Skip to content

Commit

Permalink
Parallelize Construct and Call calls in the Node.js SDK
Browse files Browse the repository at this point in the history
Due to global state issues in the nodejs runtime, we serialized MLC construct requests in #6452.
This had the downside that cloud resources were not created in parallel if multiple instances
of the same component were instantiated in the same program.

This change removes the serialization of Construct and Call calls.
The global state issues were already fixed in #10568
  • Loading branch information
flostadler committed Jun 20, 2024
1 parent 05574aa commit 2725d4e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: sdk/nodejs
description: Parallelize Construct and Call calls in the Node.js SDK
86 changes: 23 additions & 63 deletions sdk/nodejs/provider/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,26 @@ class Server implements grpc.UntypedServiceImplementation {
engineAddr: string | undefined;
readonly provider: Provider;
readonly uncaughtErrors: Set<Error>;

/** Queue of construct calls. */
constructCallQueue = Promise.resolve();
private readonly _callbacks = new Map<string, any>();

constructor(engineAddr: string | undefined, provider: Provider, uncaughtErrors: Set<Error>) {
this.engineAddr = engineAddr;
this.provider = provider;
this.uncaughtErrors = uncaughtErrors;

// When we catch an uncaught error, we need to respond to the inflight call/construct gRPC requests
// with the error to avoid a hang.
const uncaughtHandler = (err: Error) => {
if (!this.uncaughtErrors.has(err)) {
this.uncaughtErrors.add(err);
}
// terminate the outstanding gRPC requests.
this._callbacks.forEach((callback) => callback(err, undefined));
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
// just suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
}

// Satisfy the grpc.UntypedServiceImplementation interface.
Expand Down Expand Up @@ -272,33 +284,8 @@ class Server implements grpc.UntypedServiceImplementation {
}

public async construct(call: any, callback: any): Promise<void> {
// Serialize invocations of `construct` and `call` so that each call runs one after another, avoiding concurrent
// runs. We do this because `construct` and `call` modify global state to reset the SDK's runtime options.
// This is a short-term workaround to provide correctness, but likely isn't sustainable long-term due to the
// limits it places on parallelism. We will likely want to investigate if it's possible to run each invocation
// in its own context, possibly using Node's `createContext` API to avoid modifying global state:
// https://nodejs.org/api/vm.html#vm_vm_createcontext_contextobject_options
const res = this.constructCallQueue.then(() => this.constructImpl(call, callback));
/* eslint-disable no-empty,no-empty-function,@typescript-eslint/no-empty-function */
this.constructCallQueue = res.catch(() => {});
return res;
}

async constructImpl(call: any, callback: any): Promise<void> {
// given that construct calls are serialized, we can attach an uncaught handler to pick up exceptions
// in underlying user code. When we catch the error, we need to respond to the gRPC request with the error
// to avoid a hang.
const uncaughtHandler = (err: Error) => {
if (!this.uncaughtErrors.has(err)) {
this.uncaughtErrors.add(err);
}
// bubble the uncaught error in the user code back and terminate the outstanding gRPC request.
callback(err, undefined);
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
// just suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
const callbackId = Symbol("id");
this._callbacks.set(callbackId.toString(), callback);
try {
const req: any = call.request;
const type = req.getType();
Expand Down Expand Up @@ -359,40 +346,14 @@ class Server implements grpc.UntypedServiceImplementation {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
} finally {
// remove these uncaught handlers that are specific to this gRPC callback context
process.off("uncaughtException", uncaughtHandler);
process.off("unhandledRejection", uncaughtHandler);
// remove the gRPC callback context from the map of in-flight callbacks
this._callbacks.delete(callbackId.toString());
}
}

public async call(call: any, callback: any): Promise<void> {
// Serialize invocations of `construct` and `call` so that each call runs one after another, avoiding concurrent
// runs. We do this because `construct` and `call` modify global state to reset the SDK's runtime options.
// This is a short-term workaround to provide correctness, but likely isn't sustainable long-term due to the
// limits it places on parallelism. We will likely want to investigate if it's possible to run each invocation
// in its own context, possibly using Node's `createContext` API to avoid modifying global state:
// https://nodejs.org/api/vm.html#vm_vm_createcontext_contextobject_options
const res = this.constructCallQueue.then(() => this.callImpl(call, callback));
/* eslint-disable no-empty, no-empty-function, @typescript-eslint/no-empty-function */
this.constructCallQueue = res.catch(() => {});
return res;
}

async callImpl(call: any, callback: any): Promise<void> {
// given that call calls are serialized, we can attach an uncaught handler to pick up exceptions
// in underlying user code. When we catch the error, we need to respond to the gRPC request with the error
// to avoid a hang.
const uncaughtHandler = (err: Error) => {
if (!this.uncaughtErrors.has(err)) {
this.uncaughtErrors.add(err);
}
// bubble the uncaught error in the user code back and terminate the outstanding gRPC request.
callback(err, undefined);
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
// just suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
const callbackId = Symbol("id");
this._callbacks.set(callbackId.toString(), callback);
try {
const req: any = call.request;
if (!this.provider.call) {
Expand Down Expand Up @@ -441,9 +402,8 @@ class Server implements grpc.UntypedServiceImplementation {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
} finally {
// remove these uncaught handlers that are specific to this gRPC callback context
process.off("uncaughtException", uncaughtHandler);
process.off("unhandledRejection", uncaughtHandler);
// remove the gRPC callback context from the map of in-flight callbacks
this._callbacks.delete(callbackId.toString());
}
}

Expand Down

0 comments on commit 2725d4e

Please sign in to comment.