diff --git a/agent/src/agents/twitch.ts b/agent/src/agents/twitch.ts index a442b4181..db6388b0c 100644 --- a/agent/src/agents/twitch.ts +++ b/agent/src/agents/twitch.ts @@ -478,35 +478,48 @@ async function join( .orderBy("createdAt", "desc") .onSnapshot(async (snapshot) => { for (const change of snapshot.docChanges()) { - if (change.type == "added") { - // verify that the user id matches the channel id. - const userId = change.doc.get("userId"); - if (userId) { - const channelId = await getChannelId(userId, "twitch"); - if (channelId != `twitch:${authProvider.providerId}`) { - continue; - } - } - const targetChannel = change.doc.get("targetChannel"); - const message = change.doc.get("message"); - if (!targetChannel || !message) { + if (change.type != "added") { + continue; + } + // verify that the user id matches the channel id. + const userId = change.doc.get("userId"); + if (userId) { + const channelId = await getChannelId(userId, "twitch"); + if (channelId != `twitch:${authProvider.providerId}`) { continue; } - try { - await send.say(targetChannel, message); - await change.doc.ref.update({ - sentAt: admin.firestore.FieldValue.serverTimestamp(), - }); - } catch (e: any) { - log.error( - { error: e, targetChannel, message }, - "error sending message" - ); - await change.doc.ref.update({ - error: e.message, + } + const targetChannel = change.doc.get("targetChannel"); + const message = change.doc.get("message"); + if (!targetChannel || !message) { + continue; + } + try { + await admin.firestore().runTransaction(async (transaction) => { + const doc = await transaction.get(change.doc.ref); + if (doc.get("sentAt")) { + return; + } + transaction.update(change.doc.ref, { sentAt: admin.firestore.FieldValue.serverTimestamp(), }); - } + }); + } catch (error) { + // transaction failed, probably because it was already sent. + continue; + } + try { + await send.say(targetChannel, message); + await change.doc.ref.update({ isComplete: true }); + } catch (e: any) { + log.error( + { error: e, targetChannel, message }, + "error sending message" + ); + await change.doc.ref.update({ + isComplete: true, + error: e.message, + }); } } });