Skip to content

Commit

Permalink
StorageHistory : first steps
Browse files Browse the repository at this point in the history
  • Loading branch information
jmettraux committed Jan 6, 2010
1 parent a197498 commit dbd6c3c
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 14 deletions.
1 change: 1 addition & 0 deletions TODO.txt
Expand Up @@ -286,4 +286,5 @@

[ ] shell ? irb ? Shell.new(storage)
[ ] focus on fulldup or json.dup (via fulldup ?)
[ ] implement Storage#clear!(type)

4 changes: 3 additions & 1 deletion lib/ruote/context.rb
Expand Up @@ -73,7 +73,7 @@ def add_service (key, *args)

key = "s_#{key}" unless key.match(/^s\_/)

if klass
service = if klass

require(path)

Expand All @@ -87,6 +87,8 @@ def add_service (key, *args)
end

self.class.class_eval %{ def #{key[2..-1]}; @conf['#{key}']; end }

service
end

def shutdown
Expand Down
9 changes: 4 additions & 5 deletions lib/ruote/log/fs_history.rb
Expand Up @@ -67,9 +67,8 @@ def shutdown
@file.close rescue nil
end

# Returns an array of Ruote::Record instances, each record represents
# a ruote engine [history] event.
# Returns an empty array if no history was found for the given wfid.
# Returns the messages concerning the process given by its wfid,
# or an empty array if there were no such process seen.
#
def by_process (wfid)

Expand Down Expand Up @@ -118,8 +117,8 @@ def range
end
end

# Returns an array of Record instances for a given date, and any process
# instance.
# Returns an array of messages for a given day or an empty array if nothing
# happened at that date.
#
def by_date (date)

Expand Down
108 changes: 108 additions & 0 deletions lib/ruote/log/storage_history.rb
@@ -0,0 +1,108 @@
#--
# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Made in Japan.
#++


module Ruote

#
# Logs the ruote engine history to the storage underlying the worker.
#
# Warning : don't use this history implementation when the storage is
# HashStorage. It will fill up your memory... Keeping history for a
# transient ruote is a bit overkill (IMHO).
#
class StorageHistory

def initialize (context, options={})

@context = context
@options = options

if @context.worker

# only care about logging if there is a worker present

@context.storage.add_type('history')
@context.worker.subscribe(:all, self)
end
end

def by_process (wfid)

@context.storage.get_many('history', /!#{wfid}$/)
end
alias :by_wfid :by_process

# Returns an array [ most recent date, oldest date ] (Time instances).
#
def range

# TODO : implement me
end

def by_date (date)

# TODO : implement me
end

#def history_to_tree (wfid)
# # (NOTE why not ?)
#end

# The history system doesn't implement purge! so that when purge! is called
# on the engine, the history is not cleared.
#
# Call this *dangerous* clear! method to clean out any history file.
#
def clear!
end

# This is the method called by the workqueue. Incoming engine events
# are 'processed' here.
#
def notify (msg)

msg = msg.dup
# a shallow copy is sufficient

t = msg['_id'].split('-').last
# '_id' => "#{$$}-#{Thread.current.object_id}-#{Time.now.to_f.to_s}"

si = if fei = msg['fei']
Ruote::FlowExpressionId.to_storage_id(fei)
else
msg['wfid'] || 'no_wfid'
end

msg['original_id'] = msg['_id']
msg['_id'] = "#{t}!#{si}"
msg['type'] = 'history'
msg['original_put_at'] = msg['put_at']
msg.delete('_rev')

@context.storage.put(msg)
end
end
end

5 changes: 5 additions & 0 deletions lib/ruote/storage/fs_storage.rb
Expand Up @@ -77,6 +77,11 @@ def purge!
FileUtils.rm_rf(@cloche.dir)
end

# No need for that here (FsStorage adds type on the fly).
#
def add_type (type)
end

def dump (type)

s = "=== #{type} ===\n"
Expand Down
12 changes: 6 additions & 6 deletions lib/ruote/storage/hash_storage.rb
Expand Up @@ -70,7 +70,7 @@ def put (doc, opts={})
doc['put_at'] = Ruote.now_to_utc_s
doc['_rev'] = doc['_rev'] + 1

(@h[doc['type']] ||= {})[doc['_id']] = Rufus::Json.dup(doc)
@h[doc['type']][doc['_id']] = Rufus::Json.dup(doc)

nil
end
Expand Down Expand Up @@ -149,6 +149,11 @@ def purge!
@h['configurations']['engine'] = @options
end

def add_type (type)

@h[type] = {}
end

def dump (type)

s = "=== #{type} ===\n"
Expand All @@ -161,11 +166,6 @@ def dump (type)
end
end
end

def add_test_type (type)

@h[type] = {}
end
end
end

Expand Up @@ -18,7 +18,7 @@
require 'ruote/part/no_op_participant'


class FtHistoryTest < Test::Unit::TestCase
class FtFsHistoryTest < Test::Unit::TestCase
include FunctionalBase

def test_launch
Expand Down
53 changes: 53 additions & 0 deletions test/functional/ft_36_storage_history.rb
@@ -0,0 +1,53 @@

#
# testing ruote
#
# Tue Jan 5 17:51:10 JST 2010
#

require File.join(File.dirname(__FILE__), 'base')

require 'ruote/log/fs_history'
require 'ruote/part/no_op_participant'


class FtStorageHistoryTest < Test::Unit::TestCase
include FunctionalBase

def test_launch

pdef = Ruote.process_definition do
alpha
echo 'done.'
end

history = @engine.add_service(
'history', 'ruote/log/storage_history', 'Ruote::StorageHistory')

@engine.register_participant :alpha, Ruote::NoOpParticipant

#noisy

wfid0 = assert_trace(pdef, "done.")
wfid1 = assert_trace(pdef, "done.\ndone.")

sleep 0.100

assert_equal 17, @engine.storage.get_many('history').size

h = @engine.context.history.by_process(wfid0)
#h.each { |r| p r }
assert_equal 8, h.size

# testing record.to_h

h = @engine.context.history.by_process(wfid1)
#h.each { |r| p r }
assert_equal 8, h.size

history.clear!

assert_equal 0, @engine.storage.get_many('history')
end
end

2 changes: 1 addition & 1 deletion test/unit/ut_17_storage.rb
Expand Up @@ -24,7 +24,7 @@ class UtStorage < Test::Unit::TestCase

def setup
@s = determine_storage({})
@s.add_test_type('dogfood') if @s.respond_to?(:add_test_type)
@s.add_type('dogfood')
@s.put(
'_id' => 'toto',
'type' => 'dogfood',
Expand Down

0 comments on commit dbd6c3c

Please sign in to comment.