Permalink
Browse files

Workaholic implements workaholic

  • Loading branch information...
1 parent 740c510 commit 1a534b8b47439467f0a069d1e7e5ba9dab705498 @mewlist committed May 15, 2012
Showing with 155 additions and 4 deletions.
  1. +1 −1 .gitignore
  2. +4 −0 lib/workaholic.rb
  3. +7 −0 lib/workaholic/job.rb
  4. +60 −0 lib/workaholic/worker.rb
  5. +8 −0 spec/workaholic/job_spec.rb
  6. +45 −0 spec/workaholic/worker_spec.rb
  7. +30 −3 spec/workaholic_spec.rb
View
@@ -42,7 +42,7 @@ pkg
#.\#*
# For vim:
-#*.swp
+*.swp
# For redcar:
#.redcar
View
@@ -1,2 +1,6 @@
+require 'workaholic/job'
+require 'workaholic/worker'
+
module Workaholic
+
end
@@ -0,0 +1,7 @@
+module Workaholic
+ class Job
+ def run
+ # override me
+ end
+ end
+end
@@ -0,0 +1,60 @@
+require 'thread'
+
+module Workaholic
+ class RunningError < StandardError; end
+ class Worker
+ attr_reader :queue
+ attr_reader :state
+
+ def initialize
+ @queue = Queue.new
+ @state = :stopped
+ @threads ||= []
+ end
+
+ def start( thread_count = 2 )
+ raise RunningError unless state == :stopped
+
+ @state = :running
+ thread_count.times do |i|
+ t = Thread.start do
+ while [:running, :stopping].include? @state
+ run @queue.pop( true )
+ sleep 0.016
+ break if @state == :stopping
+ end
+ end
+ @threads.push t
+ end
+
+ if block_given?
+ yield self
+ sleep 0.1 while queue.size > 0
+ stop
+ end
+ end
+
+ def stop
+ @state = :stopping
+ while !@threads.select{ |t| t.alive? }.empty?
+ sleep 1
+ end
+ @state = :stopped
+ @threads = []
+ end
+
+ def run( job )
+ begin
+ job.run
+ rescue ThreadError # when queue is empty
+ sleep 0.1
+ rescue e
+ raise e
+ end
+ end
+
+ def push( job )
+ @queue.push job
+ end
+ end
+end
@@ -0,0 +1,8 @@
+require File.expand_path(File.dirname(__FILE__) + '/../spec_helper')
+
+describe Workaholic::Job do
+ describe '#run' do
+ subject { Workaholic::Job.new }
+ it { should respond_to :run}
+ end
+end
@@ -0,0 +1,45 @@
+require File.expand_path(File.dirname(__FILE__) + '/../spec_helper')
+
+describe Workaholic do
+ let(:worker) { Workaholic::Worker.new }
+
+ describe '#start' do
+ subject { worker.start( 1 ) }
+
+ it 'should start threads' do
+ Thread.should_receive(:start)
+ subject
+ end
+
+ it 'should raise error when worker is already running' do
+ worker.start 2
+ expect { subject }.to raise_error( Workaholic::RunningError )
+ end
+
+ it 'should call stop when block given' do
+ worker.should_receive(:stop)
+ worker.start 2 do
+ worker.push Workaholic::Job.new
+ end
+ end
+ end
+
+ describe '#run' do
+ subject { worker.run job }
+ let(:job) { Workaholic::Job.new }
+
+ it 'should run job' do
+ job.should_receive(:run)
+ subject
+ end
+ end
+
+ describe '#push' do
+ subject { worker.push job }
+ let(:job) { Workaholic::Job.new }
+
+ it 'should push job in queue' do
+ expect{ subject }.to change( worker.queue, :size ).by(1)
+ end
+ end
+end
@@ -1,7 +1,34 @@
require File.expand_path(File.dirname(__FILE__) + '/spec_helper')
-describe "Workaholic" do
- it "fails" do
- fail "hey buddy, you should probably rename this file and start specing for real"
+
+class TestJob < Workaholic::Job
+ attr_accessor :counter
+ def run
+ self.class.count_up!
+ end
+
+ @count = 0
+ def self.count; @count end
+ def self.count_up!; @count+=1 end
+end
+
+describe Workaholic do
+ let(:worker) { Workaholic::Worker.new }
+ let(:job1) { TestJob.new }
+ let(:job2) { TestJob.new }
+
+ describe 'run parallel job' do
+ subject { TestJob.count }
+before do
+ worker.start 100
+ 1000.times do
+ worker.push job1
+ worker.push job2
+ end
+ sleep 0.1 while worker.queue.size > 0
+ worker.stop
+ end
+
+ it { should == 2000 }
end
end

0 comments on commit 1a534b8

Please sign in to comment.