Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-load-events-cursor-null.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"workflow": patch
---

Fix spurious "Event cursor missing after initial load" warning
134 changes: 131 additions & 3 deletions packages/core/src/runtime/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type { World } from '@workflow/world';
import { describe, expect, it, vi } from 'vitest';
import { getWorkflowQueueName, healthCheck } from './helpers.js';
import type { Event, World } from '@workflow/world';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import {
getWorkflowQueueName,
getWorkflowRunEvents,
healthCheck,
} from './helpers.js';

// Mock the logger to suppress output during tests
vi.mock('../logger.js', () => ({
Expand All @@ -12,6 +16,25 @@ vi.mock('../logger.js', () => ({
},
}));

const eventsListMock = vi.fn();

vi.mock('./world.js', () => ({
getWorld: vi.fn(() => ({
events: {
list: eventsListMock,
},
})),
}));

const makeEvent = (eventId: string): Event =>
({
eventId,
runId: 'wrun_mockidnumber0001',
eventType: 'step_created',
correlationId: 'step_mock',
createdAt: new Date(),
}) as unknown as Event;

describe('getWorkflowQueueName', () => {
it('should return a valid queue name for a simple workflow name', () => {
expect(getWorkflowQueueName('myWorkflow')).toBe(
Expand Down Expand Up @@ -137,3 +160,108 @@ describe('healthCheck', () => {
);
});
});

describe('getWorkflowRunEvents', () => {
beforeEach(() => {
eventsListMock.mockReset();
});

it('returns the cursor from the last page when pagination terminates normally', async () => {
const page1 = [makeEvent('evnt_a'), makeEvent('evnt_b')];
eventsListMock.mockResolvedValueOnce({
data: page1,
cursor: 'eid:evnt_b',
hasMore: false,
});

const result = await getWorkflowRunEvents('wrun_test');

expect(result.events).toHaveLength(2);
expect(result.cursor).toBe('eid:evnt_b');
expect(eventsListMock).toHaveBeenCalledTimes(1);
});

// Regression test for the "Event cursor missing after initial load" warning.
//
// A World may legitimately return `{ data: [], cursor: null, hasMore: false }`
// on a trailing empty page — workflow-server does this whenever the previous
// page's DynamoDB query hit `Limit` exactly and DynamoDB returned a
// `LastEvaluatedKey` "just in case." If the pagination loop overwrites the
// cursor with `null` on that trailing page, the runtime's incremental-load
// path can't proceed and falls back to a full reload on every replay
// iteration, logging "Event cursor missing after initial load" each time.
it('preserves the cursor from the previous page when the final page is empty', async () => {
const page1 = [makeEvent('evnt_a'), makeEvent('evnt_b')];
eventsListMock.mockResolvedValueOnce({
data: page1,
cursor: 'eid:evnt_b',
hasMore: true,
});
eventsListMock.mockResolvedValueOnce({
data: [],
cursor: null,
hasMore: false,
});

const result = await getWorkflowRunEvents('wrun_test');

expect(result.events).toHaveLength(2);
expect(result.cursor).toBe('eid:evnt_b');
expect(eventsListMock).toHaveBeenCalledTimes(2);
});

it('returns null cursor only when no events exist at all', async () => {
eventsListMock.mockResolvedValueOnce({
data: [],
cursor: null,
hasMore: false,
});

const result = await getWorkflowRunEvents('wrun_test');

expect(result.events).toHaveLength(0);
expect(result.cursor).toBeNull();
});

it('uses the latest cursor when paginating through multiple non-empty pages', async () => {
eventsListMock.mockResolvedValueOnce({
data: [makeEvent('evnt_a')],
cursor: 'eid:evnt_a',
hasMore: true,
});
eventsListMock.mockResolvedValueOnce({
data: [makeEvent('evnt_b')],
cursor: 'eid:evnt_b',
hasMore: true,
});
eventsListMock.mockResolvedValueOnce({
data: [makeEvent('evnt_c')],
cursor: 'eid:evnt_c',
hasMore: false,
});

const result = await getWorkflowRunEvents('wrun_test');

expect(result.events.map((e) => e.eventId)).toEqual([
'evnt_a',
'evnt_b',
'evnt_c',
]);
expect(result.cursor).toBe('eid:evnt_c');
});

it('falls back to the afterCursor when an incremental load returns no events', async () => {
eventsListMock.mockResolvedValueOnce({
data: [],
cursor: null,
hasMore: false,
});

const result = await getWorkflowRunEvents('wrun_test', 'eid:evnt_z');

expect(result.events).toHaveLength(0);
// Preserving the input cursor avoids the runtime treating "no new events
// since last poll" as "I have no idea where I am in the log."
expect(result.cursor).toBe('eid:evnt_z');
});
});
10 changes: 9 additions & 1 deletion packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,15 @@ export async function getWorkflowRunEvents(

allEvents.push(...response.data);
hasMore = response.hasMore;
nextCursor = response.cursor;
// Preserve the last non-null cursor across pages. A World may
// legitimately return `{ data: [], cursor: null, hasMore: false }`
// on a trailing empty page — for example when the previous page's
// underlying DynamoDB query hit `Limit` exactly and returned a
// `LastEvaluatedKey` "just in case". Overwriting with that null
// would lose the position past the last real event we loaded and
// force the runtime into the "no cursor after initial load" full-
// reload fallback on every subsequent replay iteration.
nextCursor = response.cursor ?? nextCursor;
pagesLoaded++;

runtimeLogger.debug('Loaded event page', {
Expand Down
Loading