Skip to content

Commit

Permalink
Remove hacky Job.accumulate_instances.
Browse files Browse the repository at this point in the history
  • Loading branch information
myronmarston committed Mar 17, 2015
1 parent 22221a3 commit 6f675bd
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 103 deletions.
13 changes: 6 additions & 7 deletions lib/plines/dependency_graph.rb
Expand Up @@ -12,16 +12,15 @@ class DependencyGraph
class CircularDependencyError < StandardError; end

def initialize(pipeline, batch_data)
@steps = Job.accumulate_instances do
jobs_by_klass = self.class.jobs_by_klass_for(pipeline, batch_data)
jobs_by_klass = self.class.jobs_by_klass_for(pipeline, batch_data)

jobs_by_klass.values.flatten.each do |job|
job.add_dependencies_for(batch_data, jobs_by_klass)
end

@terminal_jobs = jobs_by_klass.fetch(pipeline.terminal_step)
jobs_by_klass.values.flatten.each do |job|
job.add_dependencies_for(batch_data, jobs_by_klass)
end

@terminal_jobs = jobs_by_klass.fetch(pipeline.terminal_step)
@steps = jobs_by_klass.values.flatten

setup_terminal_dependencies
detect_circular_dependencies!
end
Expand Down
46 changes: 0 additions & 46 deletions lib/plines/job.rb
Expand Up @@ -47,52 +47,6 @@ def add_dependencies_for(batch_data, jobs_by_klass)
def external_dependencies
klass.external_dependencies_for(data)
end

class << self
# Prevent users of this class from constructing a new instance directly;
# Instead, they should use #build.
#
# Note: I tried to override #new (w/ a `super` call) but it didn't work...
# I think it was overriding Struct.new rather than Job.new
# or something.
private :new

# Ensures all "identical" instances (same klass and data)
# created within the block are in fact the same object.
# This is important when constructing the dependency graph,
# so that all the dependency/dependee relationships point to
# the right objects (rather than duplicate objects).
def accumulate_instances
self.repository = Hash.new { |h,k| h[k] = new(*k) }

begin
yield
return repository.values
ensure
self.repository = nil
end
end

def build(*args, &block)
repository[args, &block]
end

private

def repository=(value)
Thread.current[:plines_job_repository] = value
end

NullRepository = Class.new do
def self.[](args, &block)
Job.send(:new, *args, &block)
end
end

def repository
Thread.current[:plines_job_repository] || NullRepository
end
end
end
end

2 changes: 1 addition & 1 deletion lib/plines/step.rb
Expand Up @@ -129,7 +129,7 @@ def jobs_for(batch_data)
@fan_out_blocks.inject([batch_data]) do |job_data_hashes, fan_out_block|
job_data_hashes.flat_map { |job_data| fan_out_block.call(job_data) }
end.map do |job_data|
Job.build(self, job_data)
Job.new(self, job_data)
end
end

Expand Down
60 changes: 11 additions & 49 deletions spec/unit/plines/job_spec.rb
Expand Up @@ -9,11 +9,11 @@ module Plines
step_class(:StepB)
step_class(:StepC)

let(:a1_1) { Job.build(P::StepA, a: 1) }
let(:a1_2) { Job.build(P::StepA, a: 1) }
let(:a2) { Job.build(P::StepA, a: 2) }
let(:b) { Job.build(P::StepB, a: 1) }
let(:c) { Job.build(P::StepC, a: 1) }
let(:a1_1) { Job.new(P::StepA, a: 1) }
let(:a1_2) { Job.new(P::StepA, a: 1) }
let(:a2) { Job.new(P::StepA, a: 2) }
let(:b) { Job.new(P::StepB, a: 1) }
let(:c) { Job.new(P::StepC, a: 1) }

it 'is uniquely identified by the class/data combination' do
steps = Set.new
Expand All @@ -24,7 +24,7 @@ module Plines

it 'raises an error if given data that is not a hash' do
expect {
Job.build(P::StepA, 5)
Job.new(P::StepA, 5)
}.to raise_error(NotAHashError)
end

Expand Down Expand Up @@ -70,48 +70,10 @@ module Plines

it 'yields when constructed if passed a block' do
yielded_object = nil
si = Job.build(P::StepA, a: 5) { |a| yielded_object = a }
si = Job.new(P::StepA, a: 5) { |a| yielded_object = a }
expect(yielded_object).to be(si)
end

it 'does not allow consumers to construct instances using .new (since we need accumulation behavior and we cannot override .new)' do
expect { Job.new(P::StepA, a: 3) }.to raise_error(NoMethodError)
end

describe '.accumulate_instances' do
it 'causes .build to return identical object instances for the same arguments for the duration of the block' do
expect(Job.build(P::StepA, a: 1)).not_to be(Job.build(P::StepA, a: 1))
s1 = s2 = nil

Job.accumulate_instances do
s1 = Job.build(P::StepA, a: 1)
s2 = Job.build(P::StepA, a: 1)
end

expect(s1).to be(s2)
end

it 'returns the accumulated instances' do
s1 = s2 = s3 = nil

instances = Job.accumulate_instances do
s1 = Job.build(P::StepA, a: 1)
s2 = Job.build(P::StepA, a: 1)
s3 = Job.build(P::StepA, a: 2)
end

expect(instances).to match_array [s1, s3]
end

it 'correctly restores the initial state if an error is raised in the block' do
expect {
Job.accumulate_instances { raise "boom" }
}.to raise_error("boom")

expect(Job.build(P::StepA, a: 1)).not_to be(Job.build(P::StepA, a: 1))
end
end

describe "#external_dependencies" do
it 'returns each of the external dependencies of the job' do
step_class(:F) do
Expand All @@ -121,7 +83,7 @@ module Plines
end
end

j = Job.build(P::F, a: 1)
j = Job.new(P::F, a: 1)
expect(j.external_dependencies.map(&:name)).to eq(["foo", "bar"])
end

Expand All @@ -133,13 +95,13 @@ module Plines
end
end

j = Job.build(P::F, a: 1)
j = Job.new(P::F, a: 1)
expect(j.external_dependencies).to eq([])

j = Job.build(P::F, foo: true)
j = Job.new(P::F, foo: true)
expect(j.external_dependencies.map(&:name)).to eq(["foo"])

j = Job.build(P::F, foo: true, bar: true)
j = Job.new(P::F, foo: true, bar: true)
expect(j.external_dependencies.map(&:name)).to eq(["foo", "bar"])
end
end
Expand Down

0 comments on commit 6f675bd

Please sign in to comment.