Skip to content
Permalink
Browse files

Rewrote as gen_server.

  • Loading branch information...
lethain committed Dec 14, 2009
1 parent 88e977c commit 35324dbe17141548c3bba2218e34ce886f70e693
Showing with 107 additions and 117 deletions.
  1. +5 −30 kvs.erl
  2. +1 −1 kvs.hrl
  3. +101 −86 kvs_server.erl
35 kvs.erl
@@ -11,24 +11,17 @@
%% @doc create N nodes in distributed key-value store
%% @spec start(integer()) -> started
start(N) ->
pg2:create(kvs),
lists:foreach(fun(_) ->
Store = #kvs_store{data=[],
pending_reads=[],
pending_writes=[]},
pg2:join(kvs, spawn(kvs_server, store, [Store]))
kvs_server:start_link()
end, lists:seq(0, N)),
started.

%% @doc stop all pids in KVS process group
%% stop() -> stopped.
stop() ->
LocalFun = fun(X) -> erlang:node(X) == node() end,
LocalPids = lists:filter(LocalFun, pg2:get_members(kvs)),
lists:foreach(fun(Pid) ->
pg2:leave(kvs, Pid),
Pid ! stop
end, LocalPids),
gen_server:call(Pid, stop)
end, pg2:get_members(kvs)),
stopped.

%% @doc retrieve value for key
@@ -42,16 +35,7 @@ get(Key) -> get(Key, ?TIMEOUT).
%% timeout = {error, timeout}
get(Key, Timeout) ->
Pid = pg2:get_closest_pid(kvs),
Pid ! {self(), get, Key},
receive
{Pid, got, Value} ->
{ok, Value};
{error, Error} ->
{error, Error}
after
Timeout ->
{error, timeout}
end.
gen_server:call(Pid, {get, Key}, Timeout).

%% @doc update value for key
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
@@ -61,13 +45,4 @@ set(Key, Val) -> set(Key, Val, ?TIMEOUT).
%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
set(Key, Val, Timeout) ->
Pid = pg2:get_closest_pid(kvs),
Pid ! {self(), set, Key, Val},
receive
{Pid, received, {set, Key, Val}} ->
{ok, updated};
{error, Error} ->
{error, Error}
after
Timeout ->
{error, timeout}
end.
gen_server:call(Pid, {set, Key, Val}, Timeout).
@@ -1,7 +1,7 @@
-define(TIMEOUT, infinity).
-define(KVS_WRITE_TIMEOUT, 2).
-define(KVS_READ_TIMEOUT, 1).
-define(KVS_POLL_PENDING, 500).
-define(KVS_POLL_PENDING, 1000).
-define(KVS_WRITES, 3).
-define(KVS_READS, 3).
-record(kvs_store, {data, pending_reads, pending_writes}).
@@ -1,76 +1,54 @@
%% @doc kvs_server implements a distributed key-value store.
-module(kvs_server).
-include("kvs.hrl").
-export([store/1]).

%% @doc implementation of distributed key-value store
%% @spec store(proplist()) -> term()
%% proplist = [{term(), term()}]
store(Store) ->
receive
{Sender, get, Key} ->
message_get(Store, Sender, Key);
{Sender, retrieve, Client, Key} ->
message_retrieve(Store, Sender, Client, Key);
{Sender, retrieved, Client, Key, Value} ->
message_retrieved(Store, Sender, Client, Key, Value);
{Sender, set, Key, Value} ->
message_set(Store, Sender, Key, Value);
{Sender, update, Client, Key, Value} ->
message_update(Store, Sender, Client, Key, Value);
{Sender, updated, Client, Key, Value} ->
message_updated(Store, Sender, Client, Key, Value);
stop ->
ok
after
?KVS_POLL_PENDING ->
Writes2 = lists:filter(fun filter_writes/1, Store#kvs_store.pending_writes),
Reads2 = lists:filter(fun filter_reads/1, Store#kvs_store.pending_reads),
store(Store#kvs_store{pending_writes=Writes2, pending_reads=Reads2})
end.
% interface for gen_servers
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

%% @doc filter expired writes from pending_writes.
filter_writes({{Client, _Key}, {_Count, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_WRITE_TIMEOUT ->
Client ! {error, write_failed},
false;
true ->
true
end.
%% @doc performs initialization of gen_server, return value is the initial state value.
init([]) ->
process_flag(trap_exit, true),
pg2:create(kvs),
pg2:join(kvs, self()),
{ok, #kvs_store{data=[], pending_reads=[], pending_writes=[]}, ?KVS_POLL_PENDING}.

%% @doc filter expired reads from pending_reads.
filter_reads({{Client, _Key}, {_Count, _Values, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_READ_TIMEOUT ->
Client ! {error, read_failed},
false;
true ->
true
end.
%% @doc called when gen_server is terminated.
%% (Note that init function must specify trap_exit
%% for terminate to be called.)
%% @spec terminate(term(), term()) -> ok.
terminate(_Reason, _State) ->
pg2:leave(kvs, self()),
ok.

%% @doc follow standard gen_server pattern of declaring a local start_link implementation
%% which itself calls gen_server:start_link/3 or gen_server:start_link/4.
start_link() ->
%gen_server:start_link(?MODULE, [], [{debug, [log, {log_to_file, "kvs.log"}]}]).
gen_server:start_link(?MODULE, [], []).

%% @doc called during code changes, which this implementation ignores.
code_change(_OldVsn, State, _Extra) -> {ok, State}.

%% @doc handles system messages, including timeouts.
handle_info(timeout, State) ->
Writes2 = lists:filter(fun filter_writes/1, State#kvs_store.pending_writes),
Reads2 = lists:filter(fun filter_reads/1, State#kvs_store.pending_reads),
{noreply, State#kvs_store{pending_writes=Writes2, pending_reads=Reads2}, ?KVS_POLL_PENDING};
handle_info(_Info, State) ->
{noreply, State, ?KVS_POLL_PENDING}.

%%
%% Message Handling Functions
%% All server->server communcation occurs via asynchronous calls
%%

message_get(Store = #kvs_store{pending_reads=Reads}, Sender, Key) ->
% client interface for retrieving values
lists:foreach(fun(Pid) ->
Pid ! {self(), retrieve, Sender, Key}
end, pg2:get_members(kvs)),
% ?KVS_READS is required # of nodes to read from
% [] is used to collect read values
Reads2 = [{{Sender, Key}, {?KVS_READS, [], ts()}} | Reads],
store(Store#kvs_store{pending_reads=Reads2}).

message_retrieve(Store = #kvs_store{data=Data}, Sender, Client, Key) ->
Sender ! {self(), retrieved, Client, Key, proplists:get_value(Key, Data)},
store(Store).
%% @doc handle asynchronous communication.
handle_cast({retrieve, Sender, Client, Key}, State=#kvs_store{data=Data}) ->
gen_server:cast(Sender, {retrieved, self(), Client, Key, proplists:get_value(Key, Data)}),
{noreply, State, ?KVS_POLL_PENDING};

message_retrieved(Store = #kvs_store{pending_reads=Reads}, _Sender, Client, Key, Value) ->
handle_cast({retrieved, _Sender, Client, Key, Value}, Store=#kvs_store{pending_reads=Reads}) ->
case proplists:get_value({Client, Key}, Reads) of
undefined ->
store(Store);
{noreply, Store, ?KVS_POLL_PENDING};
{0, Values, _Timestamp} ->
Freq = lists:foldr(fun(X, Acc) ->
case proplists:get_value(X, Acc) of
@@ -79,48 +57,85 @@ message_retrieved(Store = #kvs_store{pending_reads=Reads}, _Sender, Client, Key,
end
end, [], Values),
[{Popular, _} | _ ] = lists:reverse(lists:keysort(2, Freq)),
Client ! {self(), got, Popular},
store(Store#kvs_store{
pending_reads=proplists:delete({Key, Value}, Reads)});
gen_server:reply(Client, {ok, Popular}),
{noreply, (Store#kvs_store{pending_reads=proplists:delete({Key, Value}, Reads)}), ?KVS_POLL_PENDING};
{Count, Values, Timestamp} ->
store(Store#kvs_store{
{noreply, Store#kvs_store{
pending_reads=[{{Client, Key}, {Count-1, [Value | Values], Timestamp}} |
proplists:delete({Client, Key}, Reads)]})
end.

message_set(Store = #kvs_store{pending_writes=Writes}, Sender, Key, Value) ->
% client interface for updating values
lists:foreach(fun(Pid) ->
Pid ! {self(), update, Sender, Key, Value}
end, pg2:get_members(kvs)),
Writes2 = [{{Sender, Key}, {?KVS_WRITES, ts()}} | Writes],
store(Store#kvs_store{pending_writes=Writes2}).

proplists:delete({Client, Key}, Reads)]},
?KVS_POLL_PENDING}
end;

message_update(Store = #kvs_store{data=Data}, Sender, Client, Key, Value) ->
handle_cast({update, Sender, Client, Key, Value}, Store=#kvs_store{data=Data}) ->
% sent to all nodes by first receiving node
Sender ! {self(), updated, Client, Key, Value},
store(Store#kvs_store{data=[{Key, Value} | proplists:delete(Key, Data)]}).
gen_server:cast(Sender, {updated, self(), Client, Key, Value}),
{noreply, Store#kvs_store{data=[{Key, Value} | proplists:delete(Key, Data)]}, ?KVS_POLL_PENDING};

message_updated(Store = #kvs_store{pending_writes=Writes}, _Sender, Client, Key, Value) ->

handle_cast({updated, _Sender, Client, Key, Value}, Store=#kvs_store{pending_writes=Writes}) ->
{Count, Timestamp} = proplists:get_value({Client, Key}, Writes),
case Count of
undefined ->
store(Store);
{noreply, Store, ?KVS_POLL_PENDING};
0 ->
Client ! {self(), received, {set, Key, Value}},
store(Store#kvs_store{
pending_writes=proplists:delete({Key, Value}, Writes)});
gen_server:reply(Client, {ok, updated}),
{noreply, Store#kvs_store{pending_writes=proplists:delete({Key, Value}, Writes)}, ?KVS_POLL_PENDING};
_ ->
store(Store#kvs_store{
{noreply, Store#kvs_store{
pending_writes=[{{Client, Key}, {Count-1, Timestamp}} |
proplists:delete({Client, Key}, Writes)]})
proplists:delete({Client, Key}, Writes)]},
?KVS_POLL_PENDING}
end.

%%
%% All client->server communication occurs via synchronous calls.
%%

%% @doc handle explicit stop requests
handle_call(stop, _Sender, State) ->
{stop, stopped, stopped, State};

handle_call({get, Key}, Sender, Store=#kvs_store{pending_reads=Reads}) ->
lists:foreach(fun(Pid) ->
gen_server:cast(Pid, {retrieve, self(), Sender, Key})
end, pg2:get_members(kvs)),
% ?KVS_READS is required # of nodes to read from
% [] is used to collect read values
Reads2 = [{{Sender, Key}, {?KVS_READS, [], ts()}} | Reads],
{noreply, Store#kvs_store{pending_reads=Reads2}, ?KVS_POLL_PENDING};

handle_call({set, Key, Value}, Sender, Store=#kvs_store{pending_writes=Writes}) ->
% client interface for updating values
lists:foreach(fun(Pid) ->
gen_server:cast(Pid, {update, self(), Sender, Key, Value})
end, pg2:get_members(kvs)),
Writes2 = [{{Sender, Key}, {?KVS_WRITES, ts()}} | Writes],
{noreply, Store#kvs_store{pending_writes=Writes2}, ?KVS_POLL_PENDING}.

%%
%% Utility functions
%%

ts() ->
{Mega, Sec, _} = erlang:now(),
(Mega * 1000000) + Sec.

%% @doc filter expired writes from pending_writes.
filter_writes({{Client, _Key}, {_Count, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_WRITE_TIMEOUT ->
gen_server:reply(Client, {error, write_failed}),
false;
true ->
true
end.

%% @doc filter expired reads from pending_reads.
filter_reads({{Client, _Key}, {_Count, _Values, Ts}}) ->
Now = ts(),
if Now > Ts + ?KVS_READ_TIMEOUT ->
gen_server:reply(Client, {error, read_failed}),
false;
true ->
true
end.

0 comments on commit 35324db

Please sign in to comment.
You can’t perform that action at this time.