Skip to content

Commit

Permalink
Replace custom serialization code with YAML magic
Browse files Browse the repository at this point in the history
  • Loading branch information
bkeepers committed May 3, 2010
1 parent 31e6d07 commit 481d123
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 117 deletions.
1 change: 1 addition & 0 deletions Rakefile
Expand Up @@ -28,6 +28,7 @@ Jeweler::Tasks.new do |s|
s.add_development_dependency "dm-observer"
s.add_development_dependency "dm-aggregates"
s.add_development_dependency "dm-validations"
s.add_development_dependency "dm-serializer"
s.add_development_dependency "do_sqlite3"
s.add_development_dependency "database_cleaner"
end
Expand Down
18 changes: 9 additions & 9 deletions lib/delayed/backend/active_record.rb
@@ -1,16 +1,16 @@
require 'active_record'

YAML.add_domain_type("ActiveRecord,2007", "") do |type, val|
type.split(':', 3).last.constantize.find(val['attributes']['id'])
end

class ActiveRecord::Base
def self.load_for_delayed_job(id)
if id
find(id)
else
super
end
def to_yaml_type
"!ActiveRecord,2007/#{self.class}"
end
def dump_for_delayed_job
"#{self.class};#{id}"

def to_yaml_properties
['@attributes']
end
end

Expand Down
17 changes: 1 addition & 16 deletions lib/delayed/backend/data_mapper.rb
@@ -1,22 +1,7 @@
require 'dm-core'
require 'dm-observer'
require 'dm-aggregates'

module DataMapper
module Resource
module ClassMethods
def load_for_delayed_job(id)
find!(id)
end
end

module InstanceMethods
def dump_for_delayed_job
"#{self.class};#{id}"
end
end
end
end
require 'dm-serializer'

module Delayed
module Backend
Expand Down
26 changes: 14 additions & 12 deletions lib/delayed/backend/mongo_mapper.rb
@@ -1,18 +1,20 @@
require 'mongo_mapper'

module ::MongoMapper
module Document
module ClassMethods
def load_for_delayed_job(id)
find!(id)
end
end
YAML.add_domain_type("MongoMapper,2010", "") do |type, val|
begin
type.split(':', 3).last.constantize.find!(val['_id'])
rescue MongoMapper::DocumentNotFound
nil
end
end

module InstanceMethods
def dump_for_delayed_job
"#{self.class};#{id}"
end
end
module MongoMapper::Document
def to_yaml_type
"!MongoMapper,2010/#{self.class}"
end

def to_yaml_properties
['@_id']
end
end

Expand Down
37 changes: 37 additions & 0 deletions lib/delayed/class_to_yaml.rb
@@ -0,0 +1,37 @@
require 'yaml'

class Module
yaml_as "tag:ruby.yaml.org,2002:module"

def Module.yaml_new( klass, tag, val )
if String === val
val.split(/::/).inject(Object) {|m, n| m.const_get(n)}
else
raise YAML::TypeError, "Invalid Module: " + val.inspect
end
end

def to_yaml( opts = {} )
YAML::quick_emit( nil, opts ) { |out|
out.scalar( "tag:ruby.yaml.org,2002:module", self.name, :plain )
}
end
end

class Class
yaml_as "tag:ruby.yaml.org,2002:class"

def Class.yaml_new( klass, tag, val )
if String === val
val.split(/::/).inject(Object) {|m, n| m.const_get(n)}
else
raise YAML::TypeError, "Invalid Class: " + val.inspect
end
end

def to_yaml( opts = {} )
YAML::quick_emit( nil, opts ) { |out|
out.scalar( "tag:ruby.yaml.org,2002:class", self.name, :plain )
}
end
end
51 changes: 4 additions & 47 deletions lib/delayed/performable_method.rb
@@ -1,62 +1,19 @@
class Class
def load_for_delayed_job(arg)
self
end

def dump_for_delayed_job
name
end
end

module Delayed
class PerformableMethod < Struct.new(:object, :method, :args)
STRING_FORMAT = /^LOAD\;([A-Z][\w\:]+)(?:\;(\w+))?$/

class LoadError < StandardError
end

def initialize(object, method, args)
raise NoMethodError, "undefined method `#{method}' for #{object.inspect}" unless object.respond_to?(method)

self.object = dump(object)
self.args = args.map { |a| dump(a) }
self.object = object
self.args = args
self.method = method.to_sym
end

def display_name
if STRING_FORMAT === object
"#{$1}#{$2 ? '#' : '.'}#{method}"
else
"#{object.class}##{method}"
end
"#{object.class}##{method}"
end

def perform
load(object).send(method, *args.map{|a| load(a)})
rescue PerformableMethod::LoadError
# We cannot do anything about objects that can't be loaded
true
end

private

def load(obj)
if STRING_FORMAT === obj
$1.constantize.load_for_delayed_job($2)
else
obj
end
rescue => e
Delayed::Worker.logger.warn "Could not load object for job: #{e.message}"
raise PerformableMethod::LoadError
end

def dump(obj)
if obj.respond_to?(:dump_for_delayed_job)
"LOAD;#{obj.dump_for_delayed_job}"
else
obj
end
object.send(method, *args) if object
end
end
end
1 change: 1 addition & 0 deletions lib/delayed_job.rb
Expand Up @@ -2,6 +2,7 @@

require File.dirname(__FILE__) + '/delayed/message_sending'
require File.dirname(__FILE__) + '/delayed/performable_method'
require File.dirname(__FILE__) + '/delayed/class_to_yaml'
require File.dirname(__FILE__) + '/delayed/backend/base'
require File.dirname(__FILE__) + '/delayed/worker'
require File.dirname(__FILE__) + '/delayed/railtie' if defined?(::Rails::Railtie)
Expand Down
4 changes: 2 additions & 2 deletions spec/backend/mongo_mapper_job_spec.rb
Expand Up @@ -51,7 +51,7 @@ def tell
job = story.delay.tell

job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.object.should == "LOAD;MongoStory;#{story.id}"
job.payload_object.object.should == story
job.payload_object.method.should == :tell
job.payload_object.args.should == []
job.payload_object.perform.should == 'Once upon a time...'
Expand All @@ -62,7 +62,7 @@ def tell
job = MongoStoryReader.new.delay.read(story)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :read
job.payload_object.args.should == ["LOAD;MongoStory;#{story.id}"]
job.payload_object.args.should == [story]
job.payload_object.perform.should == 'Epilog: Once upon a time...'
end
end
Expand Down
64 changes: 36 additions & 28 deletions spec/backend/shared_backend_spec.rb
@@ -1,3 +1,9 @@
class NamedJob < Struct.new(:perform)
def display_name
'named_job'
end
end

shared_examples_for 'a backend' do
def create_job(opts = {})
@backend.create(opts.merge(:payload_object => SimpleJob.new))
Expand Down Expand Up @@ -43,34 +49,36 @@ def create_job(opts = {})
job = @backend.enqueue M::ModuleJob.new
lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
end

it "should raise an DeserializationError when the job class is totally unknown" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

it "should try to load the class when it is unknown at the time of the deserialization" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

describe "payload_object" do
it "should raise a DeserializationError when the job class is totally unknown" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

it "should try to load the class when it is unknown at the time of the deserialization" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

it "should try include the namespace when loading unknown objects" do
job = @backend.new :handler => "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should try include the namespace when loading unknown objects" do
job = @backend.new :handler => "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

it "should also try to load structs when they are unknown (raises TypeError)" do
job = @backend.new :handler => "--- !ruby/struct:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should also try to load structs when they are unknown (raises TypeError)" do
job = @backend.new :handler => "--- !ruby/struct:JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end

it "should try include the namespace when loading unknown structs" do
job = @backend.new :handler => "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
it "should try include the namespace when loading unknown structs" do
job = @backend.new :handler => "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
end

describe "find_available" do
Expand Down Expand Up @@ -171,10 +179,10 @@ def create_job(opts = {})
it "should be the class name of the job that was enqueued" do
@backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
end

it "should be the method that will be called if its a performable method object" do
@job = Story.delay.create
@job.name.should == "Story.create"
job = @backend.new(:payload_object => NamedJob.new)
job.name.should == 'named_job'
end

it "should be the instance method that will be called if its a performable method object" do
Expand Down
6 changes: 3 additions & 3 deletions spec/performable_method_spec.rb
Expand Up @@ -19,15 +19,15 @@ def read(story)
story = Story.create :text => 'Once upon...'
p = Delayed::PerformableMethod.new(story, :tell, [])
p.class.should == Delayed::PerformableMethod
p.object.should == "LOAD;Story;#{story.id}"
p.object.should == story
p.method.should == :tell
p.args.should == []
p.perform.should == 'Once upon...'
end

it "should allow class methods to be called on ActiveRecord models" do
p = Delayed::PerformableMethod.new(Story, :count, [])
lambda { p.send(:load, p.object) }.should_not raise_error
lambda { p.perform.should be_kind_of(Fixnum) }.should_not raise_error
end

it "should store arguments as string if they are active record objects" do
Expand All @@ -36,7 +36,7 @@ def read(story)
p = Delayed::PerformableMethod.new(reader, :read, [story])
p.class.should == Delayed::PerformableMethod
p.method.should == :read
p.args.should == ["LOAD;Story;#{story.id}"]
p.args.should == [story]
p.perform.should == 'Epilog: Once upon...'
end
end

0 comments on commit 481d123

Please sign in to comment.