Skip to content

Commit

Permalink
Merge pull request #7 from chamorin/main
Browse files Browse the repository at this point in the history
Replaced @bufbuild/connect deprecated dependency with @connectrpc
  • Loading branch information
DenisCarriere committed Nov 17, 2023
2 parents 76f8c93 + dca7478 commit b5063de
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 68 deletions.
101 changes: 58 additions & 43 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@substreams/node",
"version": "0.3.0",
"version": "0.4.0",
"description": "Substreams for Node.js",
"license": "MIT",
"repository": "substreams-js/substreams-node",
Expand All @@ -19,11 +19,11 @@
"files": ["dist", "src", "example.js"],
"keywords": ["substreams", "streamingfast", "thegraph", "graph"],
"dependencies": {
"@bufbuild/connect": "latest",
"@bufbuild/connect-node": "latest",
"@substreams/core": "^0.7.0",
"@bufbuild/protobuf": "latest",
"@substreams/core": "^0.1.19",
"eventemitter3": "latest"
"eventemitter3": "latest",
"@connectrpc/connect": "latest",
"@connectrpc/connect-node": "latest"
},
"devDependencies": {
"@tsconfig/recommended": "latest",
Expand Down
32 changes: 15 additions & 17 deletions src/BlockEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { CallOptions, Transport } from "@bufbuild/connect";
import { createPromiseClient } from "@bufbuild/connect";
import { AnyMessage, IMessageTypeRegistry, JsonObject, Message } from "@bufbuild/protobuf";
import { Progress, createStateTracker, isEmptyMessage, unpackMapOutput } from "@substreams/core";
import type { CallOptions, Transport } from "@connectrpc/connect";
import { createPromiseClient } from "@connectrpc/connect";
import { isEmptyMessage, unpackMapOutput } from "@substreams/core";
import type {
BlockScopedData,
BlockUndoSignal,
Expand Down Expand Up @@ -73,16 +73,16 @@ export class TypedEventEmitter<TEvents extends Record<string, any>> {
type LocalEventTypes = {
// block
block: [block: BlockScopedData];
session: [session: SessionInit, state: Progress];
progress: [progress: ModulesProgress, state: Progress];
undo: [undo: BlockUndoSignal, state: Progress];
session: [session: SessionInit];
progress: [progress: ModulesProgress];
undo: [undo: BlockUndoSignal];

// debug (only available in development mode)
debugSnapshotData: [undo: InitialSnapshotData, state: Progress];
debugSnapshotComplete: [undo: InitialSnapshotComplete, state: Progress];
debugSnapshotData: [undo: InitialSnapshotData];
debugSnapshotComplete: [undo: InitialSnapshotComplete];

// response
response: [response: Response, state: Progress];
response: [response: Response];
cursor: [cursor: string, clock: Clock];
clock: [clock: Clock];
output: [message: Message<AnyMessage>, cursor: string, clock: Clock];
Expand Down Expand Up @@ -116,15 +116,13 @@ export class BlockEmitter extends TypedEventEmitter<LocalEventTypes> {
*/
public async start() {
this.stopped = false;
const track = createStateTracker();
const client = createPromiseClient(Stream, this.transport);

for await (const response of client.blocks(this.request, this.options)) {
if (this.stopped) {
break;
}
const state = track(response);
this.emit("response", response, state);
this.emit("response", response);

switch (response.message.case) {
case "blockScopedData": {
Expand All @@ -145,23 +143,23 @@ export class BlockEmitter extends TypedEventEmitter<LocalEventTypes> {
break;
}
case "progress": {
this.emit("progress", response.message.value, state);
this.emit("progress", response.message.value);
break;
}
case "session": {
this.emit("session", response.message.value, state);
this.emit("session", response.message.value);
break;
}
case "blockUndoSignal": {
this.emit("undo", response.message.value, state);
this.emit("undo", response.message.value);
break;
}
case "debugSnapshotData": {
this.emit("debugSnapshotData", response.message.value, state);
this.emit("debugSnapshotData", response.message.value);
break;
}
case "debugSnapshotComplete": {
this.emit("debugSnapshotComplete", response.message.value, state);
this.emit("debugSnapshotComplete", response.message.value);
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/createDefaultTransport.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createHeadersInterceptor } from "./createHeadersInterceptor.js";
import { Transport } from "@bufbuild/connect";
import { createGrpcTransport } from "@bufbuild/connect-node";
import { Transport } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import type { IMessageTypeRegistry } from "@bufbuild/protobuf";
import { createAuthInterceptor } from "@substreams/core";

Expand Down
2 changes: 1 addition & 1 deletion src/createHeadersInterceptor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Interceptor } from "@bufbuild/connect";
import type { Interceptor } from "@connectrpc/connect";

export function createHeadersInterceptor(headers?: Headers): Interceptor {
return (next) => async (req) => {
Expand Down

0 comments on commit b5063de

Please sign in to comment.