-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: local workflow replay #42
Conversation
- Build workflow directly
"/executions/{executionId}/history": [ | ||
{ | ||
methods: [HttpMethod.GET], | ||
entry: "executions/history.js", | ||
config: (fn) => { | ||
this.table.grantReadData(fn); | ||
}, | ||
}, | ||
], | ||
"/executions/{executionId}/workflow-history": [ | ||
{ | ||
methods: [HttpMethod.GET], | ||
entry: "executions/workflow-history.js", | ||
config: (fn) => { | ||
this.history.grantRead(fn); | ||
}, | ||
}, | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking that we should bundle all the handlers into a single lambda function:
- we should not over-create resources in the customer's account
- minimize number of cold starts
- minimize impact of account-wide concurrent executions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for the API? Do we have a monolith lambda router we like to use?
How would a monolith lambda help with concurrent executions?
async function workflowHistory( | ||
event: APIGatewayProxyEventV2 | ||
): Promise<APIGatewayProxyResultV2<HistoryStateEvent[]>> { | ||
const executionId = event.pathParameters?.executionId; | ||
if (!executionId) { | ||
return { statusCode: 400, body: `Missing executionId` }; | ||
} | ||
|
||
const workflowClient = createWorkflowRuntimeClient(getService()); | ||
return workflowClient.getHistory(decodeExecutionId(executionId)); | ||
} | ||
|
||
export const handler = middy(workflowHistory).use(errorMiddleware); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are starting to look redundant. See my earlier comment - should we move all of these into a single handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about a wrapper?
export const handler = withMiddy(workflowHandler);
runtime: Runtime.NODEJS_16_X, | ||
architecture: Architecture.ARM_64, | ||
bundling: { | ||
// https://github.com/aws/aws-cdk/issues/21329#issuecomment-1212336356 | ||
// cannot output as .mjs file as ulid does not support it. | ||
mainFields: ["module", "main"], | ||
esbuildArgs: { | ||
"--conditions": "module,import,require", | ||
}, | ||
metafile: true, | ||
}, | ||
...baseNodeFnProps, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make a BaseNodeFn
construct instead of merging in props?
"/executions/{executionId}/history": [ | ||
{ | ||
methods: [HttpMethod.GET], | ||
entry: "executions/history.js", | ||
config: (fn) => { | ||
this.table.grantReadData(fn); | ||
}, | ||
}, | ||
], | ||
"/executions/{executionId}/workflow-history": [ | ||
{ | ||
methods: [HttpMethod.GET], | ||
entry: "executions/workflow-history.js", | ||
config: (fn) => { | ||
this.history.grantRead(fn); | ||
}, | ||
}, | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for the API? Do we have a monolith lambda router we like to use?
How would a monolith lambda help with concurrent executions?
packages/@eventual/aws-runtime/src/handlers/api/executions/list.ts
Outdated
Show resolved
Hide resolved
async function workflowHistory( | ||
event: APIGatewayProxyEventV2 | ||
): Promise<APIGatewayProxyResultV2<HistoryStateEvent[]>> { | ||
const executionId = event.pathParameters?.executionId; | ||
if (!executionId) { | ||
return { statusCode: 400, body: `Missing executionId` }; | ||
} | ||
|
||
const workflowClient = createWorkflowRuntimeClient(getService()); | ||
return workflowClient.getHistory(decodeExecutionId(executionId)); | ||
} | ||
|
||
export const handler = middy(workflowHistory).use(errorMiddleware); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about a wrapper?
export const handler = withMiddy(workflowHandler);
Co-authored-by: Chris Fraser <chris@functionless.com>
packages/@eventual/cli/src/api-ky.ts
Outdated
@@ -42,7 +45,7 @@ export async function apiKy(region?: string): Promise<KyInstance> { | |||
path: url.pathname, | |||
protocol: url.protocol, | |||
method: req.method.toUpperCase(), | |||
body: (await req.body?.getReader().read())?.value, | |||
body: req.body && (await streamToString(req.body)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not req.text() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It consumes the body, leaving it unreadable for the actual request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that a problem? Is it used twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, once for signing, and one to actually send the request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you overwrite the body in the new one with the signed body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I think getReader.read()
also consumes the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok figured it out. When doing req.text() on node 14, polyfilled with node-fetch: it runs this in the consumeBody method:
data[INTERNALS].disturbed = true;
Which should be fine, since we're constructing a new request anyway. The new request uses a copy constructor, which unfortunately copied the disturbed flag, and hence would cause the new request to fail too. This was a quirk specific to node-fetch, on node 18, without the polyfill, text() was working fine.
I've changed the new request construction to just copy the minimum parameters it needs to work, url, method, headers, and body, so it doesnt copy the consumed flag. Now it is working fine on node 14.
//We box our cache in case our fn returns undefined | ||
let resMap = new Map<any, { value: R }>(); | ||
return (...args) => { | ||
let key = options?.cacheKey ? options.cacheKey(...args) : args; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we making use of the cache key concept?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the function's we're memoing are parameterized. Although they may only ever currently be invoked with a single set of arguments currently, I think we're kind of breaking the function's contract by ignoring the parameters when memoing, and might lead to a nasty surprise down the road, so figured better to account for them.
packages/@eventual/cli/src/api-ky.ts
Outdated
@@ -42,7 +45,7 @@ export async function apiKy(region?: string): Promise<KyInstance> { | |||
path: url.pathname, | |||
protocol: url.protocol, | |||
method: req.method.toUpperCase(), | |||
body: (await req.body?.getReader().read())?.value, | |||
body: req.body && (await streamToString(req.body)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you overwrite the body in the new one with the signed body?
packages/@eventual/cli/src/api-ky.ts
Outdated
@@ -42,7 +45,7 @@ export async function apiKy(region?: string): Promise<KyInstance> { | |||
path: url.pathname, | |||
protocol: url.protocol, | |||
method: req.method.toUpperCase(), | |||
body: (await req.body?.getReader().read())?.value, | |||
body: req.body && (await streamToString(req.body)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I think getReader.read()
also consumes the stream.
"race", | ||
]; | ||
|
||
export class OuterVisitor extends Visitor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is new? merge issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just moved from esbuild-plugin.ts isn't it?
"module": "esnext", | ||
"target": "esnext", | ||
"moduleResolution": "node", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to stay, may be why the bundle size grew too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved these properites tsconfig-base.json a while ago, so they'll be consistent across all projects.
Also tweaked them a bit.
module - esnext - so esm is default.
target: es2019. so it''ll compile down to node 14 es support.
moduleResolution: nodenext. needed for esm
This file was identical between both commits in the bundle size comparison fwiw
eventual replay <entry> <workflow> <execution>.
TODO