diff --git a/temporalio/lib/temporalio/client/async_activity_handle.rb b/temporalio/lib/temporalio/client/async_activity_handle.rb index 2010a6c3..f3b5678f 100644 --- a/temporalio/lib/temporalio/client/async_activity_handle.rb +++ b/temporalio/lib/temporalio/client/async_activity_handle.rb @@ -29,6 +29,7 @@ def initialize(client:, task_token:, id_reference:) # @param details [Array] Details of the heartbeat. # @param detail_hints [Array, nil] Converter hints for the details. # @param rpc_options [RPCOptions, nil] Advanced RPC options. + # @raise [Error::AsyncActivityCanceledError] If the activity was canceled, paused, and/or reset. def heartbeat(*details, detail_hints: nil, rpc_options: nil) @client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new( task_token_or_id_reference:, diff --git a/temporalio/lib/temporalio/error.rb b/temporalio/lib/temporalio/error.rb index e46df4a1..0170990a 100644 --- a/temporalio/lib/temporalio/error.rb +++ b/temporalio/lib/temporalio/error.rb @@ -97,9 +97,13 @@ def initialize # Error that occurs when an async activity handle tries to heartbeat and the activity is marked as canceled. class AsyncActivityCanceledError < Error + # @return [Activity::CancellationDetails] + attr_reader :details + # @!visibility private - def initialize + def initialize(details) super('Activity canceled') + @details = details end end diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index 8466d9b0..59cb9f6c 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -2,6 +2,7 @@ require 'google/protobuf/well_known_types' require 'securerandom' +require 'temporalio/activity' require 'temporalio/api' require 'temporalio/client/activity_id_reference' require 'temporalio/client/async_activity_handle' @@ -829,9 +830,13 @@ def heartbeat_async_activity(input) rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) end - raise Error::AsyncActivityCanceledError if resp.cancel_requested + return unless resp.cancel_requested || resp.activity_paused || resp.activity_reset - nil + raise Error::AsyncActivityCanceledError, Activity::CancellationDetails.new( + cancel_requested: resp.cancel_requested, + paused: resp.activity_paused, + reset: resp.activity_reset + ) end def complete_async_activity(input) diff --git a/temporalio/sig/temporalio/error.rbs b/temporalio/sig/temporalio/error.rbs index 720ffac6..c067e436 100644 --- a/temporalio/sig/temporalio/error.rbs +++ b/temporalio/sig/temporalio/error.rbs @@ -35,6 +35,16 @@ module Temporalio def initialize: -> void end + class ScheduleAlreadyRunningError < Error + def initialize: -> void + end + + class AsyncActivityCanceledError < Error + attr_reader details: Activity::CancellationDetails + + def initialize: (Activity::CancellationDetails details) -> void + end + class RPCError < Error attr_reader code: Code::enum diff --git a/temporalio/test/worker_activity_test.rb b/temporalio/test/worker_activity_test.rb index 169a750f..d39bd4f8 100644 --- a/temporalio/test/worker_activity_test.rb +++ b/temporalio/test/worker_activity_test.rb @@ -562,16 +562,27 @@ def test_worker_shutdown class AsyncCompletionActivity < Temporalio::Activity::Definition def initialize @task_token = Queue.new + @id_ref = Queue.new end def execute - @task_token.push(Temporalio::Activity::Context.current.info.task_token) + info = Temporalio::Activity::Context.current.info + @task_token.push(info.task_token) + @id_ref.push(Temporalio::Client::ActivityIDReference.new( + workflow_id: info.workflow_id, + run_id: info.workflow_run_id, + activity_id: info.activity_id + )) raise Temporalio::Activity::CompleteAsyncError end def wait_task_token @task_token.pop end + + def wait_id_ref + @id_ref.pop + end end def test_async_completion_success @@ -629,6 +640,70 @@ def test_async_completion_cancel end end + def test_async_completion_cancel_details + # Cancel + act = AsyncCompletionActivity.new + execute_activity(act, wait_for_cancellation: true) do |handle| + task_token = act.wait_task_token + handle.cancel + assert_eventually do + err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do + env.client.async_activity_handle(task_token).heartbeat + end + assert err.details.cancel_requested? + refute err.details.paused? + refute err.details.reset? + end + end + + # Pause + act = AsyncCompletionActivity.new + execute_activity(act, wait_for_cancellation: true) do + id_ref = act.wait_id_ref + env.client.workflow_service.pause_activity(Temporalio::Api::WorkflowService::V1::PauseActivityRequest.new( + namespace: env.client.namespace, + execution: Temporalio::Api::Common::V1::WorkflowExecution.new( + workflow_id: id_ref.workflow_id, + run_id: id_ref.run_id + ), + identity: env.client.connection.options.identity, + id: id_ref.activity_id, + reason: 'my reason' + )) + assert_eventually do + err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do + env.client.async_activity_handle(id_ref).heartbeat + end + refute err.details.cancel_requested? + assert err.details.paused? + refute err.details.reset? + end + end + + # Reset + act = AsyncCompletionActivity.new + execute_activity(act, wait_for_cancellation: true) do + id_ref = act.wait_id_ref + env.client.workflow_service.reset_activity(Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new( + namespace: env.client.namespace, + execution: Temporalio::Api::Common::V1::WorkflowExecution.new( + workflow_id: id_ref.workflow_id, + run_id: id_ref.run_id + ), + identity: env.client.connection.options.identity, + id: id_ref.activity_id + )) + assert_eventually do + err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do + env.client.async_activity_handle(id_ref).heartbeat + end + refute err.details.cancel_requested? + refute err.details.paused? + assert err.details.reset? + end + end + end + def test_async_completion_timeout act = AsyncCompletionActivity.new execute_activity(act, start_to_close_timeout: 0.5, wait_for_cancellation: true) do diff --git a/temporalio/test/worker_workflow_activity_test.rb b/temporalio/test/worker_workflow_activity_test.rb index 850a7b90..d4079a79 100644 --- a/temporalio/test/worker_workflow_activity_test.rb +++ b/temporalio/test/worker_workflow_activity_test.rb @@ -277,7 +277,7 @@ def execute(swallow) raise unless swallow det = Temporalio::Activity::Context.current.cancellation_details - "canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}" + "canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}, reset: #{det&.reset?}" end end @@ -312,7 +312,7 @@ def test_cancellation_pause reason: 'my reason' ) env.client.workflow_service.pause_activity(req) - assert_equal 'canceled - paused: true, requested: false', handle.result + assert_equal 'canceled - paused: true, requested: false, reset: false', handle.result end # Re-raise @@ -344,4 +344,28 @@ def test_cancellation_pause end end end + + def test_cancellation_reset + queue = Queue.new + execute_workflow( + CancellationDetailsWorkflow, true, + activities: [CancellationDetailsActivity.new(queue)] + ) do |handle| + # Wait for activity to start + activity_id = queue.pop(timeout: 10) + assert activity_id + # Send reset, and confirm we get what we expect + req = Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new( + namespace: env.client.namespace, + execution: Temporalio::Api::Common::V1::WorkflowExecution.new( + workflow_id: handle.id, + run_id: handle.result_run_id + ), + identity: env.client.connection.options.identity, + id: activity_id + ) + env.client.workflow_service.reset_activity(req) + assert_equal 'canceled - paused: false, requested: false, reset: true', handle.result + end + end end