Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

updating error handling

  • Loading branch information...
commit 26bd829272e89166497bc8ac3386ad6d002218f3 1 parent 8dbb958
Jacob Vorreuter authored
Showing with 107 additions and 43 deletions.
  1. +15 −10 src/erlmc.erl
  2. +92 −33 src/erlmc_conn.erl
View
25 src/erlmc.erl
@@ -87,7 +87,7 @@ remove_connection(Host, Port) ->
get(Key0) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {get, Key}, ?TIMEOUT).
+ call(map_key(Key), {get, Key}, ?TIMEOUT).
get_many(Keys) ->
Self = self(),
@@ -109,41 +109,41 @@ add(Key, Value) ->
add(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {add, Key, Value, Expiration}, ?TIMEOUT).
+ call(map_key(Key), {add, Key, Value, Expiration}, ?TIMEOUT).
set(Key, Value) ->
set(Key, Value, 0).
set(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {set, Key, Value, Expiration}, ?TIMEOUT).
+ call(map_key(Key), {set, Key, Value, Expiration}, ?TIMEOUT).
replace(Key, Value) ->
replace(Key, Value, 0).
replace(Key0, Value, Expiration) when is_binary(Value), is_integer(Expiration) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {replace, Key, Value, Expiration}, ?TIMEOUT).
+ call(map_key(Key), {replace, Key, Value, Expiration}, ?TIMEOUT).
delete(Key0) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {delete, Key}, ?TIMEOUT).
+ call(map_key(Key), {delete, Key}, ?TIMEOUT).
increment(Key0, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {increment, Key, Value, Initial, Expiration}, ?TIMEOUT).
+ call(map_key(Key), {increment, Key, Value, Initial, Expiration}, ?TIMEOUT).
decrement(Key0, Value, Initial, Expiration) when is_binary(Value), is_binary(Initial), is_integer(Expiration) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {decrement, Key, Value, Initial, Expiration}, ?TIMEOUT).
+ call(map_key(Key), {decrement, Key, Value, Initial, Expiration}, ?TIMEOUT).
append(Key0, Value) when is_binary(Value) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {append, Key, Value}, ?TIMEOUT).
+ call(map_key(Key), {append, Key, Value}, ?TIMEOUT).
prepend(Key0, Value) when is_binary(Value) ->
Key = package_key(Key0),
- gen_server:call(map_key(Key), {prepend, Key, Value}, ?TIMEOUT).
+ call(map_key(Key), {prepend, Key, Value}, ?TIMEOUT).
stats() ->
multi_call(stats).
@@ -177,6 +177,11 @@ host_port_call(Host, Port, Msg) ->
Pid = unique_connection(Host, Port),
gen_server:call(Pid, Msg, ?TIMEOUT).
+call(Pid, Msg, Timeout) ->
+ case gen_server:call(Pid, Msg, Timeout) of
+ {error, Error} -> exit({erlmc, Error});
+ Resp -> Resp
+ end.
%%--------------------------------------------------------------------
%%% Stateful loop
@@ -291,7 +296,7 @@ unique_connections() ->
unique_connection(Host, Port) ->
case ets:lookup(erlmc_connections, {Host, Port}) of
- [] -> exit({error, {connection_not_found, {Host, Port}}});
+ [] -> exit({erlmc, {connection_not_found, {Host, Port}}});
Pids ->
{_, Pid} = lists:nth(random:uniform(length(Pids)), Pids),
Pid
View
125 src/erlmc_conn.erl
@@ -68,56 +68,104 @@ init([Host, Port]) ->
%% @hidden
%%--------------------------------------------------------------------
handle_call({get, Key}, _From, Socket) ->
- #response{key=Key1, value=Value} = send_recv(Socket, #request{op_code=?OP_GetK, key=list_to_binary(Key)}),
- case binary_to_list(Key1) of
- Key -> {reply, Value, Socket};
- _ -> {reply, <<>>, Socket}
- end;
+ case send_recv(Socket, #request{op_code=?OP_GetK, key=list_to_binary(Key)}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ #response{key=Key1, value=Value} ->
+ case binary_to_list(Key1) of
+ Key -> {reply, Value, Socket};
+ _ -> {reply, <<>>, Socket}
+ end
+ end;
handle_call({add, Key, Value, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Add, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Add, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({set, Key, Value, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Set, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Set, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({replace, Key, Value, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Replace, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Replace, extras = <<16#deadbeef:32, Expiration:32>>, key=list_to_binary(Key), value=Value}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({delete, Key}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Delete, key=list_to_binary(Key)}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Delete, key=list_to_binary(Key)}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({increment, Key, Value, Initial, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Increment, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
- {reply, Resp, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Increment, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({decrement, Key, Value, Initial, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Decrement, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}),
- {reply, Resp, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Decrement, extras = <<Value:64, Initial:64, Expiration:32>>, key=list_to_binary(Key)}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({append, Key, Value}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Append, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Append, key=list_to_binary(Key), value=Value}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({prepend, Key, Value}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Prepend, key=list_to_binary(Key), value=Value}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Prepend, key=list_to_binary(Key), value=Value}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call(stats, _From, Socket) ->
send(Socket, #request{op_code=?OP_Stat}),
- Reply = collect_stats_from_socket(Socket),
- {reply, Reply, Socket};
+ case collect_stats_from_socket(Socket) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Reply ->
+ {reply, Reply, Socket}
+ end;
handle_call(flush, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Flush}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Flush}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call({flush, Expiration}, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Flush, extras = <<Expiration:32>>}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Flush, extras = <<Expiration:32>>}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call(quit, _From, Socket) ->
send_recv(Socket, #request{op_code=?OP_Quit}),
@@ -125,8 +173,12 @@ handle_call(quit, _From, Socket) ->
{stop, shutdown, undefined};
handle_call(version, _From, Socket) ->
- Resp = send_recv(Socket, #request{op_code=?OP_Version}),
- {reply, Resp#response.value, Socket};
+ case send_recv(Socket, #request{op_code=?OP_Version}) of
+ {error, Err} ->
+ {stop, Err, {error, Err}, Socket};
+ Resp ->
+ {reply, Resp#response.value, Socket}
+ end;
handle_call(_, _From, Socket) -> {reply, {error, invalid_call}, Socket}.
@@ -177,6 +229,8 @@ collect_stats_from_socket(Socket) ->
collect_stats_from_socket(Socket, Acc) ->
case recv(Socket) of
+ {error, Err} ->
+ {error, Err};
#response{body_size=0} ->
Acc;
#response{key=Key, value=Value} ->
@@ -192,9 +246,12 @@ send(Socket, Request) ->
gen_tcp:send(Socket, Bin).
recv(Socket) ->
- Resp1 = recv_header(Socket),
- Resp2 = recv_body(Socket, Resp1),
- Resp2.
+ case recv_header(Socket) of
+ {error, Err} ->
+ {error, Err};
+ HdrResp ->
+ recv_body(Socket, HdrResp)
+ end.
encode_request(Request) when is_record(Request, request) ->
Magic = 16#80,
@@ -216,6 +273,7 @@ recv_header(Socket) ->
recv_body(Socket, #response{key_size = KeySize, extras_size = ExtrasSize, body_size = BodySize}=Resp) ->
decode_response_body(recv_bytes(Socket, BodySize), ExtrasSize, KeySize, Resp).
+decode_response_header({error, Err}) -> {error, Err};
decode_response_header(<<16#81:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8, Status:16, BodySize:32, Opaque:32, CAS:64>>) ->
#response{
op_code = Opcode,
@@ -228,6 +286,7 @@ decode_response_header(<<16#81:8, Opcode:8, KeySize:16, ExtrasSize:8, DataType:8
body_size = BodySize
}.
+decode_response_body({error, Err}, _, _, _) -> {error, Err};
decode_response_body(Bin, ExtrasSize, KeySize, Resp) ->
<<Extras:ExtrasSize/binary, Key:KeySize/binary, Value/binary>> = Bin,
Resp#response{
@@ -240,5 +299,5 @@ recv_bytes(_, 0) -> <<>>;
recv_bytes(Socket, NumBytes) ->
case gen_tcp:recv(Socket, NumBytes) of
{ok, Bin} -> Bin;
- {error, closed} -> exit({error, memcached_connection_closed})
+ Err -> Err
end.
Please sign in to comment.
Something went wrong with that request. Please try again.