Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Cohen committed Dec 20, 2012
0 parents commit 370ee9f
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
node_modules
20 changes: 20 additions & 0 deletions LICENSE
@@ -0,0 +1,20 @@
Copyright (c) 2012 Yahoo! Inc. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
125 changes: 125 additions & 0 deletions README.md
@@ -0,0 +1,125 @@
# redis-locking-worker

## Overview

An event driven implementation of global locks using Redis.

## Install

npm install redis-locking-worker

## Usage

```js
var RedisLockingWorker = require("redis-locking-worker");

var worker = new RedisLockingWorker({
"lockKey" : "mylock",
"statusLevel" : RedisLockingWorker.StatusLevels.Verbose,
"lockTimeout" : 5000,
"maxAttempts" : 5
});

worker.on("acquired", function(lastAttempt) {
if (Math.random() <= SUCCESS_CHANCE) {
console.log("Completed work successfully, cleaning up!");
lock.done(lastAttempt);
} else {
// oh no, we failed to do work!
console.log("Failed to do work, hopefully someone else will have better luck!");
}
});

worker.on("locked", function() {
console.log("Someone else acquired the lock");
});

worker.on("error", function(error) {
console.error("Error from lock: %j", error);
});

worker.on("status", function(message) {
console.log("Status message from lock: %s", message);
});

worker.acquire();
```

## Options

You can specify a variety of options when creating a new lock instance:

{
"client" : null, /* Instance of a node-redis redis client (https://github.com/mranney/node_redis) */
"host" : "localhost", /* Redis host to connect to if a client wasn't explicitly passed in */
"port" : 6379, /* Redis port to connect to if a client wasn't explicitly passed in */
"lockKey" : null, /* Name of the key to use for this lock, defaults to null, not optional */
"statusLevel" : RedisLockingWorker.StatusLevel.Normal, /* Verbosity to use when emitting status events */
"lockTimeout" : 5000, /* Time, in milliseconds, before a lock should expire. */
"maxAttempts" : 5, /* Number of attempts to complete work before giving up */
}

## Example

You can run:

node examples/cluster.js

For an example implementation that uses cluster to fork one worker per CPU core, each one with a 15% chance of completing the "work" successfully. You should see output along these lines:

Worker 1 failed to do work
Worker 2 did not acquire lock
Worker 3 did not acquire lock
Worker 5 did not acquire lock
Worker 4 did not acquire lock
Worker 6 did not acquire lock
Worker 7 did not acquire lock
Worker 8 did not acquire lock
Worker 2 Status message from lock: Checking status of work from attempt #1
Worker 2 Status message from lock: Work was not completed, trying to reacquire lock for attempt #1
Worker 2 Status message from lock: Trying to reacquire lock
Worker 2 Status message from lock: Current attempt count: 1
Worker 2 Status message from lock: This is attempt #2
Worker 2 failed to do work
Worker 3 Status message from lock: Checking status of work from attempt #1
Worker 3 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 5 Status message from lock: Checking status of work from attempt #1
Worker 5 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 4 Status message from lock: Checking status of work from attempt #1
Worker 4 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 6 Status message from lock: Checking status of work from attempt #1
Worker 6 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 7 Status message from lock: Checking status of work from attempt #1
Worker 7 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 8 Status message from lock: Checking status of work from attempt #1
Worker 8 Status message from lock: Attempt count has been incremented (expected 1, but it's 2), someone else got the lock
Worker 3 Status message from lock: Checking status of work from attempt #2
Worker 3 Status message from lock: Work was not completed, trying to reacquire lock for attempt #2
Worker 3 Status message from lock: Trying to reacquire lock
Worker 3 Status message from lock: Current attempt count: 2
Worker 3 Status message from lock: This is attempt #3
Worker 3 failed to do work
Worker 5 Status message from lock: Checking status of work from attempt #2
Worker 5 Status message from lock: Attempt count has been incremented (expected 2, but it's 3), someone else got the lock
Worker 4 Status message from lock: Checking status of work from attempt #2
Worker 4 Status message from lock: Attempt count has been incremented (expected 2, but it's 3), someone else got the lock
Worker 6 Status message from lock: Checking status of work from attempt #2
Worker 6 Status message from lock: Attempt count has been incremented (expected 2, but it's 3), someone else got the lock
Worker 7 Status message from lock: Checking status of work from attempt #2
Worker 7 Status message from lock: Attempt count has been incremented (expected 2, but it's 3), someone else got the lock
Worker 8 Status message from lock: Checking status of work from attempt #2
Worker 8 Status message from lock: Attempt count has been incremented (expected 2, but it's 3), someone else got the lock
Worker 5 Status message from lock: Checking status of work from attempt #3
Worker 5 Status message from lock: Work was not completed, trying to reacquire lock for attempt #3
Worker 5 Status message from lock: Trying to reacquire lock
Worker 5 Status message from lock: Current attempt count: 3
Worker 5 Status message from lock: This is attempt #4
Worker 5 completed work successfully, last attempt? false
Worker 4 Status message from lock: Checking status of work from attempt #3
Worker 4 Status message from lock: Work completed successfully by primary process, deleting lock
Worker 6 Status message from lock: Checking status of work from attempt #3
Worker 6 Status message from lock: Lock key is gone, someone else completed the work and deleted the lock
Worker 7 Status message from lock: Checking status of work from attempt #3
Worker 7 Status message from lock: Lock key is gone, someone else completed the work and deleted the lock
Worker 8 Status message from lock: Checking status of work from attempt #3
Worker 8 Status message from lock: Lock key is gone, someone else completed the work and deleted the lock
17 changes: 17 additions & 0 deletions examples/cluster.js
@@ -0,0 +1,17 @@
/*
Copyright (c) 2012, Yahoo! Inc. All rights reserved.
Copyrights licensed under the MIT License. See the accompanying LICENSE file for terms.
*/

var cluster = require("cluster");
var os = require("os");

var cpuCount = os.cpus().length;

cluster.setupMaster({
exec : __dirname + "/worker.js",
});

for (var i = 0; i < cpuCount; i++) {
cluster.fork();
}
41 changes: 41 additions & 0 deletions examples/worker.js
@@ -0,0 +1,41 @@
/*
Copyright (c) 2012, Yahoo! Inc. All rights reserved.
Copyrights licensed under the MIT License. See the accompanying LICENSE file for terms.
*/

var cluster = require("cluster");

var RedisLock = require("../");

var SUCCESS_CHANCE = 0.15;

var lock = new RedisLock({
"lockKey" : "mylock",
"statusLevel" : RedisLock.StatusLevels.Verbose,
"lockTimeout" : 5000,
"maxAttempts" : 5
});

lock.on("acquired", function(lastAttempt) {
if (Math.random() <= SUCCESS_CHANCE) {
console.log("Worker %d completed work successfully, last attempt?", cluster.worker.id, lastAttempt);
lock.done(lastAttempt);
} else {
// oh no, we failed to do work!
console.log("Worker %d failed to do work", cluster.worker.id);
}
});

lock.on("locked", function() {
console.log("Worker %d did not acquire lock", cluster.worker.id);
});

lock.on("error", function(error) {
console.error("Worker %d Error from lock: %j", cluster.worker.id, error);
});

lock.on("status", function(message) {
console.log("Worker %d Status message from lock: %s", cluster.worker.id, message);
});

lock.acquire();
162 changes: 162 additions & 0 deletions index.js
@@ -0,0 +1,162 @@
/*
Copyright (c) 2012, Yahoo! Inc. All rights reserved.
Copyrights licensed under the MIT License. See the accompanying LICENSE file for terms.
*/

var events = require("events");
var util = require("util");

var redis = require("redis");

var TIMEOUT_DEFAULT = 5000;
var MAX_LOCK_ATTEMPTS_DEFAULT = 5;
var DONE_VALUE = "DONE";

var RedisLockingWorker = module.exports = function RedisLockingWorker(settings) {
if (!(this instanceof RedisLockingWorker)) {
return new RedisLockingWorker(settings);
}

if (settings.client) {
this.client = client;
} else {
settings.port = settings.port || 6379;
settings.host = settings.host || "localhost";
this.client = redis.createClient(settings.port, settings.host);
}

this.lockKey = settings.lockKey;

this.statusLevel = settings.statusLevel || StatusLevels.Normal;
this.lockTimeout = settings.lockTimeout || TIMEOUT_DEFAULT;
this.maxAttempts = settings.maxAttempts || MAX_LOCK_ATTEMPTS_DEFAULT;
};
util.inherits(RedisLockingWorker, events.EventEmitter);

var StatusLevels = RedisLockingWorker.StatusLevels = {
"Verbose" : 1,
"Normal" : 2
};

RedisLockingWorker.prototype.acquire = function acquire() {
var that = this;

this.client.setnx(this.lockKey, 1, function(error, result) {
if (error) {
console.error("Attempt to acquire lock failed: %j", error);
return;
}

if (result) {
that.emit("acquired", false);
} else {
that.emit("locked");
setTimeout(checkLock.bind(that), that.lockTimeout, 1);
}
});
};

RedisLockingWorker.prototype.done = function done(lastAttempt) {
if (lastAttempt) {
this.client.del(this.lockKey);
} else {
this.client.set(this.lockKey, DONE_VALUE);
}
};

function emitStatus(level, message) {
if (level >= this.statusLevel) {
this.emit("status", message);
}
}

function reacquireLock(attemptCount) {
var that = this;

var emit = emitStatus.bind(this, StatusLevels.Normal);
var emitVerbose = emitStatus.bind(this, StatusLevels.Verbose);

emitVerbose("Trying to reacquire lock");

this.client.watch(this.lockKey);
this.client.get(this.lockKey, function(error, currentAttemptCount) {
if (!currentAttemptCount) {
emitVerbose("Lock is gone, someone else completed the work!");
return;
}

emitVerbose("Current attempt count: " + currentAttemptCount);

var attempts = parseInt(currentAttemptCount, 10) + 1;
emitVerbose("This is attempt #" + attempts);

if (attempts > that.maxAttempts) {
emitVerbose("Exceeded maximum attempts, giving up!");
that.emit("max-attempts");
that.client.unwatch();
that.client.expire(that.lockKey, ((that.lockTimeout / 1000) * 2));
return;
}

that.client.multi()
.set(that.lockKey, attempts)
.exec(function(error, replies) {
if (error) {
that.emit("error", error);
return;
}

if (!replies) {
// The value changed out from under us, we didn't get the lock!
that.emit("locked");
that.client.get(that.lockKey, function(error, currentAttemptCount) {
setTimeout(checkLock.bind(that), that.lockTimeout, currentAttemptCount);
});
} else {
that.emit("acquired", attempts === that.maxAttempts);
}
});
});
}

function checkLock(lastCount) {
var that = this;

var emit = emitStatus.bind(this, StatusLevels.Normal);
var emitVerbose = emitStatus.bind(this, StatusLevels.Verbose);

emitVerbose("Checking status of work from attempt #" + lastCount);

this.client.watch(this.lockKey);
this.client.multi()
.get(this.lockKey)
.exec(function(error, replies) {
if (error) {
that.emit("error", error);
return;
}

if (!replies) {
emit("Lock value has changed while we were checking it, someone else got the lock");
that.client.get(that.lockKey, function(error, newCount) {
setTimeout(checkLock.bind(that), that.lockTimeout, newCount);
});
return;
}

var currentCount = replies[0];
if (currentCount === null) {
emit("Lock key is gone, someone else completed the work and deleted the lock");
return;
} else if (currentCount === DONE_VALUE) {
emit("Work completed successfully by primary process, deleting lock");
that.client.del(that.lockKey);
} else if (currentCount == lastCount) {
emit("Work was not completed, trying to reacquire lock for attempt #" + currentCount);
reacquireLock.call(that, currentCount);
} else {
emitVerbose("Attempt count has been incremented (expected " + lastCount + ", but it's " + currentCount + "), someone else got the lock");
setTimeout(checkLock.bind(that), that.lockTimeout, currentCount);
}
});
};

0 comments on commit 370ee9f

Please sign in to comment.