Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

updating structure of client to allow for better throughput

  • Loading branch information...
commit 4ae4b8e67911085f5189462c7fdeef63e09a5d02 1 parent c12f126
Jacob Vorreuter authored committed
View
528 src/mcerlang.erl
@@ -1,5 +1,4 @@
%% Copyright (c) 2009
-%% Nick Gerakines <nick@gerakines.net>
%% Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
@@ -26,387 +25,218 @@
%% http://code.google.com/p/memcached/wiki/MemcacheBinaryProtocol
%% @doc a binary protocol memcached client
-module(mcerlang).
--behaviour(gen_server).
-%% gen_server callbacks
--export([start_link/1, init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
+-export([start/0, start/1, start_link/0, start_link/1, init/2,
+ add_server/3, remove_server/2, add_connection/2, remove_connection/2]).
%% api callbacks
-export([get/1, get_many/1, add/2, add/3, set/2, set/3,
replace/2, replace/3, delete/1, increment/4, decrement/4,
append/2, prepend/2, stats/0, flush/0, flush/1, quit/0,
- version/0, continuum/0]).
-
--export([find_next_largest/2]).
+ version/0]).
-include("mcerlang.hrl").
--define(TIMEOUT, 3000).
-
--record(state, {continuum, sockets}).
+-define(TIMEOUT, 60000).
-%% @spec start_link(CacheServers) -> {ok, pid()}
-%% CacheServers = [{Host, Port, ConnectionPoolSize}]
-%% Host = string()
-%% Port = integer()
-%% ConnectionPoolSize = integer()
+%%--------------------------------------------------------------------
+%%% API
+%%--------------------------------------------------------------------
+start() -> start([{"localhost", 11211, 1}]).
+start(CacheServers) when is_list(CacheServers) ->
+ random:seed(now()),
+ case proc_lib:start(?MODULE, init, [self(), CacheServers], 5000) of
+ {ok, _Pid} -> ok;
+ Error -> Error
+ end.
+
+start_link() -> start_link([{"localhost", 11211, 1}]).
start_link(CacheServers) when is_list(CacheServers) ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, CacheServers, []).
-
-get(Key) ->
- gen_server:call(?MODULE, {get, Key}).
+ random:seed(now()),
+ proc_lib:start_link(?MODULE, init, [self(), CacheServers], 5000).
+
+add_server(Host, Port, PoolSize) ->
+ erlang:send(?MODULE, {add_server, Host, Port, PoolSize}),
+ ok.
+
+remove_server(Host, Port) ->
+ erlang:send(?MODULE, {remove_server, Host, Port}),
+ ok.
+
+add_connection(Host, Port) ->
+ erlang:send(?MODULE, {add_connection, Host, Port}),
+ ok.
+
+remove_connection(Host, Port) ->
+ erlang:send(?MODULE, {remove_connection, Host, Port}),
+ ok.
+
+get(Key0) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {get, Key}, ?TIMEOUT).
get_many(Keys) ->
- gen_server:call(?MODULE, {get_many, Keys}).
+ Self = self(),
+ Pids = [spawn(fun() ->
+ Res = (catch ?MODULE:get(Key)),
+ Self ! {self(), {Key, Res}}
+ end) || Key <- Keys],
+ lists:reverse(lists:foldl(
+ fun(Pid, Acc) ->
+ receive
+ {Pid, {Key, Res}} -> [{Key, Res}|Acc]
+ after ?TIMEOUT ->
+ Acc
+ end
+ end, [], Pids)).
add(Key, Value) ->
add(Key, Value, 0).
-add(Key, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
- gen_server:call(?MODULE, {add, Key, Value, Expiration}).
+add(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {add, Key, Value, Expiration}, ?TIMEOUT).
set(Key, Value) ->
set(Key, Value, 0).
-set(Key, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
- gen_server:call(?MODULE, {set, Key, Value, Expiration}).
+set(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {set, Key, Value, Expiration}, ?TIMEOUT).
replace(Key, Value) ->
replace(Key, Value, 0).
-replace(Key, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
- gen_server:call(?MODULE, {replace, Key, Value, Expiration}).
+replace(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {replace, Key, Value, Expiration}, ?TIMEOUT).
-delete(Key) ->
- gen_server:call(?MODULE, {delete, Key}).
+delete(Key0) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {delete, Key}, ?TIMEOUT).
-increment(Key, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
- gen_server:call(?MODULE, {increment, Key, Value, Initial, Expiration}).
+increment(Key0, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {increment, Key, Value, Initial, Expiration}, ?TIMEOUT).
-decrement(Key, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
- gen_server:call(?MODULE, {decrement, Key, Value, Initial, Expiration}).
+decrement(Key0, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {decrement, Key, Value, Initial, Expiration}, ?TIMEOUT).
-append(Key, Value) when is_binary(Value) ->
- gen_server:call(?MODULE, {append, Key, Value}).
+append(Key0, Value) when is_binary(Value) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {append, Key, Value}, ?TIMEOUT).
-prepend(Key, Value) when is_binary(Value) ->
- gen_server:call(?MODULE, {prepend, Key, Value}).
+prepend(Key0, Value) when is_binary(Value) ->
+ Key = package_key(Key0),
+ gen_server:call(map_key(Key), {prepend, Key, Value}, ?TIMEOUT).
stats() ->
- gen_server:call(?MODULE, stats).
-
+ multi_call(stats).
+
flush() ->
- gen_server:call(?MODULE, flush).
+ multi_call(flush).
flush(Expiration) when is_integer(Expiration) ->
- gen_server:call(?MODULE, {flush, Expiration}).
+ multi_call({flush, Expiration}).
quit() ->
- gen_server:call(?MODULE, quit).
+ [begin
+ {Key, [
+ {'EXIT',{shutdown,{gen_server,call,[Pid,quit,?TIMEOUT]}}} ==
+ (catch gen_server:call(Pid, quit, ?TIMEOUT)) || Pid <- Pids]}
+ end || {Key, Pids} <- unique_connections()].
version() ->
- gen_server:call(?MODULE, version).
-
-continuum() ->
- gen_server:call(?MODULE, continuum).
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
+ multi_call(version).
+multi_call(Msg) ->
+ [begin
+ Pid = lists:nth(random:uniform(length(Pids)), Pids),
+ {{Host, Port}, gen_server:call(Pid, Msg, ?TIMEOUT)}
+ end || {{Host, Port}, Pids} <- unique_connections()].
+
%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%% @hidden
-%%--------------------------------------------------------------------
-init(CacheServers) ->
- {A,B,C} = now(),
- random:seed(A,B,C),
+%%% Stateful loop
+%%--------------------------------------------------------------------
+init(Parent, CacheServers) ->
+ process_flag(trap_exit, true),
+ register(mcerlang, self()),
+ ets:new(mcerlang_continuum, [ordered_set, protected, named_table]),
+ ets:new(mcerlang_connections, [bag, protected, named_table]),
%% Continuum = [{uint(), {Host, Port}}]
- Continuum = lists:sort(dict:to_list(lists:foldl(
- fun({Host, Port, _}, Dict) ->
- lists:foldl(
- fun(_, Dict1) ->
- dict:store(hash_to_uint(Host, Port), {Host, Port}, Dict1)
- end, Dict, lists:seq(1,100))
- end, dict:new(), CacheServers))),
+ [add_server_to_continuum(Host, Port) || {Host, Port, _} <- CacheServers],
- %% Sockets = [{{Host,Port}, [socket()]}]
- Sockets = lists:foldl(
- fun({Host, Port, ConnPoolSize}, Acc1) ->
- SList = lists:foldl(
- fun(_, Acc2) ->
- case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of
- {ok, S} -> [S|Acc2];
- _ -> Acc2
- end
- end, [], lists:seq(1,ConnPoolSize)),
- case SList of
- [] ->
- Acc1;
- _ ->
- [{{Host, Port}, SList}|Acc1]
- end
- end, [], CacheServers),
+ %% Connections = [{{Host,Port}, ConnPid}]
+ [begin
+ [start_connection(Host, Port) || _ <- lists:seq(1, ConnPoolSize)]
+ end || {Host, Port, ConnPoolSize} <- CacheServers],
- {ok, #state{continuum=Continuum, sockets=Sockets}}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_call({get, Key0}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- #response{key=Key1, value=Value} = send_recv(Socket, #request{op_code=?OP_GetK, key=list_to_binary(Key)}),
- case binary_to_list(Key1) of
- Key -> {reply, Value, State};
- _ -> {reply, <<>>, State}
- end;
-
-handle_call({get_many, Keys}, _From, State) ->
- SocketDicts = lists:foldl(
- fun(Key, Dict) ->
- Socket = map_key(State, Key),
- send(Socket, #request{op_code=?OP_GetKQ, key=list_to_binary(Key)}),
- case dict:find(Socket, Dict) of
- {ok, Count} -> dict:store(Socket, Count+1, Dict);
- error -> dict:store(Socket, 1, Dict)
- end
- end, dict:new(), Keys),
- Resps = lists:flatten([begin
- send(Socket, #request{op_code=?OP_Noop}),
- [recv(Socket) || _ <- lists:seq(1,Count)]
- end || {Socket, Count} <- dict:to_list(SocketDicts)]),
- Reply = [begin
- case lists:keysearch(list_to_binary(Key), 8, Resps) of
- {value, Resp} -> {Key, Resp#response.value};
- false -> {Key, <<>>}
- end
- end || Key <- Keys],
- {reply, Reply, State};
-
-handle_call({add, Key0, Value, Expiration}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Add, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, State};
-
-handle_call({set, Key0, Value, Expiration}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Set, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, State};
-
-handle_call({replace, Key0, Value, Expiration}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Replace, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, State};
-
-handle_call({delete, Key0}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Delete, key=list_to_binary(Key)}),
- {reply, Resp#response.value, State};
-
-handle_call({increment, Key0, Value, Initial, Expiration}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Increment, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
- {reply, Resp, State};
+ proc_lib:init_ack(Parent, {ok, self()}),
-handle_call({decrement, Key0, Value, Initial, Expiration}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Decrement, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
- {reply, Resp, State};
-
-handle_call({append, Key0, Value}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Append, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, State};
-
-handle_call({prepend, Key0, Value}, _From, State) ->
- Key = package_key(Key0),
- Socket = map_key(State, Key),
- Resp = send_recv(Socket, #request{op_code=?OP_Prepend, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, State};
+ loop().
-handle_call(stats, _From, State) ->
- Sockets = [begin
- {{Host, Port}, begin
- send(Socket, #request{op_code=?OP_Stat}),
- Socket
- end}
- end || {{Host, Port}, [Socket|_]} <- State#state.sockets],
- Reply = [begin
- {HostPortKey, collect_stats_from_socket(Socket)}
- end || {HostPortKey, Socket} <- Sockets],
- {reply, Reply, State};
-
-handle_call(flush, _From, State) ->
- Reply = send_all(State, #request{op_code=?OP_Flush}),
- {reply, Reply, State};
-
-handle_call({flush, Expiration}, _From, State) ->
- Reply = send_all(State, #request{op_code=?OP_Flush, extras = <<Expiration:32>>}),
- {reply, Reply, State};
-
-handle_call(quit, _From, State) ->
- Reply = send_all(State, #request{op_code=?OP_Quit}),
- {reply, Reply, State};
-
-handle_call(version, _From, State) ->
- Reply = send_all(State, #request{op_code=?OP_Version}),
- {reply, Reply, State};
-
-handle_call(continuum, _From, State) ->
- {reply, State#state.continuum, State};
+loop() ->
+ receive
+ {add_server, Host, Port, ConnPoolSize} ->
+ add_server_to_continuum(Host, Port),
+ [start_connection(Host, Port) || _ <- lists:seq(1, ConnPoolSize)];
+ {remove_server, Host, Port} ->
+ [gen_server:call(Pid, quit, ?TIMEOUT) || [Pid] <- ets:match(mcerlang_connections, {{Host, Port}, '$1'})],
+ remove_server_from_continuum(Host, Port);
+ {add_connection, Host, Port} ->
+ start_connection(Host, Port);
+ {remove_connection, Host, Port} ->
+ [[Pid]|_] = ets:match(mcerlang_connections, {{Host, Port}, '$1'}),
+ (catch gen_server:call(Pid, quit, ?TIMEOUT));
+ {'EXIT', Pid, Err} ->
+ case ets:match(mcerlang_connections, {'$1', Pid}) of
+ [[{Host, Port}]] ->
+ ets:delete_object(mcerlang_connections, {{Host, Port}, Pid}),
+ case Err of
+ shutdown -> ok;
+ _ -> start_connection(Host, Port)
+ end;
+ _ ->
+ ok
+ end
+ end,
+ loop().
-handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_cast(_Message, State) -> {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%% @hidden
-%%--------------------------------------------------------------------
-handle_info(_Info, State) -> {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%% @hidden
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) -> ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%% @hidden
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+start_connection(Host, Port) ->
+ case mcerlang_conn:start_link([Host, Port]) of
+ {ok, Pid} -> ets:insert(mcerlang_connections, {{Host, Port}, Pid});
+ _ -> ok
+ end.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-send_all(State, Request) ->
- [begin
- {{Host, Port}, begin
- Resp = send_recv(Socket, Request),
- Resp#response.value
- end}
- end || {{Host, Port}, [Socket|_]} <- State#state.sockets].
-
-collect_stats_from_socket(Socket) ->
- collect_stats_from_socket(Socket, []).
-
-collect_stats_from_socket(Socket, Acc) ->
- case recv(Socket) of
- #response{body_size=0} ->
- Acc;
- #response{key=Key, value=Value} ->
- collect_stats_from_socket(Socket, [{binary_to_atom(Key, utf8), binary_to_list(Value)}|Acc])
- end.
-
-send_recv(Socket, Request) ->
- ok = send(Socket, Request),
- recv(Socket).
-
-send(Socket, Request) ->
- Bin = encode_request(Request),
- gen_tcp:send(Socket, Bin).
-
-recv(Socket) ->
- Resp1 = recv_header(Socket),
- Resp2 = recv_body(Socket, Resp1),
- Resp2.
-
-encode_request(Request) when is_record(Request, request) ->
- Magic = 16#80,
- Opcode = Request#request.op_code,
- KeySize = size(Request#request.key),
- Extras = Request#request.extras,
- ExtrasSize = size(Extras),
- DataType = Request#request.data_type,
- Reserved = Request#request.reserved,
- Body = <<Extras:ExtrasSize/binary, (Request#request.key)/binary, (Request#request.value)/binary>>,
- BodySize = size(Body),
- Opaque = Request#request.opaque,
- CAS = Request#request.cas,
- <<Magic:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Reserved:16, BodySize:32, Opaque:32, CAS:64, Body:BodySize/binary>>.
-
-recv_header(Socket) ->
- decode_response_header(recv_bytes(Socket, 24)).
-
-recv_body(Socket, #response{key_size = KeySize, extras_size = ExtrasSize, body_size = BodySize}=Resp) ->
- decode_response_body(recv_bytes(Socket, BodySize), ExtrasSize, KeySize, Resp).
-
-decode_response_header(<<16#81:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Status:16, BodySize:32, Opaque:32, CAS:64>>) ->
- #response{
- op_code = Opcode,
- data_type = DataType,
- status = Status,
- opaque = Opaque,
- cas = CAS,
- key_size = KeySize,
- extras_size = ExtrasSize,
- body_size = BodySize
- }.
-
-decode_response_body(Bin, ExtrasSize, KeySize, Resp) ->
- <<Extras:ExtrasSize/binary, Key:KeySize/binary, Value/binary>> = Bin,
- Resp#response{
- extras = Extras,
- key = Key,
- value = Value
- }.
-
-recv_bytes(_, 0) -> <<>>;
-recv_bytes(Socket, NumBytes) ->
- case gen_tcp:recv(Socket, NumBytes) of
- {ok, Bin} -> Bin;
- {error, closed} -> exit({error, memcached_connection_closed})
- end.
+add_server_to_continuum(Host, Port) ->
+ ets:insert(mcerlang_continuum, {hash_to_uint(Host, Port), {Host, Port}}).
+remove_server_from_continuum(Host, Port) ->
+ ets:delete(mcerlang_continuum, hash_to_uint(Host, Port)).
+
package_key(Key) when is_atom(Key) ->
atom_to_list(Key);
-
+
package_key(Key) when is_list(Key) ->
Key;
-
+
package_key(Key) when is_binary(Key) ->
binary_to_list(Key);
-
+
package_key(Key) ->
lists:flatten(io_lib:format("~p", [Key])).
-
+
+unique_connections() ->
+ dict:to_list(lists:foldl(
+ fun({Key, Val}, Dict) ->
+ dict:append_list(Key, [Val], Dict)
+ end, dict:new(), ets:tab2list(mcerlang_connections))).
+
%% Consistent hashing functions
%%
%% First, hash memcached servers to unsigned integers on a continuum. To
@@ -420,33 +250,35 @@ hash_to_uint(Host, Port) when is_list(Host), is_integer(Port) ->
hash_to_uint(Key) when is_list(Key) ->
<<Int:128/unsigned-integer>> = erlang:md5(Key), Int.
-map_key(#state{continuum=Continuum, sockets=Sockets}, Key) when is_list(Key) ->
- {Host, Port} = find_next_largest(hash_to_uint(Key), Continuum),
- Pool = proplists:get_value({Host, Port}, Sockets),
- lists:nth(random:uniform(length(Pool)), Pool).
-
-find_next_largest(Int, Continuum) ->
- {A,B} = lists:split(length(Continuum) div 2, Continuum),
- case find_next_largest(Int, A, B) of
- undefined ->
- [{_, Val}|_] = Continuum,
- Val;
- Val -> Val
+%% @spec map_key(Key) -> Conn
+%% Key = string()
+%% Conn = pid()
+map_key(Key) when is_list(Key) ->
+ First = ets:first(mcerlang_continuum),
+ {Host, Port} =
+ case find_next_largest(hash_to_uint(Key), First) of
+ undefined ->
+ case First of
+ '$end_of_table' -> exit(mcerlang_continuum_empty);
+ _ ->
+ [{_, Value}] = ets:lookup(mcerlang_continuum, First),
+ Value
+ end;
+ Value -> Value
+ end,
+ case ets:lookup(mcerlang_connections, {Host, Port}) of
+ [] -> exit({error, {connection_not_found, {Host, Port}}});
+ Pids ->
+ {_, Pid} = lists:nth(random:uniform(length(Pids)), Pids),
+ Pid
end.
-find_next_largest(Int, [], [{Pivot, _}|_]) when Int >= Pivot -> undefined;
-
-find_next_largest(Int, [], [{Pivot, Val}|_]) when Int < Pivot -> Val;
+find_next_largest(_, '$end_of_table') ->
+ undefined;
-find_next_largest(Int, Front, [{Pivot, Val} | _]) when Int < Pivot ->
- {Last, _} = lists:last(Front),
- case Int >= Last of
- true -> Val;
- false ->
- {A, B} = lists:split(length(Front) div 2, Front),
- find_next_largest(Int, A, B)
- end;
-
-find_next_largest(Int, _, [{Pivot,_} | _]=Back) when Int >= Pivot ->
- {A, B} = lists:split(length(Back) div 2, Back),
- find_next_largest(Int, A, B).
+find_next_largest(Int, Key) when Key > Int ->
+ [{_, Val}] = ets:lookup(mcerlang_continuum, Key),
+ Val;
+
+find_next_largest(Int, Key) ->
+ find_next_largest(Int, ets:next(mcerlang_continuum, Key)).
View
244 src/mcerlang_conn.erl
@@ -0,0 +1,244 @@
+%% Copyright (c) 2009
+%% Jacob Vorreuter <jacob.vorreuter@gmail.com>
+%%
+%% Permission is hereby granted, free of charge, to any person
+%% obtaining a copy of this software and associated documentation
+%% files (the "Software"), to deal in the Software without
+%% restriction, including without limitation the rights to use,
+%% copy, modify, merge, publish, distribute, sublicense, and/or sell
+%% copies of the Software, and to permit persons to whom the
+%% Software is furnished to do so, subject to the following
+%% conditions:
+%%
+%% The above copyright notice and this permission notice shall be
+%% included in all copies or substantial portions of the Software.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+%% OTHER DEALINGS IN THE SOFTWARE.
+%%
+%% http://code.google.com/p/memcached/wiki/MemcacheBinaryProtocol
+%% @doc a binary protocol memcached client
+-module(mcerlang_conn).
+-behaviour(gen_server).
+
+-include("mcerlang.hrl").
+
+%% gen_server callbacks
+-export([start_link/1, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3]).
+
+%% API functions
+start_link([Host, Port]) ->
+ gen_server:start_link(?MODULE, [Host, Port], []).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%% @hidden
+%%--------------------------------------------------------------------
+init([Host, Port]) ->
+ case gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}]) of
+ {ok, Socket} ->
+ {ok, Socket};
+ Error ->
+ exit(Error)
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%% @hidden
+%%--------------------------------------------------------------------
+handle_call({get, Key}, _From, Socket) ->
+ #response{key=Key1, value=Value} = send_recv(Socket, #request{op_code=?OP_GetK, key=list_to_binary(Key)}),
+ case binary_to_list(Key1) of
+ Key -> {reply, Value, Socket};
+ _ -> {reply, <<>>, Socket}
+ end;
+
+handle_call({add, Key, Value, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Add, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({set, Key, Value, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Set, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({replace, Key, Value, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Replace, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({delete, Key}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Delete, key=list_to_binary(Key)}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({increment, Key, Value, Initial, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Increment, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
+ {reply, Resp, Socket};
+
+handle_call({decrement, Key, Value, Initial, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Decrement, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
+ {reply, Resp, Socket};
+
+handle_call({append, Key, Value}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Append, key=list_to_binary(Key), value=Value}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({prepend, Key, Value}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Prepend, key=list_to_binary(Key), value=Value}),
+ {reply, Resp#response.value, Socket};
+
+handle_call(stats, _From, Socket) ->
+ send(Socket, #request{op_code=?OP_Stat}),
+ Reply = collect_stats_from_socket(Socket),
+ {reply, Reply, Socket};
+
+handle_call(flush, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Flush}),
+ {reply, Resp#response.value, Socket};
+
+handle_call({flush, Expiration}, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Flush, extras = <<Expiration:32>>}),
+ {reply, Resp#response.value, Socket};
+
+handle_call(quit, _From, Socket) ->
+ send_recv(Socket, #request{op_code=?OP_Quit}),
+ gen_tcp:close(Socket),
+ {stop, shutdown, undefined};
+
+handle_call(version, _From, Socket) ->
+ Resp = send_recv(Socket, #request{op_code=?OP_Version}),
+ {reply, Resp#response.value, Socket};
+
+handle_call(_, _From, Socket) -> {reply, {error, invalid_call}, Socket}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%% @hidden
+%%--------------------------------------------------------------------
+handle_cast(_Message, State) -> {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%% @hidden
+%%--------------------------------------------------------------------
+handle_info(_Info, State) -> {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%% @hidden
+%%--------------------------------------------------------------------
+terminate(_Reason, Socket) ->
+ case is_port(Socket) of
+ true -> gen_tcp:close(Socket);
+ false -> ok
+ end, ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%% @hidden
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+collect_stats_from_socket(Socket) ->
+ collect_stats_from_socket(Socket, []).
+
+collect_stats_from_socket(Socket, Acc) ->
+ case recv(Socket) of
+ #response{body_size=0} ->
+ Acc;
+ #response{key=Key, value=Value} ->
+ collect_stats_from_socket(Socket, [{binary_to_atom(Key, utf8), binary_to_list(Value)}|Acc])
+ end.
+
+send_recv(Socket, Request) ->
+ ok = send(Socket, Request),
+ recv(Socket).
+
+send(Socket, Request) ->
+ Bin = encode_request(Request),
+ gen_tcp:send(Socket, Bin).
+
+recv(Socket) ->
+ Resp1 = recv_header(Socket),
+ Resp2 = recv_body(Socket, Resp1),
+ Resp2.
+
+encode_request(Request) when is_record(Request, request) ->
+ Magic = 16#80,
+ Opcode = Request#request.op_code,
+ KeySize = size(Request#request.key),
+ Extras = Request#request.extras,
+ ExtrasSize = size(Extras),
+ DataType = Request#request.data_type,
+ Reserved = Request#request.reserved,
+ Body = <<Extras:ExtrasSize/binary, (Request#request.key)/binary, (Request#request.value)/binary>>,
+ BodySize = size(Body),
+ Opaque = Request#request.opaque,
+ CAS = Request#request.cas,
+ <<Magic:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Reserved:16, BodySize:32, Opaque:32, CAS:64, Body:BodySize/binary>>.
+
+recv_header(Socket) ->
+ decode_response_header(recv_bytes(Socket, 24)).
+
+recv_body(Socket, #response{key_size = KeySize, extras_size = ExtrasSize, body_size = BodySize}=Resp) ->
+ decode_response_body(recv_bytes(Socket, BodySize), ExtrasSize, KeySize, Resp).
+
+decode_response_header(<<16#81:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Status:16, BodySize:32, Opaque:32, CAS:64>>) ->
+ #response{
+ op_code = Opcode,
+ data_type = DataType,
+ status = Status,
+ opaque = Opaque,
+ cas = CAS,
+ key_size = KeySize,
+ extras_size = ExtrasSize,
+ body_size = BodySize
+ }.
+
+decode_response_body(Bin, ExtrasSize, KeySize, Resp) ->
+ <<Extras:ExtrasSize/binary, Key:KeySize/binary, Value/binary>> = Bin,
+ Resp#response{
+ extras = Extras,
+ key = Key,
+ value = Value
+ }.
+
+recv_bytes(_, 0) -> <<>>;
+recv_bytes(Socket, NumBytes) ->
+ case gen_tcp:recv(Socket, NumBytes) of
+ {ok, Bin} -> Bin;
+ {error, closed} -> exit({error, memcached_connection_closed})
+ end.
View
61 t/mcerlang_t_001.t
@@ -4,28 +4,45 @@
main(_) ->
etap:plan(unknown),
- case (catch start()) of
- {'EXIT', Err} ->
- io:format("Err ~p~n", [Err]),
- etap:bail();
- _ ->
- etap:end_tests()
- end,
- ok.
-
-start() ->
- {ok, Socket} = gen_tcp:connect("localhost", 11211, [binary, {packet, 0}, {active, false}]),
- gen_tcp:send(Socket, <<128,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
- etap:ok((fun({ok, _}) -> true; (_) -> false end)(gen_tcp:recv(Socket, 0, 2000)), "noop"),
-
- etap:is(mcerlang:find_next_largest(4, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), c, "find next largest"),
- etap:is(mcerlang:find_next_largest(1, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), b, "find next largest"),
- etap:is(mcerlang:find_next_largest(0, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), a, "find next largest"),
- etap:is(mcerlang:find_next_largest(13, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), e, "find next largest"),
- etap:is(mcerlang:find_next_largest(14, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), a, "find next largest"),
- etap:is(mcerlang:find_next_largest(15, [{1,a}, {2,b}, {5,c}, {7,d}, {14,e}]), a, "find next largest"),
+ (fun() ->
+ {ok, Socket} = gen_tcp:connect("localhost", 11211, [binary, {packet, 0}, {active, false}]),
+
+ gen_tcp:send(Socket, <<128,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ etap:ok((fun({ok, _}) -> true; (_) -> false end)(gen_tcp:recv(Socket, 0, 2000)), "noop"),
- gen_tcp:send(Socket, <<128,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ gen_tcp:send(Socket, <<128,7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+
+ ok
+ end)(),
+
+ (fun() ->
+ etap:is(mcerlang:start(), ok, "mcerlang connect to default memcached server ok"),
+
+ etap:is(mcerlang:set("Hello", <<"World">>), <<>>, "set ok"),
+ etap:is(mcerlang:add("Hello", <<"Fail">>), <<"Data exists for key.">>, "add ok"),
+ etap:is(mcerlang:get("Hello"), <<"World">>, "get ok"),
+ etap:is(mcerlang:delete("Hello"), <<>>, "delete ok"),
+ etap:is(mcerlang:add("Hello", <<"World2">>), <<>>, "add ok"),
+ etap:is(mcerlang:get("Hello"), <<"World2">>, "get ok"),
+ etap:is(mcerlang:append("Hello", <<"!!!">>), <<>>, "append ok"),
+ etap:is(mcerlang:get("Hello"), <<"World2!!!">>, "get ok"),
+ etap:is(mcerlang:prepend("Hello", <<"$$$">>), <<>>, "prepend ok"),
+ etap:is(mcerlang:get("Hello"), <<"$$$World2!!!">>, "get ok"),
+ etap:is(mcerlang:delete("Hello"), <<>>, "delete ok"),
+ etap:is(mcerlang:get("Hello"), <<>>, "get ok"),
+
+ mcerlang:set("One", <<"A">>),
+ mcerlang:set("Two", <<"B">>),
+ mcerlang:set("Three", <<"C">>),
+
+ etap:is(mcerlang:get_many(["One", "Two", "Two-and-a-half", "Three"]), [{"One",<<"A">>},{"Two",<<"B">>},{"Two-and-a-half",<<>>},{"Three",<<"C">>}], "get_many ok"),
+
+ etap:is(mcerlang:flush(0), [{{"localhost",11211},<<>>}], "flush ok"),
+
+ etap:is(mcerlang:quit(), [{{"localhost",11211},[true]}], "quit ok"),
+
+ ok
+ end)(),
- ok.
+ etap:end_tests().
View
44 t/mcerlang_t_002.t
@@ -1,44 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-%%! -pa ./ebin -sasl errlog_type error -boot start_sasl -noshell
-
-main(_) ->
- etap:plan(unknown),
- case (catch start()) of
- {'EXIT', Err} ->
- io:format("Err ~p~n", [Err]),
- etap:bail();
- _ ->
- etap:end_tests()
- end,
- ok.
-
-start() ->
- {ok, _} = mcerlang:start_link([{"127.0.0.1", 11211, 1}]),
-
- etap:is(mcerlang:set("Hello", <<"World">>), <<>>, "set ok"),
- etap:is(mcerlang:add("Hello", <<"Fail">>), <<"Data exists for key.">>, "add ok"),
- etap:is(mcerlang:get("Hello"), <<"World">>, "get ok"),
- etap:is(mcerlang:delete("Hello"), <<>>, "delete ok"),
- etap:is(mcerlang:add("Hello", <<"World2">>), <<>>, "add ok"),
- etap:is(mcerlang:get("Hello"), <<"World2">>, "get ok"),
- etap:is(mcerlang:append("Hello", <<"!!!">>), <<>>, "append ok"),
- etap:is(mcerlang:get("Hello"), <<"World2!!!">>, "get ok"),
- etap:is(mcerlang:prepend("Hello", <<"$$$">>), <<>>, "prepend ok"),
- etap:is(mcerlang:get("Hello"), <<"$$$World2!!!">>, "get ok"),
- etap:is(mcerlang:delete("Hello"), <<>>, "delete ok"),
- etap:is(mcerlang:get("Hello"), <<>>, "get ok"),
-
- mcerlang:set("One", <<"A">>),
- mcerlang:set("Two", <<"B">>),
- mcerlang:set("Three", <<"C">>),
-
- io:format("stats ~p~n", [mcerlang:stats()]),
-
- etap:is(mcerlang:get_many(["One", "Two", "Two-and-a-half", "Three"]), [{"One",<<"A">>},{"Two",<<"B">>},{"Two-and-a-half",<<>>},{"Three",<<"C">>}], "get_many ok"),
-
- etap:is(mcerlang:flush(0), [{{"127.0.0.1",11211},<<>>}], "flush ok"),
-
- etap:is(mcerlang:quit(), [{{"127.0.0.1",11211},<<>>}], "quit ok"),
-
- ok.
Please sign in to comment.
Something went wrong with that request. Please try again.