Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

fix: after provider.disconnect() is called, Ganache should stop serving requests #3433

Merged
merged 14 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/chains/ethereum/ethereum/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import {
MessageEvent,
VmConsoleLogEvent
} from "./provider-events";
import { ConsoleLogs } from "@ganache/console.log";

declare type RequestMethods = KnownKeys<EthereumApi>;

Expand Down Expand Up @@ -141,8 +140,8 @@ export class EthereumProvider
{
#options: EthereumInternalOptions;
#api: EthereumApi;
#executor: Executor;
#wallet: Wallet;
readonly #executor: Executor;
readonly #blockchain: Blockchain;

constructor(
Expand Down Expand Up @@ -419,10 +418,19 @@ export class EthereumProvider
}
};

/**
* Disconnect the provider instance. This will cause the underlying blockchain to be stopped, and any pending
* tasks to be rejected. Emits a `disconnect` event once successfully disconnected.
* @returns Fullfills with `undefined` once the provider has been disconnected.
*/
public disconnect = async () => {
// executor.stop() will stop accepting new tasks, but will not wait for inflight tasks. These may reject with
// (unhelpful) internal errors. See https://github.com/trufflesuite/ganache/issues/3499
this.#executor.stop();
await this.#blockchain.stop();

this.#executor.end();
this.emit("disconnect");
return;
};

//#region legacy
Expand Down
88 changes: 88 additions & 0 deletions src/chains/ethereum/ethereum/tests/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,92 @@ describe("provider", () => {
assert.notStrictEqual(hash, "");
});
});

describe("disconnect()", () => {
let provider: EthereumProvider;

[true, false].forEach(asyncRequestProcessing => {
describe(`asyncRequestProcessing: ${asyncRequestProcessing}`, () => {
beforeEach("Instantiate provider", async () => {
provider = await getProvider({
chain: { asyncRequestProcessing }
});
});

it("immediately and syncronously stops accepting request when `disconnect()` is called", async () => {
provider.disconnect();
jeffsmale90 marked this conversation as resolved.
Show resolved Hide resolved
const whenBlockByNumber = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});

await assert.rejects(
whenBlockByNumber,
new Error("Cannot process request, Ganache is disconnected."),
"Requests made after disconnect is called should reject"
);
});

it("emits the 'disconnect' event", async () => {
const whenDisconnected = provider.once("disconnect");
await provider.disconnect();
await assert.doesNotReject(
whenDisconnected,
'The provider should emit the "disconnect" event'
);
});

// todo: Reinstate this test when https://github.com/trufflesuite/ganache/issues/3499 is fixed
it.skip("processes requests executed before disconnect is called", async () => {
const whenBlockByNumber = provider.request({
method: "eth_getProof",
params: ["0xC7D9E2d5FE0Ff5C43102158C31BbC4aA2fDe10d8", [], "latest"]
});
const whenDisconnected = provider.disconnect();

await assert.doesNotReject(
whenBlockByNumber,
"Currently executing request should resolve"
);
await assert.doesNotReject(
whenDisconnected,
'The provider should emit the "disconnect" event'
);
});
});
});

// todo: Reinstate this test when https://github.com/trufflesuite/ganache/issues/3499 is fixed
describe.skip("without asyncRequestProcessing", () => {
// we only test this with asyncRequestProcessing: false, because it's impossible to force requests
// to be "pending" when asyncRequestProcessing: true
it("processes started requests, but reject pending requests", async () => {
provider = await getProvider({
chain: { asyncRequestProcessing: false }
});

const active = provider.request({
method: "eth_getProof",
params: ["0x4Ae2736a3b914C7597131fd1Ef30F74aC4B20874", [], "latest"]
});
const pending = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});

const whenDisconnected = provider.disconnect();

await assert.rejects(
pending,
new Error("Cannot process request, Ganache is disconnected."),
"pending tasks should reject"
);
await assert.doesNotReject(active, "active tasks should not reject");
await assert.doesNotReject(
whenDisconnected,
'The provider should emit the "disconnect" event'
);
});
});
});
});
5 changes: 4 additions & 1 deletion src/packages/core/src/connector-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ const initialize = <T extends FlavorName = typeof DefaultFlavor>(
// provider is ready we unpause.. This lets us accept queue requests before
// we've even fully initialized.

// The function referenced by requestcoordinator.resume will be changed when
// requestCoordinator.stop() is called. Ensure that no references to the
// function are held, otherwise internal errors may be surfaced.
return {
connector,
promise: connectPromise.then(requestCoordinator.resume)
promise: connectPromise.then(() => requestCoordinator.resume())
};
};

Expand Down
18 changes: 18 additions & 0 deletions src/packages/core/tests/connector-loader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import assert from "assert";
import loader from "../src/connector-loader";

describe("connector-loader", () => {
describe("initialize", () => {
it("the returned promise should reject, if disconnect() is called before the provider is ready", async () => {
const { promise, connector } = loader.initialize({});
connector.provider.disconnect();

// This assertion ensures that the "stopped" queue() method that is
// assigned in request-coordinator.stop() is called correctly.
await assert.rejects(
promise,
new Error("Cannot resume processing requests, Ganache is disconnected.")
);
});
});
});
20 changes: 20 additions & 0 deletions src/packages/utils/src/utils/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ export class Executor {
this.#requestCoordinator = requestCoordinator;
}

/**
* Stop processing requests. We pass this call through to the requestCoordinator, which means that api
* validation will continue to work after calling stop() in execute().
*/
public stop() {
davidmurdoch marked this conversation as resolved.
Show resolved Hide resolved
this.#requestCoordinator.stop();
}

/**
* Finalise shutdown of the underlying RequestCoordinator.
*/
public end() {
this.#requestCoordinator.end();
}

/**
* Executes the method with the given methodName on the API
* @param methodName - The name of the JSON-RPC method to execute.
Expand Down Expand Up @@ -43,6 +58,11 @@ export class Executor {
// just double check, in case a API breaks the rules and adds non-fns
// to their API interface.
if (typeof fn === "function") {
// The function referenced by requestcoordinator.queue will be changed
// when requestCoordinator.stop() is called. Ensure that no references
// to the function are held, otherwise internal errors may be
// surfaced.

// queue up this method for actual execution:
return this.#requestCoordinator.queue(fn, api, params);
}
Expand Down
50 changes: 45 additions & 5 deletions src/packages/utils/src/utils/request-coordinator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { OverloadedParameters } from "../types";

const noop = () => {};
type RejectableTask = {
execute: (...args: any) => Promise<any>;
reject: (reason?: any) => void;
};

/**
* Responsible for managing global concurrent requests.
Expand All @@ -14,12 +18,13 @@ export class RequestCoordinator {
/**
* The pending requests. You can't do anything with this array.
*/
public readonly pending: ((...args: any) => Promise<any>)[] = [];
public readonly pending: RejectableTask[] = [];

/**
* The number of tasks currently being processed.
*/
public runningTasks: number = 0;

#paused: boolean = true;
public get paused(): boolean {
return this.#paused;
Expand Down Expand Up @@ -60,7 +65,8 @@ export class RequestCoordinator {
) {
const current = this.pending.shift();
this.runningTasks++;
current()
current
.execute()
// By now, we've resolved the fn's `value` by sending it to the parent scope.
// But over here, we're also waiting for this fn's _value_ to settle _itself_ (it might be a promise) before
// continuing through the `pending` queue. Because we wait for it again here, it could potentially throw here,
Expand All @@ -74,6 +80,40 @@ export class RequestCoordinator {
}
};

/**
* Stop processing tasks - calls to queue(), and resume() will reject with an
* error indicating that Ganache is disconnected. This is an irreversible
* action. If you wish to be able to resume processing, use pause() instead.
*
* Note: this changes the references of this.resume and this.queue. Any code
* that maintains references to the values referenced by this.resume or
* this.queue, could have unintended consequences after calling this.stop().
*/
public stop() {
this.pause();
this.resume = () => {
throw new Error(
"Cannot resume processing requests, Ganache is disconnected."
);
};

this.queue = async () => {
throw new Error("Cannot process request, Ganache is disconnected.");
};
}

/**
* Finalise shutdown of the RequestCoordinator. Rejects all pending tasks in order. Should be
* called after all in-flight tasks have resolved in order to maintain overall FIFO order.
*/
public end() {
while (this.pending.length > 0) {
this.pending
.shift()
.reject(new Error("Cannot process request, Ganache is disconnected."));
}
}

/**
* Insert a new function into the queue.
*/
Expand All @@ -83,8 +123,8 @@ export class RequestCoordinator {
argumentsList: OverloadedParameters<T>
) => {
return new Promise<{ value: ReturnType<typeof fn> }>((resolve, reject) => {
// const executor is `async` to force the return value into a Promise.
const executor = async () => {
// const execute is `async` to force the return value into a Promise.
const execute = async () => {
try {
const value = Reflect.apply(
fn,
Expand All @@ -97,7 +137,7 @@ export class RequestCoordinator {
reject(e);
}
};
this.pending.push(executor);
this.pending.push({ execute, reject });
this.#process();
});
};
Expand Down
100 changes: 100 additions & 0 deletions src/packages/utils/tests/request-coordinator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import assert from "assert";
import { RequestCoordinator } from "../src/utils/request-coordinator";

describe("request-coordinator", () => {
const thisArg = {};
const paramsArg: [] = [];
const noop = () => undefined;
let coordinator: RequestCoordinator;

beforeEach("instantiate RequestCoordinator", () => {
coordinator = new RequestCoordinator(0);
});

describe("stop()", () => {
it("should set `paused` property to `true`", () => {
coordinator.stop();

assert(coordinator.paused);
});

it("should not allow processing to be resumed", () => {
coordinator.stop();

assert.throws(
() => coordinator.resume(),
new Error("Cannot resume processing requests, Ganache is disconnected.")
);
});

it("should not allow new requests to be queued", async () => {
coordinator.stop();

await assert.rejects(
coordinator.queue(noop, thisArg, paramsArg),
new Error("Cannot process request, Ganache is disconnected.")
);
});
});

describe("end()", () => {
it("should reject pending requests in the order that they were received", async () => {
coordinator.pause();

let nextRejectionIndex = 0;
const pendingAssertions: Promise<any>[] = [];

for (let taskIndex = 0; taskIndex < 10; taskIndex++) {
const task = coordinator.queue(noop, thisArg, paramsArg);

pendingAssertions.push(
task.catch(() => {
assert.strictEqual(
taskIndex,
nextRejectionIndex,
`Rejected in incorrect order, waiting on task at index ${nextRejectionIndex}, got ${taskIndex}.`
);
nextRejectionIndex++;
})
);

pendingAssertions.push(
assert.rejects(
task,
new Error("Cannot process request, Ganache is disconnected.")
)
);
}

coordinator.end();
await Promise.all(pendingAssertions);

assert.equal(
coordinator.pending.length,
0,
"Pending array should be empty"
);
});

it("should clear the pending tasks queue", () => {
coordinator.pause();

for (let i = 0; i < 10; i++) {
coordinator.queue(noop, thisArg, paramsArg);
}

assert.equal(
coordinator.pending.length,
10,
"Incorrect pending queue length before calling end()"
);

coordinator.end();
assert.equal(
coordinator.pending.length,
0,
"Incorrect pending queue length after calling end()"
);
});
});
});