Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
if should_update_balance_threshold(inner_error) {
balance_threshold_update_needed = true;
}
} else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e {
if should_update_balance_threshold(inner_error) {
balance_threshold_update_needed = true;
}
} else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e
&& should_update_balance_threshold(inner_error)
{
balance_threshold_update_needed = true;
}

// For deterministic build failures, fail the transaction immediately
Expand All @@ -284,10 +284,8 @@ impl<C: Chain> EoaExecutorWorker<C> {
}
}

if balance_threshold_update_needed {
if let Err(e) = self.update_balance_threshold().await {
tracing::error!(error = ?e, "Failed to update balance threshold");
}
if balance_threshold_update_needed && let Err(e) = self.update_balance_threshold().await {
tracing::error!(error = ?e, "Failed to update balance threshold");
}

Ok(cleaned_results)
Expand Down
67 changes: 50 additions & 17 deletions twmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,25 +947,39 @@ impl<H: DurableExecution> Queue<H> {
local job_data_hash = KEYS[3]
local results_hash = KEYS[4] -- e.g., "myqueue:results"
local dedupe_set_name = KEYS[5]
local active_hash = KEYS[6]
local pending_list = KEYS[7]
local delayed_zset = KEYS[8]

local max_len = tonumber(ARGV[1])

local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
local actually_deleted = 0

if #job_ids_to_delete > 0 then
for _, j_id in ipairs(job_ids_to_delete) do
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('HDEL', results_hash, j_id)
redis.call('DEL', errors_list_name)
-- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
-- This prevents the race where we prune metadata for a job that's currently running
local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil

-- Only delete if the job is NOT currently in the system
if not is_active and not is_pending and not is_delayed then
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('HDEL', results_hash, j_id)
redis.call('DEL', errors_list_name)
actually_deleted = actually_deleted + 1
end
end
redis.call('LTRIM', list_name, 0, max_len - 1)
end
return #job_ids_to_delete
return actually_deleted
"#,
);

Expand All @@ -975,6 +989,9 @@ impl<H: DurableExecution> Queue<H> {
.key(self.job_data_hash_name())
.key(self.job_result_hash_name()) // results_hash
.key(self.dedupe_set_name())
.key(self.active_hash_name()) // Check if job is active
.key(self.pending_list_name()) // Check if job is pending
.key(self.delayed_zset_name()) // Check if job is delayed
.arg(self.options.max_success) // max_len (LTRIM is 0 to max_success-1)
.invoke_async(&mut self.redis.clone())
.await?;
Expand Down Expand Up @@ -1099,24 +1116,37 @@ impl<H: DurableExecution> Queue<H> {
local list_name = KEYS[2]
local job_data_hash = KEYS[3]
local dedupe_set_name = KEYS[4]
local active_hash = KEYS[5]
local pending_list = KEYS[6]
local delayed_zset = KEYS[7]

local max_len = tonumber(ARGV[1])

local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
local actually_deleted = 0

if #job_ids_to_delete > 0 then
for _, j_id in ipairs(job_ids_to_delete) do
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('DEL', errors_list_name)
-- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil

-- Only delete if the job is NOT currently in the system
if not is_active and not is_pending and not is_delayed then
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('DEL', errors_list_name)
actually_deleted = actually_deleted + 1
end
end
redis.call('LTRIM', list_name, 0, max_len - 1)
end
return #job_ids_to_delete
return actually_deleted
"#,
);

Expand All @@ -1125,6 +1155,9 @@ impl<H: DurableExecution> Queue<H> {
.key(self.failed_list_name())
.key(self.job_data_hash_name())
.key(self.dedupe_set_name())
.key(self.active_hash_name()) // Check if job is active
.key(self.pending_list_name()) // Check if job is pending
.key(self.delayed_zset_name()) // Check if job is delayed
.arg(self.options.max_failed)
.invoke_async(&mut self.redis.clone())
.await?;
Expand Down
82 changes: 67 additions & 15 deletions twmq/src/multilane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,25 +961,51 @@ impl<H: DurableExecution> MultilaneQueue<H> {
local job_data_hash = KEYS[3]
local results_hash = KEYS[4]
local dedupe_set_name = KEYS[5]
local lanes_zset = KEYS[6]

local max_len = tonumber(ARGV[1])

local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
local actually_deleted = 0

if #job_ids_to_delete > 0 then
for _, j_id in ipairs(job_ids_to_delete) do
-- Get the lane_id for this job to check if it's active/pending/delayed
local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta'
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('HDEL', results_hash, j_id)
redis.call('DEL', errors_list_name)
local lane_id = redis.call('HGET', job_meta_hash, 'lane_id')

local should_delete = true

if lane_id then
-- Check if job is in any active state for this lane
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active'
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending'
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed'

local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1
local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil
local is_delayed = redis.call('ZSCORE', lane_delayed_zset, j_id) ~= nil

-- Don't delete if job is currently in the system
if is_active or is_pending or is_delayed then
should_delete = false
end
end

if should_delete then
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('HDEL', results_hash, j_id)
redis.call('DEL', errors_list_name)
actually_deleted = actually_deleted + 1
end
end
redis.call('LTRIM', list_name, 0, max_len - 1)
end
return #job_ids_to_delete
return actually_deleted
"#,
);

Expand All @@ -989,6 +1015,7 @@ impl<H: DurableExecution> MultilaneQueue<H> {
.key(self.job_data_hash_name())
.key(self.job_result_hash_name())
.key(self.dedupe_set_name())
.key(self.lanes_zset_name()) // Need to check lanes
.arg(self.options.max_success)
.invoke_async(&mut self.redis.clone())
.await?;
Expand Down Expand Up @@ -1087,20 +1114,45 @@ impl<H: DurableExecution> MultilaneQueue<H> {
local max_len = tonumber(ARGV[1])

local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
local actually_deleted = 0

if #job_ids_to_delete > 0 then
for _, j_id in ipairs(job_ids_to_delete) do
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'
-- Get the lane_id for this job to check if it's active/pending/delayed
local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('DEL', errors_list_name)
local lane_id = redis.call('HGET', job_meta_hash, 'lane_id')

local should_delete = true

if lane_id then
-- Check if job is in any active state for this lane
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active'
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending'
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed'

local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1
local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil
local is_delayed = redis.call('ZSCORE', lane_delayed_zset, j_id) ~= nil

-- Don't delete if job is currently in the system
if is_active or is_pending or is_delayed then
should_delete = false
end
end

if should_delete then
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'

redis.call('SREM', dedupe_set_name, j_id)
redis.call('HDEL', job_data_hash, j_id)
redis.call('DEL', job_meta_hash)
redis.call('DEL', errors_list_name)
actually_deleted = actually_deleted + 1
end
end
redis.call('LTRIM', list_name, 0, max_len - 1)
end
return #job_ids_to_delete
return actually_deleted
"#,
);

Expand Down
Loading
Loading