Skip to content

Commit

Permalink
remove workflow gem and refactored functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
virtualstaticvoid committed Aug 28, 2015
1 parent d4172f0 commit 48b3fae
Show file tree
Hide file tree
Showing 22 changed files with 513 additions and 402 deletions.
3 changes: 0 additions & 3 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in Taskinator.gemspec
gemspec

# add as a git gem dependency until version 1.3.0 is released to RubyGems.
gem 'workflow', :github => 'virtualstaticvoid/workflow', :branch => :master

# queues
gem 'sidekiq' , '>= 3.5.0', :github => "mperham/sidekiq"
gem 'rspec-sidekiq' , '>= 2.1.0'
Expand Down
8 changes: 0 additions & 8 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ GIT
redis (~> 3.2, >= 3.2.1)
redis-namespace (~> 1.5, >= 1.5.2)

GIT
remote: git://github.com/virtualstaticvoid/workflow.git
revision: 403a9e44bf49f4d154156d5701e3d67b115ed6da
branch: master
specs:
workflow (1.3.0)

PATH
remote: .
specs:
Expand Down Expand Up @@ -194,4 +187,3 @@ DEPENDENCIES
rspec-sidekiq (>= 2.1.0)
sidekiq (>= 3.5.0)!
taskinator!
workflow!
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ To monitor the state of the processes, use the `Taskinator::Api::Processes` clas
processes = Taskinator::Api::Processes.new
processes.each do |process|
# => output the unique process identifier and current state
puts [:process, process.uuid, process.current_state.name]
puts [:process, process.uuid, process.current_state]
end
```

Expand Down Expand Up @@ -613,11 +613,14 @@ The following instrumentation events are issued:
| `taskinator.process.created` | After a root process gets created |
| `taskinator.process.saved` | After a root process has been persisted to Redis |
| `taskinator.process.enqueued` | After a process or subprocess is enqueued for processing |
| `taskinator.process.processing` | When a process or subprocess is processing |
| `taskinator.process.paused` | When a process or subprocess is paused |
| `taskinator.process.resumed` | When a process or subprocess is resumed |
| `taskinator.process.completed` | After a process or subprocess has completed processing |
| `taskinator.process.cancelled` | After a process or subprocess has been cancelled |
| `taskinator.process.failed` | After a process or subprocess has failed |
| `taskinator.task.enqueued` | After a task has been enqueued |
| `taskinator.task.executed` | After a task has executed |
| `taskinator.task.processing` | When a task is processing |
| `taskinator.task.completed` | After a task has completed |
| `taskinator.task.cancelled` | After a task has been cancelled |
| `taskinator.task.failed` | After a task has failed |
Expand Down
3 changes: 2 additions & 1 deletion lib/taskinator.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
require 'json'
require 'yaml'
require 'securerandom'
require 'workflow'

require 'taskinator/version'

Expand All @@ -11,6 +10,8 @@

require 'taskinator/definition'

require 'taskinator/workflow'

require 'taskinator/visitor'
require 'taskinator/persistence'
require 'taskinator/instrumentation'
Expand Down
50 changes: 33 additions & 17 deletions lib/taskinator/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ def enqueued_payload(additional={})
payload_for(:enqueued, additional)
end

def started_payload(additional={})
def processing_payload(additional={})
payload_for(:processing, additional)
end

def paused_payload(additional={})
payload_for(:paused, additional)
end

def resumed_payload(additional={})
payload_for(:resumed, additional)
end

def completed_payload(additional={})
payload_for(:completed, additional)
end
Expand All @@ -25,8 +33,8 @@ def cancelled_payload(additional={})
payload_for(:cancelled, additional)
end

def failed_payload(additional={})
payload_for(:failed, additional)
def failed_payload(exception, additional={})
payload_for(:failed, { :exception => exception.to_s, :backtrace => exception.backtrace }.merge(additional))
end

private
Expand All @@ -36,24 +44,32 @@ def payload_for(state, additional={})
# need to cache here, since this method hits redis, so can't be part of multi statement following
process_key = self.process_key

tasks_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
conn.hmget process_key, :tasks_count, :completed, :cancelled, :failed
tasks_count, processing_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
conn.hmget process_key,
:tasks_count,
:tasks_processing,
:tasks_completed,
:tasks_cancelled,
:tasks_failed
end

tasks_count = tasks_count.to_f

return {
:type => self.class.name,
:process_uuid => process_uuid,
:process_options => process_options,
:uuid => uuid,
:options => options,
:state => state,
:percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0,
:instance => self
}.merge(additional)
return OpenStruct.new(
{
:type => self.class,
:process_uuid => process_uuid,
:process_options => process_options,
:uuid => uuid,
:options => options,
:state => state,
:percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_processing => (tasks_count > 0) ? (processing_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0,
:instance => self
}.merge(additional)
).freeze

end

Expand Down
55 changes: 28 additions & 27 deletions lib/taskinator/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ def key_for(uuid)
"taskinator:#{base_key}:#{uuid}"
end

# retrieves the workflow state for the given identifier
# this prevents to need to load the entire object when
# querying for the status of an instance
def state_for(uuid)
key = key_for(uuid)
state = Taskinator.redis do |conn|
conn.hget(key, :state) || 'initial'
end
state.to_sym
end

# fetches the instance for given identifier
# optionally, provide a hash to use for the instance cache
# this argument is defaulted, so top level callers don't
Expand All @@ -68,10 +57,11 @@ def save
visitor = RedisSerializationVisitor.new(conn, self).visit
conn.hmset(
Taskinator::Process.key_for(uuid),
:tasks_count, visitor.task_count,
:tasks_failed, 0,
:tasks_completed, 0,
:tasks_cancelled, 0,
:tasks_count, visitor.task_count,
:tasks_failed, 0,
:tasks_processing, 0,
:tasks_completed, 0,
:tasks_cancelled, 0,
)
true
end
Expand All @@ -95,39 +85,36 @@ def process_key
@process_key ||= Taskinator::Process.key_for(process_uuid)
end

# cache of the current state (eventually consistent!)
attr_reader :state

# retrieves the workflow state
# this method is called from the workflow gem
def load_workflow_state
state = Taskinator.redis do |conn|
conn.hget(self.key, :state)
conn.hget(self.key, :state) || 'initial'
end
@state = (state || 'initial').to_sym
state.to_sym
end

# persists the workflow state
# this method is called from the workflow gem
def persist_workflow_state(new_state)
time_now = Time.now.utc
@updated_at = Time.now.utc
Taskinator.redis do |conn|
process_key = self.process_key
conn.multi do
conn.hmset(
self.key,
:state, new_state,
:updated_at, time_now
:updated_at, @updated_at
)

# also update the "root" process
conn.hset(
process_key,
:updated_at, time_now
:updated_at, @updated_at
)
end
end
@state = new_state
new_state
end

# persists the error information
Expand All @@ -152,7 +139,7 @@ def error
error_type, error_message, error_backtrace =
conn.hmget(self.key, :error_type, :error_message, :error_backtrace)

[error_type, error_message, JSON.parse(error_backtrace)]
[error_type, error_message, JSON.parse(error_backtrace || '[]')]
end
end

Expand All @@ -168,12 +155,13 @@ def tasks_count
%w(
failed
cancelled
processing
completed
).each do |status|

define_method "count_#{status}" do
count = Taskinator.redis do |conn|
conn.hget self.process_key, status
conn.hget self.process_key, "tasks_#{status}"
end
count.to_i
end
Expand All @@ -182,7 +170,7 @@ def tasks_count
Taskinator.redis do |conn|
process_key = self.process_key
conn.multi do
conn.hincrby process_key, status, 1
conn.hincrby process_key, "tasks_#{status}", 1
conn.hset process_key, :updated_at, Time.now.utc
end
end
Expand Down Expand Up @@ -273,6 +261,10 @@ def visit_attribute_time(attribute)
visit_attribute(attribute)
end

def visit_attribute_enum(attribute, type)
visit_attribute(attribute)
end

def visit_process_reference(attribute)
process = @instance.send(attribute)
@hmset += [attribute, process.uuid] if process
Expand Down Expand Up @@ -391,6 +383,15 @@ def visit_attribute_time(attribute)
end
end

# NB: assumes the enum type's members have integer values!
# NB: assumes the type defines a "Default" member
def visit_attribute_enum(attribute, type)
visit_attribute(attribute) do |value|
const_value = type.constants.select {|c| type.const_get(c) == value.to_i }.first
const_value ? type.const_get(const_value) : type::Default
end
end

def visit_type(attribute)
value = @attribute_values[attribute]
if value
Expand Down
Loading

0 comments on commit 48b3fae

Please sign in to comment.