Skip to content
Browse files

Ensure that duplicate tasks aren't being ran more than once.

  • Loading branch information...
1 parent b4778da commit e483526e685b9e087a9ac5f05345dead90f94dc7 @viseztrance committed Apr 6, 2012
Showing with 60 additions and 5 deletions.
  1. +22 −3 lib/callisto/queue.rb
  2. +38 −2 spec/queue_spec.rb
View
25 lib/callisto/queue.rb
@@ -3,9 +3,10 @@ module Callisto
class Queue
attr_accessor :max_processes, :task
- @@stack = []
- @@processes = []
+ @@stack = []
+ @@processes = {}
@@max_processes = 10
+ @@identity = false
class << self
@@ -33,12 +34,25 @@ def processes
@@processes
end
+ def identity=(value)
+ @@identity = value
+ end
+
+ def identity
+ @@identity
+ end
+
def <<(task, options = {})
entry = new(task)
+ if identity # Ensure that the task hasn't been already enqueued
+ processes.each { |pid, running_task| return pid if entry.has?(running_task) }
+ stack.each { |current_entry| return nil if entry.has?(current_entry.task) }
+ end
if processes.size < max_processes
entry.process
else
self.stack << entry
+ nil
end
end
@@ -52,15 +66,20 @@ def initialize(task)
self.task = task
end
+ def has?(value)
+ self.class.identity.call(task) == self.class.identity.call(value)
+ end
+
def process
pid = fork do
self.class.callback.call(task)
end
- self.class.processes << pid
+ self.class.processes[pid] = task
Thread.new do
Process.wait(pid)
self.class.processes.delete(pid)
end
+ pid
end
end
View
40 spec/queue_spec.rb
@@ -4,10 +4,11 @@
describe "Queue" do
- after do
+ before do
Callisto::Queue.stack.replace([])
- Callisto::Queue.processes.replace([])
+ Callisto::Queue.processes.replace({})
Callisto::Queue.max_processes = 10
+ Callisto::Queue.identity = false
end
it "should run a task" do
@@ -39,6 +40,41 @@
File.unlink(file3.path)
end
+ describe "when running the same (identical) task more than once" do
+
+ before do
+ Callisto::Queue.identity = proc { |task| task[:id] }
+ Callisto::Queue.callback = proc { |task| task[:data].call }
+ end
+
+ it "should not run the task" do
+ Callisto::Queue << { :id => 1, :data => proc { sleep 1 } }
+ 4.times {
+ Callisto::Queue << { :id => 2, :data => proc { sleep 1 } }
+ }
+ Callisto::Queue.processes.count.must_equal 2
+ end
+
+ it "adding a task should return the original pid" do
+ pid1 = Callisto::Queue << { :id => 1, :data => proc { sleep 1 } }
+ pid2 = Callisto::Queue << { :id => 1, :data => proc { sleep 1 } }
+ pid3 = Callisto::Queue << { :id => 2, :data => proc { sleep 1 } }
+ pid1.must_equal pid2
+ pid1.wont_equal pid3
+ end
+
+ it "should not stack duplicates" do
+ Callisto::Queue.max_processes = 1
+ Callisto::Queue << { :id => 1, :data => proc { sleep 1 } }
+ 4.times {
+ Callisto::Queue << { :id => 2, :data => proc { sleep 1 } }
+ }
+ Callisto::Queue << { :id => 3, :data => proc { sleep 1 } }
+ Callisto::Queue.stack.count.must_equal 2
+ end
+
+ end
+
describe "when max tasks reached" do
before do

0 comments on commit e483526

Please sign in to comment.
Something went wrong with that request. Please try again.