-
Notifications
You must be signed in to change notification settings - Fork 14
Retry policies implementation #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
durabletask/worker.py
Outdated
self.call_activity_util(id, activity, input, retry_policy) | ||
return self._pending_tasks.get(id, task.CompletableTask()) | ||
|
||
def call_activity_util(self, id: Optional[int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this call_activity_helper
. "Util" seems a little unnatural in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, merged both helper methods and calling call_activity_function_helper
.
durabletask/worker.py
Outdated
if act_task is None: | ||
encoded_input = shared.to_json(input) if input else None | ||
else: | ||
encoded_input = input.__str__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do __str__()
here? Can you add a comment explaining this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment:
# Here, we don't need to convert the input to JSON because it is already converted.
# We just need to take string representation of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't it already a str
in these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input
here is of type TInput@call_activity_function_helper
.
And, below new_schedule_task_action
expects encoded_input
to be of type str
.
I have removed __str__()
but rather typecasted to str
now.
durabletask/worker.py
Outdated
retry_policy=retry_policy) | ||
return self._pending_tasks.get(id, task.CompletableTask()) | ||
|
||
def call_sub_orchestrator_util(self, id: Optional[int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer call_sub_orchestrator_helper
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged both helper methods and calling call_activity_function_helper.
durabletask/worker.py
Outdated
if timer_task._retryable_parent is not None and timer_task._retryable_type is not None: | ||
activity_action = timer_task._retryable_parent._action | ||
|
||
if timer_task._retryable_type == "activity": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if there's a way to simplify the design so that we don't need the _retryable_type
field. It feels a bit fragile. I don't have any specific suggestions right now, but I'd like us to consider it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have tried to modify the logic, to look more standarized. Using an enum and setting it as a part of RetryableTask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified to use a boolean, instead of enum.
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I'm concerned about with this PR is whether it correctly handles cases where retry policies are mixed with when_all
and when_any
use cases. Can we add some test cases that cover these scenarios?
durabletask/worker.py
Outdated
if act_task is None: | ||
encoded_input = shared.to_json(input) if input else None | ||
else: | ||
encoded_input = input.__str__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't it already a str
in these cases?
…able activity Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
now, I have added tests for when_any and when_all with retryable task. |
durabletask/task.py
Outdated
self._is_sub_orch = is_sub_orch | ||
|
||
def try_completion(self) -> bool: | ||
if self._retry_count >= self._retry_policy.max_number_of_attempts - 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of tracking the number of retries, would it be more intuitive to track the number of attempts since we're comparing to max_number_of_attempts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is number of attempts different than number of retries? Number of attempts is retries - 1.
Or is it a nomenclature suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated it to be attempt_count
and updated the logic accordingly.
…etry-policies-impl
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
@cgillum Please review this PR. It has been a quite stretched PR time-wise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes are looking good. Just a few fairly minor things.
next_delay = sub_orch_task.compute_next_delay() | ||
if next_delay == None: | ||
sub_orch_task.fail( | ||
f"Sub-orchestration task #{task_id} failed: {failedEvent.failureDetails.errorMessage}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential improvement we could add here is including the number of retries in this error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is logged when it finally fails after Either 1. All retries have exhausted Or 2. Retry timed out.
This is not actually logged with each retry.
I can add such logging at each attempt count but would need some new definition to have this understanding that what attempt number is it And, if retry has timed out, etc.
Do you think that is required at each retry/attempt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not suggesting we log on each retry (though we could consider this too). I'm simply suggesting that the error message could include the number of retries that were attempted. For example,
f"Sub-orchestration task #{task_id} failed: {failedEvent.failureDetails.errorMessage}", | |
f"Sub-orchestration task #{task_id} failed after {attempt_count} attempt(s): {failedEvent.failureDetails.errorMessage}", |
tests/test_orchestration_executor.py
Outdated
retry_timeout=timedelta(seconds=50)), | ||
input="Tokyo") | ||
t2 = ctx.call_activity(dummy_activity, input="Seattle") | ||
winner = yield task.when_all([t1, t2]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This orchestration logic doesn't make sense. The result of when_all
is not a "winner". Rather, it's a list containing the results of both t1
and t2
. This means taht your if/else check below will always execute the else
block no matter what. Please rewrite this orchestration so that it correctly uses the task.when_all
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, updated to return as per when_all
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This PR introduces retry policies.
Retry Policies can have following attributes:
Once this PR is merged, would need to incorporate Docs also for Retry Policy and attach the Design approach taken for it.