forked from bluesky-social/feed-generator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
subscription.ts
85 lines (75 loc) · 2.72 KB
/
subscription.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import { DidResolver, MemoryCache } from '@atproto/identity'
import {
OutputSchema as RepoEvent,
isCommit,
} from './lexicon/types/com/atproto/sync/subscribeRepos'
import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
import { allTerms, excludeTerms, regex, communitySubdomains, dids } from './util/subscription-filters/filters'
const subdomainIncludedDids = new Map<string, boolean>()
const didCache = new MemoryCache()
const didResolver = new DidResolver({
plcUrl: 'https://plc.directory',
didCache,
})
export class FirehoseSubscription extends FirehoseSubscriptionBase {
async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return
const ops = await getOpsByType(evt)
// This logs the text of every post off the firehose.
// Just for fun :)
// Delete before actually using
/*
for (const post of ops.posts.creates) {
console.log(post.record.text)
}
*/
const postsToDelete = ops.posts.deletes.map((del) => del.uri)
const postsToCreate = ops.posts.creates
.filter((create) => {
if (!create.record.langs?.includes('pt')) return false
// check if user's handle subdomain is in our list of community handle subdomains
if (!subdomainIncludedDids.has(create.author)) {
didResolver.resolve(create.author).then((doc) => {
doc?.alsoKnownAs?.map(h => h.replace("at://", "")).forEach(handle => {
if (communitySubdomains.some(domain => handle.endsWith(`${domain}`))) {
subdomainIncludedDids.set(create.author, true)
console.log(`Including ${create.author} in feed`)
} else {
subdomainIncludedDids.set(create.author, false)
}
})
})
}
return (
allTerms.some((x) => new RegExp(`\\b${x}\\b`, 'i').test(create.record.text))
|| dids.includes(create.author)
|| regex.some((x) => x.test(create.record.text))
|| subdomainIncludedDids.get(create.author)
) && !excludeTerms.some((x) => new RegExp(`\\b${x}\\b`, 'i').test(create.record.text))
})
.map((create) => {
// map alf-related posts to a db row
return {
uri: create.uri,
cid: create.cid,
indexedAt: new Date().toISOString(),
}
})
if (postsToDelete.length > 0) {
await this.db
.deleteFrom('post')
.where('uri', 'in', postsToDelete)
.execute()
}
if (postsToCreate.length > 0) {
for (const post of postsToCreate) {
console.log(post)
}
await this.db
.insertInto('post')
.values(postsToCreate)
.onConflict((oc) => oc.doNothing())
.execute()
}
}
}