Skip to content

Commit

Permalink
fix(worker): fix worker lock algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
gregberge committed Jul 26, 2022
1 parent bd25d89 commit ccf24ef
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions lib/locky.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const resourceKey = require("./resource-key");
/**
* @typedef RedisPromises
* @property {ReturnType<import('util').promisify<RedisClient["get"]>>} get
* @property {ReturnType<import('util').promisify<RedisClient["set"]>>} set
* @property {ReturnType<import('util').promisify<RedisClient["pexpire"]>>} pexpire
* @property {ReturnType<import('util').promisify<RedisClient["quit"]>>} quit
* @property {ReturnType<import('util').promisify<RedisClient["smembers"]>>} smembers
Expand All @@ -24,6 +25,7 @@ const resourceKey = require("./resource-key");
const promisifyRedisClient = (client) => {
const promises = {
get: promisify(client.get.bind(client)),
set: promisify(client.set.bind(client)),
pexpire: promisify(client.pexpire.bind(client)),
quit: promisify(client.quit.bind(client)),
smembers: promisify(client.smembers.bind(client)),
Expand Down Expand Up @@ -317,12 +319,14 @@ class Locky extends EventEmitter {
const run = async () => {
try {
await this.runOperation(async () => {
const trx = this.expirationWorker.redis.multi();
const key = `${this.prefix}expirate:worker`;
trx.setnx(key, "OK");
trx.pexpire(key, /** @type {number} */ (this.ttl));
const [locked] = await promisify(trx.exec.bind(trx))();

const locked = await this.redis.promises.set(
key,
"OK",
"NX",
"PX",
this.ttl
);
if (locked) {
await this.expireKeys();
await this.redis.promises.del(key);
Expand Down

0 comments on commit ccf24ef

Please sign in to comment.