Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def initialize(client:, task_token:, id_reference:)
# @param details [Array<Object>] Details of the heartbeat.
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
# @raise [Error::AsyncActivityCanceledError] If the activity was canceled, paused, and/or reset.
def heartbeat(*details, detail_hints: nil, rpc_options: nil)
@client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new(
task_token_or_id_reference:,
Expand Down
6 changes: 5 additions & 1 deletion temporalio/lib/temporalio/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ def initialize

# Error that occurs when an async activity handle tries to heartbeat and the activity is marked as canceled.
class AsyncActivityCanceledError < Error
# @return [Activity::CancellationDetails]
attr_reader :details

# @!visibility private
def initialize
def initialize(details)
super('Activity canceled')
@details = details
end
end

Expand Down
9 changes: 7 additions & 2 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'google/protobuf/well_known_types'
require 'securerandom'
require 'temporalio/activity'
require 'temporalio/api'
require 'temporalio/client/activity_id_reference'
require 'temporalio/client/async_activity_handle'
Expand Down Expand Up @@ -829,9 +830,13 @@ def heartbeat_async_activity(input)
rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
)
end
raise Error::AsyncActivityCanceledError if resp.cancel_requested
return unless resp.cancel_requested || resp.activity_paused || resp.activity_reset

nil
raise Error::AsyncActivityCanceledError, Activity::CancellationDetails.new(
cancel_requested: resp.cancel_requested,
paused: resp.activity_paused,
reset: resp.activity_reset
)
end

def complete_async_activity(input)
Expand Down
10 changes: 10 additions & 0 deletions temporalio/sig/temporalio/error.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ module Temporalio
def initialize: -> void
end

class ScheduleAlreadyRunningError < Error
def initialize: -> void
end

class AsyncActivityCanceledError < Error
attr_reader details: Activity::CancellationDetails

def initialize: (Activity::CancellationDetails details) -> void
end

class RPCError < Error
attr_reader code: Code::enum

Expand Down
77 changes: 76 additions & 1 deletion temporalio/test/worker_activity_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -562,16 +562,27 @@ def test_worker_shutdown
class AsyncCompletionActivity < Temporalio::Activity::Definition
def initialize
@task_token = Queue.new
@id_ref = Queue.new
end

def execute
@task_token.push(Temporalio::Activity::Context.current.info.task_token)
info = Temporalio::Activity::Context.current.info
@task_token.push(info.task_token)
@id_ref.push(Temporalio::Client::ActivityIDReference.new(
workflow_id: info.workflow_id,
run_id: info.workflow_run_id,
activity_id: info.activity_id
))
raise Temporalio::Activity::CompleteAsyncError
end

def wait_task_token
@task_token.pop
end

def wait_id_ref
@id_ref.pop
end
end

def test_async_completion_success
Expand Down Expand Up @@ -629,6 +640,70 @@ def test_async_completion_cancel
end
end

def test_async_completion_cancel_details
# Cancel
act = AsyncCompletionActivity.new
execute_activity(act, wait_for_cancellation: true) do |handle|
task_token = act.wait_task_token
handle.cancel
assert_eventually do
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
env.client.async_activity_handle(task_token).heartbeat
end
assert err.details.cancel_requested?
refute err.details.paused?
refute err.details.reset?
end
end

# Pause
act = AsyncCompletionActivity.new
execute_activity(act, wait_for_cancellation: true) do
id_ref = act.wait_id_ref
env.client.workflow_service.pause_activity(Temporalio::Api::WorkflowService::V1::PauseActivityRequest.new(
namespace: env.client.namespace,
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
workflow_id: id_ref.workflow_id,
run_id: id_ref.run_id
),
identity: env.client.connection.options.identity,
id: id_ref.activity_id,
reason: 'my reason'
))
assert_eventually do
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
env.client.async_activity_handle(id_ref).heartbeat
end
refute err.details.cancel_requested?
assert err.details.paused?
refute err.details.reset?
end
end

# Reset
act = AsyncCompletionActivity.new
execute_activity(act, wait_for_cancellation: true) do
id_ref = act.wait_id_ref
env.client.workflow_service.reset_activity(Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new(
namespace: env.client.namespace,
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
workflow_id: id_ref.workflow_id,
run_id: id_ref.run_id
),
identity: env.client.connection.options.identity,
id: id_ref.activity_id
))
assert_eventually do
err = assert_raises(Temporalio::Error::AsyncActivityCanceledError) do
env.client.async_activity_handle(id_ref).heartbeat
end
refute err.details.cancel_requested?
refute err.details.paused?
assert err.details.reset?
end
end
end

def test_async_completion_timeout
act = AsyncCompletionActivity.new
execute_activity(act, start_to_close_timeout: 0.5, wait_for_cancellation: true) do
Expand Down
28 changes: 26 additions & 2 deletions temporalio/test/worker_workflow_activity_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def execute(swallow)
raise unless swallow

det = Temporalio::Activity::Context.current.cancellation_details
"canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}"
"canceled - paused: #{det&.paused?}, requested: #{det&.cancel_requested?}, reset: #{det&.reset?}"
end
end

Expand Down Expand Up @@ -312,7 +312,7 @@ def test_cancellation_pause
reason: 'my reason'
)
env.client.workflow_service.pause_activity(req)
assert_equal 'canceled - paused: true, requested: false', handle.result
assert_equal 'canceled - paused: true, requested: false, reset: false', handle.result
end

# Re-raise
Expand Down Expand Up @@ -344,4 +344,28 @@ def test_cancellation_pause
end
end
end

def test_cancellation_reset
queue = Queue.new
execute_workflow(
CancellationDetailsWorkflow, true,
activities: [CancellationDetailsActivity.new(queue)]
) do |handle|
# Wait for activity to start
activity_id = queue.pop(timeout: 10)
assert activity_id
# Send reset, and confirm we get what we expect
req = Temporalio::Api::WorkflowService::V1::ResetActivityRequest.new(
namespace: env.client.namespace,
execution: Temporalio::Api::Common::V1::WorkflowExecution.new(
workflow_id: handle.id,
run_id: handle.result_run_id
),
identity: env.client.connection.options.identity,
id: activity_id
)
env.client.workflow_service.reset_activity(req)
assert_equal 'canceled - paused: false, requested: false, reset: true', handle.result
end
end
end
Loading