Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
Tidy up request-coordinator and add tests at EthereumProvider level
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffsmale90 committed Aug 4, 2022
1 parent 01d62f8 commit a59b00d
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 42 deletions.
23 changes: 12 additions & 11 deletions src/chains/ethereum/ethereum/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ export class EthereumProvider
}>
implements Provider<EthereumApi>
{
#options: EthereumInternalOptions;
#api: EthereumApi;
#wallet: Wallet;
readonly #options: EthereumInternalOptions;
readonly #api: EthereumApi;
readonly #wallet: Wallet;
readonly #executor: Executor;
readonly #blockchain: Blockchain;

Expand Down Expand Up @@ -419,19 +419,20 @@ export class EthereumProvider
};

/**
* Disconnect the provider instance. This will cause the underlying blockchain to be stopped, and any pending
* tasks to be rejected. Await the returned Promise to ensure that everything has been cleanly shut down
* before terminating the process.
* @return Promise<void> - indicating that the provider has been cleanly disconnected
* Disconnect the provider, the underlying blockchain will be stopped. Any tasks currently executing will be completed,
* any pending tasks will be rejected. The returned Promise will resolve when the provider is disconnected, and a
* "disconnect" event will be emitted.
* @returns Promise<void> - resolves when the provider has disconnected
*/
public disconnect = async () => {
const executor = this.#executor;
// We make a best effort to resolve any currently executing tasks, before rejecting pending tasks. This relies on
// this.#blockchain.stop() waiting to resolve until after all executing tasks have settled. Executor does not
// guarantee that no tasks are currently executing, before rejecting any remaining pending tasks.

// await executor.stop() to ensure that all currently processing tasks are complete
// we await executor.stop() here to ensure that any currently executing tasks are complete before pulling the
// rug out by stopping the blockchain.
await executor.stop();

// we call rejectPendingTasks() _after_ executor.stop() has resolved, to ensure that all tasks are executed in
// FIFO order
executor.rejectPendingTasks();
await this.#blockchain.stop();

Expand Down
110 changes: 100 additions & 10 deletions src/chains/ethereum/ethereum/tests/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,16 +423,6 @@ describe("provider", () => {
}
);
});

it("stops responding to RPC methods once disconnected", async () => {
const provider = await getProvider();
await provider.disconnect();

await assert.rejects(
provider.send("eth_getBlockByNumber", ["latest"]),
new Error("Cannot process request, Ganache is disconnected.")
);
});
});

describe("web3 compatibility", () => {
Expand Down Expand Up @@ -471,4 +461,104 @@ describe("provider", () => {
assert.notStrictEqual(hash, "");
});
});

describe("disconnect()", () => {
let provider: EthereumProvider;

[true, false].forEach(asyncRequestProcessing => {
describe(`asyncRequestProcessing: ${asyncRequestProcessing}`, () => {
beforeEach("Instantiate provider", async () => {
provider = await getProvider({
chain: { asyncRequestProcessing }
});
});

it("stops responding to RPC methods once disconnected", async () => {
await provider.disconnect();

await assert.rejects(
provider.send("eth_getBlockByNumber", ["latest"]),
new Error("Cannot process request, Ganache is disconnected.")
);
});

it("raises the 'disconnect' event", async () => {
const whenDisconnected = provider.once("disconnect");
await provider.disconnect();
await assert.doesNotReject(
whenDisconnected,
'The provider should raise the "disconnect" event'
);
});

it("successfully processes requests executed before disconnect is called", async () => {
const whenBlockByNumber = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});
const whenDisconnected = provider.disconnect();

await assert.doesNotReject(
whenBlockByNumber,
"A call to .request() on the provider before disconnect is called should succeed"
);
await assert.doesNotReject(
whenDisconnected,
'The provider should raise the "disconnect" event'
);
});

it("rejects requests after disconnect is called", async () => {
const whenDisconnected = provider.disconnect();
const whenBlockByNumber = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});

await assert.rejects(
whenBlockByNumber,
new Error("Cannot process request, Ganache is disconnected.")
);
await assert.doesNotReject(
whenDisconnected,
'The provider should raise the "disconnect" event'
);
});
});
});

describe("without asyncRequestProcessing", () => {
beforeEach("Instantiate provider", async () => {
provider = await getProvider({
chain: { asyncRequestProcessing: false }
});
});

// we only test this with asyncRequestProcessing: false, because it's impossible to force requests
// to be "pending" when asyncRequestProcessing: true
it("successfully processes started requests, but reject pending requests", async () => {
const active = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});
const pending = provider.request({
method: "eth_getBlockByNumber",
params: ["latest"]
});

const whenDisconnected = provider.disconnect();

await assert.rejects(
pending,
new Error("Cannot process request, Ganache is disconnected."),
"pending tasks should reject"
);
await assert.doesNotReject(active, "active tasks should not reject");
await assert.doesNotReject(
whenDisconnected,
'The provider should raise the "disconnect" event'
);
});
});
});
});
42 changes: 22 additions & 20 deletions src/packages/utils/src/utils/request-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,46 +83,48 @@ export class RequestCoordinator {
.catch(noop)
.finally(() => {
this.runningTasks--;

if (this.runningTasks === 0 && this.pending.length === 0) {
this.workComplete();
} else {
this.#process();
if (this.runningTasks === 0) {
this.#executingTasksFinished();
}
this.#process();
});
}
};

#whenFinished: () => void;
private workComplete() {
if (this.#whenFinished) {
this.#whenFinished();
this.#whenFinished = undefined;
// resolver which will be set by stop() function if it needs to wait on currently executing tasks to complete
#resolveStopAwaiter: () => void = undefined;
#executingTasksFinished = () => {
if (this.#resolveStopAwaiter !== undefined) {
this.#resolveStopAwaiter();
this.#resolveStopAwaiter = undefined;
}
}
};

/**
* Stop processing tasks - calls to queue(), and resume() will reject with an error indicating that Ganache is
* Stop processing tasks - calls to queue(), and resume() will reject or throw with an error indicating that Ganache is
* disconnected. This is an irreversible action. If you wish to be able to resume processing, use pause() instead.
*
* Note: This will _not_ reject any pending tasks - see rejectPendingTasks()
* @returns Promise<void> - indicating that all currently executing tasks are complete
* @returns Promise<void> - resolves once all currently executing tasks are complete
*/
public async stop(): Promise<void> {
public stop = async () => {
if (this.#stopped) {
throw new Error("Already stopped.");
}

this.pause();
this.#stopped = true;

if (this.runningTasks > 0) {
await new Promise<void>((resolve, _) => {
this.#whenFinished = resolve;
});
if (this.runningTasks > 0 && this.#resolveStopAwaiter === undefined) {
await new Promise<void>(
(resolve, _) => (this.#resolveStopAwaiter = resolve)
);
}
}

/**
* Reject any pending tasks with an error indicating that Ganache is disconnected. Tasks are rejected in FIFO order.
*/
public rejectPendingTasks() {
public rejectPendingTasks = () => {
let current: RejectableTask;
while ((current = this.pending.shift())) {
current.reject(
Expand Down
7 changes: 6 additions & 1 deletion src/packages/utils/tests/request-coordinator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("request-coordinator", () => {
});

it("should stop when tasks are queued", async () => {
const uncompletable = coordinator.queue(() => new Promise<void>(noop), this, []);
coordinator.queue(() => new Promise<void>(noop), this, []);
await coordinator.stop();

assert(coordinator.paused);
Expand Down Expand Up @@ -79,6 +79,11 @@ describe("request-coordinator", () => {
await assert.doesNotReject(stopped);
});
});

it("should reject if called a second time", async () => {
coordinator.stop();
await assert.rejects(coordinator.stop(), new Error("Already stopped."));
});
});

describe("rejectAllPendingRequests()", () => {
Expand Down

0 comments on commit a59b00d

Please sign in to comment.