-
Notifications
You must be signed in to change notification settings - Fork 239
/
static-batch-rpc.ts
130 lines (111 loc) · 3.89 KB
/
static-batch-rpc.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import { providers, utils } from "ethers";
const DEFAULT_BATCH_TIME_LIMIT_MS = 50;
const DEFAULT_BATCH_SIZE_LIMIT = 250;
const DEFAULT_BATCH_OPTIONS = {
timeLimitMs: DEFAULT_BATCH_TIME_LIMIT_MS,
sizeLimit: DEFAULT_BATCH_SIZE_LIMIT,
};
export type BatchOptions = Partial<typeof DEFAULT_BATCH_OPTIONS>;
// mostly copied from ethers.js directly but make it a StaticJsonRpcProvider
export class StaticJsonRpcBatchProvider extends providers.StaticJsonRpcProvider {
private _timeLimitMs: number;
private _sizeLimit: number;
_pendingBatchAggregator: NodeJS.Timer | null;
_pendingBatch: Array<{
request: { method: string; params: Array<any>; id: number; jsonrpc: "2.0" };
resolve: (result: any) => void;
reject: (error: Error) => void;
}> | null;
constructor(
url: string | utils.ConnectionInfo | undefined,
network: providers.Networkish | undefined,
batchOptions: BatchOptions = DEFAULT_BATCH_OPTIONS,
) {
super(url, network);
this._timeLimitMs = batchOptions.timeLimitMs || DEFAULT_BATCH_SIZE_LIMIT;
this._sizeLimit = batchOptions.sizeLimit || DEFAULT_BATCH_TIME_LIMIT_MS;
this._pendingBatchAggregator = null;
this._pendingBatch = null;
}
private sendCurrentBatch(request: any) {
// if we still have a timeout clear that first
if (this._pendingBatchAggregator) {
clearTimeout(this._pendingBatchAggregator);
}
// Get the current batch and clear it, so new requests
// go into the next batch
const batch = this._pendingBatch || [];
this._pendingBatch = null;
this._pendingBatchAggregator = null;
// Get the request as an array of requests
const request_ = batch.map((inflight) => inflight.request);
this.emit("debug", {
action: "requestBatch",
request: utils.deepCopy(request),
provider: this,
});
return utils.fetchJson(this.connection, JSON.stringify(request_)).then(
(result) => {
this.emit("debug", {
action: "response",
request: request_,
response: result,
provider: this,
});
// For each result, feed it to the correct Promise, depending
// on whether it was a success or error
batch.forEach((inflightRequest_, index) => {
const payload = result[index];
if (payload.error) {
const error = new Error(payload.error.message);
(error as any).code = payload.error.code;
(error as any).data = payload.error.data;
inflightRequest_.reject(error);
} else {
inflightRequest_.resolve(payload.result);
}
});
},
(error) => {
this.emit("debug", {
action: "response",
error: error,
request: request_,
provider: this,
});
// If there was an error, reject all the requests
batch.forEach((inflightRequest_) => {
inflightRequest_.reject(error);
});
},
);
}
send(method: string, params: Array<any>): Promise<any> {
const request = {
method: method,
params: params,
id: this._nextId++,
jsonrpc: "2.0",
};
if (this._pendingBatch === null) {
this._pendingBatch = [];
}
const inflightRequest: any = { request, resolve: null, reject: null };
const promise = new Promise((resolve, reject) => {
inflightRequest.resolve = resolve;
inflightRequest.reject = reject;
});
// if we would go *over* the size limit of the batch with this request, send the batch now
if (this._pendingBatch.length === this._sizeLimit) {
this.sendCurrentBatch(request);
}
this._pendingBatch.push(inflightRequest);
if (!this._pendingBatchAggregator) {
// Schedule batch for next event loop + short duration
this._pendingBatchAggregator = setTimeout(() => {
this.sendCurrentBatch(request);
}, this._timeLimitMs);
}
return promise;
}
}