Skip to content

Commit

Permalink
First pass at constructing dependency graph.
Browse files Browse the repository at this point in the history
  • Loading branch information
myronmarston committed Apr 23, 2012
1 parent 64390b8 commit de4a07e
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .rspec
@@ -1,3 +1,3 @@
--color --color
--format documentation --format documentation
--random --order random
20 changes: 19 additions & 1 deletion lib/plines.rb
@@ -1,5 +1,23 @@
require "plines/version" require "plines/version"
require 'plines/step'
require 'qless'


module Plines module Plines
# Your code goes here... extend self
attr_writer :qless

def qless
@qless ||= Qless::Client.new
end

def default_queue
@default_queue ||= qless.queue("plines")
end

def start_processing(data = {})
Plines::Step.all.each do |klass|
default_queue.put(klass, {})
end
end
end end

17 changes: 17 additions & 0 deletions lib/plines/dependency_graph.rb
@@ -0,0 +1,17 @@
module Plines
class DependencyGraph
def initialize
@step_repository = Hash.new { |h,k| h[k] = StepInstance.new(*k) }
yield self if block_given?
end

def step_for(*args)
@step_repository[args]
end

def steps
@step_repository.values
end
end
end

75 changes: 75 additions & 0 deletions lib/plines/step.rb
@@ -0,0 +1,75 @@
require 'plines/dependency_graph'

module Plines
StepInstance = Struct.new(:klass, :data) do
attr_reader :dependencies, :dependees

def initialize(*args)
super
@dependencies = Set.new
@dependees = Set.new
yield self if block_given?
end

def add_dependency(step)
dependencies << step
step.dependees << self
self
end
end

module Step
def self.all
@all ||= []
end

def self.to_dependency_graph(job_data)
DependencyGraph.new do |graph|
all.each do |step_klass|
step = graph.step_for(step_klass, job_data)
step_klass.dependencies_for(job_data).each do |dep|
step.add_dependency(graph.step_for(dep.klass, dep.data))
end
end
end
end

def self.included(klass)
klass.extend ClassMethods
Plines::Step.all << klass
end

module ClassMethods
def dependency_declarations
@dependency_declarations ||= []
end

def depends_on(*args, &block)
args.each do |klass_name|
depends_on do |data|
StepInstance.new(module_namespace.const_get(klass_name), data)
end
end

dependency_declarations << block if block
end

def dependencies_for(job_data)
dependency_declarations.flat_map { |dd| dd[job_data] }
end

def has_no_dependencies?
dependency_declarations.none?
end

private

def module_namespace
namespaces = name.split('::')
namespaces.pop # ignore the last one
namespaces.inject(Object) { |ns, mod| ns.const_get(mod) }
end
end
end
end

51 changes: 51 additions & 0 deletions spec/spec_helper.rb
@@ -1,9 +1,60 @@
if File.exist?('./config/redis_connection_url.txt')
ENV['REDIS_URL'] = File.read('./config/redis_connection_url.txt')

# use a different db number for test environment
if db_num = ENV['REDIS_URL'][%r|\/(\d{1,2})\z|, 1]
db_num = db_num.to_i
ENV['REDIS_URL'].gsub!(%r|\/#{db_num}\z|, "/#{db_num + 1}")
end
end

require_relative '../config/setup_load_paths' require_relative '../config/setup_load_paths'
require 'debugger'
Debugger.start
require 'rspec/fire' require 'rspec/fire'


RSpec::Matchers.define :have_enqueued_waiting_jobs_for do |*klasses|
match do |_|
jobs = Plines.default_queue.peek(klasses.size + 1)
jobs.map { |j| j.klass.to_s }.should =~ klasses.map(&:to_s)
end
end

module PlinesSpecHelpers
def step_class(name, &block)
block ||= Proc.new { }
klass = Class.new { include Plines::Step; module_eval(&block) }
stub_const(name.to_s, klass)
end

def enqueued_waiting_job_klass_names(expected)
jobs = Plines.default_queue.peek(expected + 1)
jobs.map { |j| j.klass.to_s }
end

module ClassMethods
def step_class(name, &block)
before(:each) { step_class(name, &block) }
end
end
end

RSpec.configure do |config| RSpec.configure do |config|
config.treat_symbols_as_metadata_keys_with_true_values = true config.treat_symbols_as_metadata_keys_with_true_values = true
config.run_all_when_everything_filtered = true config.run_all_when_everything_filtered = true
config.filter_run :f config.filter_run :f
config.include RSpec::Fire config.include RSpec::Fire
config.include PlinesSpecHelpers
config.extend PlinesSpecHelpers::ClassMethods
config.before(:each) do
if defined?(Plines::Step)
Plines::Step.all.clear
end
end
end end

shared_context "redis", :redis do
before(:all) { $_redis ||= ::Redis.connect }
before(:each) { $_redis.flushdb }
end

147 changes: 147 additions & 0 deletions spec/unit/plines/step_spec.rb
@@ -0,0 +1,147 @@
require 'spec_helper'
require 'plines/step'
require 'set'

module Plines
describe StepInstance do
step_class(:StepA)
step_class(:StepB)

let(:a1_1) { StepInstance.new(StepA, 1) }
let(:a1_2) { StepInstance.new(StepA, 1) }
let(:a2) { StepInstance.new(StepA, 2) }
let(:b) { StepInstance.new(StepB, 1) }

it 'is uniquely identified by the class/data combination' do
steps = Set.new
steps << a1_1 << a1_2 << a2 << b
steps.size.should eq(3)
steps.map(&:object_id).should =~ [a1_1, a2, b].map(&:object_id)
end

it 'initializes #dependencies and #dependees to empty sets' do
b.dependencies.should eq(Set.new)
b.dependees.should eq(Set.new)
end

it 'sets up the dependency/dependee relationship when a dependency is added' do
a2.dependencies.should be_empty
b.dependencies.should be_empty
a2.add_dependency(b)
a2.dependencies.to_a.should eq([b])
b.dependees.to_a.should eq([a2])
end

it 'yields when constructed if passed a block' do
yielded_object = nil
si = StepInstance.new(StepA, 5) { |a| yielded_object = a }
yielded_object.should be(si)
end
end

describe Step do
describe ".all" do
step_class(:StepA)
step_class(:StepB)

it 'includes all classes that includes the Plines::Step module' do
Plines::Step.all.should eq([StepA, StepB])
end
end

describe "#dependencies_for" do
it "returns an empty array for a step with no declared dependencies" do
step_class(:StepFoo)
StepFoo.dependencies_for(:data).should eq([])
end
end

describe "#has_no_dependencies?" do
step_class(:StepA)

it "returns true for steps that have no dependencies" do
StepA.should have_no_dependencies
end

it "returns false for steps that have dependencies" do
step_class(:StepC) { depends_on :StepA }
StepC.should_not have_no_dependencies
end
end

describe "#depends_on" do
step_class(:StepA)
step_class(:StepB)

it "adds static dependencies when given a class name" do
step_class(:StepC) do
depends_on :StepA, :StepB
end

dependencies = StepC.dependencies_for(:foo)
dependencies.map(&:klass).should eq([StepA, StepB])
dependencies.map(&:data).should eq([:foo, :foo])
end

it "resolves step class names in the enclosing module" do
stub_const("MySteps::A", Class.new { include Plines::Step })

stub_const("MySteps::B", Class.new {
include Plines::Step
depends_on :A
})

dependencies = MySteps::B.dependencies_for([])
dependencies.map(&:klass).should eq([MySteps::A])
end

it "adds a dynamic dependency when given a block" do
step_class(:StepC) do
depends_on do |data|
[1, 2].map { |i| StepInstance.new(StepA, data + i) }
end
end

dependencies = StepC.dependencies_for(17)
dependencies.map(&:klass).should eq([StepA, StepA])
dependencies.map(&:data).should eq([18, 19])
end
end

describe "#to_dependency_graph" do
step_class(:A) { depends_on :B, :C, :D }
step_class(:B) { depends_on :E }
step_class(:C) { depends_on :E, :F }
step_class(:D); step_class(:E); step_class(:F)
let(:graph) { graph = Plines::Step.to_dependency_graph("args") }
let(:steps_by_klass) { Hash.new { |h, k| h[k] = graph.steps.find { |s| s.klass == k } } }

def step(klass)
steps_by_klass[klass]
end

it 'constructs a full dependency graph from the given declarations' do
graph.should have(6).steps

step(A).dependencies.to_a.should =~ [step(B), step(C), step(D)]
step(A).dependees.to_a.should =~ []

step(B).dependencies.to_a.should =~ [step(E)]
step(B).dependees.to_a.should =~ [step(A)]

step(C).dependencies.to_a.should =~ [step(E), step(F)]
step(C).dependees.to_a.should =~ [step(A)]

step(D).dependencies.to_a.should =~ []
step(D).dependees.to_a.should =~ [step(A)]

step(E).dependencies.to_a.should =~ []
step(E).dependees.to_a.should =~ [step(B), step(C)]

step(F).dependencies.to_a.should =~ []
step(F).dependees.to_a.should =~ [step(C)]
end
end
end
end

52 changes: 52 additions & 0 deletions spec/unit/plines_spec.rb
@@ -0,0 +1,52 @@
require 'spec_helper'
require 'plines'

describe Plines do
describe ".qless" do
before { Plines.instance_variable_set(:@qless, nil) }
after(:all) { Plines.instance_variable_set(:@qless, nil) }

it 'returns a memoized Qless::Client instance' do
Plines.qless.should be_a(Qless::Client)
Plines.qless.should be(Plines.qless)
end

it 'can be overridden' do
orig_instance = Plines.qless
new_instance = Qless::Client.new
Plines.qless = new_instance
Plines.qless.should be(new_instance)
Plines.qless.should_not be(orig_instance)
end
end

describe ".default_queue" do
it "returns the 'plines' queue, memoized" do
Plines.default_queue.should be_a(Qless::Queue)
Plines.default_queue.should be(Plines.default_queue)
Plines.default_queue.name.should eq("plines")
end
end

describe ".start_processing", :redis do
step_class(:Step1)
step_class(:Step2)

before { Plines.default_queue.peek.should be_nil }

it "enqueues all steps that have no declared dependencies" do
Plines.start_processing
enqueued_waiting_job_klass_names(2).should =~ %w[ Step1 Step2 ]
end

it "enqueues dependent jobs as dependencies", :pending do
step_class(:DependsOnStep1) do
depends_on :Step1
end

Plines.start_processing
enqueued_waiting_job_klass_names(3).should_not include("DependsOnStep1")
end
end
end

0 comments on commit de4a07e

Please sign in to comment.