-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription.ts
109 lines (99 loc) · 2.89 KB
/
subscription.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { inArray } from 'drizzle-orm';
import { check } from './ai.js';
import { post } from './db/schema.js';
import {
OutputSchema as RepoEvent,
isCommit,
} from './lexicon/types/com/atproto/sync/subscribeRepos.js';
import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription.js';
import fs from 'node:fs/promises';
const known_svelte_words = [
'sveltekit',
'svelte-kit',
'svelte kit',
'sveltejs',
'svelte js',
'svelte.dev',
'sveltesociety.dev',
'svelte.london',
];
let known_dids: Set<string> | undefined;
let known_dids_last_read_at: Date | undefined;
export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return;
if (evt.blocks.length === 0) return;
let ops: Awaited<ReturnType<typeof getOpsByType> | undefined>;
try {
ops = await getOpsByType(evt);
} catch {}
if (!ops) {
console.log("can't get ops by type", evt.repo, evt.blocks.length);
return;
}
const postsToDelete = ops.posts.deletes.map((del) => del.uri);
const postsToCreatePromises = await Promise.allSettled(
ops.posts.creates
.filter((create) => {
// only svelte-related posts
return create.record.text.toLowerCase().includes('svelte');
})
.map(async (create) => {
try {
const stat = await fs.stat('known-dids.json');
if (
!known_dids ||
!known_dids_last_read_at ||
stat.mtime.getTime() > known_dids_last_read_at.getTime()
) {
let known_dids_string = await fs.readFile(
'known-dids.json',
'utf-8',
);
known_dids = new Set(JSON.parse(known_dids_string));
known_dids_last_read_at = stat.mtime;
console.log('known dids read at time', stat.mtime.toString());
}
} catch {
console.log('something went wrong reading the file');
}
let text = create.record.text.toLowerCase();
let include = true;
if (
(known_dids == null || !known_dids.has(create.author)) &&
!known_svelte_words.some((word) => text.includes(word))
) {
// if we don't have any known svelte word in the post we can check with
// claude 💰💰💰
console.log('using claude to determine');
include = await check(create.record.text);
}
console.log(include, text);
// map svelte-related posts to a db row
return {
uri: create.uri,
cid: create.cid,
indexedAt: new Date().toISOString(),
include,
};
}),
);
const postsToCreate = postsToCreatePromises
.filter((post) => post.status === 'fulfilled')
.filter((post) => post.value.include)
.map((post) => post.value);
if (postsToDelete.length > 0) {
await this.db
.delete(post)
.where(inArray(post.uri, postsToDelete))
.execute();
}
if (postsToCreate.length > 0) {
await this.db
.insert(post)
.values(postsToCreate)
.onConflictDoNothing()
.execute();
}
}
}