fix(cjson): santize bullmq payloads accurately#3892
Conversation
PR SummaryMedium Risk Overview Simplifies async backend selection by removing Adjusts BullMQ operational settings: BullMQ is now enabled solely by presence of Written by Cursor Bugbot for commit 07716cd. Configure here. |
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
Greptile SummaryThis PR fixes a known Lua Alongside the core cjson fix, the PR also:
Key items to verify before merge:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant R as Route/Webhook
participant Q as WorkspaceDispatch
participant B as BullMQ Worker
participant S as sanitizeBullMQPayload
participant E as ExecutionEngine
R->>R: isBullMQEnabled()?<br/>(now: Boolean(REDIS_URL))
alt BullMQ enabled
R->>Q: enqueueWorkspaceDispatch(bullmqPayload)
Q->>Q: Lua: atomically claim<br/>(add lease + remove from lane)<br/>returns jobId only
Q->>Q: TypeScript: fetch record,<br/>update status to 'admitting',<br/>save back to Redis
Q->>B: dispatch BullMQ job
B->>S: sanitizeBullMQPayload(payload)<br/>fix cjson {} → [] for empty arrays
S->>E: executeQueuedWorkflowJob(sanitized)
else TriggerDev enabled
R->>Q: getJobQueue().enqueue(...)
Q->>E: executeWorkflowJob(payload)<br/>ensureArray(callChain) only
else Database/inline mode
R->>E: executeInline()
end
Reviews (1): Last reviewed commit: "fix(cjson): santization bullmq payloads ..." | Re-trigger Greptile |
| export function isBullMQEnabled(): boolean { | ||
| return isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL) | ||
| return Boolean(env.REDIS_URL) | ||
| } |
There was a problem hiding this comment.
Implicit BullMQ activation breaks deployments without workers
isBullMQEnabled() now returns true whenever REDIS_URL is set. Previously it also required CONCURRENCY_CONTROL_ENABLED=true, which was the explicit opt-in gate. Many self-hosted deployments configure REDIS_URL for caching, rate-limiting, or session storage without running BullMQ workers. After this change, any such deployment will start routing async workflow executions to BullMQ queues that are never drained — jobs silently queue up and never run.
The old guard (isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL)) ensured both the infrastructure (Redis) and the operator intent (CONCURRENCY_CONTROL_ENABLED=true) were present. Removing CONCURRENCY_CONTROL_ENABLED entirely eliminates that intent signal with no migration path.
If the goal is to consolidate configuration, consider keeping REDIS_URL as the sole gate but documenting this clearly, and ensuring all deployment guides explicitly note that providing REDIS_URL now enables BullMQ.
| removeOnComplete: { age: 2 * 60 * 60, count: 1000 }, | ||
| removeOnFail: { age: 2 * 60 * 60, count: 500 }, | ||
| } | ||
| case 'webhook-execution': | ||
| return { |
There was a problem hiding this comment.
Aggressive TTL reduction limits observability and retry windows
All BullMQ queue retention has been reduced from 24 h / 7 days down to 2 hours, and the same change applies to JOB_TTL_SECONDS in redis-store.ts (previously 48 h). While shorter retention helps control Redis memory, 2 hours is a very tight window:
- A failed workflow job that fails at 01:00 and isn't noticed until 03:30 will have already been evicted — no state to inspect or manually retry.
removeOnFailat 2 h means failed jobs disappear before an on-call engineer in a different timezone can investigate them.
The previous removeOnFail values (7 days for workflow-execution, 3 days for webhook-execution and schedule-execution) provided a more reasonable recovery window. Consider retaining a longer failure TTL (e.g., 24–72 h) while keeping the shorter success TTL for completed jobs.
| export function sanitizeBullMQPayload(payload: any): void { | ||
| if (!payload) return | ||
|
|
||
| payload.selectedOutputs = ensureArray(payload.selectedOutputs) | ||
|
|
||
| if (payload.metadata) { | ||
| payload.metadata.callChain = ensureArray(payload.metadata.callChain) | ||
|
|
||
| if (payload.metadata.pendingBlocks !== undefined) { | ||
| payload.metadata.pendingBlocks = ensureArray(payload.metadata.pendingBlocks) | ||
| } | ||
|
|
||
| if (payload.metadata.workflowStateOverride?.edges !== undefined) { | ||
| payload.metadata.workflowStateOverride.edges = ensureArray( | ||
| payload.metadata.workflowStateOverride.edges | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| if (payload.runFromBlock?.sourceSnapshot) { | ||
| const state = payload.runFromBlock.sourceSnapshot | ||
| for (const field of EXECUTION_STATE_ARRAY_FIELDS) { | ||
| if (field in state && !Array.isArray(state[field])) { | ||
| state[field] = [] | ||
| } | ||
| } | ||
|
|
||
| if (state.dagIncomingEdges && typeof state.dagIncomingEdges === 'object') { | ||
| for (const key of Object.keys(state.dagIncomingEdges)) { | ||
| if (!Array.isArray(state.dagIncomingEdges[key])) { | ||
| state.dagIncomingEdges[key] = [] | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
sanitizeBullMQPayload has no unit tests
The PR checklist marks "Tests added/updated and passing", but no test file for json-sanitize.ts appears in the diff. The sanitization logic handles several nested paths (payload.metadata.callChain, payload.metadata.workflowStateOverride.edges, payload.runFromBlock.sourceSnapshot.*, and dagIncomingEdges per-key) that are easy to accidentally break.
Consider adding a dedicated test file that exercises:
- Top-level fields (
selectedOutputs: {}→[]) - Nested metadata fields
- The
EXECUTION_STATE_ARRAY_FIELDSloop with mixed present/absent keys - The
dagIncomingEdgesper-key fix - A
null/undefinedpayload (guard at line 34)
| ) | ||
|
|
||
| if (shouldExecuteInline()) { | ||
| if (!isBullMQEnabled() && !isTriggerDevEnabled) { |
There was a problem hiding this comment.
isTriggerDevEnabled used inconsistently as a value vs. function call
isBullMQEnabled() is called as a function (parentheses), while isTriggerDevEnabled is referenced as a constant (no parentheses). Both are correct — isBullMQEnabled is a function and isTriggerDevEnabled is a module-level constant — but the asymmetry can confuse readers who might expect both to be function calls given the naming convention isFoo.
The same pattern appears at apps/sim/app/api/workflows/[id]/execute/route.ts (line 246) and apps/sim/lib/webhooks/processor.ts (line 1268). No code change is strictly necessary, but adding an inline comment clarifying that isTriggerDevEnabled is a constant (evaluated at module load) would aid readability.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Summary
Sanitize bullmq payloads since cjson within bullmq is treating empty arrays as objects (known issue).
Type of Change
Testing
Tested manually
Checklist