-
Notifications
You must be signed in to change notification settings - Fork 194
/
index.ts
120 lines (106 loc) · 3.42 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import fetch from 'cross-fetch';
import beams from '../helpers/beams';
import db from '../helpers/mysql';
import subscribers from './subscribers.json';
import chunk from 'lodash/chunk';
import { getProposalScores } from '../scores';
import { sha256 } from '../helpers/utils';
const delay = 5;
const interval = 30;
const serviceEvents = parseInt(process.env.SERVICE_EVENTS || '0');
const serviceEventsSalt = parseInt(process.env.SERVICE_EVENTS_SALT || '12345');
const servicePushNotifications = parseInt(
process.env.SERVICE_PUSH_NOTIFICATIONS || '0'
);
const getProposal = async proposalId => {
try {
const proposals = await db.queryAsync(
'SELECT * FROM proposals WHERE id = ?',
[proposalId]
);
const proposal = proposals[0];
return proposal;
} catch (error) {
throw new Error(`Proposal not found with id ${proposalId}`);
}
};
const getSubscribedWallets = async space => {
const subscriptions = await db.queryAsync(
'SELECT * FROM subscriptions WHERE space = ?',
[space]
);
const wallets = subscriptions.map(subscription => subscription.address);
return wallets;
};
const sendPushNotification = async event => {
const subscribedWallets = await getSubscribedWallets(event.space);
const walletsChunks = chunk(subscribedWallets, 100);
const proposal = await getProposal(event.id.replace('proposal/', ''));
for await (const walletsChunk of walletsChunks) {
await beams.publishToInterests(walletsChunk, {
web: {
notification: {
title: event.space,
body: proposal.title,
deep_link: `${process.env.SNAPSHOT_URI}/#/${event.space}/${event.id}`
}
}
});
}
};
async function sendEvent(event, to) {
event.token = sha256(`${to}${serviceEventsSalt}`);
event.secret = sha256(`${to}${serviceEventsSalt}`);
const res = await fetch(to, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(event)
});
return res.json();
}
async function processEvents() {
const ts = parseInt((Date.now() / 1e3).toFixed()) - delay;
const events = await db.queryAsync('SELECT * FROM events WHERE expire <= ?', [
ts
]);
console.log('[events] Process event start', ts, events.length);
for (const event of events) {
if (event.event === 'proposal/end') {
try {
const proposalId = event.id.replace('proposal/', '');
const scores = await getProposalScores(proposalId);
console.log(
'[events] Stored scores on proposal/end',
scores.scores_state,
proposalId
);
} catch (e) {
console.log('[events]', e);
}
}
if (servicePushNotifications && event.event === 'proposal/start')
sendPushNotification(event);
Promise.all(
subscribers
.filter(
subscriber =>
!subscriber.spaces || subscriber.spaces.includes(event.space)
)
.map(subscriber => sendEvent(event, subscriber.url))
)
.then(() => console.log('[events] Process event done'))
.catch(e => console.log('[events] Process event failed', e));
try {
await db.queryAsync(
'DELETE FROM events WHERE id = ? AND event = ? LIMIT 1',
[event.id, event.event]
);
console.log(`[events] Event sent ${event.id} ${event.event}`);
} catch (e) {
console.log('[events]', e);
}
}
}
if (serviceEvents) {
setInterval(async () => await processEvents(), interval * 1e3);
}