Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

use ImmutableDrip

  • Loading branch information...
commit ab5a71e925ac091c1d458e841c8a15ebd6a9cfe3 1 parent f0a792b
@seki authored
Showing with 83 additions and 41 deletions.
  1. +83 −41 lib/drip.rb
View
124 lib/drip.rb
@@ -9,9 +9,9 @@ def inspect; to_s; end
class ImmutableDrip
class Generator
- def initialize
- @pool = []
- @tag = []
+ def initialize(pool=[], tag=[])
+ @pool = pool
+ @tag = tag
@shared = Hash.new {|h, k| h[k] = k; k}
end
@@ -19,7 +19,7 @@ def add(key, value, *tag)
@pool << [key, value]
idx = @pool.size - 1
tag.uniq.each do |t|
- @tag << [[@shared[t], key], idx]
+ @tag << [[@shared[t], key]]
end
end
@@ -32,6 +32,35 @@ def generate
end
ImmutableDrip.new(@pool.sort, tag)
end
+
+ def prepare_store(dir)
+ Dir.mkdir(dir) rescue nil
+ dump = Dir.glob(File.join(dir, '*.dump')).max_by do |fn|
+ File.basename(fn).to_i(36)
+ end
+ if dump
+ @pool, @tag, _ = File.open(dump, 'rb') {|fp| Marshal.load(fp)}
+ File.unlink(dump)
+ end
+ dump = false
+ loaded = dump ? File.basename(dump).to_i(36) : 0
+ Dir.glob(File.join(dir, '*.log')) do |fn|
+ next if loaded > File.basename(fn).to_i(36)
+ begin
+ SimpleStore.reader(fn).each do |k, v, attic|
+ obj, *tags = v
+ attic.forget
+ add(k, attic, *tags)
+ end
+ rescue
+ end
+ end
+ name = Drip.time_to_key(Time.now).to_s(36)
+ File.open(File.join(dir, name + '.dump'), 'wb') {|fp|
+ Marshal.dump([@pool, @tag], fp)
+ }
+ SimpleStore.new(File.join(dir, name + '.log'))
+ end
end
INF = 1.0/0
@@ -59,7 +88,7 @@ def read_tag(key, tag, n=1)
idx = lower_boundary(@tag, [tag, key + 1])
return [] unless idx
@tag[idx, n].find_all {|kv| kv[0][0] == tag}.collect {|kv|
- [kv[0][1], *@pool[kv[1]][1].to_a]
+ [kv[0][1], *fetch(kv[0][1])]
}
end
@@ -68,7 +97,7 @@ def head_tag(n, tag)
upper = upper_boundary(@tag, [tag, INF])
lower = [lower, upper - n].max
@tag[lower ... upper].collect {|kv|
- [kv[0][1], *@pool[kv[1]][1].to_a]
+ [kv[0][1], *fetch(kv[0][1])]
}
end
@@ -83,10 +112,11 @@ def head(n=1, tag=nil)
def older_tag(key, tag)
idx = upper_boundary(@tag, [tag, key-1])
k, v = @tag[idx - 1]
- k && k[0] == tag ? [k[1], *@pool[v][1].to_a] : nil
+ k && k[0] == tag ? [k[1], *fetch(k[1])] : nil
end
def older(key, tag=nil)
+ return nil if @pool.empty?
key = @pool[-1][0] + 1 unless key
return older_tag(key, tag) if tag
idx = upper_boundary(@pool, key - 1)
@@ -129,11 +159,12 @@ def upper_boundary(ary, key)
end
def initialize(dir, option={})
+ @past = prepare_store(dir, option)
+ @fence = (@past.head[0][0] rescue 0) || 0
@pool = RBTree.new
@tag = RBTree.new
@event = Rinda::TupleSpace.new(5)
- @event.write([:last, 0])
- prepare_store(dir, option)
+ @event.write([:last, @fence])
end
def write(obj, *tags)
@@ -155,6 +186,7 @@ def write_at(at, *value)
end
def fetch(key)
+ return @past.fetch(key) if @fence >= key
@pool[key].to_a
end
alias [] fetch
@@ -171,6 +203,34 @@ def make_renewer(timeout)
end
def read(key, n=1, at_least=1, timeout=nil)
+ return curr_read(key, n, at_least, timeout) if key > @fence
+ ary = @past.read(key, n)
+ return ary if ary.size >= n
+ ary + curr_read(key, n - ary.size, at_least - ary.size, timeout)
+ end
+
+ def read_tag(key, tag, n=1, at_least=1, timeout=nil)
+ return curr_read_tag(key, tag, n, at_least, timeout) if key > @fence
+ ary = @past.read_tag(key, tag, n)
+ return ary if ary.size >= n
+ ary + curr_read_tag(key, tag, n - ary.size, at_least - ary.size, timeout)
+ end
+
+ def head(n=1, tag=nil)
+ ary = curr_head(n, tag)
+ return ary if ary.size == n
+ @past.head(n - ary.size, tag) + ary
+ end
+
+ def older(key, tag=nil)
+ curr_older(key, tag) || @past.older(key, tag)
+ end
+
+ def newer(key, tag=nil)
+ @past.newer(key, tag) || curr_newer(key, tag)
+ end
+
+ def curr_read(key, n=1, at_least=1, timeout=nil)
renewer = make_renewer(timeout)
key = time_to_key(Time.now) unless key
ary = []
@@ -187,7 +247,7 @@ def read(key, n=1, at_least=1, timeout=nil)
ary
end
- def read_tag(key, tag, n=1, at_least=1, timeout=nil)
+ def curr_read_tag(key, tag, n=1, at_least=1, timeout=nil)
renewer = make_renewer(timeout)
key = time_to_key(Time.now) unless key
ary = []
@@ -205,7 +265,7 @@ def read_tag(key, tag, n=1, at_least=1, timeout=nil)
ary
end
- def head(n=1, tag=nil)
+ def curr_head(n=1, tag=nil)
ary = []
key = nil
while it = older(key, tag)
@@ -217,7 +277,7 @@ def head(n=1, tag=nil)
ary
end
- def older(key, tag=nil)
+ def curr_older(key, tag=nil)
key = time_to_key(Time.now) unless key
unless tag
k, v = @pool.upper_bound(key - 1)
@@ -229,15 +289,19 @@ def older(key, tag=nil)
[it[1]] + fetch(it[1])
end
- def newer(key, tag=nil)
+ def curr_newer(key, tag=nil)
return read(key, 1, 0)[0] unless tag
read_tag(key, tag, 1, 0)[0]
end
- def time_to_key(time)
+ def self.time_to_key(time)
time.tv_sec * 1000000 + time.tv_usec
end
+ def time_to_key(time)
+ self.class.time_to_key(time)
+ end
+
def key_to_time(key)
Time.at(*key.divmod(1000000))
end
@@ -314,34 +378,12 @@ def write(key, value)
def prepare_store(dir, option={})
if dir.nil?
@store = SimpleStore.new(nil, option)
- return
- end
-
- Dir.mkdir(dir) rescue nil
- dump = Dir.glob(File.join(dir, '*.dump')).max_by do |fn|
- File.basename(fn).to_i(36)
- end
- if dump
- @pool, @tag, last = File.open(dump, 'rb') {|fp| Marshal.load(fp)}
- @event.take([:last, nil])
- @event.write([:last, last])
- File.unlink(dump)
+ return ImmutableDrip.new
end
- loaded = dump ? File.basename(dump).to_i(36) : 0
- Dir.glob(File.join(dir, '*.log')) do |fn|
- next if loaded > File.basename(fn).to_i(36)
- begin
- store = SimpleStore.reader(fn)
- restore(store)
- rescue
- end
- end
- name = time_to_key(Time.now).to_s(36)
- _, last = @event.read([:last, nil])
- File.open(File.join(dir, name + '.dump'), 'wb') {|fp|
- Marshal.dump([@pool, @tag, last], fp)
- }
- @store = SimpleStore.new(File.join(dir, name + '.log'))
+
+ gen = ImmutableDrip::Generator.new
+ @store = gen.prepare_store(dir)
+ return gen.generate
end
def shared_text(str)
Please sign in to comment.
Something went wrong with that request. Please try again.