diff --git a/README.md b/README.md index 99c6226..7c8541b 100644 --- a/README.md +++ b/README.md @@ -35,12 +35,12 @@ docker run --network=host --rm ubercadence/cli:master --do sample domain registe ## Installation cadence-python ``` -pip install cadence-client==1.0.0b2 +pip install cadence-client==1.0.0b3 ``` ## Hello World Sample -``` +```python import sys import logging from cadence.activity_method import activity_method @@ -63,7 +63,7 @@ class GreetingActivities: # Activities Implementation class GreetingActivitiesImpl: def compose_greeting(self, greeting: str, name: str): - return greeting + " " + name + "!" + return f"{greeting} {name}!" # Workflow Interface diff --git a/cadence/decision_loop.py b/cadence/decision_loop.py index 954ca7c..afdf0d5 100644 --- a/cadence/decision_loop.py +++ b/cadence/decision_loop.py @@ -27,7 +27,7 @@ HistoryEvent, EventType, WorkflowType, ScheduleActivityTaskDecisionAttributes, \ CancelWorkflowExecutionDecisionAttributes, StartTimerDecisionAttributes, TimerFiredEventAttributes, \ FailWorkflowExecutionDecisionAttributes, RecordMarkerDecisionAttributes, Header, WorkflowQuery, \ - RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse + RespondQueryTaskCompletedRequest, QueryTaskCompletedType, QueryWorkflowResponse, DecisionTaskFailedCause from cadence.conversions import json_to_args, args_to_json from cadence.decisions import DecisionId, DecisionTarget from cadence.exception_handling import serialize_exception, deserialize_exception @@ -537,6 +537,7 @@ class ReplayDecider: decision_events: DecisionEvents = None decisions: OrderedDict[DecisionId, DecisionStateMachine] = field(default_factory=OrderedDict) decision_context: DecisionContext = None + workflow_id: str = None activity_id_to_scheduled_event_id: Dict[str, int] = field(default_factory=dict) @@ -674,6 +675,11 @@ def handle_activity_task_failed(self, event: HistoryEvent): def handle_activity_task_timed_out(self, event: HistoryEvent): self.decision_context.handle_activity_task_timed_out(event) + def handle_decision_task_failed(self, event: HistoryEvent): + attr = event.decision_task_failed_event_attributes + if attr and attr.cause == DecisionTaskFailedCause.RESET_WORKFLOW: + self.decision_context.set_current_run_id(attr.new_run_id) + def handle_workflow_execution_signaled(self, event: HistoryEvent): signaled_event_attributes = event.workflow_execution_signaled_event_attributes signal_input = signaled_event_attributes.input @@ -816,6 +822,7 @@ def on_timer_canceled(self: ReplayDecider, event: HistoryEvent): EventType.DecisionTaskScheduled: noop, EventType.DecisionTaskStarted: noop, # Filtered by HistoryHelper EventType.DecisionTaskTimedOut: noop, # TODO: check + EventType.DecisionTaskFailed: ReplayDecider.handle_decision_task_failed, EventType.ActivityTaskScheduled: ReplayDecider.handle_activity_task_scheduled, EventType.ActivityTaskStarted: ReplayDecider.handle_activity_task_started, EventType.ActivityTaskCompleted: ReplayDecider.handle_activity_task_completed, @@ -904,14 +911,16 @@ def poll(self) -> Optional[PollForDecisionTaskResponse]: def process_task(self, decision_task: PollForDecisionTaskResponse) -> List[Decision]: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decisions: List[Decision] = decider.decide(decision_task.history.events) decider.destroy() return decisions def process_query(self, decision_task: PollForDecisionTaskResponse) -> bytes: execution_id = str(decision_task.workflow_execution) - decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker) + decider = ReplayDecider(execution_id, decision_task.workflow_type, self.worker, + workflow_id=decision_task.workflow_execution.workflow_id) decider.decide(decision_task.history.events) try: result = decider.query(decision_task, decision_task.query) diff --git a/cadence/tests/test_decision_loop.py b/cadence/tests/test_decision_loop.py index 2d4ba10..255225c 100644 --- a/cadence/tests/test_decision_loop.py +++ b/cadence/tests/test_decision_loop.py @@ -6,7 +6,8 @@ from cadence.cadence_types import HistoryEvent, EventType, PollForDecisionTaskResponse, \ ScheduleActivityTaskDecisionAttributes, WorkflowExecutionStartedEventAttributes, Decision, \ - ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes + ActivityTaskStartedEventAttributes, MarkerRecordedEventAttributes, DecisionTaskFailedEventAttributes, \ + DecisionTaskFailedCause from cadence.clock_decision_context import VERSION_MARKER_NAME from cadence.decision_loop import HistoryHelper, is_decision_event, DecisionTaskLoop, ReplayDecider, DecisionEvents, \ nano_to_milli @@ -365,6 +366,18 @@ def test_handle_activity_task_started(self): args, kwargs = state_machine.handle_started_event.call_args_list[0] self.assertIn(event, args) + def test_handle_decision_task_failed(self): + event = HistoryEvent(event_id=15) + event.event_type = EventType.DecisionTaskFailed + event.decision_task_failed_event_attributes = DecisionTaskFailedEventAttributes() + event.decision_task_failed_event_attributes.cause = DecisionTaskFailedCause.RESET_WORKFLOW + event.decision_task_failed_event_attributes.new_run_id = "the-new-run-id" + self.decider.decision_context = decision_context = MagicMock() + self.decider.handle_decision_task_failed(event) + decision_context.set_current_run_id.assert_called() + args, kwargs = decision_context.set_current_run_id.call_args_list[0] + assert args[0] == "the-new-run-id" + def tearDown(self) -> None: self.decider.destroy() diff --git a/cadence/workflow.py b/cadence/workflow.py index ea3ecc6..be570a8 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -8,7 +8,6 @@ from typing import Callable, List, Type, Dict, Tuple from uuid import uuid4 -from six import reraise from cadence.activity import ActivityCompletionClient from cadence.activity_method import RetryParameters, ActivityOptions @@ -90,6 +89,19 @@ def get_logger(name): task: ITask = ITask.current() return task.decider.decision_context.get_logger(name) + @staticmethod + def get_workflow_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.workflow_id + + @staticmethod + def get_execution_id(): + from cadence.decision_loop import ITask + task: ITask = ITask.current() + return task.decider.execution_id + + class WorkflowStub: pass diff --git a/requirements.txt b/requirements.txt index 9d6fbd7..5b30432 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ more-itertools==7.0.0 ply==3.11 -six==1.12.0 tblib==1.6.0 thriftrw==1.7.2 dataclasses-json==0.3.8 diff --git a/setup.py b/setup.py index 299424c..ea3d576 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="cadence-client", - version="1.0.0-beta2", + version="1.0.0-beta3", author="Mohammed Firdaus", author_email="firdaus.halim@gmail.com", description="Python framework for Cadence Workflow Service", @@ -17,12 +17,13 @@ "dataclasses-json>=0.3.8", "more-itertools>=7.0.0", "ply>=3.11", - "six>=1.12.0", "tblib>=1.6.0", "thriftrw>=1.7.2", ], classifiers=[ "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ],