-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.ts
83 lines (72 loc) · 2.81 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import { PubSub, Message } from "@google-cloud/pubsub";
const pubSubClient = new PubSub({
keyFilename: "<INSERT FILEPATH TO GCP CREDENTIALS HERE>",
});
// main function to publish messages from a subscription to a topic
export async function republishMessages(
subscriptionName: string,
topicName: string,
timeout = 3000,
maxSimultaneousMessages = 20,
): Promise<void> {
await pullFromSubscriptionAndProcess(
subscriptionName,
async (message: Message) => {
await publishToTopic(topicName, message);
},
timeout,
maxSimultaneousMessages,
);
}
// there's no API to explicitly pull all messages enqueued in the subscription
// we must instead poll via subscribing as the code below does
async function pullFromSubscriptionAndProcess(
subscriptionName: string,
processMessage: (message: Message) => Promise<void>,
timeout = 3000,
maxSimultaneousMessages = 20,
): Promise<void> {
let mostRecentMessageTimestamp = 0;
const processingMessageIds = new Set<string>();
const subscription = pubSubClient.subscription(subscriptionName, {
flowControl: { maxMessages: maxSimultaneousMessages },
});
subscription.on("message", async (message: Message) => {
try {
mostRecentMessageTimestamp = Date.now();
processingMessageIds.add(message.id);
await processMessage(message);
message.ack();
} catch (err) {
console.error(err);
message.nack();
} finally {
processingMessageIds.delete(message.id);
}
});
// initial wait
await new Promise((resolve) => setTimeout(resolve, timeout));
// wait for the timeout time to be hit since the last message
// was received, also wait for all message processing to finish,
// then finish and close the subscription
while (getTimeSinceMostRecent() < timeout || processingMessageIds.size > 0) {
await new Promise((resolve) => setTimeout(resolve, timeout - getTimeSinceMostRecent()));
}
await subscription.close();
function getTimeSinceMostRecent() {
return Date.now() - mostRecentMessageTimestamp;
}
}
// ensure processing messages happens one at a time per topic, otherwise we hit a rate limit with GCP
const promiseQueueMap = new Map<string, Promise<void>>();
async function publishToTopic(topicName: string, message: Message) {
if (!promiseQueueMap.has(topicName)) {
promiseQueueMap.set(topicName, Promise.resolve());
}
const { id, data, attributes } = message;
const appendedPromise = promiseQueueMap.get(topicName)!.then(async () => {
await getPubSub().topic(topicName).publishMessage({ messageId: id, attributes, data });
});
promiseQueueMap.set(topicName, appendedPromise);
await appendedPromise;
}