generated from mkizka/typescript-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription.ts
81 lines (73 loc) · 2.33 KB
/
subscription.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { ids, lexicons } from "@atproto/bsky/dist/lexicon/lexicons.js";
import type { OutputSchema as RepoEvent } from "@atproto/bsky/dist/lexicon/types/com/atproto/sync/subscribeRepos.js";
import { isCommit } from "@atproto/bsky/dist/lexicon/types/com/atproto/sync/subscribeRepos.js";
import { cborToLexRecord, readCar } from "@atproto/repo";
import { Subscription } from "@atproto/xrpc-server";
type Operation = {
action: string;
uri: string;
cid: string;
repo: string;
collection: string;
record: Record<string, unknown>;
};
export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>;
constructor(public service: string = "wss://bsky.network") {
this.sub = new Subscription({
service: service,
method: ids.ComAtprotoSyncSubscribeRepos,
getParams: () => undefined, // TODO: fix
validate: (value: unknown) => {
try {
return lexicons.assertValidXrpcMessage<RepoEvent>(
ids.ComAtprotoSyncSubscribeRepos,
value,
);
} catch (err) {
console.error("repo subscription skipped invalid message", err);
}
},
});
}
async handleEvent(event: RepoEvent) {
if (!isCommit(event)) return;
const car = await readCar(event.blocks);
const operations: Operation[] = [];
for (const op of event.ops) {
if (!op.cid) continue;
const recordBytes = car.blocks.get(op.cid);
if (!recordBytes) continue;
operations.push({
action: op.action,
cid: `${op.cid}`,
uri: `at://${event.repo}/${op.path}`,
collection: op.path.split("/")[0],
repo: event.repo,
record: cborToLexRecord(recordBytes),
});
}
return this.handle(operations, event);
}
abstract handle(
operations: Operation[],
event: RepoEvent,
): Promise<void> | void;
async run() {
for await (const evt of this.sub) {
try {
await this.handleEvent(evt);
} catch (e) {
console.error(e);
}
}
}
}
class FirehoseSubscription extends FirehoseSubscriptionBase {
handle(operations: Operation[], _event: RepoEvent): void | Promise<void> {
if (operations[0] && !operations[0].collection.startsWith("app.bsky")) {
console.log(JSON.stringify(operations, null, 2));
}
}
}
new FirehoseSubscription().run().catch(console.error);