Skip to content

Commit

Permalink
v1.4.7 Change everything (a little bit)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy van de Water committed Nov 22, 2016
1 parent 04fb9d6 commit c5bdf28
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 130 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "slurry-spreader",
"version": "1.4.6",
"version": "1.4.7",
"description": "Spread the Slurry around",
"main": "index.js",
"scripts": {
Expand Down
145 changes: 72 additions & 73 deletions src/spreader.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,21 @@ async = require 'async'
UUID = require 'uuid'

class SlurrySpreader extends EventEmitter2
constructor: (options={}, dependencies={}) ->
{
@redisUri
@namespace
} = options
{
@UUID
} = dependencies
@UUID ?= UUID
constructor: ({@redisUri, @namespace, @lockTimeout}, dependencies={}) ->
{@UUID} = dependencies

throw new Error('SlurrySpreader: @redisUri is required') unless @redisUri?
throw new Error('SlurrySpreader: @namespace is required') unless @namespace?

add: (slurry, callback) =>
{
uuid
} = slurry
@UUID ?= UUID
@lockTimeout ?= 60 * 1000

add: (slurry, callback) =>
slurry = _.cloneDeep slurry
{uuid} = slurry
debug 'add', uuid

nonce = @UUID.v4()
slurry.nonce = nonce
slurry.nonce = @UUID.v4()

tasks = [
async.apply @redisClient.set, "data:#{uuid}", JSON.stringify(slurry)
Expand All @@ -38,49 +32,59 @@ class SlurrySpreader extends EventEmitter2

async.series tasks, callback

close: (slurry, callback) =>
@_unclaimSlurry slurry.uuid, callback
close: ({uuid}, callback) =>
@_releaseLock uuid, callback

connect: (callback) =>
@slurries = {}
@redisClient = new RedisNS @namespace, redis.createClient(@redisUri, dropBufferSupport: true)
@queueClient = new RedisNS @namespace, redis.createClient(@redisUri, dropBufferSupport: true)

redlockOptions =
retryCount: 100
retryDelay: 100
@redlock = new Redlock [@queueClient], redlockOptions
@redlock = new Redlock [@queueClient], retryCount: 0
callback()

processQueue: (cb) =>
callback = (error) =>
return cb error if error?
return _.delay cb, 100

@queueClient.brpoplpush 'slurries', 'slurries', 30, (error, uuid) =>
debug 'brpoplpush'
return callback error if error?
return callback() unless uuid?

return @_extendOrReleaseLock uuid, callback if @_isSubscribed uuid
return @_acquireLock uuid, callback

processQueueForever: =>
async.until @_isStopped, @processQueue, (error) =>
throw error if error?

remove: ({ uuid }, callback) =>
debug 'remove', uuid

tasks = [
async.apply @redisClient.del, "data:#{uuid}"
async.apply @redisClient.lrem, 'slurries', 0, uuid
async.apply @_unclaimSlurry, uuid
]

async.series tasks, callback

start: (callback) =>
@connect (error) =>
return callback error if error?
@_processQueueForever()
@processQueueForever()
callback()

stop: (callback) =>
@stopped = true
async.eachSeries _.keys(@slurries), @_unclaimSlurry, callback
async.eachSeries _.keys(@slurries), @_releaseLock, callback

_acquireLock: (uuid, callback) =>
@redlock.lock "locks:#{uuid}", 60*1000, (error, lock) =>
debug '_acquireLock', "locks:#{uuid}", @lockTimeout
@redlock.lock "locks:#{uuid}", @lockTimeout, (error, lock) =>
return callback() if error?
return callback() unless lock?
@_handleSlurry uuid, (error) =>
lock.unlock (lockError) =>
return callback lockError if lockError?
callback error
@_createSlurry {uuid, lock}, callback

_checkClaimableSlurry: (uuid, callback) =>
@redisClient.exists "claim:#{uuid}", (error, exists) =>
Expand All @@ -94,67 +98,62 @@ class SlurrySpreader extends EventEmitter2
_claimSlurry: (uuid, callback) =>
@redisClient.setex "claim:#{uuid}", 60, Date.now(), callback

_createSlurry: (uuid, callback) =>
_createSlurry: ({uuid, lock}, callback) =>
return callback() if @_isSubscribed uuid
@_getSlurry uuid, (error, slurry) =>
return callback error if error?
return callback new Error('Slurry Not Found') unless slurry?
@slurries[uuid] = slurry.nonce
@slurries[uuid] = {lock, nonce: slurry?.nonce}
return @_releaseLockAndDelete uuid, callback unless slurry?
@emit 'create', slurry
callback()

_destroySlurry: (uuid, callback) =>
_extendLock: (uuid, callback) =>
debug '_extendLock', uuid
return callback() unless @_isSubscribed uuid
@_getSlurry uuid, (error, slurry) =>

slurry = @slurries[uuid]
slurry.lock.extend @lockTimeout, callback

_extendOrReleaseLock: (uuid, callback) =>
debug '_extendOrReleaseLock', uuid
return unless @_isSubscribed uuid

@_getSlurry uuid, (error, slurryData) =>
return callback error if error?
return callback() if slurry?.nonce? && @slurries[uuid] == slurry?.nonce
@_unclaimSlurry uuid, (error) =>
return callback error if error?
delete @slurries[uuid]
@emit 'destroy', slurry || {uuid}
callback()
return @_releaseLockAndDelete uuid, callback unless slurryData?
return @_extendLock uuid, callback if slurryData?.nonce == @slurries[uuid].nonce
return @_releaseLock uuid, callback

_getSlurry: (uuid, callback) =>
@redisClient.get "data:#{uuid}", (error, data) =>
return callback error if error?
try
slurry = JSON.parse(data)
catch error
return callback error
@_jsonParse data, callback

callback null, slurry

_handleSlurry: (uuid, callback) =>
@_checkClaimableSlurry uuid, (error, claimable) =>
return callback error if error?
return callback() unless claimable
async.series [
async.apply @_claimSlurry, uuid
async.apply @_createSlurry, uuid
async.apply @_destroySlurry, uuid
], callback
_isStopped: =>
@stopped

_isSubscribed: (uuid) =>
@slurries[uuid]?

_processQueue: (callback) =>
@queueClient.brpoplpush 'slurries', 'slurries', 30, (error, uuid) =>
return callback new Error('stopping') if @stopped
return callback error if error?
return callback() unless uuid?

@_acquireLock uuid, (error) =>
return callback error if error?
setTimeout callback, 100
_jsonParse: (data, callback) =>
return callback() if _.isEmpty data
try
return callback null, JSON.parse(data)
catch error
return callback error

_processQueueForever: =>
async.forever @_processQueue, (error) =>
throw error if error? && !@stopped

_unclaimSlurry: (uuid, callback) =>
debug '_unclaimSlurry', uuid
_releaseLock: (uuid, callback) =>
debug '_releaseLock', uuid
return callback() unless @_isSubscribed uuid
@emit 'destroy', {uuid}
slurry = @slurries[uuid]
slurry.lock.unlock callback

delete @slurries[uuid]
@redisClient.del "claim:#{uuid}", callback
_releaseLockAndDelete: (uuid, callback) =>
debug '_releaseLockAndDelete'
async.series [
async.apply @redisClient.lrem, 'slurries', 0, uuid
async.apply @_releaseLock, uuid
], callback

module.exports = SlurrySpreader
Loading

0 comments on commit c5bdf28

Please sign in to comment.