Permalink
Browse files

completed first version of persister actor

  • Loading branch information...
1 parent 5cf1824 commit 60f94d481ce755039d8c50bd8d49d801a313a0c1 @ohler55 committed Feb 17, 2014
Showing with 508 additions and 78 deletions.
  1. +4 −0 README.md
  2. +151 −22 lib/oflow/actors/persister.rb
  3. +6 −1 lib/oflow/hastasks.rb
  4. +1 −1 lib/oflow/task.rb
  5. +5 −1 lib/oflow/test/actorwrap.rb
  6. +1 −1 lib/oflow/version.rb
  7. +0 −36 notes
  8. +337 −13 test/actors/persister_test.rb
  9. +3 −3 test/all_tests.rb
View
@@ -25,6 +25,10 @@ Follow [@peterohler on Twitter](http://twitter.com/#!/peterohler) for announceme
## Release Notes
+### Release 0.5
+
+ - Added Persister Actor that acts as a simple local store database.
+
### Release 0.4
- Added support for dynamic Timer options updates.
@@ -3,15 +3,34 @@
module OFlow
module Actors
- # TBD
+ # Actor that persists records to the local file system as JSON
+ # representations of the records. Records can be the whole contents of the
+ # box received or a sub element of the contents. The key to the records are
+ # keys provided either in the record data or outside the data but somewhere
+ # else in the box received. Options for maintaining historic records and
+ # sequence number locking are included. If no sequence number is provide the
+ # Persister will assume there is no checking required and write anyway.
+ #
+ # Records are stored as JSON with the filename as the key and sequence
+ # number. The format of the file name is <key>~<seq>.json. As an example, a
+ # record stored with a key of 'first' and a sequence number of 3 (third time
+ # saved) would be 'first~3.json.
class Persister < Actor
attr_reader :dir
attr_reader :key_path
attr_reader :seq_path
- attr_reader :single_file
+ attr_reader :data_path
attr_reader :historic
+ # Initializes the persister with options of:
+ # @param [Hash] options with keys of
+ # - :dir [String] directory to store the persisted records
+ # - :key_data [String] path to record data (default: nil (all))
+ # - :key_path [String] path to key for the record (default: 'key')
+ # - :seq_path [String] path to sequence for the record (default: 'seq')
+ # - :cache [Boolean] if true, cache records in memory
+ # - :historic [Boolean] if true, do not delete previous versions
def initialize(task, options)
super
@dir = options[:dir]
@@ -20,19 +39,26 @@ def initialize(task, options)
end
@key_path = options.fetch(:key_path, 'key')
@seq_path = options.fetch(:seq_path, 'seq')
+ @data_path = options.fetch(:data_path, nil) # nil means all contents
if options.fetch(:cache, true)
+ # key is record key, value is [seq, rec]
@cache = {}
else
@cache = nil
end
- @single_file = options.fetch(:single_file, false)
@historic = options.fetch(:historic, false)
if Dir.exist?(@dir)
unless @cache.nil?
Dir.glob(File.join('**', '*.json')).each do |path|
path = File.join(@dir, path)
- _load(path) if File.symlink?(path)
+ if File.symlink?(path)
+ rec = load(path)
+ unless @cache.nil?
+ key, seq = key_seq_from_path(path)
+ @cache[key] = [seq, rec]
+ end
+ end
end
end
else
@@ -64,75 +90,178 @@ def perform(op, box)
def insert(box)
key = box.get(@key_path)
- raise "no key found" if key.nil? # TBD specialized errors
+ raise KeyError.new(:insert) if key.nil?
box = box.set(@seq_path, 1)
- _save(box.contents, key, 1)
+ rec = box.get(@data_path)
+ @cache[key] = [1, rec] unless @cache.nil?
+ save(rec, key, 1)
end
+ # Returns true if the actor is caching records.
def caching?()
!@cache.nil?
end
def read(box)
# Should be a Hash.
key = box.contents[:key]
- raise "no key found" if key.nil?
- if @cache.nil? || true # TBD
+ raise KeyError(:read) if key.nil?
+ if @cache.nil?
linkpath = File.join(@dir, "#{key}.json")
- rec = _load(linkpath)
+ rec = load(linkpath)
else
- rec = @cache[key]
+ unless (seq_rec = @cache[key]).nil?
+ rec = seq_rec[1]
+ end
end
# If not found rec will be nil, that is okay.
rec
end
def update(box)
key = box.get(@key_path)
+ raise KeyError.new(:update) if key.nil?
seq = box.get(@seq_path)
- raise "no key found" if key.nil? # TBD specialized errors
- # TBD if seq not set then lookup cached one or from file
+ if @cache.nil?
+ if (seq_rec = @cache[key]).nil?
+ raise NotFoundError.new(key)
+ end
+ seq = seq_rec[0] if seq.nil?
+ else
+ seq = 0
+ has_rec = false
+ Dir.glob(File.join(@dir, '**', "#{key}*.json")).each do |path|
+ if File.symlink?(path)
+ has_rec = true
+ next
+ end
+ _, s = key_seq_from_path(path)
+ seq = s if seq < s
+ end
+ end
+ raise NotFoundError.new(key) unless has_rec
+ raise SeqError.new(:update, key) if seq.nil? || 0 == seq
+
seq += 1
box = box.set(@seq_path, seq)
- _save(box.contents, key, seq)
+ rec = box.get(@data_path)
+ @cache[key] = [seq, rec] unless @cache.nil?
+ rec = save(rec, key, seq)
+ delete_historic(key, seq) unless @historic
+ rec
end
def delete(box)
- # TBD
+ key = box.get(@key_path)
+ @cache.delete(key) unless @cache.nil?
+ linkpath = File.join(@dir, "#{key}.json")
+ File.delete(linkpath)
+ delete_historic(key, nil) unless @historic
+ nil
end
def query(box)
- # TBD
- # Send to provided destination or nil destination if none provided.
+ recs = {}
+ expr = box.get('expr')
+ if expr.nil?
+ if @cache.nil?
+ Dir.glob(File.join(@dir, '**/*.json')).each do |path|
+ recs[File.basename(path)[0..-6]] = load(path) if File.symlink?(path)
+ end
+ else
+ @cache.each do |key,seq_rec|
+ recs[key] = seq_rec[1]
+ end
+ end
+ elsif expr.is_a?(Proc)
+ if @cache.nil?
+ Dir.glob(File.join(@dir, '**/*.json')).each do |path|
+ next unless File.symlink?(path)
+ rec = load(path)
+ key, seq = key_seq_from_path(path)
+ recs[key] = rec if expr.call(rec, key, seq)
+ end
+ else
+ @cache.each do |key,seq_rec|
+ rec = seq_rec[1]
+ recs[key] = rec if expr.call(rec, key, seq_rec[0])
+ end
+ end
+ else
+ # TBD add support for string safe expressions in the future
+ raise Exception.new("expr can only be a Proc, not a #{expr.class}")
+ end
+ recs
end
def clear(box)
+ @cache = {} unless @cache.nil?
`rm -rf #{@dir}`
+ # remake the dir in preparation for future inserts
+ `mkdir -p #{@dir}`
nil
end
# internal use only
- def _save(rec, key, seq)
+ def save(rec, key, seq)
filename = "#{key}~#{seq}.json"
path = File.join(@dir, filename)
linkpath = File.join(@dir, "#{key}.json")
- raise "#{path} already exists" if File.exist?(path) # TBD specialized errors (incorrect seq num)
+ raise ExistsError.new(key, seq) if File.exist?(path)
Oj.to_file(path, rec, :mode => :object)
- # TBD move then delete old symlink if it exists
begin
File.delete(linkpath)
- rescue Exception => e
+ rescue Exception
# ignore
end
File.symlink(filename, linkpath)
- # TBD old prev file if not historic
rec
end
- def _load(path)
+ def load(path)
+ return nil unless File.exist?(path)
Oj.load_file(path, :mode => :object)
end
+ def delete_historic(key, seq)
+ Dir.glob(File.join(@dir, '**', "#{key}~*.json")).each do |path|
+ _, s = key_seq_from_path(path)
+ next if s == seq
+ File.delete(path)
+ end
+ end
+
+ def key_seq_from_path(path)
+ path = File.readlink(path) if File.symlink?(path)
+ base = File.basename(path)[0..-6] # strip off '.json'
+ a = base.split('~')
+ [a[0..-2].join('~'), a[-1].to_i]
+ end
+
+ class KeyError < Exception
+ def initialize(op)
+ super("No key found for #{op}")
+ end
+ end # KeyError
+
+ class SeqError < Exception
+ def initialize(op, key)
+ super("No sequence number found for #{op} of #{key}")
+ end
+ end # SeqError
+
+ class ExistsError < Exception
+ def initialize(key, seq)
+ super("#{key}:#{seq} already exists")
+ end
+ end # ExistsError
+
+ class NotFoundError < Exception
+ def initialize(key)
+ super("#{key} not found")
+ end
+ end # NotFoundError
+
end # Persister
end # Actors
end # OFlow
View
@@ -21,7 +21,10 @@ def flow(name, options={}, &block)
yield(f) if block_given?
f.resolve_all_links()
# Wait to validate until at the top so up-links don't fail validation.
- f.validate() if Env == self
+ if Env == self
+ f.validate()
+ f.start()
+ end
f
end
@@ -33,6 +36,8 @@ def flow(name, options={}, &block)
# @param block [Proc] block to yield to with the new Task instance
# @return [Task] new Task
def task(name, actor_class, options={}, &block)
+ has_state = options.has_key?(:state)
+ options[:state] = Task::STOPPED unless has_state
t = Task.new(self, name, actor_class, options)
@tasks[t.name] = t
yield(t) if block_given?
View
@@ -55,7 +55,7 @@ def initialize(flow, name, actor_class, options={})
@actor = actor_class.new(self, options)
raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)
- @state = RUNNING
+ @state = options.fetch(:state, RUNNING)
return unless @actor.with_own_thread()
@loop = Thread.start(self) do |me|
@@ -49,7 +49,11 @@ def receive(op, box)
if @starting
@before << [op, box]
else
- @actor.perform(op, box)
+ begin
+ @actor.perform(op, box)
+ rescue Exception => e
+ ship(:error, Box.new([e, full_name()]))
+ end
end
nil
end
View
@@ -1,5 +1,5 @@
module OFlow
# Current version of the module.
- VERSION = '0.5.0a1'
+ VERSION = '0.5.0'
end
View
36 notes
@@ -7,8 +7,6 @@
- add log entries to actors
- features
- - Persister Actor (write to disk and ready on start)
- - 0.5.0
- HttpServer Actor
- 0.6.0
- .graffle OmniGraffle input
@@ -22,40 +20,6 @@
- 1.0.0
-- Persister
- - impl
- - test for config
- - make sure dir is created
- -
-
- - ops to handle
- - insert (or create)
- - read (or get)
- - update
- - delete (or remove)
- - query
- - clear/dump
- - options
- - dir to write to
- - default is db/full_name.gsub(':', '/')
- - key - path to key for the box
- - cache flag - future feature
- - option for write to one file or separate
- - first separate files
- - single file - future feature
- - general feature, error can follow error link
- - store using oj object mode
- - option to send results to different places
- - specify link dest in request
- - box
- - get { dest: xxx, key: xxx }
- - insert { key: optional, data: xxx }
- - update { key: optional, data: xxx }
- - delete { key: optional }
- - query { dest: xxx, expr: xxx }
- - expr is a block or proc
- - string expr - future feature
-
- TBDs
- example
Oops, something went wrong.

0 comments on commit 60f94d4

Please sign in to comment.