Permalink
Browse files

Separate out hanoid_backend API

First step towards a hanoidb backend API, which
will allow replacing the storage engine with
any ordered storage.

We still need an API for how to choose backend
and perhaps also how to discover the backend
of an existing store.
  • Loading branch information...
1 parent e3689c3 commit 3a43d9235bf18d06d9672d18b6e0c793e939fe41 @krestenkrab committed Oct 22, 2012
Showing with 356 additions and 72 deletions.
  1. +2 −0 include/plain_rpc.hrl
  2. +174 −0 src/hanoidb_backend.erl
  3. +67 −0 src/hanoidb_han2_backend.erl
  4. +90 −66 src/hanoidb_level.erl
  5. +23 −6 src/hanoidb_reader.erl
@@ -27,3 +27,5 @@
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
-define(CAST(From,Msg), {'$cast', From, Msg}).
+-type caller() :: { pid(), reference() }.
+
@@ -0,0 +1,174 @@
+
+-module(hanoidb_backend).
+
+-include("include/hanoidb.hrl").
+-include("src/hanoidb.hrl").
+
+-type options() :: [ atom() | { atom(), term() } ].
+-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }.
+-type batch_reader() :: any().
+-type batch_writer() :: any().
+-type random_reader() :: any().
+
+-export([merge/7]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+%% batch_reader and batch_writer are used by the merging logic. A batch_reader
+%% must return the values in lexicographical order of the binary keys.
+
+-callback open_batch_reader(File :: string(), Options :: options())
+ -> {ok, batch_reader()} | { error, term() }.
+-callback read_next(batch_reader())
+ -> { [kvexp_entry(), ...], batch_reader()} | 'done'.
+-callback close_batch_reader( batch_reader() )
+ -> ok | {error, term()}.
+
+
+-callback open_batch_writer(File :: string(), Options :: options())
+ -> {ok, batch_writer()} | {error, term()}.
+-callback write_next( kvexp_entry() , batch_writer() )
+ -> {ok, batch_writer()} | {error, term()}.
+-callback write_count( batch_writer() ) ->
+ {ok, non_neg_integer()} | {error, term()}.
+-callback close_batch_writer( batch_writer() )
+ -> ok | {error, term()}.
+
+
+-callback open_random_reader(File :: string(), Options :: options()) ->
+ {ok, random_reader()} | {error, term()}.
+-callback file_name( random_reader() ) ->
+ {ok, string()} | {error, term()}.
+-callback lookup( Key :: key(), random_reader() ) ->
+ not_found | {ok, value()}.
+-callback range_fold( fun( (key(), value(), term()) -> term() ),
+ Acc0 :: term(),
+ Reader :: random_reader(),
+ Range :: #key_range{} ) ->
+ {limit, term(), LastKey :: binary()} | {ok, term()}.
+-callback close_random_reader(random_reader()) ->
+ ok | {error, term()}.
+
+
+
+
+-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
+merge(Mod,A,B,C, Size, IsLastLevel, Options) ->
+ {ok, IXA} = Mod:open_batch_reader(A, Options),
+ {ok, IXB} = Mod:open_batch_reader(B, Options),
+ {ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]),
+ scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}).
+
+terminate(Mod, Out) ->
+ {ok, Count} = Mod:write_count( Out ),
+ ok = Mod:close_batch_writer( Out ),
+ {ok, Count}.
+
+step(S) ->
+ step(S, 1).
+
+step({N, From}, Steps) ->
+ {N-Steps, From}.
+
+
+scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
+ case FromPID of
+ none ->
+ ok;
+ {PID, Ref} ->
+ PID ! {Ref, step_done}
+ end,
+
+ receive
+ {step, From, HowMany} ->
+ scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
+ end;
+
+scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
+ case Mod:read_next(IXA) of
+ {AKVs, IXA2} ->
+ scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
+ done ->
+ ok = Mod:close_batch_reader(IXA),
+ scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step)
+ end;
+
+scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
+ case Mod:read_next(IXB) of
+ {BKVs, IXB2} ->
+ scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step);
+ done ->
+ ok = Mod:close_batch_reader(IXB),
+ scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step)
+ end;
+
+scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step)
+ when Key1 < Key2 ->
+ case Entry of
+ {_, ?TOMBSTONE, _} when IsLastLevel ->
+ scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step));
+ _ ->
+ {ok, Out3} = Mod:write_next( Entry, Out ),
+ scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step))
+ end;
+scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step)
+ when Key1 > Key2 ->
+ case Entry of
+ {_, ?TOMBSTONE, _} when IsLastLevel ->
+ scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step));
+ _ ->
+ {ok, Out3} = Mod:write_next( Entry, Out ),
+ scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step))
+ end;
+scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) ->
+ case Entry of
+ {_, ?TOMBSTONE, _} when IsLastLevel ->
+ scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step));
+ _ ->
+ {ok, Out3} = Mod:write_next( Entry, Out ),
+ scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2))
+ end.
+
+
+scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
+ case FromPID of
+ none ->
+ ok;
+ {PID, Ref} ->
+ PID ! {Ref, step_done}
+ end,
+
+ receive
+ {step, From, HowMany} ->
+ scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From})
+ end;
+
+scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
+ case Mod:read_next(IX) of
+ {KVs, IX2} ->
+ scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step);
+ done ->
+ case FromPID of
+ none ->
+ ok;
+ {PID, Ref} ->
+ PID ! {Ref, step_done}
+ end,
+ ok = Mod:close_batch_reader(IX),
+ terminate(Mod, Out)
+ end;
+
+scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) ->
+ scan_only(Mod, IX, Out, true, Rest, step(Step));
+
+scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) ->
+ {ok, Out3} = Mod:write_next( Entry, Out ),
+ scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)).
+
+
+
+
+
+
@@ -0,0 +1,67 @@
+-module(hanoidb_han2_backend).
+
+-include("hanoidb.hrl").
+
+-behavior(hanoidb_backend).
+
+-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]).
+-export([open_batch_reader/2, read_next/1, close_batch_reader/1]).
+-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]).
+
+
+open_random_reader(Name, Options) ->
+ hanoidb_reader:open(Name, [random|Options]).
+
+file_name(Reader) ->
+ hanoidb_reader:file_name(Reader).
+
+lookup(Key, Reader) ->
+ hanoidb_reader:lookup(Reader, Key).
+
+range_fold(Fun, Acc, Reader, Range) ->
+ hanoidb_reader:range_fold(Fun, Acc, Reader, Range).
+
+close_random_reader(Reader) ->
+ hanoidb_reader:close(Reader).
+
+
+
+open_batch_reader(Name, Options) ->
+ hanoidb_reader:open(Name, [sequential|Options]).
+
+read_next(Reader) ->
+ case hanoidb_reader:next_node(Reader) of
+ {node, KVs} ->
+ {[ unfold(KV) || KV <- KVs], Reader};
+ end_of_data ->
+ 'done';
+ {error, _}=Err ->
+ Err
+ end.
+
+unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value ->
+ {Key,Value,Expiry};
+unfold({Key,Value}) ->
+ {Key, Value, infinity}.
+
+close_batch_reader(Reader) ->
+ hanoidb_reader:close(Reader).
+
+open_batch_writer(Name, Options) ->
+ hanoidb_writer:init([Name, Options]).
+
+write_next( {Key, Value, Expiry}, Writer) ->
+ {noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer),
+ {ok, Writer2}.
+
+write_count( Writer ) ->
+ case hanoidb_writer:handle_call(count, self(), Writer) of
+ {ok, Count, _} ->
+ {ok, Count};
+ Err ->
+ {error, Err}
+ end.
+
+close_batch_writer(Writer) ->
+ {stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer),
+ ok.
Oops, something went wrong.

0 comments on commit 3a43d92

Please sign in to comment.