Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Initial commit

  • Loading branch information...
commit c68a8d824e1c08cb0f5d2b7221638fbb5f3e13df 0 parents
will authored

Showing 2 changed files with 88 additions and 0 deletions. Show diff stats Hide diff stats

  1. +2 0  .gitignore
  2. +86 0 kvs.erl
2  .gitignore
... ... @@ -0,0 +1,2 @@
  1 +*~
  2 +*.beam
86 kvs.erl
... ... @@ -0,0 +1,86 @@
  1 +-module(kvs).
  2 +-export([get/1, get/2, set/2, set/3, start/1, stop/0, store/1]).
  3 +-define(TIMEOUT, 500).
  4 +
  5 +%% @doc create N nodes in distributed key-value store
  6 +%% @spec start(integer()) -> started
  7 +start(N) ->
  8 + pg2:create(kvs),
  9 + lists:foreach(fun(_) ->
  10 + pg2:join(kvs, spawn(kvs, store, [[]]))
  11 + end, lists:seq(0, N)),
  12 + started.
  13 +
  14 +%% @doc stop all pids in KVS process group
  15 +%% stop() -> stopped.
  16 +stop() ->
  17 + lists:foreach(fun(Pid) ->
  18 + pg2:leave(kvs, Pid),
  19 + Pid ! stop
  20 + end, pg2:get_members(kvs)),
  21 + stopped.
  22 +
  23 +%% @doc retrieve value for key
  24 +%% @spec get(term()) -> value() | undefined
  25 +%% value = term()
  26 +get(Key) -> get(Key, ?TIMEOUT).
  27 +
  28 +%% @doc retrieve value for key, with timeout
  29 +%% @spec get(term(), integer()) -> val() | timeout()
  30 +%% val = {ok, term()} | {ok, undefined}
  31 +%% timeout = {error, timeout}
  32 +get(Key, Timeout) ->
  33 + Pid = pg2:get_closest_pid(kvs),
  34 + Pid ! {self(), get, Key},
  35 + receive
  36 + {Pid, got, Value} ->
  37 + {ok, Value}
  38 + after
  39 + Timeout ->
  40 + {error, timeout}
  41 + end.
  42 +
  43 +%% @doc update value for key
  44 +%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
  45 +set(Key, Val) -> set(Key, Val, ?TIMEOUT).
  46 +
  47 +%% @doc update value for key, with timeout
  48 +%% @spec set(term(), term()) -> {ok, updated} | {error, timeout}
  49 +set(Key, Val, Timeout) ->
  50 + Pid = pg2:get_closest_pid(kvs),
  51 + Pid ! {self(), set, Key, Val},
  52 + receive
  53 + {Pid, received, {set, Key, Val}} ->
  54 + {ok, updated}
  55 + after
  56 + Timeout ->
  57 + {error, timeout}
  58 + end.
  59 +
  60 +%% @doc implementation of distributed key-value store
  61 +%% @spec store(proplist()) -> term()
  62 +%% proplist = [{term(), term()}]
  63 +store(Data) ->
  64 + receive
  65 + {Sender, get, Key} ->
  66 + % client interface for retrieving values
  67 + Sender ! {self(), got, proplists:get_value(Key, Data)},
  68 + store(Data);
  69 + {Sender, set, Key, Value} ->
  70 + % client interface for updating values
  71 + lists:foreach(fun(Pid) ->
  72 + Pid ! {self(), update, Key, Value}
  73 + end, pg2:get_members(kvs)),
  74 + Sender ! {self(), received, {set, Key, Value}},
  75 + store(Data);
  76 + {Sender, update, Key, Value} ->
  77 + % sent to all nodes by first receiving node
  78 + Sender ! {self(), updated, Key, Value},
  79 + store([{Key, Value} | proplists:delete(Key, Data)]);
  80 + {_Sender, updated, _Key, _Value} ->
  81 + store(Data);
  82 + stop ->
  83 + ok
  84 + end.
  85 +
  86 +

0 comments on commit c68a8d8

Please sign in to comment.
Something went wrong with that request. Please try again.