Permalink
Browse files

Initial reimplementation of Erlang hash_ring module as a gen_server

  • Loading branch information...
1 parent dc60050 commit ae180011a6a94e97ad02e3819cb78b65afb1876d @yrashk committed Aug 1, 2011
Showing with 122 additions and 128 deletions.
  1. +122 −128 src/hash_ring.erl
View
@@ -1,200 +1,194 @@
-module(hash_ring).
+-behaviour(gen_server).
+%% gen_server
-export([
- start/0,
- start/2,
- stop/0,
- init/1
+ start_link/0,
+ start_link/2,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
]).
+%% API
-export([
- create_ring/2,
- delete_ring/1,
- add_node/2,
- remove_node/2,
- find_node/2
+ create_ring/2,
+ delete_ring/1,
+ add_node/2,
+ remove_node/2,
+ find_node/2,
+ stop/0
]).
+-define(SERVER, ?MODULE).
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Public API
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Starts the hash_ring driver
-start() ->
+start_link() ->
case code:priv_dir(hash_ring) of
{error, bad_name} ->
Path = filename:join([filename:dirname(code:which(hash_ring)),"..","priv"]);
Path ->
ok
end,
- start(Path, "hash_ring_drv").
+ start_link(Path, "hash_ring_drv").
%%
%% @doc Starts the hash_ring driver.
%% This function should be called with the Path to the shared library and the name
%% of the shared library. SharedLib should be <strong>hash_ring_drv</strong>.
%%
-start(Path, SharedLib) ->
+start_link(Path, SharedLib) ->
case erl_ddll:load_driver(Path, SharedLib) of
ok ->
- Pid = spawn(?MODULE, init, ["hash_ring_drv"]),
- register(hash_ring, Pid),
- ok;
+ gen_server:start_link({local, ?SERVER}, ?MODULE, "hash_ring_drv", []);
{error, already_loaded} -> {error, already_loaded};
Other -> exit({error, Other})
end.
stop() ->
- hash_ring ! stop,
- unregister(hash_ring).
+ (catch gen_server:call(?SERVER, stop)).
create_ring(Ring, NumReplicas) ->
- hash_ring ! {create_ring, self(), {Ring, NumReplicas}},
- receive
- ok -> ok
- end.
+ gen_server:call(?SERVER, {create_ring, {Ring, NumReplicas}}).
delete_ring(Ring) ->
- hash_ring ! {delete_ring, self(), Ring},
- receive
- ok -> ok;
- {error, Reason} -> {error, Reason}
- end.
+ gen_server:call(?SERVER, {delete_ring, Ring}).
add_node(Ring, Node) when is_list(Node) ->
add_node(Ring, list_to_binary(Node));
add_node(Ring, Node) when is_binary(Node) ->
- hash_ring ! {add_node, self(), {Ring, Node}},
- receive
- ok -> ok;
- {error, Error} -> {error, Error}
- end.
+ gen_server:call(?SERVER, {add_node, {Ring, Node}}).
remove_node(Ring, Node) when is_list(Node) ->
remove_node(Ring, list_to_binary(Node));
remove_node(Ring, Node) when is_binary(Node) ->
- hash_ring ! {remove_node, self(), {Ring, Node}},
- receive
- ok -> ok;
- {error, Error} -> {error, Error}
- end.
+ gen_server:call(?SERVER, {remove_node, {Ring, Node}}).
find_node(Ring, Key) when is_list(Key) ->
find_node(Ring, list_to_binary(Key));
find_node(Ring, Key) when is_binary(Key) ->
- hash_ring ! {find_node, self(), {Ring, Key}},
- receive
- {ok, Node} -> {ok, Node};
- {error, Reason} -> {error, Reason}
- end.
+ gen_server:call(?SERVER, {find_node, {Ring, Key}}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Internal functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(state, {
-port,rings
+ port,
+ rings
}).
init(Drv) ->
Port = open_port({spawn, Drv}, [binary]),
- loop(#state{port=Port,rings = dict:new()}).
-
+ {ok, #state{
+ port = Port,
+ rings = dict:new() }}.
-loop(#state{port=Port,rings=Rings} = State) ->
- receive
- {create_ring, Pid, {Ring, NumReplicas}} ->
- Port ! {self(), {command, <<1:8, NumReplicas:32>>}},
- NewState = receive
- {Port, {data, <<Index:32>>}} ->
- Pid ! ok,
- State#state{rings = dict:store(Ring, Index, Rings)}
- end,
- loop(NewState);
- {delete_ring, Pid, Ring} ->
- NewState = case dict:find(Ring, Rings) of
- {ok, Index} ->
- Port ! {self(), {command, <<2:8, Index:32>>}},
- receive
- {Port, {data, <<0:8>>}} ->
- Pid ! ok,
- State#state{rings = dict:erase(Ring, Rings)};
- {Port, {data, <<1:8>>}} ->
- Pid ! {error, ring_not_found},
- State
- end;
- _ ->
- Pid ! {error, ring_not_found},
- State
- end,
- loop(NewState);
- {add_node, Pid, {Ring, Node}} when is_binary(Node) ->
- case dict:find(Ring, Rings) of
- {ok, Index} ->
- NodeSize = size(Node),
- Port ! {self(), {command, <<3:8, Index:32, NodeSize:32, Node/binary>>}},
- receive
- {Port, {data, <<0:8>>}} ->
- Pid ! ok;
- {Port, {data, <<1:8>>}} ->
- Pid ! {error, unknown_error}
- end;
- _ ->
- Pid ! {error, ring_not_found}
- end,
- loop(State);
- {remove_node, Pid, {Ring, Node}} when is_binary(Node) ->
- case dict:find(Ring, Rings) of
- {ok, Index} ->
- NodeSize = size(Node),
- Port ! {self(), {command, <<4:8, Index:32, NodeSize:32, Node/binary>>}},
- receive
- {Port, {data, <<0:8>>}} ->
- Pid ! ok;
- {Port, {data, <<1:8>>}} ->
- Pid ! {error, unknown_error}
- end;
- _ ->
- Pid ! {error, ring_not_found}
- end,
- loop(State);
- {find_node, Pid, {Ring, Key}} when is_binary(Key) ->
- case dict:find(Ring, Rings) of
- {ok, Index} ->
- KeySize = size(Key),
- Port ! {self(), {command, <<5:8, Index:32, KeySize:32, Key/binary>>}},
- receive
- {Port, {data, <<3:8>>}} ->
- Pid ! {error, invalid_ring};
- {Port, {data, <<2:8>>}} ->
- Pid ! {error, node_not_found};
- {Port, {data, <<1:8>>}} ->
- Pid ! {error, unknown_error};
- {Port, {data, <<Node/binary>>}} ->
- Pid ! {ok, Node}
- end;
- _ ->
- Pid ! {error, ring_not_found}
- end,
- loop(State);
- stop ->
- Port ! {self(), close},
+handle_call({create_ring, {Ring, NumReplicas}}, _From, #state{ port = Port, rings = Rings } = State) ->
+ Port ! {self(), {command, <<1:8, NumReplicas:32>>}},
+ receive
+ {Port, {data, <<Index:32>>}} ->
+ {reply, ok,
+ State#state{rings = dict:store(Ring, Index, Rings)}}
+ end;
+
+handle_call({delete_ring, Ring}, _From, #state{ port = Port, rings = Rings } = State) ->
+ case dict:find(Ring, Rings) of
+ {ok, Index} ->
+ Port ! {self(), {command, <<2:8, Index:32>>}},
receive
- {Port, closed} ->
- exit(normal)
+ {Port, {data, <<0:8>>}} ->
+ {reply, ok, State#state{rings = dict:erase(Ring, Rings)}};
+ {Port, {data, <<1:8>>}} ->
+ {reply, {error, ring_not_found}, State}
end;
- {'EXIT', Port, Reason} ->
- error_logger:error_msg("hash_ring port exited: ~p~n", [Reason]);
_ ->
- loop(State)
+ {reply, {error, ring_not_found}, State}
+ end;
+
+handle_call({add_node, {Ring, Node}}, _From, #state{ port = Port, rings = Rings } = State) ->
+ case dict:find(Ring, Rings) of
+ {ok, Index} ->
+ NodeSize = size(Node),
+ Port ! {self(), {command, <<3:8, Index:32, NodeSize:32, Node/binary>>}},
+ receive
+ {Port, {data, <<0:8>>}} ->
+ {reply, ok, State};
+ {Port, {data, <<1:8>>}} ->
+ {reply, {error, unknown_error}, State}
+ end;
+ _ ->
+ {reply, {error, ring_not_found}, State}
+ end;
+
+handle_call({remove_node, {Ring, Node}}, _From, #state{ port = Port, rings = Rings } = State) ->
+ case dict:find(Ring, Rings) of
+ {ok, Index} ->
+ NodeSize = size(Node),
+ Port ! {self(), {command, <<4:8, Index:32, NodeSize:32, Node/binary>>}},
+ receive
+ {Port, {data, <<0:8>>}} ->
+ {reply, ok, State};
+ {Port, {data, <<1:8>>}} ->
+ {reply, {error, unknown_error}, State}
+ end;
+ _ ->
+ {reply, {error, ring_not_found}, State}
+ end;
+
+handle_call({find_node, {Ring, Key}}, _From, #state{ port = Port, rings = Rings } = State) ->
+ case dict:find(Ring, Rings) of
+ {ok, Index} ->
+ KeySize = size(Key),
+ Port ! {self(), {command, <<5:8, Index:32, KeySize:32, Key/binary>>}},
+ receive
+ {Port, {data, <<3:8>>}} ->
+ {reply, {error, invalid_ring}, State};
+ {Port, {data, <<2:8>>}} ->
+ {reply, {error, node_not_found}, State};
+ {Port, {data, <<1:8>>}} ->
+ {reply, {error, unknown_error}, State};
+ {Port, {data, <<Node/binary>>}} ->
+ {reply, {ok, Node}, State}
+ end;
+ _ ->
+ {reply, {error, ring_not_found}, State}
+ end;
+
+handle_call(stop, _From, State) ->
+ {stop, normal, State}.
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+handle_info({'EXIT', Port, _Reason} = PortExit, #state{ port = Port } = State ) ->
+ {stop, PortExit, State}.
+
+terminate({'EXIT', Port, _Reason}, #state{ port = Port }) ->
+ ok;
+terminate(_, #state{ port = Port }) ->
+ Port ! {self(), close},
+ receive
+ {Port, closed} ->
+ ok
end.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Tests
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -208,7 +202,7 @@ setup_driver() ->
undefined -> ok;
_ -> stop()
end,
- hash_ring:start().
+ hash_ring:start_link().
driver_starts_test() ->
setup_driver(),

0 comments on commit ae18001

Please sign in to comment.