Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: increase network performance by reducing unnecessary rpc calls #165

Merged
merged 10 commits into from
Sep 29, 2022
1 change: 1 addition & 0 deletions packages/network/src/createBlockNumberStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Providers } from "./createProvider";

/**
* Creates a stream of block numbers based on the `block` event of the currently connected provider.
* In case `initialSync` is provided, this stream will also output a stream of past block numbers to drive replaying events.
*
* @param providers Mobx computed providers object (created by {@link createReconnectingProvider}).
* @param options
Expand Down
1 change: 1 addition & 0 deletions packages/network/src/createNetwork.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const config = {
pollingInterval: 1000,
skipNetworkCheck: true,
},
chainId: 4242,
},
chainId: 100,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/network/src/createNetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function createNetwork(initialConfig: NetworkConfig) {
// Sync the local time to the chain time in regular intervals
const syncBlockSub = combineLatest([blockNumber$, computedToStream(providers)])
.pipe(
throttleTime(config.clock.syncInterval, undefined, { leading: true, trailing: true }), // Update time max once per 5s
throttleTime(config.clock.syncInterval, undefined, { leading: true, trailing: true }),
concatMap(([blockNumber, currentProviders]) =>
currentProviders ? fetchBlock(currentProviders.json, blockNumber) : EMPTY
), // Fetch the latest block if a provider is available
Expand Down
15 changes: 11 additions & 4 deletions packages/network/src/createProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { JsonRpcBatchProvider, JsonRpcProvider, WebSocketProvider } from "@ethersproject/providers";
import { Networkish, WebSocketProvider } from "@ethersproject/providers";
import { callWithRetry, observableToComputed, timeoutAfter } from "@latticexyz/utils";
import { IComputedValue, IObservableValue, observable, reaction, runInAction } from "mobx";
import { ensureNetworkIsUp } from "./networkUtils";
import { MUDJsonRpcBatchProvider, MUDJsonRpcProvider } from "./provider";
import { ProviderConfig } from "./types";

export type Providers = ReturnType<typeof createProvider>;
Expand All @@ -15,10 +16,16 @@ export type Providers = ReturnType<typeof createProvider>;
* ws: WebSocketProvider
* }
*/
export function createProvider({ jsonRpcUrl, wsRpcUrl, options }: ProviderConfig) {
export function createProvider({ chainId, jsonRpcUrl, wsRpcUrl, options }: ProviderConfig) {
const network: Networkish = {
chainId,
name: "mudChain",
};
const providers = {
json: options?.batch ? new JsonRpcBatchProvider(jsonRpcUrl) : new JsonRpcProvider(jsonRpcUrl),
ws: wsRpcUrl ? new WebSocketProvider(wsRpcUrl) : undefined,
json: options?.batch
? new MUDJsonRpcBatchProvider(jsonRpcUrl, network)
: new MUDJsonRpcProvider(jsonRpcUrl, network),
ws: wsRpcUrl ? new WebSocketProvider(wsRpcUrl, network) : undefined,
};

if (options?.pollingInterval) {
Expand Down
6 changes: 4 additions & 2 deletions packages/network/src/createSystemExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Contract, ContractInterface, Signer } from "ethers";
import { observable, runInAction } from "mobx";
import { createTxQueue } from "./createTxQueue";
import { Network } from "./createNetwork";
import { BehaviorSubject } from "rxjs";

/**
* Create a system executor object.
Expand All @@ -23,7 +24,8 @@ export function createSystemExecutor<T extends { [key: string]: Contract }>(
network: Network,
systems: Component<{ value: Type.String }>,
interfaces: { [key in keyof T]: ContractInterface },
options?: { devMode?: boolean }
gasPrice$: BehaviorSubject<number>,
options?: { devMode?: boolean; concurrency?: number }
) {
const systemContracts = observable.box({} as T);
const systemIdPreimages: { [key: string]: string } = Object.keys(interfaces).reduce((acc, curr) => {
Expand All @@ -47,7 +49,7 @@ export function createSystemExecutor<T extends { [key: string]: Contract }>(
runInAction(() => systemContracts.set({ ...systemContracts.get(), [system.id]: system.contract }));
});

const { txQueue, dispose } = createTxQueue<T>(systemContracts, network, options);
const { txQueue, dispose } = createTxQueue<T>(systemContracts, network, gasPrice$, options);
world.registerDisposer(dispose);

return txQueue;
Expand Down
71 changes: 53 additions & 18 deletions packages/network/src/createTxQueue.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { BaseContract, BigNumberish, CallOverrides, Overrides } from "ethers";
import { action, autorun, computed, IComputedValue, IObservableValue, observable, runInAction } from "mobx";
import { mapObject, deferred, uuid, awaitValue, cacheUntilReady, extractEncodedArguments } from "@latticexyz/utils";
import { autorun, computed, IComputedValue, IObservableValue, observable, runInAction } from "mobx";
import { mapObject, deferred, uuid, awaitValue, cacheUntilReady } from "@latticexyz/utils";
import { Mutex } from "async-mutex";
import { TransactionResponse } from "@ethersproject/providers";
import { JsonRpcProvider, TransactionReceipt } from "@ethersproject/providers";
import { Contracts, TxQueue } from "./types";
import { ConnectionState } from "./createProvider";
import { Network } from "./createNetwork";
import { getRevertReason } from "./networkUtils";
import { BehaviorSubject } from "rxjs";

type ReturnTypeStrict<T> = T extends (...args: any) => any ? ReturnType<T> : never;

Expand All @@ -23,11 +24,16 @@ type ReturnTypeStrict<T> = T extends (...args: any) => any ? ReturnType<T> : nev
export function createTxQueue<C extends Contracts>(
computedContracts: IComputedValue<C> | IObservableValue<C>,
network: Network,
gasPrice$: BehaviorSubject<number>,
options?: { concurrency?: number; devMode?: boolean }
): { txQueue: TxQueue<C>; dispose: () => void; ready: IComputedValue<boolean | undefined> } {
const { concurrency } = options || {};

const queue = createPriorityQueue<{
execute: (nonce: number, gasLimit: BigNumberish) => Promise<TransactionResponse>;
execute: (
nonce: number,
gasLimit: BigNumberish
) => Promise<{ hash: string; wait: () => Promise<TransactionReceipt> }>;
estimateGas: () => BigNumberish | Promise<BigNumberish>;
cancel: () => void;
stateMutability?: string;
Expand Down Expand Up @@ -71,8 +77,16 @@ export function createTxQueue<C extends Contracts>(
target: C[keyof C],
prop: keyof C[keyof C],
args: unknown[]
): Promise<ReturnTypeStrict<typeof target[typeof prop]>> {
const [resolve, reject, promise] = deferred<ReturnTypeStrict<typeof target[typeof prop]>>();
): Promise<{
hash: string;
wait: () => Promise<TransactionReceipt>;
response: Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
}> {
const [resolve, reject, promise] = deferred<{
hash: string;
wait: () => Promise<TransactionReceipt>;
response: Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
}>();

// Extract existing overrides from function call
const hasOverrides = args.length > 0 && isOverrides(args[args.length - 1]);
Expand All @@ -90,7 +104,7 @@ export function createTxQueue<C extends Contracts>(
// Create a function that executes the tx when called
const execute = async (nonce: number, gasLimit: BigNumberish) => {
try {
const member = target[prop];
const member = target.populateTransaction[prop as string];
if (member == undefined) {
throw new Error("Member does not exist.");
}
Expand All @@ -103,12 +117,34 @@ export function createTxQueue<C extends Contracts>(
);
}

const configOverrides = { ...overrides, nonce, gasLimit };
// Populate config
const configOverrides = {
gasPrice: gasPrice$.getValue(),
...overrides,
nonce,
gasLimit,
};
if (options?.devMode) configOverrides.gasPrice = 0;

const result = await member(...argsWithoutOverrides, configOverrides);
resolve(result);
return result;
// Populate tx
const populatedTx = await member(...argsWithoutOverrides, configOverrides);
populatedTx.nonce = nonce;
populatedTx.chainId = network.config.chainId;

// Execute tx
const signedTx = await target.signer.signTransaction(populatedTx);
const hash = await (target.provider as JsonRpcProvider).perform("sendTransaction", {
signedTransaction: signedTx,
});
const response = target.provider.getTransaction(hash) as Promise<ReturnTypeStrict<typeof target[typeof prop]>>;
// This promise is awaited asynchronously in the tx queue and the action queue to catch errors
const wait = async () => (await response).wait();

// Resolved value goes to the initiator of the transaction
resolve({ hash, wait, response });

// Returned value gets processed inside the tx queue
return { hash, wait };
} catch (e) {
reject(e as Error);
throw e; // Rethrow error to catch when processing the queue
Expand Down Expand Up @@ -137,6 +173,11 @@ export function createTxQueue<C extends Contracts>(
// Increase utilization to prevent executing more tx than allowed by capacity
utilization++;

// Start processing another request from the queue
// Note: we start processing again after increasing the utilization to process up to `concurrency` tx request in parallel.
// At the end of this function after decreasing the utilization we call processQueue again trigger tx requests waiting for capacity.
processQueue();

// Run exclusive to avoid two tx requests awaiting the nonce in parallel and submitting with the same nonce.
const txResult = await submissionMutex.runExclusive(async () => {
// Define variables in scope visible to finally block
Expand Down Expand Up @@ -173,7 +214,7 @@ export function createTxQueue<C extends Contracts>(
(("code" in error && error.code === "NONCE_EXPIRED") ||
JSON.stringify(error).includes("transaction already imported"));

console.log("TxQueue:", {
console.info("TxQueue:", {
error,
isNonViewTransaction,
shouldIncreaseNonce,
Expand Down Expand Up @@ -203,16 +244,10 @@ export function createTxQueue<C extends Contracts>(
const worldAddress = params.get("worldAddress");
// Log useful commands that can be used to replay this tx
const trace = `mud trace --config deploy.json --world ${worldAddress} --tx ${txResult.hash}`;
const call = `mud call-system --world ${worldAddress} --caller ${network.connectedAddress.get()} --calldata ${extractEncodedArguments(
txResult.data
)} --systemId <SYSTEM_ID>`;

console.log("---------- DEBUG COMMANDS (RUN IN TERMINAL) -------------");
console.log("Trace:");
console.log(trace);
console.log("//");
console.log("Call system:");
console.log(call);
console.log("---------------------------------------------------------");
}
}
Expand Down
28 changes: 28 additions & 0 deletions packages/network/src/provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { JsonRpcBatchProvider, JsonRpcProvider, Network, Networkish } from "@ethersproject/providers";
import { ConnectionInfo } from "ethers/lib/utils";

export class MUDJsonRpcProvider extends JsonRpcProvider {
constructor(url: string | ConnectionInfo | undefined, network: Networkish) {
super(url, network);
}
async detectNetwork(): Promise<Network> {
const network = this.network;
if (network == null) {
throw new Error("No network");
}
return network;
}
}

export class MUDJsonRpcBatchProvider extends JsonRpcBatchProvider {
constructor(url?: string | ConnectionInfo | undefined, network?: Networkish | undefined) {
super(url, network);
}
async detectNetwork(): Promise<Network> {
const network = this.network;
if (network == null) {
throw new Error("No network");
}
return network;
}
}
1 change: 1 addition & 0 deletions packages/network/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type Clock = {
};

export interface ProviderConfig {
chainId: number;
jsonRpcUrl: string;
wsRpcUrl?: string;
options?: { batch?: boolean; pollingInterval?: number; skipNetworkCheck?: boolean };
Expand Down
49 changes: 41 additions & 8 deletions packages/network/src/workers/SyncWorker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { JsonRpcProvider } from "@ethersproject/providers";
import { keccak256, sleep } from "@latticexyz/utils";
import { computed } from "mobx";
Expand Down Expand Up @@ -143,7 +144,11 @@ describe("Sync.worker", () => {
checkpointServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
chainId: 4242,
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -171,7 +176,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
chainId: 4242,
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -201,7 +210,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});
await sleep(0);
Expand All @@ -215,7 +228,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "http://localhost:50052",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});
await sleep(0);
Expand All @@ -229,7 +246,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand All @@ -253,7 +274,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand All @@ -277,7 +302,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand Down Expand Up @@ -311,7 +340,11 @@ describe("Sync.worker", () => {
streamServiceUrl: "",
chainId: 4242,
worldContract: { address: "0x00", abi: [] },
provider: { jsonRpcUrl: "", options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true } },
provider: {
chainId: 4242,
jsonRpcUrl: "",
options: { batch: false, pollingInterval: 1000, skipNetworkCheck: true },
},
initialBlockNumber: 0,
});

Expand Down
6 changes: 4 additions & 2 deletions packages/network/src/workers/SyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
const cacheBlockNumber = await getIndexDBCacheStoreBlockNumber(indexDbCache);
const snapshotBlockNumber = await getSnapshotBlockNumber(snapshotClient, worldContract.address);
console.log(
`[SyncWorker] cache block: ${cacheBlockNumber}, snapshot block: ${snapshotBlockNumber}, start sync at ${initialBlockNumber}`
`[SyncWorker] cache block: ${cacheBlockNumber}, snapshot block: ${
snapshotBlockNumber > 0 ? snapshotBlockNumber : "Unavailable"
}, start sync at ${initialBlockNumber}`
);
let initialState = createCacheStore();
if (initialBlockNumber > Math.max(cacheBlockNumber, snapshotBlockNumber)) {
Expand Down Expand Up @@ -184,7 +186,7 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig
);

console.log(
`[SyncWorker] got ${gapStateEvents.length} items from block range ${initialState.blockNumber} -> ${streamStartBlockNumber}`
`[SyncWorker || via JSON-RPC] got ${gapStateEvents.length} items from block range ${initialState.blockNumber} -> ${streamStartBlockNumber}`
);

// Merge initial state, gap state and live events since initial sync started
Expand Down
Loading