Skip to content

Commit

Permalink
Merge pull request #3 from virtualstaticvoid/sub_process_optimization
Browse files Browse the repository at this point in the history
Sub process optimization
  • Loading branch information
virtualstaticvoid committed Nov 1, 2016
2 parents 2699524 + 9485394 commit 48698e1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
taskinator (0.3.9)
taskinator (0.3.10)
builder (>= 3.2.2)
connection_pool (>= 2.2.0)
globalid (~> 0.3)
Expand Down
126 changes: 125 additions & 1 deletion lib/taskinator/persistence.rb
Expand Up @@ -68,6 +68,15 @@ def save
end
end

def to_xml
builder = ::Builder::XmlMarkup.new
builder.instruct!
builder.tag!('process', :key => self.key) do |xml|
XmlSerializationVisitor.new(xml, self).visit
end
builder
end

# the persistence key
def key
@key ||= self.class.key_for(self.uuid)
Expand Down Expand Up @@ -262,7 +271,10 @@ def visit_tasks(tasks)
tasks.each do |task|
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
@conn.rpush "#{@key}:tasks", task.uuid
@base_visitor.incr_task_count unless task.is_a?(Task::SubProcess)
unless task.is_a?(Task::SubProcess)
incr_task_count unless self == @base_visitor
@base_visitor.incr_task_count
end
end
end

Expand Down Expand Up @@ -315,6 +327,118 @@ def incr_task_count
end
end

class XmlSerializationVisitor < Taskinator::Visitor::Base

#
# the redis connection is passed in since it is
# in the multi statement mode in order to produce
# one roundtrip to the redis server
#

attr_reader :builder
attr_reader :instance

def initialize(builder, instance, base_visitor=self)
@builder = builder
@instance = instance
@key = instance.key
@root = base_visitor.instance
@base_visitor = base_visitor
@task_count = 0
end

# the starting point for serializing the instance
def visit
@attributes = []
@attributes << [:type, @instance.class.name]
@attributes << [:process_uuid, @root.uuid]
@attributes << [:state, :initial]

@instance.accept(self)

@attributes << [:task_count, @task_count]

@attributes.each do |(name, value)|
builder.tag!('attribute', name => value)
end

self
end

def visit_process(attribute)
process = @instance.send(attribute)
if process
@attributes << [attribute, process.uuid]

builder.tag!('process', :key => process.key) do |xml|
XmlSerializationVisitor.new(xml, process, @base_visitor).visit
end
end
end

def visit_tasks(tasks)
builder.tag!('tasks') do |xml|
tasks.each do |task|
xml.tag!('task', :key => task.key) do |xml2|
XmlSerializationVisitor.new(xml2, task, @base_visitor).visit
unless task.is_a?(Task::SubProcess)
incr_task_count unless self == @base_visitor
@base_visitor.incr_task_count
end
end
end
end
end

def visit_attribute(attribute)
value = @instance.send(attribute)
@attributes << [attribute, value] if value
end

def visit_attribute_time(attribute)
visit_attribute(attribute)
end

def visit_attribute_enum(attribute, type)
visit_attribute(attribute)
end

def visit_process_reference(attribute)
process = @instance.send(attribute)
@attributes << [attribute, process.uuid] if process
end

def visit_task_reference(attribute)
task = @instance.send(attribute)
@attributes << [attribute, task.uuid] if task
end

def visit_type(attribute)
type = @instance.send(attribute)
@attributes << [attribute, type.name] if type
end

def visit_args(attribute)
values = @instance.send(attribute)
yaml = Taskinator::Persistence.serialize(values)

# greater than 2 MB?
if (yaml.bytesize / (1024.0**2)) > 2
Taskinator.logger.warn("Large argument data detected for '#{self.to_s}'. Consider using intrinsic types instead, or try to reduce the amount of data provided.")
end

@attributes << [attribute, yaml]
end

def task_count
@task_count
end

def incr_task_count
@task_count += 1
end
end

class RedisDeserializationVisitor < Taskinator::Visitor::Base

#
Expand Down
1 change: 1 addition & 0 deletions lib/taskinator/process.rb
Expand Up @@ -41,6 +41,7 @@ def initialize(definition, options={})
@queue = options.delete(:queue)
@created_at = Time.now.utc
@updated_at = created_at
@current_state = :initial
end

def parent=(value)
Expand Down
1 change: 1 addition & 0 deletions lib/taskinator/task.rb
Expand Up @@ -39,6 +39,7 @@ def initialize(process, options={})
@queue = options.delete(:queue)
@created_at = Time.now.utc
@updated_at = created_at
@current_state = :initial
end

def accept(visitor)
Expand Down
2 changes: 1 addition & 1 deletion lib/taskinator/version.rb
@@ -1,3 +1,3 @@
module Taskinator
VERSION = "0.3.9"
VERSION = "0.3.10"
end
4 changes: 2 additions & 2 deletions lib/taskinator/workflow.rb
Expand Up @@ -2,11 +2,11 @@ module Taskinator
module Workflow

def current_state
# NB: don't memoize this value (i.e. re-read it each time)
@current_state = load_workflow_state
@current_state ||= load_workflow_state
end

def current_state=(new_state)
return if new_state == @current_state
@current_state = persist_workflow_state(new_state)
end

Expand Down
4 changes: 4 additions & 0 deletions spec/taskinator/persistence_spec.rb
Expand Up @@ -210,6 +210,10 @@ def initialize
pending __FILE__
end

describe "#to_xml" do
pending __FILE__
end

describe "#key" do
it {
expect(subject.key).to match(/#{subject.uuid}/)
Expand Down

0 comments on commit 48698e1

Please sign in to comment.