Skip to content

Commit

Permalink
Add try-finally block to subscribe generator
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloSzx committed Jul 28, 2022
1 parent d2a8bdc commit 7dcf49a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/afraid-cars-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@soundxyz/redis-pubsub": patch
---

Add try-finally block to subscribe generator
52 changes: 27 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,41 +382,43 @@ export function RedisPubSub({

dataPromises.add(dataPromise);

const subscribing = redisSubscribe({
channel,
})?.then(subscriptionValue.ready.resolve, subscriptionValue.ready.reject);
try {
const subscribing = redisSubscribe({
channel,
})?.then(subscriptionValue.ready.resolve, subscriptionValue.ready.reject);

if (subscribing) await subscribing;
if (subscribing) await subscribing;

while (true) {
await dataPromise.current.promise;
while (true) {
await dataPromise.current.promise;

for (const value of dataPromise.current.values as Output[]) {
if (filter && !filter(value)) {
if (intLogLevel) {
logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", {
channel,
});
}
continue;
}

for (const value of dataPromise.current.values as Output[]) {
if (filter && !filter(value)) {
yield value;
}

if (dataPromise.current.isDone) {
if (intLogLevel) {
logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", {
logMessage("SUBSCRIPTION_FINISHED", {
channel,
});
}
continue;
}

yield value;
}

if (dataPromise.current.isDone) {
if (intLogLevel) {
logMessage("SUBSCRIPTION_FINISHED", {
channel,
});
break;
} else {
dataPromise.current = pubsubDeferredPromise();
}
break;
} else {
dataPromise.current = pubsubDeferredPromise();
}
} finally {
await dataPromise.unsubscribe();
}

await dataPromise.unsubscribe();
}

async function unsubscribe(
Expand Down

0 comments on commit 7dcf49a

Please sign in to comment.