Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
Add locking to cron model
Browse files Browse the repository at this point in the history
  • Loading branch information
Braydon Fuller committed Nov 13, 2017
1 parent 51955c7 commit a72e25c
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 18 deletions.
103 changes: 85 additions & 18 deletions lib/models/cron.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict';

const assert = require('assert');
const mongoose = require('mongoose');
const SchemaOptions = require('../options');
const errors = require('storj-service-error-types');

const CronJob = new mongoose.Schema({
name: {
Expand All @@ -13,40 +12,108 @@ const CronJob = new mongoose.Schema({
},
locked: {
type: Boolean,
default: false
required: true
},
lockedEnd: {
type: Date,
required: true
},
started: {
type: Date,
default: Date.now,
required: true
},
finished: {
type: Date,
default: Date.now,
required: true
required: false
},
data: {
type: String,
required: false
}
});

Partner.set('toObject', {
transform: function(doc, ret) {
ret.id = ret._id;
delete ret.__v;
delete ret._id;
CronJob.statics.lock = function(name, expires, callback) {
const self = this;
const now = new Date();
const end = new Date(now.getTime() + expires);
assert(Number.isInteger(expires), 'Expires argument is expected');

function isDuplicate(err) {
return (err && err.code === 11000);
}
});

Partner.set('toJSON', {
transform: function(doc, ret) {
ret.id = ret._id;
delete ret.__v;
delete ret._id;
function getLock(done) {
const query = {
name: name,
locked: false
};
const sort = {};
const update = {
$set: {
name: name,
locked: true,
lockedEnd: end,
started: now
}
};
const options = {
new: true,
upsert: true,
writeConcern: 'majority'
};
self.collection.findAndModify(query, sort, update, options, (err, res) => {
if (isDuplicate(err)) {
return done(null, false);
} else if (err) {
return done(err, false);
}
done(null, true, res);
});
}
});

function getLockFromExpired(done) {
console.log('NOW', now);
const query = {
name: name,
locked: true,
lockedEnd: {
$lte: now
}
};
const sort = {};
const update = {
$set: {
name: name,
locked: true,
lockedEnd: end,
started: now
}
};
const options = {
new: true,
upsert: true,
writeConcern: 'majority'
};
self.collection.findAndModify(query, sort, update, options, (err, res) => {
if (isDuplicate(err)) {
return done(null, false);
} else if (err) {
return done(err, false);
}
done(null, true, res);
});
}

getLock((err, success, res) => {
if (err) {
return callback(err);
}
if (!success) {
return getLockFromExpired(callback);
}
callback(err, success, res);
});
};

module.exports = function(connection) {
return connection.model('CronJob', CronJob);
Expand Down
122 changes: 122 additions & 0 deletions test/cron.unit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
'use strict';

const chai = require('chai');
const expect = chai.expect;
const sinon = require('sinon');
const chaiDate = require('chai-datetime');
const mongoose = require('mongoose');

chai.use(chaiDate);
require('mongoose-types').loadTypes(mongoose);

const CronSchema = require('../lib/models/cron');

var Cron;
var connection;

before(function(done) {
connection = mongoose.createConnection(
'mongodb://127.0.0.1:27017/__storj-bridge-test',
function() {
Cron = CronSchema(connection);
Cron.remove({}, function() {
done();
});
}
);
});

after(function(done) {
connection.close(done);
});

describe('/Storage/models/Cron', function() {
const sandbox = sinon.sandbox.create();
afterEach(() => sandbox.restore());

describe('@constructor', function() {

it('should create new work', function (done) {
const now = new Date();
const end = new Date(now.getTime() + 1000);
const job = new Cron({
name: 'StorageEventsFinality',
locked: true,
lockedEnd: end,
started: now
});

job.save(function(err, job) {
if (err) {
return done(err);
}

expect(job.name).to.equal('StorageEventsFinality');
expect(job.locked).to.equal(true);
expect(job.lockedEnd).to.equal(end);
expect(job.started).to.equal(now);
done();
});
});
});

describe('#lock', function () {

it('should get lock without document', function(done) {

Cron.lock('SingletonOne', 1000, function(err) {
if (err) {
return done(err);
}
done();
});

});

it('should not get lock with existing locked document', function(done) {

Cron.lock('SingletonTwo', 10000, function(err, locked) {
if (err) {
return done(err);
}
expect(locked).to.equal(true);

Cron.lock('SingletonTwo', 10000, function(err, locked) {
if (err) {
return done(err);
}
expect(locked).to.equal(false);
done();
});
});

});

it('should get lock with expired locked document', function(done) {
const now = new Date();
const clock = sandbox.useFakeTimers();
clock.tick(now.getTime());
const expires = 10000;

Cron.lock('SingletonThree', expires, function(err, locked, res) {
if (err) {
return done(err);
}
expect(locked).to.equal(true);
expect(res);
clock.tick(expires);

Cron.lock('SingletonThree', expires, function(err, locked, res) {
if (err) {
return done(err);
}
expect(locked).to.equal(true);
expect(res);
done();
});
});
});

});

});

0 comments on commit a72e25c

Please sign in to comment.