Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

update to new jeweler to remove deprication notices

  • Loading branch information...
commit 78c5750bde4769e9df6ec05eed596e24b3ab69bb 1 parent 1656075
Luke van der Hoeven authored
81 DESIGN.textile
View
@@ -0,0 +1,81 @@
+h1. Queues
+
+There are a number of important queues that we employ to track data:
+
+# queues - responsilbe for pushing/popping jobs. only stores a unique id. Is
+pushed and popped, therefore a fairly "volatile" dataset.
+# data-store:[name] - hash responsible for the different types of job data.
+Stored by job id. This data is persisted until a job is successful or failed
+too many times. Then the job will be dropped and failure recorded.
+# result-store:[name] - hash responsible for storing the results by job id.
+Data will remain here until removed. Can be fetched multiple times if desired
+providing a cached retrieval.
+# failure-store:[name] - hash responsible for storing any failed jobs. Stored
+until manually cleared.
+
+In addition there is a unique id counter that gets incremented as the job queue
+grows. It is stored at "unique_id"
+
+On top of these queues, there is also a set of stores setup to keep track of
+workers and processes actively being worked.
+
+# workers - set that tracks all the registered workers. All workers not in this list
+should be destroyed.
+# working - set of workers that are actively working a job.
+
+h1. Jobs
+
+I decided to make jobs a two fold purpose vehicle. One, it would be the job
+storage mechanism through which a job would be placed on the queue for later
+execution by workers and two, it would actually do the job execution, even
+though the worker is the one that calls it. This way all job related tasks
+can be abstracted to the job class, while the worker can busy itself about
+handling the control of the job execution and not the job execution itself.
+
+Jobs will also be responsible for storing the state of the result and any
+failures that come from the job execution.
+
+A job can only live on one queue.
+
+Job will try 3 times before being taken out of the work queue and placed in the
+failure queue
+
+h1. Workers
+
+There was a big struggle in the beginning to decide how to work queued jobs.
+I originally wanted to do a more cooperative scheduling schema but found that
+would not end up taking multiple cores into consideration. Moreover, it is
+wicked hard to preempt fibers in a meaningful way without digging really deep
+into other code as well in order to retrofit them for Fibers or EventMachine.
+So I simply didn't.
+
+Instead I went with Ruby's Process library and decided to work at making it
+work as much as possible around a Unix-style processing queue so that in one
+fell swoop, we could use multiple cores as well as gain true concurrency.
+One more design decision is that we will not attempt to run multiple processes-
+per-worker. This way we can control the number of processes running by how many
+workers we choose to run, rather than by how many processes a worker is allowed
+to spawn.
+
+This presents a slight problem however. Each worker then will involve two
+processes. One parent/control-loop and one child/job-processor. This means in
+the end, we have more processes running at once, possibly chewing up more
+resources than might be ultimately necessary. This will hopefully be overcome
+by suspending the parent worker while the child process runs. This way one core
+is not chewed up by an essentially idle process. Hopefully this can be benchmarked
+to figure out if its faster to run one-to-many parent/childs or one-to-one.
+
+A worker can work multiple queues
+
+h1. Web Interface
+
+Intended to have a web frontend to control/observe workers, jobs and queues.
+
+h1. Job Observer Widget
+
+Intended to have a JS widget that will indicate when a job has been computed
+and the results are ready or a job has failed. Potential application for a
+Node.js implementation (using something like
+http://github.com/fictorial/redis-node-client). Should be easily embeddable in
+any page for simple notification.
+
7 Gemfile
View
@@ -10,4 +10,11 @@ group :development do
gem "bundler", "~> 1.0.0"
gem "jeweler", "~> 1.5.0.pre3"
gem "rcov", ">= 0"
+
end
+
+gem "redis"
+gem "yajl-ruby"
+gem "json"
+gem "mail"
+gem 'i18n'
38 Gemfile.lock
View
@@ -0,0 +1,38 @@
+GEM
+ remote: http://rubygems.org/
+ specs:
+ activesupport (3.0.0)
+ git (1.2.5)
+ i18n (0.4.1)
+ jeweler (1.5.0.pre3)
+ bundler (~> 1.0.0)
+ git (>= 1.2.5)
+ rake
+ json (1.4.6)
+ mail (2.2.6.1)
+ activesupport (>= 2.3.6)
+ mime-types
+ treetop (>= 1.4.5)
+ mime-types (1.16)
+ polyglot (0.3.1)
+ rake (0.8.7)
+ rcov (0.9.9)
+ redis (2.0.8)
+ shoulda (2.11.3)
+ treetop (1.4.8)
+ polyglot (>= 0.3.1)
+ yajl-ruby (0.7.7)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ bundler (~> 1.0.0)
+ i18n
+ jeweler (~> 1.5.0.pre3)
+ json
+ mail
+ rcov
+ redis
+ shoulda
+ yajl-ruby
2  LICENSE
View
@@ -1,4 +1,4 @@
-Copyright (c) 2010 Luke van der Hoeven
+Copyright (c) 2009 Luke van der Hoeven
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
65 README.textile
View
@@ -0,0 +1,65 @@
+h1. kthxbye
+
+Kthxbye is the answer to a fairly unique-yet-common problem: Background job
+processing when we care about the result.
+
+There are a number of projects I can think of where a job that takes longer than
+a user ought to be waiting for a result (due to server timeout length or out of
+simple app responsiveness courtesy to the user) and yet I need the result of the
+operation to be returned to the user.
+
+Here's a real-world example. I work with a set of legacy Oracle databases that
+stores much of our business logic as PLSQL procedures. Yes, this is not "The
+Rails Way™" but it's the only way for the company I work for right now.
+Many of the procedures that I run as part of several of the applications I
+support can take on average one minute or more with a standard deviation of
+almost 2 minutes (with a forced timeout of 5 minutes). That's kinda a long time
+to sit and wait on a web app.
+
+<img src="http://img.skitch.com/20100901-gadna641fj4wdeswgj74y2pssq.png" alt="RLA - IWP Analysis"/>
+
+We don't really want users sitting waiting for up to 5 minutes (when it forces
+failure) unable to do anything or (even worse) hitting refresh or the action
+again. Especially bad when this can mean the HTTP server is getting backed up
+as more and more people run these long running processes.
+
+Moreover, the users need to get response from the completed job before moving
+on. Most job processors (DJ, Resque) are setup for running jobs that do not
+require the result to be returned to the web app (think mass-mailers, queue
+population, image resizing). They just run and the output goes to a database,
+an inbox or a file server.
+
+h2. Enter Kthxbye
+
+Kthxbye is an attempt to solve this problem. It is based heavily off of
+"Resque":http://github.com/defunkt/resque and why not an addition to Resque?
+I needed some hack time with Redis on my own as I've never used it before...
+
+bq. I can learn any language or tool in a matter of days if you give me
+ 1. a good manual
+ 2. an even better project to work on.
+
+_- Prof. Shumacher_
+
+This project accomplishes both those goals. This is an attempt to learn
+something, using Resque and Redis docs as a manual, while at the same time
+creating a much needed solution to a problem.
+
+The idea is to be able to do the following:
+
+<script src="http://gist.github.com/560927.js"> </script>
+<a href=http://gist.github.com/560927>Gist</a>
+
+h2. Note on Patches/Pull Requests
+
+* Fork the project.
+* Make your feature addition or bug fix.
+* Add tests for it. This is important so I don't break it in a
+ future version unintentionally.
+* Commit, do not mess with rakefile, version, or history.
+ (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull)
+* Send me a pull request. Bonus points for topic branches.
+
+h2. Copyright
+
+Copyright (c) 2010 Luke van der Hoeven. See LICENSE for details.
12 Rakefile
View
@@ -11,21 +11,19 @@ require 'rake'
require 'jeweler'
Jeweler::Tasks.new do |gem|
- # gem is a Gem::Specification... see http://docs.rubygems.org/read/chapter/20 for more options
gem.name = "kthxbye"
- gem.summary = %Q{TODO: one-line summary of your gem}
- gem.description = %Q{TODO: longer description of your gem}
+ gem.summary = %Q{Async processing + results notification}
+ gem.description = %Q{Kthxbye is the answer to a fairly unique-yet-common problem: Background job processing when we care about the result.}
gem.email = "hungerandthirst@gmail.com"
gem.homepage = "http://github.com/plukevdh/kthxbye"
gem.authors = ["Luke van der Hoeven"]
- # Include your dependencies below. Runtime dependencies are required when using your gem,
- # and development dependencies are only needed for development (ie running rake tasks, tests, etc)
- # spec.add_runtime_dependency 'jabber4r', '> 0.1'
- # spec.add_development_dependency 'rspec', '> 1.2.3'
gem.add_development_dependency "shoulda", ">= 0"
gem.add_development_dependency "bundler", "~> 1.0.0"
gem.add_development_dependency "jeweler", "~> 1.5.0.pre3"
gem.add_development_dependency "rcov", ">= 0"
+ gem.add_dependency "redis", "~> 2.0.5"
+ gem.add_dependency "yajl-ruby", ">= 0.7.7"
+ gem.add_dependency "json", "~> 1.4.6"
end
Jeweler::RubygemsDotOrgTasks.new
1  VERSION
View
@@ -0,0 +1 @@
+0.0.2
108 lib/kthxbye.rb
View
@@ -0,0 +1,108 @@
+require 'redis'
+begin
+ require 'yajl'
+rescue
+ require 'json'
+end
+
+require 'kthxbye/config'
+require 'kthxbye/helper'
+require 'kthxbye/job'
+require 'kthxbye/worker'
+require 'kthxbye/failure'
+
+module Kthxbye
+ include Helper
+ extend self
+
+ #takes in an existing redis instance or simply connects a new instance
+ def connect( redis_instance=nil )
+ @redis = ( redis_instance || Redis.new( :host => Config.options[:redis_server], :port => Config.options[:redis_port] ) )
+ end
+
+ def redis
+ return @redis if @redis
+ Config.setup
+ self.connect
+ self.redis
+ end
+
+ #
+ def enqueue(queue, klass, *args)
+ Job.create(queue, klass, *args)
+ end
+
+ # gets the size of a given queue
+ def size(queue)
+ redis.llen("queue:#{queue}").to_i
+ end
+
+ # gets the latest latest job off the given queue
+ # returns a Job object
+ def salvage(q)
+ id = redis.lpop( "queue:#{q}" )
+ if id
+ payload = decode( redis.hget( "data-store:#{q}", id ) )
+ return Job.new(id, q, payload)
+ else
+ log "No jobs found in #{q}"
+ return nil
+ end
+ end
+
+ def poll(id)
+ Job.poll(id)
+ end
+
+ # lets us peek at the data for a given job
+ def payload_peek(queue, id)
+ Job.peek(queue, id)
+ end
+
+ # returns all the queues Kthxbye knows about
+ def queues
+ redis.smembers( :queues ).sort
+ end
+
+ # registers the queue in our "known queues" list
+ def register_queue(queue)
+ redis.sadd(:queues, queue) unless redis.sismember(:queues, queue)
+ end
+
+ # Removes the queue from the active queue listing, does not delete queue
+ # will lead to phantom queues. use delete_queue for complete removal of queue
+ def unregister_queue(queue)
+ redis.srem(:queues, queue)
+ end
+
+ # Completely removes queue: unregisters it then deletes it
+ # should return true in all cases
+ def delete_queue(queue)
+ unregister_queue(queue)
+ redis.del( "queue:#{queue}" ) || true
+ end
+
+ # returns all our registered workers
+ def workers
+ Array( redis.smembers( :workers ) )
+ end
+
+ # returns all our active workers
+ def working
+ Array( redis.smembers( :working ) )
+ end
+
+
+ # returns either the job results for a specific job (if id specified)
+ # or all the results for all the jobs on a queue
+ def job_results(queue, id=nil)
+ if id
+ decode( redis.hget( "result-store:#{queue}", id ) )
+ else
+ Array( redis.hgetall( "result-store:#{queue}" ) )
+ end
+ end
+end
+
+
+
BIN  lib/kthxbye/.failure.rb.swp
View
Binary file not shown
BIN  lib/kthxbye/.job.rb.swp
View
Binary file not shown
BIN  lib/kthxbye/.worker.rb.swp
View
Binary file not shown
34 lib/kthxbye/config.rb
View
@@ -0,0 +1,34 @@
+module Kthxbye
+ module Config
+
+ # default options for Kthxbye
+ #
+ # redis_server = the ip to connect to by defaut
+ #
+ # redis_port = default redis port
+ #
+ # attempts = default number of attempts on a failing job
+ # before moving to the failed job store
+ #
+ # vervose = more output
+ #
+ DEFAULT = {:redis_server => '127.0.0.1',
+ :redis_port => 9876,
+ :attempts => 3,
+ :verbose => false}.freeze
+
+ # configures any other args input by the user.
+ # can pull from a config.yaml file as well.
+ #
+ def self.setup( args=nil )
+ @options = DEFAULT.dup
+ @options.merge!( YAML.load('config.yaml') ) if File.exist?( 'config.yaml' )
+ @options.merge!( args ) if args
+ end
+
+ def self.options
+ return @options
+ end
+
+ end
+end
75 lib/kthxbye/failure.rb
View
@@ -0,0 +1,75 @@
+require 'mail'
+
+Mail.defaults do
+ delivery_method :sendmail
+end
+
+class Mailer
+ def self.failure(id)
+ Mail.new do
+ from 'luke.vanderhoeven@htc.hargray.com'
+ to 'luke.vanderhoeven@htc.hargray.com'
+ subject 'Kthxbye failure'
+ body "There has been a failure reported by Kthxbye for job #{id}. Please review error log."
+ end
+ end
+end
+
+module Kthxbye
+ module Failure
+ include Helper
+ extend Helper
+
+ attr_accessor :job, :exception
+
+ def self.all
+ redis.hvals( :failed ).map{|x| decode( x )}
+ end
+
+ # returns all the failed jobs
+ def self.find(id)
+ decode( redis.hget( :failed, id ) )
+ end
+
+ # gets count of all errors
+ def self.count
+ redis.hkeys( :failed ).size
+ end
+
+ # gets count of all errors of a specific type
+ def self.count_type(type)
+ vals = redis.hvals( :failed )
+ vals.each {|x| o = decode(x); vals.delete x if o['type'] !~ /#{type.to_s}/}
+ vals.size
+ end
+
+ # creates a Failure object.
+ def self.create(job, exception)
+ error = {
+ :type => exception.class.to_s,
+ :error => exception.to_s,
+ :data => job.payload,
+ :job => job.id,
+ :queue => job.queue,
+ :time => Time.now,
+ :backtrace => Array( exception.backtrace )
+ }
+
+ redis.hset( :failed, job.id, encode( error ) )
+ job.dequeue
+ Mailer.failure(job.id).deliver
+ end
+
+ # the only method allowed to clear exceptions out of the exception store
+ def self.clear_exception(id)
+ redis.hdel( :failed, id )
+ end
+
+ # retrying does not remove from failure queue. it simply adds it back into the active queue
+ def self.force_retry(id)
+ job = Failure.find(id)
+ Job.create(id, job['queue'], job['data'])
+ end
+
+ end
+end
48 lib/kthxbye/helper.rb
View
@@ -0,0 +1,48 @@
+module Kthxbye
+ module Helper
+
+ def redis
+ Kthxbye.redis
+ end
+
+ def log(msg)
+ if Kthxbye::Config.options[:verbose]
+ puts "!! #{msg} - #{Time.now.strftime("%I:%M%p")}"
+ end
+ end
+
+ def queue_from_class(klass)
+ parts = klass.to_s.split("::")
+ qname = parts.last
+ qname.downcase unless qname.empty?
+ end
+
+ #
+ # encode/decode code taken and modified from Resque
+ # (http://github.com/defunkt/resque/blob/master/lib/resque/helpers.rb)
+ #
+ def encode( data )
+ if defined? Yajl
+ Yajl::Encoder.encode(data)
+ else
+ data.to_json
+ end
+ end
+
+ def decode( data )
+ return unless data
+
+ if defined? Yajl
+ begin
+ Yajl::Parser.parse( data, :check_utf8 => false )
+ rescue Yajl::ParseError
+ end
+ else
+ begin
+ JSON.parse( data )
+ rescue JSON::ParseError
+ end
+ end
+ end
+ end
+end
121 lib/kthxbye/job.rb
View
@@ -0,0 +1,121 @@
+module Kthxbye
+ class Job
+ include Helper
+ extend Helper
+
+ attr_accessor :id, :queue, :klass, :payload
+
+ # instantiates a job for the worker to run
+ def initialize(id, queue, data)
+ @id = id.to_i
+ @queue = queue
+ @klass = Object.const_get(data['klass'])
+ @payload = data['payload']
+ @failed_attempts = 0
+ end
+
+ def self.add_to_queue(queue, id)
+ raise "Error storing job id into queue" unless redis.rpush( "queue:#{queue}", id )
+ end
+
+ # insert a job into the queue
+ def self.create(queue, klass, *args)
+ raise "Need a queue to store job in" if queue.to_s.empty?
+ raise "No class to reference job type by" if klass.nil?
+
+ # increment the unique id tracker for the queue, then get it for use
+ redis.incr "uniq_id"
+ id = redis.get "uniq_id"
+
+ # push the job id to the queue
+ Job.add_to_queue( queue, id )
+
+ # register queue
+ Kthxbye.register_queue queue
+
+ # store the job data, hashed by unique_id
+ raise "Error storing job data for later use" unless redis.hset( "data-store:#{queue}", id, encode( {:klass => klass, :payload => args} ) )
+
+ log "Created job in queue #{queue} with an unique key of #{id}"
+
+ # return the id for use later
+ return id.to_i
+ end
+
+ def self.find(id, queue)
+ data = decode( redis.hget( "data-store:#{queue}", id ) )
+ Job.new(id, queue, data)
+ end
+
+ def rerun
+ add_to_queue( @queue, @id )
+ end
+
+ # simply removes this job from the active queue and places it
+ # on the inactive list.
+ # does not remove job payload
+ def dequeue
+ redis.lrem("queue:#{@queue}", 0, @id)
+ redis.sadd("queue:#{@queue}:inactive", @id)
+ end
+
+ # removes a job and its data from their respective queues
+ def destroy
+ Job.remove(@queue, @id)
+ end
+
+ # removes all existence of this job and its data
+ def self.remove(queue, id)
+ # remove the element from the active queue
+ redis.lrem("queue:#{queue}", 0, id)
+ # be sure we also remove it from the inactive queue
+ redis.srem("queue:#{queue}:inactive")
+ # remove the job's data as well
+ redis.hdel("data-store:#{queue}", id) if redis.hexists( "data-store:#{queue}", id )
+ end
+
+ # does the heavy lifting of running a job
+ def perform
+ begin
+ result = @klass.send(:perform, *@payload)
+ redis.hset( "result-store:#{@queue}", @id, encode( result ) )
+ return result
+ rescue Exception => ex
+ @failed_attempts += 1
+ log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}"
+ return Kthxbye::Failure.create( self, ex ) if @failed_attempts >= Kthxbye::Config.options[:attempts]
+ perform
+ end
+ end
+
+ def self.peek(queue, id)
+ decode( redis.hget( "data-store:#{queue}", id ) )
+ end
+
+ # will allow us to track when this job is beign worked
+ def self.mark_active(id)
+ redis.sadd("jobs:active", id)
+ end
+
+ def self.mark_inactive(id)
+ redis.srem("jobs:active", id)
+ end
+
+ # true when job is being worked
+ def active?
+ redis.sismember("jobs:active", @id)
+ end
+
+ def self.poll(id)
+ redis.sismember("jobs:active", id)
+ end
+
+ def ==(obj)
+ @payload == obj.payload &&
+ @klass == obj.klass &&
+ @id == obj.id &&
+ @queue == obj.queue
+ end
+
+ end
+end
159 lib/kthxbye/worker.rb
View
@@ -0,0 +1,159 @@
+module Kthxbye
+ class Worker
+ include Helper
+ extend Helper
+
+ attr_accessor :sleep_for, :queues, :current_job, :current_queue
+
+ def initialize(queues, sleep_for=5)
+ setup_queues(queues)
+ @sleep_for = sleep_for
+ @current_job = nil
+ end
+
+ def setup_queues(queues)
+ if queues == "*"
+ @queues = Kthxbye.queues.sort
+ elsif queues.include? ?,
+ @queues = queues.split(",").compact
+ else
+ @queues = *queues
+ end
+ end
+
+ # major run loop. workhorse of a worker... sort of.
+ # in the end, this loop simply runs the jobs in separate
+ # processes by forking out the process then waiting for it
+ # to return. we only process one
+ def run
+ log "Starting Kthxbye::Worker on queue(s) #{@queues}"
+ startup
+
+ loop do
+ break if @terminate
+
+ if !@paused and job = grab_job
+ log "Found job #{job}"
+ working(job)
+
+ fork {
+ log "Forking..."
+ result = job.perform
+ log "Completed #{result}"
+ }
+
+ Process.wait
+ done
+ else
+ break if @sleep_for == 0
+ log "No jobs on #{@queues} - sleeping for #{@sleep_for}"
+ sleep sleep_for.to_i
+ end
+ end
+ ensure
+ unregister_worker
+ end
+
+ def queues
+ @queues.sort
+ end
+
+ # startup actions
+ def startup
+ register_worker
+ register_signals
+ end
+
+ # adds worker to the workers list
+ def register_worker
+ log "Registered worker #{self}"
+ redis.sadd( :workers, self ) if !exists?
+ end
+
+ # removes the worker from our workers list
+ def unregister_worker
+ log "Unregistered worker #{self}" if exists?
+ redis.srem :workers, self
+ end
+
+ # start working actions
+ def working(job)
+ redis.sadd( :working, self )
+ @current_job = job
+
+ Job.mark_active(job)
+ end
+
+ # must be in working list and have a current job
+ def working?
+ redis.sismember( :working, self ) &&
+ @current_job != nil
+ end
+
+ # job complete actions
+ def done
+ redis.srem( :working, self )
+ Job.mark_inactive(@current_job)
+ @current_job = nil
+ end
+
+ #
+ # thanks to http://github.com/defunkt/resque/blob/master/lib/resque/worker.rb for these signals
+ #
+ def register_signals
+ trap('TERM') { shutdown! }
+ trap('INT') { shutdown! }
+
+ begin
+ trap('QUIT') { shutdown }
+ trap('USR1') { shutdown }
+ trap('USR2') { log "Paused"; @paused = true }
+ trap('CONT') { log "Unpaused"; @paused = false }
+ rescue ArgumentError
+ warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
+ end
+
+ log "Registered signals"
+ end
+
+ def shutdown
+ log "Shutting down worker #{self}"
+ unregister_worker
+ @terminate = true
+ end
+
+ def shutdown!
+ kill_child
+ shutdown
+ end
+
+ def kill_child
+ if @child
+ log "Killing child at #{@child}"
+ if system("ps -o pid,state -p #{@child}")
+ Process.kill("KILL", @child) rescue nil
+ else
+ log "Child #{@child} not found, restarting."
+ shutdown
+ end
+ end
+ end
+
+ def grab_job
+ job = nil
+ @queues.each do |q|
+ @current_queue = q
+ log "Checking \"#{q}\" queue for jobs"
+ job = Kthxbye.salvage(q)
+ break unless job.nil?
+ end
+
+ return job || false
+ end
+
+ def exists?
+ redis.sismember( :workers, self )
+ end
+
+ end
+end
115 test/redis-test.conf
View
@@ -0,0 +1,115 @@
+# Redis configuration file example
+
+# By default Redis does not run as a daemon. Use 'yes' if you need it.
+# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
+daemonize yes
+
+# When run as a daemon, Redis write a pid file in /var/run/redis.pid by default.
+# You can specify a custom pid file location here.
+pidfile ./test/redis-test.pid
+
+# Accept connections on the specified port, default is 6379
+port 9876
+
+# If you want you can bind a single interface, if the bind option is not
+# specified all the interfaces will listen for connections.
+#
+# bind 127.0.0.1
+
+# Close the connection after a client is idle for N seconds (0 to disable)
+timeout 300
+
+# Save the DB on disk:
+#
+# save <seconds> <changes>
+#
+# Will save the DB if both the given number of seconds and the given
+# number of write operations against the DB occurred.
+#
+# In the example below the behaviour will be to save:
+# after 900 sec (15 min) if at least 1 key changed
+# after 300 sec (5 min) if at least 10 keys changed
+# after 60 sec if at least 10000 keys changed
+save 900 1
+save 300 10
+save 60 10000
+
+# The filename where to dump the DB
+dbfilename dump.rdb
+
+# For default save/load DB in/from the working directory
+# Note that you must specify a directory not a file name.
+dir ./test/
+
+# Set server verbosity to 'debug'
+# it can be one of:
+# debug (a lot of information, useful for development/testing)
+# notice (moderately verbose, what you want in production probably)
+# warning (only very important / critical messages are logged)
+loglevel debug
+
+# Specify the log file name. Also 'stdout' can be used to force
+# the demon to log on the standard output. Note that if you use standard
+# output for logging but daemonize, logs will be sent to /dev/null
+logfile stdout
+
+# Set the number of databases. The default database is DB 0, you can select
+# a different one on a per-connection basis using SELECT <dbid> where
+# dbid is a number between 0 and 'databases'-1
+databases 16
+
+################################# REPLICATION #################################
+
+# Master-Slave replication. Use slaveof to make a Redis instance a copy of
+# another Redis server. Note that the configuration is local to the slave
+# so for example it is possible to configure the slave to save the DB with a
+# different interval, or to listen to another port, and so on.
+
+# slaveof <masterip> <masterport>
+
+################################## SECURITY ###################################
+
+# Require clients to issue AUTH <PASSWORD> before processing any other
+# commands. This might be useful in environments in which you do not trust
+# others with access to the host running redis-server.
+#
+# This should stay commented out for backward compatibility and because most
+# people do not need auth (e.g. they run their own servers).
+
+# requirepass foobared
+
+################################### LIMITS ####################################
+
+# Set the max number of connected clients at the same time. By default there
+# is no limit, and it's up to the number of file descriptors the Redis process
+# is able to open. The special value '0' means no limts.
+# Once the limit is reached Redis will close all the new connections sending
+# an error 'max number of clients reached'.
+
+# maxclients 128
+
+# Don't use more memory than the specified amount of bytes.
+# When the memory limit is reached Redis will try to remove keys with an
+# EXPIRE set. It will try to start freeing keys that are going to expire
+# in little time and preserve keys with a longer time to live.
+# Redis will also try to remove objects from free lists if possible.
+#
+# If all this fails, Redis will start to reply with errors to commands
+# that will use more memory, like SET, LPUSH, and so on, and will continue
+# to reply to most read-only commands like GET.
+#
+# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
+# 'state' server or cache, not as a real DB. When Redis is used as a real
+# database the memory usage will grow over the weeks, it will be obvious if
+# it is going to use too much memory in the long run, and you'll have the time
+# to upgrade. With maxmemory after the limit is reached you'll start to get
+# errors for write operations, and this may even lead to DB inconsistency.
+
+# maxmemory <bytes>
+
+############################### ADVANCED CONFIG ###############################
+
+# Glue small output buffers together in order to send small replies in a
+# single TCP packet. Uses a bit more CPU but most of the times it is a win
+# in terms of number of queries per second. Use 'yes' if unsure.
+glueoutputbuf yes
86 test/test_helper.rb
View
@@ -0,0 +1,86 @@
+#
+# setup code used and modified from http://github.com/defunkt/resque/blob/master/test/test_helper.rb
+#
+require 'rubygems'
+require 'test/unit'
+require 'shoulda'
+
+$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
+$LOAD_PATH.unshift(File.dirname(__FILE__))
+require 'kthxbye'
+
+dir = File.dirname(File.expand_path(__FILE__))
+require 'test/unit'
+require 'rubygems'
+
+
+# make sure we can run redis
+#
+
+if !system("which redis-server")
+ puts '', "** can't find `redis-server` in your path"
+ puts "** try running `sudo rake install`"
+ abort ''
+end
+
+
+#
+# start our own redis when the tests start,
+# kill it when they end
+#
+
+at_exit do
+ next if $!
+
+ if defined?(MiniTest)
+ exit_code = MiniTest::Unit.new.run(ARGV)
+ else
+ exit_code = Test::Unit::AutoRunner.run
+ end
+
+ pid = `ps -A -o pid,command | grep [r]edis-test`.split(" ")[0]
+ puts "Killing test redis server..."
+ `rm -f #{dir}/dump.rdb`
+ Process.kill("KILL", pid.to_i)
+ exit exit_code
+end
+
+puts "Starting redis for testing at localhost:9876..."
+`redis-server #{dir}/redis-test.conf`
+
+class SimpleJob
+ def self.perform(p1, p2)
+ end
+end
+
+class SomeQueueJob < SimpleJob
+ @queue = :test
+end
+
+class BadJob
+ def self.perform
+ raise "Bad job!"
+ end
+end
+
+class GoodJob
+ def self.perform(name)
+ "Good job, #{name}"
+ end
+end
+
+class LongJob
+ def self.perform(data)
+ sleep 10
+ puts "I just slept for 10 seconds with #{data} keeping me company"
+ data.gsub(" ", "_")
+ end
+end
+
+
+class BadJobWithSyntaxError
+ def self.perform
+ raise SyntaxError, "Extra Bad job!"
+ end
+end
+
90 test/test_kthxbye.rb
View
@@ -1,7 +1,91 @@
-require 'helper'
+require 'test_helper'
class TestKthxbye < Test::Unit::TestCase
- should "probably rename this file and start testing for real" do
- flunk "hey buddy, you should probably rename this file and start testing for real"
+ context "See Kthxbye Configuration" do
+
+ should "configure an app with default params" do
+ Kthxbye::Config.setup
+
+ assert_equal '127.0.0.1', Kthxbye::Config.options[:redis_server]
+ assert_equal 9876, Kthxbye::Config.options[:redis_port]
+ assert_equal false, Kthxbye::Config.options[:verbose]
+ end
+
+ should "configure an app with given params" do
+ k = Kthxbye::Config.setup(:redis_server => "localhost", :redis_port => 8080, :verbose => true)
+
+ assert_equal 'localhost', Kthxbye::Config.options[:redis_server]
+ assert_equal 8080, Kthxbye::Config.options[:redis_port]
+ assert_equal true, Kthxbye::Config.options[:verbose]
+ end
end
+
+ context "See Kthxbye" do
+ setup do
+ Kthxbye.redis.flushall
+ end
+
+ should "register unregister and delete queues" do
+ assert_equal 0, Kthxbye.queues.size
+
+ assert Kthxbye.register_queue "dogs"
+ assert Kthxbye.register_queue "cats"
+
+ assert_equal 2, Kthxbye.queues.size
+ assert_equal ["cats", "dogs"], Kthxbye.queues
+
+ assert Kthxbye.unregister_queue("dogs")
+ assert_equal ["cats"], Kthxbye.queues
+
+ assert Kthxbye.delete_queue("cats")
+ assert_equal [], Kthxbye.queues
+ end
+
+ should "store a couple of jobs and return good ids" do
+ assert id = Kthxbye.enqueue("test", SimpleJob, {:hello => "world"}, "test params")
+ assert id2 = Kthxbye.enqueue("test", GoodJob, "I am a sentence to print")
+ assert_equal 1, id
+ assert_equal 2, id2
+ end
+
+ should "enqueue a job and show a queue size of one" do
+ assert_equal 0, Kthxbye.size("test")
+ Kthxbye.enqueue("test", SimpleJob, {:hello => "world"}, "test params")
+ assert_equal 1, Kthxbye.size("test")
+ end
+
+ should "enqueue a job and peek at the datastores" do
+ id = Kthxbye.enqueue("test", SimpleJob, {:hello => "world"}, "test params")
+ assert_equal [{'hello' => "world"}, "test params"], Kthxbye.payload_peek("test", id)['payload']
+ end
+
+ should "show its queues correctly" do
+ Kthxbye.enqueue("these-jobs", SimpleJob, {:hello => "world"}, "test params")
+ Kthxbye.enqueue("those-jobs", SimpleJob, {:hello => "world"}, "test params")
+
+ assert_equal ["these-jobs", "those-jobs"], Kthxbye.queues
+
+ Kthxbye.delete_queue("these-jobs")
+ assert_equal ["those-jobs"], Kthxbye.queues
+ end
+
+ should "grab a job off the queue" do
+ Kthxbye.enqueue("these-jobs", SimpleJob, {:hello => "world"}, "test params")
+
+ assert job = Kthxbye.salvage("these-jobs")
+ assert_equal Kthxbye::Job, job.class
+ assert_equal 1, job.id
+ assert_equal "these-jobs", job.queue
+ assert_equal SimpleJob, job.klass
+ end
+
+ should "return a job" do
+ id = Kthxbye.enqueue( "test", SimpleJob, 1, 2 )
+ assert job = Kthxbye::Job.find( id, "test" )
+ assert_equal Kthxbye::Job, job.class
+ assert_equal [1, 2], job.payload
+ end
+ end
+
end
+
124 test/test_worker.rb
View
@@ -0,0 +1,124 @@
+
+class TestWorker < Test::Unit::TestCase
+ context "See Kthxbye::Worker" do
+ setup do
+ Kthxbye::Config.setup(:verbose => false, :redis_port => 9876)
+ Kthxbye.redis.flushall
+ end
+
+ should "register and show a worker" do
+ worker = Kthxbye::Worker.new("test", 0)
+ worker.run do
+ assert_equal [worker.to_s], Kthxbye.workers
+ end
+ end
+
+ should "startup a worker and register it as working" do
+ worker = Kthxbye::Worker.new("test", 0)
+ worker.run do
+ assert_equal [worker.to_s], Kthxbye.working
+ end
+ end
+
+ should "pull jobs from given queue and run them" do
+ id = Kthxbye.enqueue("test", SimpleJob, {:hello => "world"}, "test params")
+
+ assert worker = Kthxbye::Worker.new( "test", 0 )
+ worker.run
+ end
+
+ should "store a job's results and retrieve them" do
+ id = Kthxbye.enqueue("test", GoodJob, "Lukas")
+
+ worker = Kthxbye::Worker.new("test", 0 )
+ worker.run
+
+ assert_equal 1, Kthxbye.job_results("test").size
+ assert_equal "Good job, Lukas", Kthxbye.job_results("test", id)
+
+ end
+
+ should "capture and report failed jobs" do
+ id = Kthxbye.enqueue( "bad", BadJob )
+ id2 = Kthxbye.enqueue( "bad", BadJobWithSyntaxError )
+
+ assert_equal 0, Kthxbye::Failure.count
+
+ worker = Kthxbye::Worker.new("bad", 0 )
+ worker.run
+
+ assert_equal 2, Kthxbye::Failure.count
+ assert_equal "Bad job!", Kthxbye::Failure.find(id)["error"]
+ assert_equal "Extra Bad job!", Kthxbye::Failure.find(id2)["error"]
+
+ assert_equal ["Bad job!", "Extra Bad job!"], Kthxbye::Failure.all.map{|x| x['error']}
+
+ assert_equal 1, Kthxbye::Failure.count_type(RuntimeError)
+ end
+
+ should "report queues available to workers when added via csv (alphabetical)" do
+ worker = Kthxbye::Worker.new("good,bad,ugly")
+ assert_equal ["good", "bad", "ugly"].sort, worker.queues
+ end
+
+ should "report queues available to workers when using * (alphabetical)" do
+ Kthxbye.register_queue("hello")
+ Kthxbye.register_queue("world")
+
+ worker = Kthxbye::Worker.new("*")
+ assert_equal ["hello", "world"].sort, worker.queues
+ end
+
+ should "report queues available to worker when passed an array (alphabetical)" do
+ worker = Kthxbye::Worker.new(["one", "two", "three"])
+ assert_equal ["one", "two", "three"].sort, worker.queues
+ end
+
+ should "return job not active before worked" do
+ id = Kthxbye.enqueue( "test", SimpleJob, 1, 2 )
+ assert_equal false, Kthxbye.poll(id)
+ end
+
+
+ should "return job active once working it" do
+ id = Kthxbye.enqueue( "test", SimpleJob, 1, 2 )
+ worker = Kthxbye::Worker.new("test")
+ worker.working(id)
+
+ job = Kthxbye::Job.find(id, "test")
+
+ assert worker.working?
+ assert_equal Kthxbye::Job.find(worker.current_job, "test"), job
+
+ assert job.active?
+ end
+
+ should "know when it is working" do
+ id = Kthxbye.enqueue( "test", SimpleJob, 1, 2 )
+ worker = Kthxbye::Worker.new("test", 0)
+ worker.run do
+ assert worker.working?
+ assert Job.find(id).active?
+ end
+ end
+
+ should "know which is the current queue" do
+ Kthxbye.enqueue( "one", SimpleJob, 1, 2 )
+
+ worker = Kthxbye::Worker.new("*", 0)
+ worker.run do
+ assert_equal "one", worker.current_queue
+ end
+ end
+
+ should "know which is the current job id" do
+ id = Kthxbye.enqueue( "one", SimpleJob, 1, 2 )
+
+ worker = Kthxbye::Worker.new("*", 0)
+ worker.run do
+ assert_equal id, worker.current_job
+ end
+ end
+
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.