Permalink
Browse files

style cleanup

  • Loading branch information...
1 parent cd5396e commit 981147b48af2d4a76edc199203135bd74e59da0a @japerk committed May 17, 2010
Showing with 75 additions and 64 deletions.
  1. +1 −1 ebin/erldis.app
  2. +74 −63 src/erldis_client.erl
View
2 ebin/erldis.app
@@ -1,7 +1,7 @@
{application, erldis, [
{description, "Erlang Redis application"},
{vsn, "0.2.1"},
- {registered, [erldis_sup]},
+ {registered, [erldis_sup, erldis_client]},
{mod, {erldis_app, []}},
{applications, [kernel, stdlib]},
{modules, [
View
137 src/erldis_client.erl
@@ -23,8 +23,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([subscribe/4, unsubscribe/3]).
--define(EOL, "\r\n").
+-define(EOL, "\r\n").
-define(default_timeout, 5000). %% same as in gen.erl in stdlib
%%%%%%%%%%%%%
@@ -40,10 +40,8 @@ trim2(S) ->
app_get_env(AppName, Varname, Default) ->
case application:get_env(AppName, Varname) of
- undefined ->
- {ok, Default};
- V ->
- V
+ undefined -> {ok, Default};
+ V -> V
end.
%%%%%%%%%%%%%%%%%%%
@@ -65,34 +63,33 @@ sr_scall(Client, Args) ->
% This is the simple send with a single row of commands
scall(Client, Args) -> scall(Client, Args, ?default_timeout).
-scall(Client, Args, Timeout) ->
- send(Client, erldis:multibulk_cmd(Args), Timeout).
+scall(Client, Args, Timeout) -> send(Client, erldis:multibulk_cmd(Args), Timeout).
% This is the complete send with multiple rows
call(Client, Args) -> call(Client, Args, ?default_timeout).
-call(Client, Args, Timeout) ->
- send(Client, erldis:multibulk_cmd(Args), Timeout).
+call(Client, Args, Timeout) -> send(Client, erldis:multibulk_cmd(Args), Timeout).
% Blocking call with server-side timeout added as final command arg
bcall(Client, Args, Timeout) ->
- scall(Client, Args ++ [server_timeout(Timeout)], erlang_timeout(Timeout)).
+ scall(Client, Args ++ [server_timeout(Timeout)], erlang_timeout(Timeout)).
set_call(Client, Cmd, Key, Val) when is_binary(Val) ->
call(Client, Cmd, [[Key, erlang:size(Val)], [Val]]);
set_call(Client, Cmd, Key, Val) ->
set_call(Client, Cmd, Key, erldis_binaries:to_binary(Val)).
subscribe(Client, Cmd, Class, Pid)->
- case gen_server2:call(Client, {subscribe, Cmd, Class, Pid}, ?default_timeout) of
- {error, Reason} -> throw({error, Reason});
- Retval -> Retval
- end.
+ case gen_server2:call(Client, {subscribe, Cmd, Class, Pid}, ?default_timeout) of
+ {error, Reason} -> throw({error, Reason});
+ Retval -> Retval
+ end.
+
unsubscribe(Client, Cmd, Class)->
- case gen_server2:call(Client, {unsubscribe, Cmd, Class}, ?default_timeout) of
- {error, Reason} -> throw({error, Reason});
- Retval -> Retval
- end.
+ case gen_server2:call(Client, {unsubscribe, Cmd, Class}, ?default_timeout) of
+ {error, Reason} -> throw({error, Reason});
+ Retval -> Retval
+ end.
% Erlang uses milliseconds, with symbol "infinity" for "wait forever";
% redis uses seconds, with 0 for "wait forever".
@@ -106,6 +103,7 @@ erlang_timeout(V) when is_number(V) -> V + ?default_timeout.
send(Client, Cmd, Timeout) ->
Piped = gen_server2:call(Client, is_pipelined),
+
if
Piped ->
gen_server2:cast(Client, {send, Cmd});
@@ -278,18 +276,19 @@ connect_socket(State, _) ->
% Solves issue of remaining getting reset while still accumulating multi-bulk
% reply
dont_reset_remaining(State, Queue) ->
- case State#redis.remaining of
- 0 -> State#redis{calls=Queue, remaining=1};
- _ -> State#redis{calls=Queue}
- end.
+ case State#redis.remaining of
+ 0 -> State#redis{calls=Queue, remaining=1};
+ _ -> State#redis{calls=Queue}
+ end.
+
dont_reset_remaining(State, Queue, DB) ->
- case State#redis.remaining of
- 0 -> State#redis{calls=Queue, remaining=1, db=DB};
- _ -> State#redis{calls=Queue, db=DB}
- end.
+ case State#redis.remaining of
+ 0 -> State#redis{calls=Queue, remaining=1, db=DB};
+ _ -> State#redis{calls=Queue, db=DB}
+ end.
handle_call(is_pipelined, _From, State)->
- {reply, State#redis.pipeline, State};
+ {reply, State#redis.pipeline, State};
handle_call(get_all_results, From, #redis{pipeline=true, calls=Calls} = State) ->
case queue:len(Calls) of
0 ->
@@ -323,27 +322,32 @@ handle_call({send, Cmd}, From, State1) ->
{stop, timeout, {error, Reason}, State}
end;
handle_call({subscribe, Cmd, Class, Pid}, From, State1)->
- State = ensure_started(State1),
- case gen_tcp:send(State#redis.socket, [Cmd | <<?EOL>>]) of
+ State = ensure_started(State1),
+
+ case gen_tcp:send(State#redis.socket, [Cmd | <<?EOL>>]) of
ok ->
- Queue = queue:in(From, State#redis.calls),
- Subscribers = dict:store(Class, Pid, State#redis.subscribers),
- {noreply, State#redis{calls=Queue, remaining=1, subscribers=Subscribers}};
+ Queue = queue:in(From, State#redis.calls),
+ Subscribers = dict:store(Class, Pid, State#redis.subscribers),
+ {noreply, State#redis{calls=Queue, remaining=1, subscribers=Subscribers}};
{error, Reason} ->
error_logger:error_report([{send, Cmd}, {error, Reason}]),
{stop, timeout, {error, Reason}, State}
end;
handle_call({unsubscribe, Cmd, Class}, From, State1)->
- State = ensure_started(State1),
- case gen_tcp:send(State#redis.socket, [Cmd | <<?EOL>>]) of
+ State = ensure_started(State1),
+
+ case gen_tcp:send(State#redis.socket, [Cmd | <<?EOL>>]) of
ok ->
- Queue = queue:in(From, State#redis.calls),
- Subscribers = if Class == <<"">> ->
- dict:new();
- true ->
- dict:erase(Class, State#redis.subscribers)
- end,
- {noreply, State#redis{calls=Queue, remaining=1, subscribers=Subscribers}};
+ Queue = queue:in(From, State#redis.calls),
+
+ if
+ Class == <<"">> ->
+ Subscribers = dict:new();
+ true ->
+ Subscribers = dict:erase(Class, State#redis.subscribers)
+ end,
+
+ {noreply, State#redis{calls=Queue, remaining=1, subscribers=Subscribers}};
{error, Reason} ->
error_logger:error_report([{send, Cmd}, {error, Reason}]),
{stop, timeout, {error, Reason}, State}
@@ -362,10 +366,9 @@ handle_cast({pipelining, Bool}, State) ->
handle_cast(disconnect, State) ->
{stop, shutdown, State};
handle_cast({send, Cmd}, #redis{remaining=Remaining, calls=Calls} = State1) ->
- End = <<?EOL>>,
State = ensure_started(State1),
Queue = queue:in(async, Calls),
- gen_tcp:send(State#redis.socket, [Cmd|End]),
+ gen_tcp:send(State#redis.socket, [Cmd | <<?EOL>>]),
case Remaining of
0 -> {noreply, State#redis{remaining=1, calls=Queue}};
@@ -380,9 +383,10 @@ handle_cast(_, State) ->
recv_value(Socket, NBytes) ->
inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+
case gen_tcp:recv(Socket, NBytes+2) of
{ok, Packet} ->
- %error_logger:error_report({line, Packet}),
+ %error_logger:error_report({line, Packet}),
inet:setopts(Socket, [{packet, line}]), % go back to line mode
trim2(Packet);
{error, Reason} ->
@@ -391,9 +395,9 @@ recv_value(Socket, NBytes) ->
end.
send_reply(#redis{pipeline=true, calls=Calls, results=Results, reply_caller=ReplyCaller}=State)->
- Result = case lists:reverse(State#redis.buffer) of
- [V] when is_atom(V) -> V;
- R -> R
+ case lists:reverse(State#redis.buffer) of
+ [Result] when is_atom(Result) -> Result;
+ Result -> Result
end,
{_, Queue} = queue:out(Calls),
@@ -410,11 +414,15 @@ send_reply(#redis{pipeline=true, calls=Calls, results=Results, reply_caller=Repl
State#redis{results=[]}
end,
- NewState#redis{remaining=0, pstate=empty,
- reply_caller=undefined, buffer=[],
- calls=Queue};
+ NewState#redis{
+ remaining=0, pstate=empty, reply_caller=undefined,
+ buffer=[], calls=Queue
+ };
_ ->
- State#redis{results=[Result|Results], remaining=1, pstate=empty, buffer=[], calls=Queue}
+ State#redis{
+ results=[Result|Results], remaining=1, pstate=empty,
+ buffer=[], calls=Queue
+ }
end;
send_reply(State) ->
@@ -430,6 +438,7 @@ send_reply(State) ->
parse_state(State, Socket, Data) ->
Parse = erldis_proto:parse(State#redis.pstate, trim2(Data)),
+
case {State#redis.remaining-1, Parse} of
{0, error} ->
% next line is the error string
@@ -449,17 +458,19 @@ parse_state(State, Socket, Data) ->
{0, {read, NBytes}} ->
% reply with Value added to buffer
Value = recv_value(Socket, NBytes),
+
case [Value | State#redis.buffer] of
- [PubSubValue, Class, <<"message">>] = M ->
- case dict:find(Class, State#redis.subscribers) of
- {ok, Pid} ->
- Pid ! {message, Class, PubSubValue};
- _ ->
- error_logger:error_report([lost_message, {class, Class}])
- end,
- send_reply(State#redis{buffer=[]});
- Buffer ->
- send_reply(State#redis{buffer=Buffer})
+ [PubSubValue, Class, <<"message">>] ->
+ case dict:find(Class, State#redis.subscribers) of
+ {ok, Pid} ->
+ Pid ! {message, Class, PubSubValue};
+ _ ->
+ error_logger:error_report([lost_message, {class, Class}])
+ end,
+
+ send_reply(State#redis{buffer=[]});
+ Buffer ->
+ send_reply(State#redis{buffer=Buffer})
end;
{N, {read, NBytes}} ->
% accumulate multi bulk reply
@@ -476,8 +487,8 @@ parse_state(State, Socket, Data) ->
end.
handle_info({tcp, Socket, Data}, State) ->
- %error_logger:error_report([{data, Data}, {state, State}]),
- case ( parse_state(State, Socket, Data)) of
+ %error_logger:error_report([{data, Data}, {state, State}]),
+ case parse_state(State, Socket, Data) of
{error, Reason} ->
{stop, Reason, State};
NewState ->

0 comments on commit 981147b

Please sign in to comment.