-
Notifications
You must be signed in to change notification settings - Fork 176
Description
Steps to Reproduce the Problem
Take this Python program:
import asyncio
import logging
from datetime import timedelta
import time
from temporalio import workflow
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@workflow.defn
class SubWorkflow:
def __init__(self) -> None:
self.signal_received = False
@workflow.run
async def run(self) -> bool:
try:
await workflow.wait_condition(
lambda: self.signal_received, timeout=timedelta(days=2)
)
return True
except asyncio.TimeoutError:
return False
@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
@workflow.defn
class TopWorkflow:
def __init__(self) -> None:
self.signal_received = False
@workflow.run
async def run(self) -> bool:
try:
# test passes if child workflow is not started.
await workflow.start_child_workflow(
SubWorkflow.run, id="swf1", task_queue="tq1"
)
await workflow.wait_condition(
lambda: self.signal_received, timeout=timedelta(days=3)
)
return True
except asyncio.TimeoutError:
return False
@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
async def main():
logging.info("Starting time skipping")
async with await WorkflowEnvironment.start_time_skipping() as env:
logging.info("Disabling auto time skipping")
with env.auto_time_skipping_disabled():
logging.info("Starting worker")
async with Worker(
env.client,
task_queue="tq1",
workflows=[TopWorkflow, SubWorkflow],
debug_mode=True,
):
logging.info("Starting workflow")
handle = await env.client.start_workflow(
TopWorkflow.run, id="wf1", task_queue="tq1"
)
logging.info("Sleeping 1 day")
await env.sleep(timedelta(days=1))
logging.info("Signalling workflow")
await handle.signal(TopWorkflow.some_signal)
logging.info("Sleeping 1 day again")
await env.sleep(timedelta(days=1))
logging.info("Sleeping 2 days")
await env.sleep(timedelta(days=2))
logging.info("Sleeping 3 days")
await env.sleep(timedelta(days=3))
logging.info("Done")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
What that does is start a parent workflow that starts a child (but doesn't wait on completion) and then completes itself when a signal is sent. Those env.sleep
calls are UnlockTimeSkippingWithSleep
. When run including the core "activations" (events to handle) and "completions" (the commands to send to server), the logs from my Windows scratch run are:
DEBUG:asyncio:Using proactor: IocpProactor
INFO:root:Starting time skipping
INFO:root:Disabling auto time skipping
INFO:root:Starting worker
INFO:root:Starting workflow
INFO:root:Sleeping 1 day
DEBUG:temporalio.worker._workflow:Received workflow activation:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
timestamp {
seconds: 1674677060
nanos: 338000000
}
history_length: 3
jobs {
start_workflow {
workflow_type: "TopWorkflow"
workflow_id: "wf1"
randomness_seed: 14816511849120529847
identity: "19760@cretz-laptop"
workflow_execution_timeout {
seconds: 315360000
}
workflow_run_timeout {
seconds: 315360000
}
workflow_task_timeout {
seconds: 10
}
first_execution_run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
attempt: 1
start_time {
seconds: 1674677060
nanos: 338000000
}
}
}
DEBUG:temporalio.worker._workflow:Sending workflow completion:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
successful {
commands {
start_child_workflow_execution {
seq: 1
namespace: "default"
workflow_id: "swf1"
workflow_type: "SubWorkflow"
task_queue: "tq1"
parent_close_policy: PARENT_CLOSE_POLICY_TERMINATE
workflow_id_reuse_policy: WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
cancellation_type: WAIT_CANCELLATION_COMPLETED
}
}
}
DEBUG:temporalio.worker._workflow:Received workflow activation:
run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
timestamp {
seconds: 1674677060
nanos: 350000000
}
history_length: 3
jobs {
start_workflow {
workflow_type: "SubWorkflow"
workflow_id: "swf1"
randomness_seed: 15491585649214177938
parent_workflow_info {
namespace: "default"
workflow_id: "wf1"
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
}
workflow_execution_timeout {
seconds: 315360000
}
workflow_run_timeout {
seconds: 315360000
}
workflow_task_timeout {
seconds: 10
}
first_execution_run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
attempt: 1
memo {
}
search_attributes {
}
start_time {
seconds: 1674677060
nanos: 350000000
}
}
}
DEBUG:temporalio.worker._workflow:Received workflow activation:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
timestamp {
seconds: 1674677060
nanos: 350000000
}
history_length: 8
jobs {
resolve_child_workflow_execution_start {
seq: 1
succeeded {
run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
}
}
}
DEBUG:temporalio.worker._workflow:Sending workflow completion:
run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
successful {
commands {
start_timer {
seq: 1
start_to_fire_timeout {
seconds: 172800
}
}
}
}
DEBUG:temporalio.worker._workflow:Sending workflow completion:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
successful {
commands {
start_timer {
seq: 1
start_to_fire_timeout {
seconds: 259200
}
}
}
}
INFO:root:Signalling workflow
INFO:root:Sleeping 1 day again
DEBUG:temporalio.worker._workflow:Received workflow activation:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
timestamp {
seconds: 1674763460
nanos: 341000000
}
history_length: 13
jobs {
signal_workflow {
signal_name: "some_signal"
identity: "19760@cretz-laptop"
}
}
DEBUG:temporalio.worker._workflow:Sending workflow completion:
run_id: "dd5d5ce5-81be-46cc-86b3-046d6ab3a027"
successful {
commands {
cancel_timer {
seq: 1
}
}
commands {
complete_workflow_execution {
result {
metadata {
key: "encoding"
value: "json/plain"
}
data: "true"
}
}
}
}
INFO:root:Sleeping 2 days
DEBUG:temporalio.worker._workflow:Received workflow activation:
run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
timestamp {
seconds: 1674849860
nanos: 360000000
}
history_length: 8
jobs {
fire_timer {
seq: 1
}
}
DEBUG:temporalio.worker._workflow:Sending workflow completion:
run_id: "de8567d0-7f5b-4447-a16d-d21c64db9b81"
successful {
commands {
complete_workflow_execution {
result {
metadata {
key: "encoding"
value: "json/plain"
}
data: "false"
}
}
}
}
INFO:temporalio.worker._worker:Beginning worker shutdown, will wait 0:00:00 before cancelling activities
Traceback (most recent call last):
File "c:\work\tem\tem-discover\scratch-python\.venv\Lib\site-packages\temporalio\service.py", line 723, in _rpc_call
resp = await client.call(
^^^^^^^^^^^^^^^^^^
File "c:\work\tem\tem-discover\scratch-python\.venv\Lib\site-packages\temporalio\bridge\client.py", line 126, in call
resp.ParseFromString(await resp_fut)
^^^^^^^^^^^^^^
temporal_sdk_bridge.RPCError: (1, 'Timeout expired', b'')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\work\tem\tem-discover\scratch-python\scratch6\scratch.py", line 91, in <module>
asyncio.run(main())
File "C:\Users\user\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "c:\work\tem\tem-discover\scratch-python\scratch6\scratch.py", line 82, in main
await env.sleep(timedelta(days=2))
File "c:\work\tem\tem-discover\scratch-python\.venv\Lib\site-packages\temporalio\testing\_workflow.py", line 425, in sleep
await self._client.test_service.unlock_time_skipping_with_sleep(req)
File "c:\work\tem\tem-discover\scratch-python\.venv\Lib\site-packages\temporalio\service.py", line 658, in __call__
return await self.service_client._rpc_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\work\tem\tem-discover\scratch-python\.venv\Lib\site-packages\temporalio\service.py", line 738, in _rpc_call
raise RPCError(message, RPCStatusCode(status), details)
temporalio.service.RPCError: Timeout expired
What seems to be happening is the child is started, the signal is sent causing the parent workflow to complete, but the child is not cancelled or whatever. Then the 2 day sleep properly causes the child timeout timer to trigger causing it to complete as expected, but that env.sleep(timedelta(days=2))
doesn't return for several seconds before test service gRPC times it out.
If you remove the sending of the signal or the starting of the child workflow there are no errors. Sorry it's in Python and may be a big replication, I can probably write in Java if needed.
Sorry if this is the same #1540, I did not dig into that one.