Skip to content

Commit

Permalink
[SDK] - improved batch provider (#440)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsdls committed Dec 2, 2022
1 parent 564b17e commit efc56fa
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 88 deletions.
5 changes: 5 additions & 0 deletions .changeset/rotten-cougars-behave.md
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/sdk": patch
---

[EVM] - providers are now re-used if the constructor options are identical leading to better batching, also introduced an additional max batch size param (250 by default)
15 changes: 14 additions & 1 deletion packages/sdk/src/evm/constants/urls.ts
Expand Up @@ -123,6 +123,11 @@ export function getProviderForNetwork(network: ChainOrRpc | SignerOrProvider) {
}
}

const READONLY_PROVIDER_MAP: Map<
string,
StaticJsonRpcBatchProvider | providers.JsonRpcBatchProvider
> = new Map();

/**
*
* @param network - the chain name or rpc url
Expand All @@ -136,11 +141,19 @@ export function getReadOnlyProvider(network: string, chainId?: number) {
if (match) {
switch (match[1]) {
case "http":
return chainId
const seralizedOpts = `${network}-${chainId || -1}`;
const existingProvider = READONLY_PROVIDER_MAP.get(seralizedOpts);
if (existingProvider) {
return existingProvider;
}

const newProvider = chainId
? // if we know the chainId we should use the StaticJsonRpcBatchProvider
new StaticJsonRpcBatchProvider(network, chainId)
: // otherwise fall back to the built in json rpc batch provider
new providers.JsonRpcBatchProvider(network, chainId);
READONLY_PROVIDER_MAP.set(seralizedOpts, newProvider);
return newProvider;

case "ws":
return new providers.WebSocketProvider(network, chainId);
Expand Down
205 changes: 118 additions & 87 deletions packages/sdk/src/evm/lib/static-batch-rpc.ts
@@ -1,99 +1,130 @@
import { providers, utils } from "ethers";

const DEFAULT_BATCH_TIME_LIMIT_MS = 50;
const DEFAULT_BATCH_SIZE_LIMIT = 250;

const DEFAULT_BATCH_OPTIONS = {
timeLimitMs: DEFAULT_BATCH_TIME_LIMIT_MS,
sizeLimit: DEFAULT_BATCH_SIZE_LIMIT,
};

export type BatchOptions = Partial<typeof DEFAULT_BATCH_OPTIONS>;

// mostly copied from ethers.js directly but make it a StaticJsonRpcProvider
export class StaticJsonRpcBatchProvider extends providers.StaticJsonRpcProvider {
_pendingBatchAggregator: NodeJS.Timer | null;
_pendingBatch: Array<{
request: { method: string, params: Array<any>, id: number, jsonrpc: "2.0" },
resolve: (result: any) => void,
reject: (error: Error) => void
}> | null;

constructor(url: string | utils.ConnectionInfo | undefined, network: providers.Networkish | undefined){
super(url, network);
this._pendingBatchAggregator = null;
this._pendingBatch = null;
private _timeLimitMs: number;
private _sizeLimit: number;
_pendingBatchAggregator: NodeJS.Timer | null;
_pendingBatch: Array<{
request: { method: string; params: Array<any>; id: number; jsonrpc: "2.0" };
resolve: (result: any) => void;
reject: (error: Error) => void;
}> | null;

constructor(
url: string | utils.ConnectionInfo | undefined,
network: providers.Networkish | undefined,
batchOptions: BatchOptions = DEFAULT_BATCH_OPTIONS,
) {
super(url, network);
this._timeLimitMs = batchOptions.timeLimitMs || DEFAULT_BATCH_SIZE_LIMIT;
this._sizeLimit = batchOptions.sizeLimit || DEFAULT_BATCH_TIME_LIMIT_MS;
this._pendingBatchAggregator = null;
this._pendingBatch = null;
}

private sendCurrentBatch(request: any) {
// if we still have a timeout clear that first
if (this._pendingBatchAggregator) {
clearTimeout(this._pendingBatchAggregator);
}
// Get the current batch and clear it, so new requests
// go into the next batch
const batch = this._pendingBatch || [];
this._pendingBatch = null;
this._pendingBatchAggregator = null;

// Get the request as an array of requests
const request_ = batch.map((inflight) => inflight.request);

this.emit("debug", {
action: "requestBatch",
request: utils.deepCopy(request),
provider: this,
});

return utils.fetchJson(this.connection, JSON.stringify(request_)).then(
(result) => {
this.emit("debug", {
action: "response",
request: request_,
response: result,
provider: this,
});

// For each result, feed it to the correct Promise, depending
// on whether it was a success or error
batch.forEach((inflightRequest_, index) => {
const payload = result[index];
if (payload.error) {
const error = new Error(payload.error.message);
(error as any).code = payload.error.code;
(error as any).data = payload.error.data;
inflightRequest_.reject(error);
} else {
inflightRequest_.resolve(payload.result);
}
});
},
(error) => {
this.emit("debug", {
action: "response",
error: error,
request: request_,
provider: this,
});

// If there was an error, reject all the requests
batch.forEach((inflightRequest_) => {
inflightRequest_.reject(error);
});
},
);
}

send(method: string, params: Array<any>): Promise<any> {
const request = {
method: method,
params: params,
id: this._nextId++,
jsonrpc: "2.0",
};

if (this._pendingBatch === null) {
this._pendingBatch = [];
}

send(method: string, params: Array<any>): Promise<any> {
const request = {
method: method,
params: params,
id: (this._nextId++),
jsonrpc: "2.0"
};
const inflightRequest: any = { request, resolve: null, reject: null };

if (this._pendingBatch === null) {
this._pendingBatch = [ ];
}
const promise = new Promise((resolve, reject) => {
inflightRequest.resolve = resolve;
inflightRequest.reject = reject;
});

const inflightRequest: any = { request, resolve: null, reject: null };
// if we would go *over* the size limit of the batch with this request, send the batch now
if (this._pendingBatch.length === this._sizeLimit) {
this.sendCurrentBatch(request);
}

const promise = new Promise((resolve, reject) => {
inflightRequest.resolve = resolve;
inflightRequest.reject = reject;
});
this._pendingBatch.push(inflightRequest);

this._pendingBatch.push(inflightRequest);

if (!this._pendingBatchAggregator) {
// Schedule batch for next event loop + short duration
this._pendingBatchAggregator = setTimeout(() => {

// Get the current batch and clear it, so new requests
// go into the next batch
const batch = this._pendingBatch || [];
this._pendingBatch = null;
this._pendingBatchAggregator = null;

// Get the request as an array of requests
const request_ = batch.map((inflight) => inflight.request);

this.emit("debug", {
action: "requestBatch",
request: utils.deepCopy(request),
provider: this
});

return utils.fetchJson(this.connection, JSON.stringify(request_)).then((result) => {
this.emit("debug", {
action: "response",
request: request_,
response: result,
provider: this
});

// For each result, feed it to the correct Promise, depending
// on whether it was a success or error
batch.forEach((inflightRequest_, index) => {
const payload = result[index];
if (payload.error) {
const error = new Error(payload.error.message);
(<any>error).code = payload.error.code;
(<any>error).data = payload.error.data;
inflightRequest_.reject(error);
} else {
inflightRequest_.resolve(payload.result);
}
});

}, (error) => {
this.emit("debug", {
action: "response",
error: error,
request: request,
provider: this
});

batch.forEach((inflightRequest_) => {
inflightRequest_.reject(error);
});
});

}, 10);
}

return promise;
if (!this._pendingBatchAggregator) {
// Schedule batch for next event loop + short duration
this._pendingBatchAggregator = setTimeout(() => {
this.sendCurrentBatch(request);
}, this._timeLimitMs);
}
}

return promise;
}
}

0 comments on commit efc56fa

Please sign in to comment.