Skip to content

Commit

Permalink
Merge pull request #9 from substreams-js/feature/callback-client
Browse files Browse the repository at this point in the history
Implement CallbackClient
  • Loading branch information
DenisCarriere authored Dec 8, 2023
2 parents fd508d0 + 88d40b9 commit 4046585
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 28 deletions.
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import { BlockEmitter, createNodeTransport } from "@substreams/node";

// auth API token
// https://app.streamingfast.io/
// https://app.pinax.network/
if (!process.env.SUBSTREAMS_API_TOKEN) {
throw new Error("SUBSTREAMS_API_TOKEN is require");
}

const token = process.env.SUBSTREAMS_API_TOKEN;
const baseUrl = "https://mainnet.eth.streamingfast.io:443";
const baseUrl = "https://eth.substreams.pinax.network:443";

// User parameters
const manifest = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.2.3/subtivity-ethereum-v0.2.3.spkg";
Expand All @@ -54,7 +55,6 @@ const request = createRequest({
outputModule,
startBlockNum,
stopBlockNum,
productionMode: true,
});

// NodeJS Events
Expand All @@ -72,6 +72,23 @@ emitter.on("anyMessage", (message, cursor, clock) => {
console.dir(clock);
});

await emitter.start();
console.log("✅ done");
// End of Stream
emitter.on("close", (error) => {
if (error) {
console.error(error);
}
console.timeEnd("🆗 close");
});

// Fatal Error
emitter.on("fatalError", (error) => {
console.error(error);
});

console.log("✅ start");
console.time("🆗 close");
const cancel = emitter.start();

// Cancel after 3 seconds
setTimeout(cancel, 3000);
```
22 changes: 19 additions & 3 deletions example.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ emitter.on("anyMessage", (message, cursor, clock) => {
console.dir(clock);
});

console.time("✅ done");
await emitter.start();
console.timeEnd("✅ done");
// End of Stream
emitter.on("close", (error) => {
if (error) {
console.error(error);
}
console.timeEnd("🆗 close");
});

// Fatal Error
emitter.on("fatalError", (error) => {
console.error(error);
});

console.log("✅ start");
console.time("🆗 close");
const cancel = emitter.start();

// Cancel after 3 seconds
setTimeout(cancel, 3000);
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@substreams/node",
"version": "0.4.4",
"version": "0.5.0",
"description": "Substreams for Node.js",
"license": "MIT",
"repository": "substreams-js/substreams-node",
Expand Down
40 changes: 22 additions & 18 deletions src/BlockEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
// import { type Request, type Response, Stream } from "@substreams/core/proto";
import { AnyMessage, IMessageTypeRegistry, JsonObject, Message } from "@bufbuild/protobuf";
import type { CallOptions, Transport } from "@connectrpc/connect";
import { isEmptyMessage, streamBlocks, unpackMapOutput } from "@substreams/core";
import type {
import { type CallOptions, type ConnectError, type Transport, createCallbackClient } from "@connectrpc/connect";
import { isEmptyMessage, unpackMapOutput } from "@substreams/core";
import {
BlockScopedData,
BlockUndoSignal,
Clock,
Error as FatalError,
InitialSnapshotComplete,
InitialSnapshotData,
ModulesProgress,
Request,
Response,
SessionInit,
Stream,
} from "@substreams/core/proto";
import { EventEmitter } from "eventemitter3";

Expand Down Expand Up @@ -73,14 +76,17 @@ type LocalEventTypes = {
clock: [clock: Clock];
output: [message: Message<AnyMessage>, cursor: string, clock: Clock];
anyMessage: [message: JsonObject, cursor: string, clock: Clock];

// error
close: [error?: ConnectError];
fatalError: [error: FatalError];
};

export class BlockEmitter extends TypedEventEmitter<LocalEventTypes> {
public transport: Transport;
public request: Request;
public registry: IMessageTypeRegistry;
public options?: CallOptions;
private stopped = false;

constructor(transport: Transport, request: Request, registry: IMessageTypeRegistry, options?: CallOptions) {
super();
Expand All @@ -90,23 +96,15 @@ export class BlockEmitter extends TypedEventEmitter<LocalEventTypes> {
this.options = options;
}

/**
* Stop streaming blocks
*/
public stop() {
this.stopped = true;
}

/**
* Start streaming blocks
*/
public async start() {
this.stopped = false;
public start() {
const closeCallback = (error?: ConnectError) => {
this.emit("close", error);
};

for await (const response of streamBlocks(this.transport, this.request, this.options)) {
if (this.stopped) {
break;
}
const messageCallback = (response: Response) => {
this.emit("response", response);

switch (response.message.case) {
Expand Down Expand Up @@ -147,7 +145,13 @@ export class BlockEmitter extends TypedEventEmitter<LocalEventTypes> {
this.emit("debugSnapshotComplete", response.message.value);
break;
}
case "fatalError": {
this.emit("fatalError", response.message.value);
break;
}
}
}
};
const client = createCallbackClient(Stream, this.transport);
return client.blocks(this.request, messageCallback, closeCallback, this.options);
}
}

0 comments on commit 4046585

Please sign in to comment.