forked from YahooArchive/redis-locking-worker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
164 lines (135 loc) · 4.44 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
Copyright (c) 2012, Yahoo! Inc. All rights reserved.
Copyrights licensed under the MIT License. See the accompanying LICENSE file for terms.
*/
var events = require("events");
var util = require("util");
var redis = require("redis");
var TIMEOUT_DEFAULT = 5000;
var MAX_LOCK_ATTEMPTS_DEFAULT = 5;
var DONE_VALUE = "DONE";
var RedisWorker = module.exports = function RedisWorker(settings) {
if (!(this instanceof RedisWorker)) {
return new RedisWorker(settings);
}
if (settings.client) {
this.client = settings.client;
} else {
settings.port = settings.port || 6379;
settings.host = settings.host || "localhost";
this.client = redis.createClient(settings.port, settings.host);
}
this.lockKey = null;
this.statusLevel = settings.statusLevel || StatusLevels.Normal;
this.lockTimeout = settings.lockTimeout || TIMEOUT_DEFAULT;
this.maxAttempts = settings.maxAttempts || MAX_LOCK_ATTEMPTS_DEFAULT;
};
util.inherits(RedisWorker, events.EventEmitter);
var StatusLevels = RedisWorker.StatusLevels = {
"Verbose" : 1,
"Normal" : 2
};
RedisWorker.prototype.acquire = function acquire(customKey) {
this.lockKey = customKey;
var that = this;
this.client.setnx(this.lockKey, 1, function(error, result) {
if (error) {
console.error("Attempt to acquire lock failed: %j", error);
return;
}
if (result) {
that.emit("acquired", false);
} else {
that.emit("locked");
setTimeout(checkLock.bind(that), that.lockTimeout, 1);
}
});
};
RedisWorker.prototype.done = function done(lastAttempt) {
if (lastAttempt) {
this.client.del(this.lockKey);
} else {
this.client.set(this.lockKey, DONE_VALUE);
}
};
function emitStatus(level, message) {
if (level >= this.statusLevel) {
this.emit("status", message);
}
}
function reacquireLock(attemptCount) {
var that = this;
var emit = emitStatus.bind(this, StatusLevels.Normal);
var emitVerbose = emitStatus.bind(this, StatusLevels.Verbose);
emitVerbose("Trying to reacquire lock");
this.client.watch(this.lockKey);
this.client.get(this.lockKey, function(error, currentAttemptCount) {
if (!currentAttemptCount) {
emitVerbose("Lock is gone, someone else completed the work!");
return;
}
emitVerbose("Current attempt count: " + currentAttemptCount);
var attempts = parseInt(currentAttemptCount, 10) + 1;
emitVerbose("This is attempt #" + attempts);
if (attempts > that.maxAttempts) {
emitVerbose("Exceeded maximum attempts, giving up!");
that.emit("max-attempts");
that.client.unwatch();
that.client.expire(that.lockKey, ((that.lockTimeout / 1000) * 2));
return;
}
that.client.multi()
.set(that.lockKey, attempts)
.exec(function(error, replies) {
if (error) {
that.emit("error", error);
return;
}
if (!replies) {
// The value changed out from under us, we didn't get the lock!
that.emit("locked");
that.client.get(that.lockKey, function(error, currentAttemptCount) {
setTimeout(checkLock.bind(that), that.lockTimeout, currentAttemptCount);
});
} else {
that.emit("acquired", attempts === that.maxAttempts);
}
});
});
}
function checkLock(lastCount) {
var that = this;
var emit = emitStatus.bind(this, StatusLevels.Normal);
var emitVerbose = emitStatus.bind(this, StatusLevels.Verbose);
emitVerbose("Checking status of work from attempt #" + lastCount);
this.client.watch(this.lockKey);
this.client.multi()
.get(this.lockKey)
.exec(function(error, replies) {
if (error) {
that.emit("error", error);
return;
}
if (!replies) {
emit("Lock value has changed while we were checking it, someone else got the lock");
that.client.get(that.lockKey, function(error, newCount) {
setTimeout(checkLock.bind(that), that.lockTimeout, newCount);
});
return;
}
var currentCount = replies[0];
if (currentCount === null) {
emit("Lock key is gone, someone else completed the work and deleted the lock");
return;
} else if (currentCount === DONE_VALUE) {
emit("Work completed successfully by primary process, deleting lock");
that.client.del(that.lockKey);
} else if (currentCount == lastCount) {
emit("Work was not completed, trying to reacquire lock for attempt #" + currentCount);
reacquireLock.call(that, currentCount);
} else {
emitVerbose("Attempt count has been incremented (expected " + lastCount + ", but it's " + currentCount + "), someone else got the lock");
setTimeout(checkLock.bind(that), that.lockTimeout, currentCount);
}
});
};