Skip to content

Commit

Permalink
remove workflow gem and refactored functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
virtualstaticvoid committed Aug 28, 2015
1 parent d4172f0 commit 7c34808
Show file tree
Hide file tree
Showing 24 changed files with 519 additions and 406 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
v0.2.1 - 25 Aug 2015
v0.3.0 - 28 Aug 2015
---
Added created_at and updated_at to process and task as attributes.
Improved serialization visitor to include an optional converter block for deserialization of attribute values.
Corrections to lazy loader logic and speed improvements.
Removed JobWorker as it's no longer necessary.
Improvements to instrumentation
Improvements to instrumentation.
Removed workflow gem, and refactored process and task to implement the basics instead.
Several bug fixes.

v0.2.0 - 31 Jul 2015
---
Expand Down
3 changes: 0 additions & 3 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in Taskinator.gemspec
gemspec

# add as a git gem dependency until version 1.3.0 is released to RubyGems.
gem 'workflow', :github => 'virtualstaticvoid/workflow', :branch => :master

# queues
gem 'sidekiq' , '>= 3.5.0', :github => "mperham/sidekiq"
gem 'rspec-sidekiq' , '>= 2.1.0'
Expand Down
10 changes: 1 addition & 9 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,10 @@ GIT
redis (~> 3.2, >= 3.2.1)
redis-namespace (~> 1.5, >= 1.5.2)

GIT
remote: git://github.com/virtualstaticvoid/workflow.git
revision: 403a9e44bf49f4d154156d5701e3d67b115ed6da
branch: master
specs:
workflow (1.3.0)

PATH
remote: .
specs:
taskinator (0.2.1)
taskinator (0.3.0)
connection_pool (>= 2.2.0)
json (>= 1.8.2)
redis (>= 3.2.1)
Expand Down Expand Up @@ -194,4 +187,3 @@ DEPENDENCIES
rspec-sidekiq (>= 2.1.0)
sidekiq (>= 3.5.0)!
taskinator!
workflow!
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ To monitor the state of the processes, use the `Taskinator::Api::Processes` clas
processes = Taskinator::Api::Processes.new
processes.each do |process|
# => output the unique process identifier and current state
puts [:process, process.uuid, process.current_state.name]
puts [:process, process.uuid, process.current_state]
end
```

Expand Down Expand Up @@ -613,11 +613,14 @@ The following instrumentation events are issued:
| `taskinator.process.created` | After a root process gets created |
| `taskinator.process.saved` | After a root process has been persisted to Redis |
| `taskinator.process.enqueued` | After a process or subprocess is enqueued for processing |
| `taskinator.process.processing` | When a process or subprocess is processing |
| `taskinator.process.paused` | When a process or subprocess is paused |
| `taskinator.process.resumed` | When a process or subprocess is resumed |
| `taskinator.process.completed` | After a process or subprocess has completed processing |
| `taskinator.process.cancelled` | After a process or subprocess has been cancelled |
| `taskinator.process.failed` | After a process or subprocess has failed |
| `taskinator.task.enqueued` | After a task has been enqueued |
| `taskinator.task.executed` | After a task has executed |
| `taskinator.task.processing` | When a task is processing |
| `taskinator.task.completed` | After a task has completed |
| `taskinator.task.cancelled` | After a task has been cancelled |
| `taskinator.task.failed` | After a task has failed |
Expand Down
3 changes: 2 additions & 1 deletion lib/taskinator.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
require 'json'
require 'yaml'
require 'securerandom'
require 'workflow'

require 'taskinator/version'

Expand All @@ -11,6 +10,8 @@

require 'taskinator/definition'

require 'taskinator/workflow'

require 'taskinator/visitor'
require 'taskinator/persistence'
require 'taskinator/instrumentation'
Expand Down
50 changes: 33 additions & 17 deletions lib/taskinator/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ def enqueued_payload(additional={})
payload_for(:enqueued, additional)
end

def started_payload(additional={})
def processing_payload(additional={})
payload_for(:processing, additional)
end

def paused_payload(additional={})
payload_for(:paused, additional)
end

def resumed_payload(additional={})
payload_for(:resumed, additional)
end

def completed_payload(additional={})
payload_for(:completed, additional)
end
Expand All @@ -25,8 +33,8 @@ def cancelled_payload(additional={})
payload_for(:cancelled, additional)
end

def failed_payload(additional={})
payload_for(:failed, additional)
def failed_payload(exception, additional={})
payload_for(:failed, { :exception => exception.to_s, :backtrace => exception.backtrace }.merge(additional))
end

private
Expand All @@ -36,24 +44,32 @@ def payload_for(state, additional={})
# need to cache here, since this method hits redis, so can't be part of multi statement following
process_key = self.process_key

tasks_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
conn.hmget process_key, :tasks_count, :completed, :cancelled, :failed
tasks_count, processing_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
conn.hmget process_key,
:tasks_count,
:tasks_processing,
:tasks_completed,
:tasks_cancelled,
:tasks_failed
end

tasks_count = tasks_count.to_f

return {
:type => self.class.name,
:process_uuid => process_uuid,
:process_options => process_options,
:uuid => uuid,
:options => options,
:state => state,
:percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0,
:instance => self
}.merge(additional)
return OpenStruct.new(
{
:type => self.class,
:process_uuid => process_uuid,
:process_options => process_options,
:uuid => uuid,
:options => options,
:state => state,
:percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_processing => (tasks_count > 0) ? (processing_count.to_i / tasks_count) * 100.0 : 0.0,
:percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0,
:instance => self
}.merge(additional)
).freeze

end

Expand Down
55 changes: 28 additions & 27 deletions lib/taskinator/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ def key_for(uuid)
"taskinator:#{base_key}:#{uuid}"
end

# retrieves the workflow state for the given identifier
# this prevents to need to load the entire object when
# querying for the status of an instance
def state_for(uuid)
key = key_for(uuid)
state = Taskinator.redis do |conn|
conn.hget(key, :state) || 'initial'
end
state.to_sym
end

# fetches the instance for given identifier
# optionally, provide a hash to use for the instance cache
# this argument is defaulted, so top level callers don't
Expand All @@ -68,10 +57,11 @@ def save
visitor = RedisSerializationVisitor.new(conn, self).visit
conn.hmset(
Taskinator::Process.key_for(uuid),
:tasks_count, visitor.task_count,
:tasks_failed, 0,
:tasks_completed, 0,
:tasks_cancelled, 0,
:tasks_count, visitor.task_count,
:tasks_failed, 0,
:tasks_processing, 0,
:tasks_completed, 0,
:tasks_cancelled, 0,
)
true
end
Expand All @@ -95,39 +85,36 @@ def process_key
@process_key ||= Taskinator::Process.key_for(process_uuid)
end

# cache of the current state (eventually consistent!)
attr_reader :state

# retrieves the workflow state
# this method is called from the workflow gem
def load_workflow_state
state = Taskinator.redis do |conn|
conn.hget(self.key, :state)
conn.hget(self.key, :state) || 'initial'
end
@state = (state || 'initial').to_sym
state.to_sym
end

# persists the workflow state
# this method is called from the workflow gem
def persist_workflow_state(new_state)
time_now = Time.now.utc
@updated_at = Time.now.utc
Taskinator.redis do |conn|
process_key = self.process_key
conn.multi do
conn.hmset(
self.key,
:state, new_state,
:updated_at, time_now
:updated_at, @updated_at
)

# also update the "root" process
conn.hset(
process_key,
:updated_at, time_now
:updated_at, @updated_at
)
end
end
@state = new_state
new_state
end

# persists the error information
Expand All @@ -152,7 +139,7 @@ def error
error_type, error_message, error_backtrace =
conn.hmget(self.key, :error_type, :error_message, :error_backtrace)

[error_type, error_message, JSON.parse(error_backtrace)]
[error_type, error_message, JSON.parse(error_backtrace || '[]')]
end
end

Expand All @@ -168,12 +155,13 @@ def tasks_count
%w(
failed
cancelled
processing
completed
).each do |status|

define_method "count_#{status}" do
count = Taskinator.redis do |conn|
conn.hget self.process_key, status
conn.hget self.process_key, "tasks_#{status}"
end
count.to_i
end
Expand All @@ -182,7 +170,7 @@ def tasks_count
Taskinator.redis do |conn|
process_key = self.process_key
conn.multi do
conn.hincrby process_key, status, 1
conn.hincrby process_key, "tasks_#{status}", 1
conn.hset process_key, :updated_at, Time.now.utc
end
end
Expand Down Expand Up @@ -273,6 +261,10 @@ def visit_attribute_time(attribute)
visit_attribute(attribute)
end

def visit_attribute_enum(attribute, type)
visit_attribute(attribute)
end

def visit_process_reference(attribute)
process = @instance.send(attribute)
@hmset += [attribute, process.uuid] if process
Expand Down Expand Up @@ -391,6 +383,15 @@ def visit_attribute_time(attribute)
end
end

# NB: assumes the enum type's members have integer values!
# NB: assumes the type defines a "Default" member
def visit_attribute_enum(attribute, type)
visit_attribute(attribute) do |value|
const_value = type.constants.select {|c| type.const_get(c) == value.to_i }.first
const_value ? type.const_get(const_value) : type::Default
end
end

def visit_type(attribute)
value = @attribute_values[attribute]
if value
Expand Down
Loading

0 comments on commit 7c34808

Please sign in to comment.