Skip to content

Commit af859cd

Browse files
feat(workflows): lock/duplicate improvements for workflows (#4387)
* feat(workflows): lock/duplicate improvements * fix duplicate var remap bug * address comments * remove dead vars * fix tests * address comments * code cleanup * address comments * address comments * minor change * remove dead code
1 parent 50e118a commit af859cd

60 files changed

Lines changed: 17413 additions & 1063 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/realtime/src/handlers/operations.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from '@sim/realtime-protocol/constants'
1111
import { WorkflowOperationSchema } from '@sim/realtime-protocol/schemas'
1212
import { generateId } from '@sim/utils/id'
13+
import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
1314
import { ZodError } from 'zod'
1415
import { persistWorkflowOperation } from '@/database/operations'
1516
import type { AuthenticatedSocket } from '@/middleware/auth'
@@ -139,6 +140,24 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
139140
}
140141
}
141142

143+
try {
144+
await assertWorkflowMutable(workflowId)
145+
} catch (error) {
146+
if (error instanceof WorkflowLockedError) {
147+
emitOperationError(
148+
{
149+
type: 'WORKFLOW_LOCKED',
150+
message: error.message,
151+
operation,
152+
target,
153+
},
154+
{ error: error.message, retryable: false }
155+
)
156+
return
157+
}
158+
throw error
159+
}
160+
142161
// Broadcast first for position updates to minimize latency, then persist
143162
// For other operations, persist first for consistency
144163
if (isPositionUpdate) {

apps/realtime/src/handlers/subblocks.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
22
import { workflow, workflowBlocks } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { SUBBLOCK_OPERATIONS } from '@sim/realtime-protocol/constants'
5+
import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
56
import { and, eq } from 'drizzle-orm'
67
import type { AuthenticatedSocket } from '@/middleware/auth'
78
import { checkRolePermission } from '@/middleware/permissions'
@@ -151,6 +152,28 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
151152
return
152153
}
153154

155+
try {
156+
await assertWorkflowMutable(workflowId)
157+
} catch (error) {
158+
if (error instanceof WorkflowLockedError) {
159+
socket.emit('operation-forbidden', {
160+
type: 'WORKFLOW_LOCKED',
161+
message: error.message,
162+
operation: SUBBLOCK_OPERATIONS.UPDATE,
163+
target: 'subblock',
164+
})
165+
if (operationId) {
166+
socket.emit('operation-failed', {
167+
operationId,
168+
error: error.message,
169+
retryable: false,
170+
})
171+
}
172+
return
173+
}
174+
throw error
175+
}
176+
154177
// Update user activity
155178
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
156179

@@ -231,6 +254,22 @@ async function flushSubblockUpdate(
231254
return
232255
}
233256

257+
try {
258+
await assertWorkflowMutable(workflowId)
259+
} catch (error) {
260+
if (error instanceof WorkflowLockedError) {
261+
pending.opToSocket.forEach((socketId, opId) => {
262+
io.to(socketId).emit('operation-failed', {
263+
operationId: opId,
264+
error: error.message,
265+
retryable: false,
266+
})
267+
})
268+
return
269+
}
270+
throw error
271+
}
272+
234273
let updateSuccessful = false
235274
let blockLocked = false
236275
await db.transaction(async (tx) => {

apps/realtime/src/handlers/variables.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
22
import { workflow } from '@sim/db/schema'
33
import { createLogger } from '@sim/logger'
44
import { VARIABLE_OPERATIONS } from '@sim/realtime-protocol/constants'
5+
import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz'
56
import { eq } from 'drizzle-orm'
67
import type { AuthenticatedSocket } from '@/middleware/auth'
78
import { checkRolePermission } from '@/middleware/permissions'
@@ -140,6 +141,28 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
140141
return
141142
}
142143

144+
try {
145+
await assertWorkflowMutable(workflowId)
146+
} catch (error) {
147+
if (error instanceof WorkflowLockedError) {
148+
socket.emit('operation-forbidden', {
149+
type: 'WORKFLOW_LOCKED',
150+
message: error.message,
151+
operation: VARIABLE_OPERATIONS.UPDATE,
152+
target: 'variable',
153+
})
154+
if (operationId) {
155+
socket.emit('operation-failed', {
156+
operationId,
157+
error: error.message,
158+
retryable: false,
159+
})
160+
}
161+
return
162+
}
163+
throw error
164+
}
165+
143166
// Update user activity
144167
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
145168

@@ -218,6 +241,22 @@ async function flushVariableUpdate(
218241
return
219242
}
220243

244+
try {
245+
await assertWorkflowMutable(workflowId)
246+
} catch (error) {
247+
if (error instanceof WorkflowLockedError) {
248+
pending.opToSocket.forEach((socketId, opId) => {
249+
io.to(socketId).emit('operation-failed', {
250+
operationId: opId,
251+
error: error.message,
252+
retryable: false,
253+
})
254+
})
255+
return
256+
}
257+
throw error
258+
}
259+
221260
let updateSuccessful = false
222261
await db.transaction(async (tx) => {
223262
const [workflowRecord] = await tx

0 commit comments

Comments
 (0)