Skip to content

Conversation

@NathanFlurry
Copy link
Member

Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes

Copy link
Member Author

NathanFlurry commented Feb 10, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Feb 10, 2026

PR Review: Add missing workflow functionality for queues

I've reviewed this PR which adds support for workflow queue message completion and improves workflow context access patterns. Here's my comprehensive analysis:

✅ Strengths

  1. Well-structured feature addition - The PR cleanly adds message completion functionality to the workflow engine, allowing workflows to acknowledge and respond to messages
  2. Good test coverage - Added tests for the new functionality including queue completion and access control validation
  3. Backward compatibility - The complete callback is optional, maintaining compatibility with existing code
  4. Proper error handling - The new workflowAccessActor test validates that db and client are only accessible within steps

🔍 Code Quality Issues

1. Typo in PR title

The PR title has "fucntionality" instead of "functionality"

2. Indentation inconsistency (rivetkit-typescript/packages/rivetkit/src/workflow/context.ts:48-68)

Lines 48-68 have inconsistent indentation. The step method body appears to have extra indentation that should be fixed.

3. Test indentation (rivetkit-typescript/packages/workflow-engine/tests/messages.test.ts:985)

Line 985 has an extra tab before it("should consume pending messages") that should be removed.

🏗️ Architecture & Design

1. Message completion flow is well-designed

The implementation properly handles completion through multiple paths:

  • Direct complete callback on Message objects (runtime-only)
  • Fallback to messageDriver.completeMessage() method
  • Proper tracking via #completionHandles Map in the driver

2. History replay handling (workflow-engine/src/context.ts:1156-1206)

The toHistoryListenMessage / fromHistoryListenMessage serialization pattern correctly preserves message metadata during replay. The use of LISTEN_HISTORY_MESSAGE_MARKER is a good pattern for versioning.

3. Multi-name listen support

The addition of array support for messageName parameter is well-implemented with proper deduplication in normalizeMessageNames().

⚠️ Potential Issues

1. Missing completion handling in listenWithTimeout

In the listenWithTimeout method, verify that all replay paths call completeConsumedMessage(). The code at line 1727-1732 looks correct, but double-check all branches.

2. Race condition prevention in driver completion (rivetkit/src/workflow/driver.ts:103-116)

The #completionHandles Map is properly protected against duplicate entries with the check at line 427-429. This is correct.

3. Completion without deletion (rivetkit/src/actor/instance/queue-manager.ts:443-448)

The completeById method resolves waiters without removing the message from the queue. This is correct behavior for workflows (message is already deleted via deleteMessages), but ensure this is intentional and documented.

🔒 Security Considerations

  1. Access control - The new tests verify that db and client are only accessible inside workflow steps via #ensureActorAccess(), which is good for preventing misuse
  2. No injection risks - Message data is passed through without modification
  3. Type safety - Generic types properly constrain message bodies

🚀 Performance Considerations

  1. Completion handle memory - The #completionHandles Map in the driver could grow unbounded if messages are never completed. Consider adding cleanup logic or size limits.
  2. Message filtering - In loadMessages(), the filter at line 50 (inFlight || availableAt > now) is efficient
  3. Serialization overhead - The history marker pattern adds minimal overhead

📝 Documentation & Testing

Strengths:

  • Good test coverage for new functionality
  • Tests verify both happy path and error conditions
  • Example usage in fixture files

Improvements needed:

  • Consider adding JSDoc comments explaining when to use complete() vs automatic completion
  • Document the behavior difference between listen() (manual completion) and listenN() (auto-completion)

✏️ Minor Issues

  1. Unused type import? Check if all imports in context.ts are used
  2. Consider extracting magic constant - LISTEN_HISTORY_MESSAGE_MARKER is well-named but could be in a constants file

📋 Summary

Verdict: Approve with minor fixes recommended

This is a well-implemented feature that adds important functionality to the workflow engine. The main issues are:

  1. Fix indentation in workflow/context.ts (lines 48-68)
  2. Fix indentation in messages.test.ts (line 985)
  3. Fix typo in PR title
  4. Consider memory management for #completionHandles Map

The architecture is sound, the feature is well-tested, and the implementation properly handles edge cases like replay and history serialization. Once the indentation issues are resolved, this is ready to merge.

Recommendations

Before merge:

  • Fix indentation issues
  • Fix PR title typo
  • Verify all replay paths call completeConsumedMessage()

Follow-up tasks:

  • Add JSDoc documentation for message completion patterns
  • Consider adding telemetry/metrics for completion timeouts
  • Add integration test for completion under high concurrency

@graphite-app
Copy link
Contributor

graphite-app bot commented Feb 10, 2026

Merge activity

  • Feb 10, 8:45 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Feb 10, 8:46 AM UTC: CI is running for this pull request on a draft pull request (#4166) due to your merge queue CI optimization settings.
  • Feb 10, 8:48 AM UTC: The Graphite merge queue removed this pull request due to downstack failures on PR #4081.
  • Feb 10, 7:35 PM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Feb 10, 7:36 PM UTC: CI is running for this pull request on a draft pull request (#4167) due to your merge queue CI optimization settings.
  • Feb 10, 7:36 PM UTC: Merged by the Graphite merge queue via draft PR: #4167.

graphite-app bot pushed a commit that referenced this pull request Feb 10, 2026
# Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update

## How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

## Checklist:

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
@NathanFlurry NathanFlurry force-pushed the 02-09-chore_frontend_add_default_vite_sentry_url branch from f45f5d5 to eb969c8 Compare February 10, 2026 19:25
@NathanFlurry NathanFlurry force-pushed the 02-09-chore_add_missing_workflow_functionality branch from 33b1455 to 80f43be Compare February 10, 2026 19:25
Comment on lines +48 to +61
async step<T>(
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
run?: () => Promise<T>,
): Promise<T> {
if (typeof nameOrConfig === "string") {
if (!run) {
throw new Error("Step run function missing");
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step method is incorrectly indented. Remove the extra indentation at the beginning of the method definition to align it with other class methods.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 1 to +6
import { Loop } from "@rivetkit/workflow-engine";
import { actor } from "@/actor/mod";
import { db } from "@/db/mod";
import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants";
import { workflow, workflowQueueName } from "@/workflow/mod";
import type { registry } from "./registry";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imports are not sorted according to Biome's rules. They should be sorted alphabetically. Run 'biome check --apply' to automatically fix the import sorting.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

graphite-app bot pushed a commit that referenced this pull request Feb 10, 2026
# Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context.

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update

## How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

## Checklist:

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
@graphite-app graphite-app bot closed this Feb 10, 2026
@graphite-app graphite-app bot deleted the 02-09-chore_add_missing_workflow_functionality branch February 10, 2026 19:36
Comment on lines +2 to +4
import type { Client } from "@/client/client";
import type { Registry } from "@/registry";
import type { AnyDatabaseProvider, InferDatabaseClient } from "@/actor/database";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not sorted alphabetically. Biome linter typically requires imports to be sorted. Reorder these imports alphabetically.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +48 to +61
async step<T>(
nameOrConfig: string | Parameters<WorkflowContextInterface["step"]>[0],
run?: () => Promise<T>,
): Promise<T> {
if (typeof nameOrConfig === "string") {
if (!run) {
throw new Error("Step run function missing");
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
return await this.#wrapActive(() =>
this.#inner.step(nameOrConfig, () =>
this.#withActorAccess(run),
),
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is inconsistent in the step method implementation. The return statement and its content are indented more than they should be. Fix the indentation to match the surrounding code structure.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines +3 to +6
import { db } from "@/db/mod";
import { WORKFLOW_GUARD_KV_KEY } from "@/workflow/constants";
import { workflow, workflowQueueName } from "@/workflow/mod";
import type { registry } from "./registry";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not sorted alphabetically. Biome linter typically requires imports to be sorted. Reorder these imports alphabetically.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 58 to 171
@@ -68,7 +112,63 @@ for (const mode of modes) {
"wait-message",
"my-message",
);
return message;
return message.body;
};

const result = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{
mode,
},
).result;

expect(result.state).toBe("completed");
expect(result.output).toBe("hello");
});

it("listen should return a completable message handle", async () => {
const completions: Array<{ id: string; response?: unknown }> = [];
const pending = [
buildMessagePayload("my-message", "hello", "msg-1") as {
id: string;
name: string;
data: unknown;
sentAt: number;
complete?: (response?: unknown) => Promise<void>;
},
];

const messageDriver: WorkflowMessageDriver = {
async loadMessages() {
return pending.map((message) => ({
...message,
complete: async (response?: unknown) => {
completions.push({ id: message.id, response });
},
}));
},
async addMessage(message) {
pending.push(message);
},
async deleteMessages(messageIds) {
const deleted = new Set(messageIds);
const remaining = pending.filter(
(message) => !deleted.has(message.id),
);
pending.length = 0;
pending.push(...remaining);
return messageIds;
},
};
driver.messageDriver = messageDriver;

const workflow = async (ctx: WorkflowContextInterface) => {
const message = await ctx.listen<string>("wait-message", "my-message");
await message.complete({ ok: true });
return message.body;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation and formatting in the new test cases for message handling. Ensure consistent spacing and indentation throughout.

Spotted by Graphite Agent (based on CI logs)

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant