Permalink
Browse files

more progress on persister

  • Loading branch information...
1 parent d434f11 commit 5cf1824477e23589cc49f0f36e1496c29f092d03 @ohler55 committed Feb 16, 2014
Showing with 171 additions and 19 deletions.
  1. +1 −0 lib/oflow/actors.rb
  2. +77 −19 lib/oflow/actors/persister.rb
  3. +1 −0 lib/oflow/test/actorwrap.rb
  4. +5 −0 notes
  5. +1 −0 test/actors/.gitignore
  6. +86 −0 test/actors/persister_test.rb
View
@@ -11,4 +11,5 @@ module Actors
require 'oflow/actors/balancer'
require 'oflow/actors/merger'
+require 'oflow/actors/persister'
require 'oflow/actors/timer'
@@ -1,65 +1,100 @@
+require 'oj'
module OFlow
module Actors
# TBD
class Persister < Actor
+ attr_reader :dir
+ attr_reader :key_path
+ attr_reader :seq_path
+ attr_reader :single_file
+ attr_reader :historic
+
def initialize(task, options)
super
@dir = options[:dir]
if @dir.nil?
- # TBD base on full_name
+ @dir = File.join('db', task.full_name.gsub(':', '/'))
end
- @key = options[:key_path]
+ @key_path = options.fetch(:key_path, 'key')
+ @seq_path = options.fetch(:seq_path, 'seq')
if options.fetch(:cache, true)
@cache = {}
else
@cache = nil
end
@single_file = options.fetch(:single_file, false)
- @with_tracker = options.fetch(:with_tracker, false)
- # TBD load existing
+ @historic = options.fetch(:historic, false)
+
+ if Dir.exist?(@dir)
+ unless @cache.nil?
+ Dir.glob(File.join('**', '*.json')).each do |path|
+ path = File.join(@dir, path)
+ _load(path) if File.symlink?(path)
+ end
+ end
+ else
+ `mkdir -p #{@dir}`
+ end
end
def perform(op, box)
+ dest = box.contents[:dest]
+ result = nil
case op
when :insert, :create
- insert(box)
+ result = insert(box)
when :get, :read
- read(box)
+ result = read(box)
when :update
- update(box)
+ result = update(box)
when :delete, :remove
- delete(box)
+ result = delete(box)
when :query
- query(box)
+ result = query(box)
when :clear
- clear(box)
+ result = clear(box)
else
- raise OpeError.new(task.full_name, op)
+ raise OpError.new(task.full_name, op)
end
+ task.ship(dest, Box.new(result, box.tracker))
end
def insert(box)
- # TBD
+ key = box.get(@key_path)
+ raise "no key found" if key.nil? # TBD specialized errors
+ box = box.set(@seq_path, 1)
+ _save(box.contents, key, 1)
+ end
+
+ def caching?()
+ !@cache.nil?
end
def read(box)
# Should be a Hash.
- dest = box.contents[:dest]
key = box.contents[:key]
- if @cache.nil?
- # TBD read from file
+ raise "no key found" if key.nil?
+ if @cache.nil? || true # TBD
+ linkpath = File.join(@dir, "#{key}.json")
+ rec = _load(linkpath)
else
rec = @cache[key]
end
- # Send to provided destination or nil destination if none provided.
- task.ship(dest, Box.new(rec, box.tracker))
+ # If not found rec will be nil, that is okay.
+ rec
end
def update(box)
- # TBD
+ key = box.get(@key_path)
+ seq = box.get(@seq_path)
+ raise "no key found" if key.nil? # TBD specialized errors
+ # TBD if seq not set then lookup cached one or from file
+ seq += 1
+ box = box.set(@seq_path, seq)
+ _save(box.contents, key, seq)
end
def delete(box)
@@ -72,7 +107,30 @@ def query(box)
end
def clear(box)
- # TBD
+ `rm -rf #{@dir}`
+ nil
+ end
+
+ # internal use only
+ def _save(rec, key, seq)
+ filename = "#{key}~#{seq}.json"
+ path = File.join(@dir, filename)
+ linkpath = File.join(@dir, "#{key}.json")
+ raise "#{path} already exists" if File.exist?(path) # TBD specialized errors (incorrect seq num)
+ Oj.to_file(path, rec, :mode => :object)
+ # TBD move then delete old symlink if it exists
+ begin
+ File.delete(linkpath)
+ rescue Exception => e
+ # ignore
+ end
+ File.symlink(filename, linkpath)
+ # TBD old prev file if not historic
+ rec
+ end
+
+ def _load(path)
+ Oj.load_file(path, :mode => :object)
end
end # Persister
@@ -51,6 +51,7 @@ def receive(op, box)
else
@actor.perform(op, box)
end
+ nil
end
# Task API that adds entry to history.
View
5 notes
@@ -23,6 +23,11 @@
- Persister
+ - impl
+ - test for config
+ - make sure dir is created
+ -
+
- ops to handle
- insert (or create)
- read (or get)
View
@@ -0,0 +1 @@
+db
@@ -0,0 +1,86 @@
+#!/usr/bin/env ruby
+# encoding: UTF-8
+
+[ File.dirname(__FILE__),
+ File.join(File.dirname(__FILE__), "../../lib"),
+ File.join(File.dirname(__FILE__), "..")
+].each { |path| $: << path unless $:.include?(path) }
+
+require 'test/unit'
+require 'oflow'
+require 'oflow/test'
+
+class PersisterTest < ::Test::Unit::TestCase
+
+ def test_persister_config
+ t = ::OFlow::Test::ActorWrap.new('test', ::OFlow::Actors::Persister, state: ::OFlow::Task::BLOCKED,
+ dir: 'db/something',
+ key_path: 'key',
+ cache: false,
+ single_file: true,
+ with_tracker: true,
+ with_seq_num: true,
+ historic: true,
+ seq_path: 'seq')
+ assert_equal('db/something', t.actor.dir, 'dir set from options')
+ assert_equal('key', t.actor.key_path, 'key_path set from options')
+ assert_equal('seq', t.actor.seq_path, 'seq_path set from options')
+ assert_equal(false, t.actor.caching?, 'cache set from options')
+ assert_equal(true, t.actor.single_file, 'single_file set from options')
+ assert_equal(true, t.actor.historic, 'historic set from options')
+ assert(Dir.exist?(t.actor.dir), 'dir exists')
+ `rm -r #{t.actor.dir}`
+
+ t = ::OFlow::Test::ActorWrap.new('persist', ::OFlow::Actors::Persister, state: ::OFlow::Task::BLOCKED)
+ assert_equal('db/test/persist', t.actor.dir, 'dir set from options')
+ assert_equal('key', t.actor.key_path, 'key_path set from options')
+ assert_equal('seq', t.actor.seq_path, 'seq_path set from options')
+ assert_equal(true, t.actor.caching?, 'cache set from options')
+ assert_equal(false, t.actor.single_file, 'single_file set from options')
+ assert_equal(false, t.actor.historic, 'historic set from options')
+ assert(Dir.exist?(t.actor.dir), 'dir exists')
+ `rm -r #{t.actor.dir}`
+ end
+
+ def test_persister_historic
+ `rm -rf db/test/persist`
+ t = ::OFlow::Test::ActorWrap.new('persist', ::OFlow::Actors::Persister, state: ::OFlow::Task::BLOCKED,
+ historic: true)
+ # insert
+ t.receive(:insert, ::OFlow::Box.new({dest: :here, key: 'one', data: 0}))
+ assert_equal(1, t.history.size, 'one entry should be in the history')
+ assert_equal(:here, t.history[0].dest, 'should have shipped to :here')
+ assert_equal({:dest=>:here, :key=>'one', :data=>0, :seq=>1},
+ t.history[0].box.contents, 'should correct contents in shipment')
+ t.reset()
+
+ # read
+ t.receive(:read, ::OFlow::Box.new({dest: :read, key: 'one'}))
+ assert_equal(1, t.history.size, 'one entry should be in the history')
+ assert_equal(:read, t.history[0].dest, 'should have shipped to :read')
+ assert_equal({:dest=>:here, :key=>'one', :data=>0, :seq=>1},
+ t.history[0].box.contents, 'should correct contents in shipment')
+
+ # update
+ t.receive(:update, ::OFlow::Box.new({dest: :here, key: 'one', seq: 1, data: 1}))
+ t.receive(:update, ::OFlow::Box.new({dest: :here, key: 'one', seq: 2, data: 2}))
+
+ # TBD verify file exist and link point to correct place
+ # TBD load and compare
+
+ # TBD delete
+
+ puts "*** result: #{t.history}"
+
+ # TBD clear and verify dir is empty
+
+ #`rm -r #{t.actor.dir}`
+ end
+
+ # TBD test non-historic
+
+ # TBD test query
+
+ # TBD read non-existant record
+
+end

0 comments on commit 5cf1824

Please sign in to comment.