Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Pulled everything from japerk :

  • Loading branch information...
commit df4d58de588cc2b24d2aa834851d3cad9b04ac57 1 parent e45cdf6
@cstar cstar authored
View
5 Emakefile
@@ -0,0 +1,5 @@
+{"src/*", [
+ debug_info,
+ {i, "include/"},
+ {outdir, "ebin/"}
+]}.
View
31 Makefile
@@ -1,10 +1,10 @@
-LIBDIR=`erl -eval 'io:format("~s~n", [code:lib_dir()])' -s init stop -noshell`
+# store output so is only executed once
+LIBDIR=$(shell erl -eval 'io:format("~s~n", [code:lib_dir()])' -s init stop -noshell)
+# get application vsn from app file
+VSN=$(shell erl -pa ebin/ -eval 'application:load(erldis), {ok, Vsn} = application:get_key(erldis, vsn), io:format("~s~n", [Vsn])' -s init stop -noshell)
all:
- mkdir -p ebin/
- (cd src;$(MAKE))
- # test compile fails if eunit not present
- #(cd test;$(MAKE))
+ @erl -make
clean: clean_tests
(cd src;$(MAKE) clean)
@@ -24,12 +24,17 @@ testrun: all
mkdir -p ebin/
(cd test;$(MAKE) test)
-install: all
- # original "mkdir -p {LIBDIR}/erldis-0.0.1/{ebin,include}"
- # actually makes a directory called {ebin,include}
- mkdir -p ${LIBDIR}/erldis-0.0.1/ebin
- mkdir -p ${LIBDIR}/erldis-0.0.1/include
- for i in ebin/*.beam; do install $$i $(LIBDIR)/erldis-0.0.1/$$i ; done
- for i in include/*.hrl; do install $$i $(LIBDIR)/erldis-0.0.1/$$i ; done
+install: all
+ mkdir -p $(ERL_LIBS)/erldis-$(VSN)/ebin
+ mkdir -p $(ERL_LIBS)/erldis-$(VSN)/include
+ for i in ebin/*.beam; do install $$i $(ERL_LIBS)/erldis-$(VSN)/$$i ; done
+ for i in include/*.hrl; do install $$i $(ERL_LIBS)/erldis-$(VSN)/$$i ; done
# also install .app file
- install ebin/erldis.app $(LIBDIR)/erldis-0.0.1/ebin/erldis.app
+ install ebin/erldis.app $(ERL_LIBS)/erldis-$(VSN)/ebin/erldis.app
+ install ebin/erldis.appup $(ERL_LIBS)/erldis-$(VSN)/ebin/erldis.appup
+
+plt:
+ @dialyzer --build_plt --plt .plt -q -r . -I include/
+
+check: all
+ @dialyzer --check_plt --plt .plt -q -r . -I include/
View
11 doc/overview.edoc
@@ -0,0 +1,11 @@
+@doc Redis erlang client
+
+From dialtone on erldis_client:
+handle_info is the function that deals with parsing every line that comes from redis. save_or_reply is what is currently used to send replies back to the users.
+going through the State parameters:
+ * socket is of course the current connection to redis
+ * buffer contains intermediate results of parsing. it's used for multi-bulk replies. but it gets handy also for single bulk replies, although clearly it will be at most long 1 in those cases
+ * reply_caller is a function used to abstract the reply to the user. since the call is synchronous, the client might put itself in listening slower than redis can answer the request. when it manages to listen before the answer it adds a call that registers the call request and when it's done the system uses that function to send back the results. if it comes afterwards it simply gets the results in save_or_reply the function checks for reply_caller and if it's not there it just appends to results
+ * the field remaining is for how many remaining packets you need to handle before the end of this call. this is also especially useful for multi-bulk replies. when a call is made it's set to 1 then the rest of the state machine figures out if only 1 packet will be received or how many more (the state machine is in erldis_proto)
+ * then the field calls is the number of calls that are waiting. basically at the end of a call remaining would go to 0. but if it goes to 0 the state machine would parse the remaining stuff. so using calls we keep track of how many more calls are incoming and until calls is 0 remaining is reset to 1 when it goes to 0 at the end of a parsing. and this is done in save_or_reply
+ * then pstate is the parser state, there can be only 4 right now error, hold, read, empty. error means that the next line is an error message. hold means that the next number is the number of multi-bulk items in the reply and is used to set the remaining field to something else than 1. read tells you how long the next field is so you need to read those bytes + 2 (\r\n are added by redis but not counted in the bytes... I fought hard to avoid this but salvatore didn't change it...). then empty means that the system is ready to accept a new reply. the function trim2 exists just to remove the \r\n at the end of the reply
View
7 ebin/erldis.app
@@ -1,10 +1,11 @@
{application, erldis, [
{description, "Erlang Redis application"},
- {vsn, "0.0.1"},
+ {vsn, "0.0.7"},
{registered, [erldis_sup]},
{mod, {erldis_app, []}},
% TODO: include eunit?
{applications, [kernel, stdlib]},
- {modules, [erldis_client, erldis, erldis_proto, erldis_app, erldis_sup]},
+ {modules, [erldis_client, erldis, erldis_proto, erldis_app, erldis_sup,
+ erldis_sync_client, erldis_sets, erldis_dict, erldis_list]},
{env, [{host, "localhost"}, {port, 6379}, {timeout, 500}]}
-]}.
+]}.
View
65 ebin/erldis.appup
@@ -0,0 +1,65 @@
+{"0.0.7", [
+ {"0.0.6", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_dict},
+ {load_module, erldis_list}
+ ]},
+ {"0.0.5", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets},
+ {add_module, erldis_dict},
+ {add_module, erldis_list}
+ ]},
+ {"0.0.4", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.3", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.2", [
+ {add_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.1", [
+ {add_module, erldis_client},
+ {add_module, erldis_proto},
+ {load_module, erldis},
+ {delete_module, client},
+ {delete_module, proto},
+ {add_module, erldis_sets}
+ ]}
+], [
+ {"0.0.6", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_dict},
+ {load_module, erldis_list}
+ ]},
+ {"0.0.5", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets},
+ {delete_module, erldis_dict},
+ {delete_module, erldis_list}
+ ]},
+ {"0.0.4", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.3", [
+ {load_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.2", [
+ {delete_module, erldis_sync_client},
+ {load_module, erldis_sets}
+ ]},
+ {"0.0.1", [
+ {add_module, client},
+ {add_module, proto},
+ {load_module, erldis},
+ {delete_module, erldis_client},
+ {delete_module, erldis_proto},
+ {delete_module, erldis_sets}
+ ]}
+]}.
View
12 src/erldis_client.erl
@@ -256,18 +256,16 @@ handle_info({tcp, Socket, Data}, State=#redis{calls=Calls}) ->
inet:setopts(Socket, [{active, once}]),
{noreply, NewState};
handle_info({tcp_closed, Socket}, State=#redis{socket=Socket}) ->
- {stop, erldis_client_tcp_host_socket_closed, State};
+ % japerk: shutdown Reason does not generate an error message
+ {stop, shutdown, State};
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, State) ->
case State#redis.socket of
- undefined ->
- pass;
- Socket ->
- gen_tcp:close(Socket)
- end,
- ok.
+ undefined -> ok;
+ Socket -> gen_tcp:close(Socket)
+ end.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
View
77 src/erldis_dict.erl
@@ -0,0 +1,77 @@
+-module(erldis_dict).
+
+-export([append/3, append_list/3, erase/2, fetch/2, fetch_keys/2, find/2,
+ is_key/2, size/1, store/3, update/3, update/4,
+ update_counter/2, update_counter/3]).
+
+% NOTE: use erldis_lists instead, fetch & find won't work for lists
+append(Key, Value, Client) -> set_call(Client, rpush, Key, Value).
+
+append_list(Key, Values, Client) ->
+ lists:foreach(fun(Value) -> append(Key, Value, Client) end, Values).
+
+erase(Key, Client) -> scall(Client, del, [Key]).
+
+fetch(Key, Client) ->
+ case scall(Client, get, [Key]) of
+ [nil] -> undefined;
+ [Value] -> Value
+ end.
+
+% NOTE: this is only useful if keys have a known prefix
+fetch_keys(Pattern, Client) -> scall(Client, keys, [Pattern]).
+
+%filter(Pred, Client) -> ok.
+
+find(Key, Client) ->
+ case fetch(Key, Client) of
+ undefined -> error;
+ Value -> {ok, Value}
+ end.
+
+%fold(Fun, Acc0, Client) -> ok.
+
+%from_list(List, Client) -> ok.
+
+is_key(Key, Client) -> hd(scall(Client, exists, [Key])).
+
+size(Client) ->
+ numeric_value(erldis_sync_client:scall(Client, dbsize)).
+
+store(Key, [], Client) -> erase(Key, Client);
+store(Key, Value, Client) -> set_call(Client, set, Key, Value).
+
+%to_list(Client) -> ok.
+
+% NOTE: update/3 & update/4 are not atomic
+
+update(Key, Fun, Client) -> store(Key, Fun(fetch(Key, Client)), Client).
+
+update(Key, Fun, Initial, Client) ->
+ case find(Key, Client) of
+ {ok, Value} -> store(Key, Fun(Value), Client);
+ error -> store(Key, Initial, Client)
+ end.
+
+update_counter(Key, Client) -> update_counter(Key, 1, Client).
+
+% NOTE: this returns new count value, not a modified dict
+update_counter(Key, 1, Client) ->
+ numeric_value(scall(Client, incr, [Key]));
+update_counter(Key, Incr, Client) ->
+ numeric_value(scall(Client, incrby, [Key, Incr])).
+
+%%%%%%%%%%%%%
+%% helpers %%
+%%%%%%%%%%%%%
+
+numeric_value([false]) -> 0;
+numeric_value([true]) -> 1;
+numeric_value([Val]) -> Val.
+
+% TODO: copied from erldis_sets, abstract if possible, maybe use a macro
+
+scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
+
+set_call(Client, Cmd, Key, Val) ->
+ erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
View
252 src/erldis_list.erl
@@ -0,0 +1,252 @@
+-module(erldis_list).
+
+% original queue
+-export([is_queue/2, is_empty/2, len/2, in/3, in_r/3, out/2, out_r/2]).
+% extra queue
+-export([out_foreach/3]).
+% extended queue
+-export([get/2, get_r/2, drop/2, drop_r/2, peek/2, peek_r/2]).
+% array
+-export([get/3, is_array/2, set/4, size/2]).
+% list
+-export([delete/3, foreach/3, is_list/2, last/2, merge/4, nth/3,
+ sublist/3, sublist/4, umerge/4]).
+% common
+-export([foldl/4, foldr/4, from_list/3, to_list/2, reverse/2]).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% original queue like api %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+is_queue(Key, Client) -> is_list(Key, Client).
+
+is_empty(Key, Client) -> len(Key, Client) == 0.
+
+len(Key, Client) ->
+ case scall(Client, llen, [Key]) of
+ [false] -> 0;
+ [true] -> 1;
+ [Len] -> Len
+ end.
+
+in(Item, Key, Client) ->
+ [ok] = set_call(Client, rpush, Key, Item).
+
+in_r(Item, Key, Client) ->
+ [ok] = set_call(Client, lpush, Key, Item).
+
+out(Key, Client) ->
+ case hd(scall(Client, lpop, [Key])) of
+ nil -> empty;
+ Item -> {value, Item}
+ end.
+
+out_r(Key, Client) ->
+ case hd(scall(Client, rpop, [Key])) of
+ nil -> empty;
+ Item -> {value, Item}
+ end.
+
+%% @doc Call F on each element in queue until it's empty.
+out_foreach(F, Key, Client) ->
+ case out(Key, Client) of
+ empty ->
+ ok;
+ {value, Item} ->
+ F(Item),
+ out_foreach(F, Key, Client)
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% extended queue like api %%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+get(Key, Client) ->
+ case get(0, Key, Client) of
+ nil -> empty;
+ Item -> Item
+ end.
+
+get_r(Key, Client) ->
+ Len = len(Key, Client),
+
+ if
+ Len < 1 ->
+ empty;
+ true ->
+ case get(Len - 1, Key, Client) of
+ nil -> empty;
+ Item -> Item
+ end
+ end.
+
+drop(Key, Client) -> out(Key, Client).
+
+drop_r(Key, Client) -> out_r(Key, Client).
+
+peek(Key, Client) ->
+ case get(0, Key, Client) of
+ nil -> empty;
+ Item -> {value, Item}
+ end.
+
+peek_r(Key, Client) ->
+ case get_r(Key, Client) of
+ empty -> empty;
+ Item -> {value, Item}
+ end.
+
+%%%%%%%%%%%%%%%%%%%%
+%% array like api %%
+%%%%%%%%%%%%%%%%%%%%
+
+get(I, Key, Client) -> hd(scall(Client, lindex, [Key, I])).
+
+is_array(Key, Client) -> is_list(Key, Client).
+
+set(I, Value, Key, Client) -> call(Client, lset, Key, I, Value).
+
+size(Key, Client) -> len(Key, Client).
+
+%%%%%%%%%%%%%%%%%%%%
+%% lists like api %%
+%%%%%%%%%%%%%%%%%%%%
+
+% all
+% any
+% append
+
+delete(Elem, Key, Client) -> call(Client, lrem, Key, 1, Elem).
+
+% dropwhile
+
+foreach(F, Key, Client) -> foreach(0, F, Key, Client).
+
+foreach(I, F, Key, Client) ->
+ case get(I, Key, Client) of
+ nil -> ok;
+ Item -> F(Item), foreach(I+1, F, Key, Client)
+ end.
+
+is_list(Key, Client) -> ["list"] == scall(Client, type, [Key]).
+
+% keysort
+
+last(Key, Client) -> get_r(Key, Client).
+
+merge(F, L, Key, Client) -> merge(0, F, L, Key, Client).
+
+merge(_, _, [], _, _) ->
+ ok;
+merge(I, F, L, Key, Client) ->
+ case get(I, Key, Client) of
+ % append the rest of the list
+ nil -> lists:foreach(fun(Item) -> in(Item, Key, Client) end, L);
+ % compare A to head of L
+ A -> merge(I, F, A, L, Key, Client)
+ end.
+
+merge(I, F, A, [B | L], Key, Client) ->
+ case F(A, B) of
+ true ->
+ % B comes after A, so continue iterating
+ merge(I+1, F, [B | L], Key, Client);
+ false ->
+ % B comes before A, so replace A with B and continue merging with
+ % A merged into L
+ set(I, B, Key, Client),
+ merge(I+1, F, lists:merge(F, [A], L), Key, Client)
+ end.
+
+nth(N, Key, Client) -> get(N, Key, Client).
+
+% nthtail
+
+sublist(Key, Client, Len) -> sublist(Key, Client, 1, Len).
+
+sublist(Key, Client, Start, 1) ->
+ case get(Start, Key, Client) of
+ nil -> [];
+ Elem -> [Elem]
+ end;
+sublist(Key, Client, Start, Len) when Start > 0, Len > 1 ->
+ % erlang lists are 1-indexed
+ scall(Client, lrange, [Key, Start - 1, Start + Len - 2]);
+sublist(Key, Client, Start, Len) when Start < 0, Len > 1 ->
+ % can give a negative start where -1 is the last element
+ scall(Client, lrange, [Key, Start, Start - Len + 1]).
+
+% sort
+% takewhile
+
+umerge(F, L, Key, Client) -> umerge(0, F, L, Key, Client).
+
+umerge(_, _, [], _, _) ->
+ ok;
+umerge(I, F, L, Key, Client) ->
+ case get(I, Key, Client) of
+ nil -> lists:foreach(fun(Item) -> in(Item, Key, Client) end, L);
+ A -> umerge(I, F, A, L, Key, Client)
+ end.
+
+umerge(I, F, A, [B | L], Key, Client) when A == B ->
+ umerge(I+1, F, L, Key, Client);
+umerge(I, F, A, [B | L], Key, Client) ->
+ case F(A, B) of
+ true ->
+ umerge(I+1, F, [B | L], Key, Client);
+ false ->
+ set(I, B, Key, Client),
+ umerge(I+1, F, lists:umerge(F, [A], L), Key, Client)
+ end.
+
+%%%%%%%%%%%%
+%% common %%
+%%%%%%%%%%%%
+
+% TODO: foldl, foldr, to_list, foreach, and any other iterative functions
+% would probably be much faster and more efficient if they iterated on
+% fixed sized chunks using sublist, since each call to get/3 is avg O(n)
+% except when I is the first or last element.
+
+foldl(F, Acc0, Key, Client) -> foldl(0, F, Acc0, Key, Client).
+
+foldl(I, F, Acc0, Key, Client) ->
+ case get(I, Key, Client) of
+ nil -> Acc0;
+ Item -> foldl(I+1, F, F(Item, Acc0), Key, Client)
+ end.
+
+foldr(F, Acc0, Key, Client) -> foldr(len(Key, Client) - 1, F, Acc0, Key, Client).
+
+foldr(I, _, Acc0, _, _) when I < 0 ->
+ Acc0;
+foldr(I, F, Acc0, Key, Client) ->
+ case get(I, Key, Client) of
+ nil -> Acc0;
+ Item -> foldr(I-1, F, F(Item, Acc0), Key, Client)
+ end.
+
+from_list(L, Key, Client) ->
+ scall(Client, del, [Key]),
+ lists:foreach(fun(Item) -> in(Item, Key, Client) end, L).
+
+to_list(Key, Client) -> foldr(fun(Item, L) -> [Item | L] end, [], Key, Client).
+
+reverse(Key, Client) -> foldl(fun(Item, L) -> [Item | L] end, [], Key, Client).
+
+% filter
+% map
+% member
+
+%%%%%%%%%%%%%
+%% helpers %%
+%%%%%%%%%%%%%
+
+call(Client, Cmd, Key, N, Val) ->
+ erldis_sync_client:call(Client, Cmd, [[Key, N, length(Val)], [Val]]).
+
+scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
+
+set_call(Client, Cmd, Key, Val) ->
+ erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
View
72 src/erldis_sets.erl
@@ -0,0 +1,72 @@
+%% @doc sets like interface to redis. Uses erldis_sync_client to ensure
+%% synchronous results.
+%%
+%% @author Jacob Perkins <japerk@gmail.com>
+-module(erldis_sets).
+
+-export([delete/1, is_set/2, size/2, to_list/2, from_list/3, is_element/3,
+ add_element/3, del_element/3, union/2, intersection/3, intersection/2,
+ is_disjoint/3, subtract/3, subtract/2, is_subset/3, fold/4, filter/3]).
+
+%%%%%%%%%%%%%%%%%%%
+%% sets-like api %%
+%%%%%%%%%%%%%%%%%%%
+
+delete(Client) -> erldis_sync_client:stop(Client).
+
+is_set(Client, Key) -> ["set"] == scall(Client, type, [Key]).
+
+size(Client, Key) ->
+ case scall(Client, scard, [Key]) of
+ % redis actually returns 0 & 1, but those are interpreted as false & true
+ [false] -> 0;
+ [true] -> 1;
+ [Size] -> Size
+ end.
+
+to_list(Client, Key) -> scall(Client, smembers, [Key]).
+
+from_list(Client, Key, List) ->
+ % delete existing set
+ scall(Client, del, [Key]),
+ lists:foreach(fun(Elem) -> add_element(Elem, Client, Key) end, List),
+ Client.
+
+is_element(Elem, Client, Key) ->
+ case set_call(Client, sismember, Key, Elem) of
+ [false] -> false;
+ [true] -> true
+ end.
+
+add_element(Elem, Client, Key) -> set_call(Client, sadd, Key, Elem).
+
+del_element(Elem, Client, Key) -> set_call(Client, srem, Key, Elem).
+
+union(Client, Keys) -> scall(Client, sunion, Keys).
+
+intersection(Client, Key1, Key2) -> intersection(Client, [Key1, Key2]).
+
+intersection(Client, Keys) -> scall(Client, sinter, Keys).
+
+is_disjoint(Client, Key1, Key2) -> [] == intersection(Client, [Key1, Key2]).
+
+subtract(Client, Key1, Key2) -> subtract(Client, [Key1, Key2]).
+
+subtract(Client, Keys) -> scall(Client, sdiff, Keys).
+
+is_subset(Client, Key1, Key2) -> [] == subtract(Client, [Key2, Key1]).
+
+fold(F, Acc0, Client, Key) -> lists:foldl(F, Acc0, to_list(Client, Key)).
+
+filter(Pred, Client, Key) -> lists:filter(Pred, to_list(Client, Key)).
+
+%%%%%%%%%%%%%
+%% helpers %%
+%%%%%%%%%%%%%
+
+% TODO: handle {error, Reason}. throw exception?
+
+scall(Client, Cmd, Args) -> erldis_sync_client:scall(Client, Cmd, Args).
+
+set_call(Client, Cmd, Key, Val) ->
+ erldis_sync_client:call(Client, Cmd, [[Key, length(Val)], [Val]]).
View
5 src/erldis_sup.erl
@@ -10,7 +10,7 @@ start_link() ->
client() ->
% Client pid() is undefined if not started
- [{client, Client, worker, [client]}] = supervisor:which_children(?MODULE),
+ [{erldis_client, Client, worker, [erldis_client]}] = supervisor:which_children(?MODULE),
% not trying to restart_child(client) because gen_tcp:connect will hang
% even if timeout is given
Client.
@@ -19,5 +19,6 @@ init(_Args) ->
{ok, {{one_for_one, 1, 60}, [
% transient restart so client can disconnect safely
% timeout so client has time to disconnect on exit
- {client, {client, connect, []}, transient, 500, worker, [client]}
+ {erldis_client, {erldis_client, connect, []},
+ transient, 500, worker, [erldis_client]}
]}}.
View
300 src/erldis_sync_client.erl
@@ -0,0 +1,300 @@
+%% @doc This is a gen_server very similar to erldis_client, but it does
+%% synchronous calls instead of async pipelining. Does so by keeping a queue
+%% of From pids in State.calls, then calls gen_server:reply when it receives
+%% handle_info({tcp, ...). Therefore, it must get commands on handle_call
+%% instead of handle_cast, which requires direct command sending instead of
+%% using the API in erldis.
+%%
+%% @todo Much of the code has been copied from erldis_client and should be
+%% abstracted & shared where possible
+%%
+%% @author Jacob Perkins <japerk@gmail.com>
+-module(erldis_sync_client).
+
+-behaviour(gen_server).
+
+-include("erldis.hrl").
+
+-export([scall/2, scall/3, call/2, call/3, stop/1, transact/2, info/1]).
+-export([connect/0, connect/1, connect/2, connect/3, connect/4]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(EOL, "\r\n").
+
+%%%%%%%%%%%%%
+%% helpers %%
+%%%%%%%%%%%%%
+
+% TODO: copied from erldis_client, should be abstracted & shared
+
+str(X) when is_list(X) ->
+ X;
+str(X) when is_atom(X) ->
+ atom_to_list(X);
+str(X) when is_binary(X) ->
+ binary_to_list(X);
+str(X) when is_integer(X) ->
+ integer_to_list(X);
+str(X) when is_float(X) ->
+ float_to_list(X).
+
+format([], Result) ->
+ string:join(lists:reverse(Result), ?EOL);
+format([Line|Rest], Result) ->
+ JoinedLine = string:join([str(X) || X <- Line], " "),
+ format(Rest, [JoinedLine|Result]).
+
+format(Lines) ->
+ format(Lines, []).
+sformat(Line) ->
+ format([Line], []).
+
+trim2({ok, S}) ->
+ string:substr(S, 1, length(S)-2);
+trim2(S) ->
+ trim2({ok, S}).
+
+app_get_env(AppName, Varname, Default) ->
+ case application:get_env(AppName, Varname) of
+ undefined ->
+ {ok, Default};
+ V ->
+ V
+ end.
+
+%%%%%%%%%%%%%%%%%%
+%% call command %%
+%%%%%%%%%%%%%%%%%%
+
+% This is the simple send with a single row of commands
+scall(Client, Cmd) -> scall(Client, Cmd, []).
+
+scall(Client, Cmd, Args) ->
+ case gen_server:call(Client, {send, sformat([Cmd|Args])}) of
+ {error, Reason} -> throw({error, Reason});
+ Retval -> Retval
+ end.
+
+% This is the complete send with multiple rows
+call(Client, Cmd) -> call(Client, Cmd, []).
+
+call(Client, Cmd, Args) ->
+ SCmd = string:join([str(Cmd), format(Args)], " "),
+
+ case gen_server:call(Client, {send, SCmd}) of
+ {error, Reason} -> throw({error, Reason});
+ Retval -> Retval
+ end.
+
+stop(Client) -> gen_server:cast(Client, disconnect).
+
+transact(DB, F) ->
+ % TODO: error handling in case of redis not being there, and catch errors
+ % in F so can still stop
+ {ok, Client} = connect(DB),
+
+ try F(Client) of
+ Result -> stop(Client), Result
+ catch
+ throw:Result -> stop(Client), throw(Result);
+ error:Result -> stop(Client), {error, Result};
+ exit:Result -> stop(Client), exit(Result)
+ end.
+
+info(Client) ->
+ F = fun(Stat) ->
+ case parse_stat(Stat) of
+ undefined -> false;
+ {Key, Val} -> {Key, Val}
+ end
+ end,
+
+ [S] = scall(Client, info),
+ elists:mapfilter(F, string:tokens(S, "\r\n")).
+
+parse_stat("redis_version:"++Vsn) ->
+ {version, Vsn};
+parse_stat("uptime_in_seconds:"++Val) ->
+ {uptime, list_to_integer(Val)};
+parse_stat("connected_clients:"++Val) ->
+ {clients, list_to_integer(Val)};
+parse_stat("connected_slaves:"++Val) ->
+ {slaves, list_to_integer(Val)};
+parse_stat("used_memory:"++Val) ->
+ {memory, list_to_integer(Val)};
+parse_stat("changes_since_last_save:"++Val) ->
+ {changes, list_to_integer(Val)};
+parse_stat("last_save_time:"++Val) ->
+ {last_save, list_to_integer(Val)};
+parse_stat("total_connections_received:"++Val) ->
+ {connections, list_to_integer(Val)};
+parse_stat("total_commands_processed:"++Val) ->
+ {commands, list_to_integer(Val)};
+parse_stat(_) ->
+ undefined.
+
+%%%%%%%%%%
+%% init %%
+%%%%%%%%%%
+
+connect() ->
+ {ok, Host} = app_get_env(erldis, host, "localhost"),
+ connect(Host).
+
+connect(Host) when is_list(Host) ->
+ {ok, Port} = app_get_env(erldis, port, 6379),
+ connect(Host, Port);
+connect(DB) when is_integer(DB) ->
+ case connect() of
+ {ok, Client} ->
+ [ok] = scall(Client, select, [DB]),
+ {ok, Client};
+ Other ->
+ Other
+ end.
+
+connect(Host, Port) ->
+ {ok, Timeout} = app_get_env(erldis, timeout, 500),
+ connect(Host, Port, [{timeout, Timeout}]).
+
+connect(Host, Port, Options) ->
+ % not using start_link because caller may not want to crash if this
+ % server is shutdown
+ gen_server:start(?MODULE, [Host, Port], Options).
+
+connect(Host, Port, Options, DB) ->
+ case connect(Host, Port, Options) of
+ {ok, Client} ->
+ [ok] = scall(Client, select, [DB]),
+ {ok, Client};
+ Other ->
+ Other
+ end.
+
+init([Host, Port]) ->
+ process_flag(trap_exit, true),
+ {ok, Timeout} = app_get_env(erldis, timeout, 500),
+ % presence of send_timeout_close Opt causes {error, badarg}
+ Opts = [list, {active, once}, {packet, line}, {nodelay, true},
+ {send_timeout, Timeout}],
+ % without timeout, default is infinity
+ case gen_tcp:connect(Host, Port, Opts, Timeout) of
+ {error, econnrefused} ->
+ % no redis server, return ignore for supervisor
+ ignore;
+ {error, Why} ->
+ {stop, {socket_error, Why}};
+ {ok, Socket} ->
+ % calls is a queue instead of a count
+ {ok, #redis{socket=Socket, calls=queue:new()}}
+ end.
+
+%%%%%%%%%%%%%%%%%
+%% handle_call %%
+%%%%%%%%%%%%%%%%%
+
+handle_call({send, Cmd}, From, State) ->
+ % NOTE: redis ignores sent commands it doesn't understand, which means
+ % we don't get a reply, which means callers will timeout
+ case gen_tcp:send(State#redis.socket, [Cmd|?EOL]) of
+ ok ->
+ %error_logger:info_report([{send, Cmd}, {from, From}]),
+ Queue = queue:in(From, State#redis.calls),
+ {noreply, State#redis{calls=Queue, remaining=1}};
+ {error, Reason} ->
+ error_logger:error_report([{send, Cmd}, {error, Reason}]),
+ {stop, timeout, {error, Reason}, State}
+ end;
+handle_call(_, _, State) ->
+ {reply, undefined, State}.
+
+handle_cast(disconnect, State) ->
+ {stop, shutdown, State};
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%%%%%%%%%%%%%%%%%
+%% handle_info %%
+%%%%%%%%%%%%%%%%%
+
+recv_value(Socket, NBytes) ->
+ inet:setopts(Socket, [{packet, 0}]), % go into raw mode to read bytes
+
+ case gen_tcp:recv(Socket, NBytes+2) of
+ {ok, Packet} ->
+ inet:setopts(Socket, [{packet, line}]), % go back to line mode
+ trim2({ok, Packet});
+ {error, Reason} ->
+ error_logger:error_report([{recv, NBytes}, {error, Reason}]),
+ throw({error, Reason})
+ end.
+
+send_reply(State) ->
+ {{value, From}, Queue} = queue:out(State#redis.calls),
+ Reply = lists:reverse(State#redis.buffer),
+ %error_logger:info_report([{reply, Reply}, {to, From}]),
+ gen_server:reply(From, Reply),
+ State#redis{calls=Queue, buffer=[], pstate=empty}.
+
+parse_state(State, Socket, Data) ->
+ Parse = erldis_proto:parse(State#redis.pstate, trim2(Data)),
+
+ case {State#redis.remaining-1, Parse} of
+ {0, error} ->
+ % next line is the error string
+ State#redis{remaining=1, pstate=error};
+ {0, {hold, nil}} ->
+ % reply with values in buffer
+ send_reply(State);
+ {0, {hold, Remaining}} ->
+ % begin accumulation of multi bulk reply
+ State#redis{remaining=Remaining, pstate=read};
+ {_, {read, nil}} ->
+ % reply with nil
+ send_reply(State#redis{buffer=[nil]});
+ {0, {read, NBytes}} ->
+ % reply with Value added to buffer
+ Value = recv_value(Socket, NBytes),
+ Buffer = [Value | State#redis.buffer],
+ send_reply(State#redis{buffer=Buffer});
+ {N, {read, NBytes}} ->
+ % accumulate multi bulk reply
+ Value = recv_value(Socket, NBytes),
+ Buffer = [Value | State#redis.buffer],
+ State#redis{remaining=N, buffer=Buffer, pstate=read};
+ {0, Value} ->
+ % reply with Value
+ send_reply(State#redis{buffer=[Value]})
+ end.
+
+handle_info({tcp, Socket, Data}, State) ->
+ case (catch parse_state(State, Socket, Data)) of
+ {error, Reason} ->
+ error_logger:error_report([{parse_state, Data}, {error, Reason}]),
+ {stop, Reason, State};
+ NewState ->
+ inet:setopts(Socket, [{active, once}]),
+ {noreply, NewState}
+ end;
+handle_info({tcp_closed, Socket}, State=#redis{socket=Socket}) ->
+ error_logger:warning_report([{erldis_sync_client, tcp_closed}, State]),
+ {stop, shutdown, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%%%%%%%%%%%%%%
+%% terminate %%
+%%%%%%%%%%%%%%%
+
+terminate(_Reason, State) ->
+ % NOTE: if supervised with brutal_kill, may not be able to reply
+ R = fun(From) -> gen_server:reply(From, {error, closed}) end,
+ lists:foreach(R, queue:to_list(State#redis.calls)),
+
+ case State#redis.socket of
+ undefined -> ok;
+ Socket -> gen_tcp:close(Socket)
+ end.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
View
30 test/erldis_dict_tests.erl
@@ -0,0 +1,30 @@
+-module(erldis_dict_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+dict_test() ->
+ % setup
+ application:load(erldis),
+ {ok, Client} = erldis_sync_client:connect(),
+ ?assertEqual(erldis_sync_client:scall(Client, flushdb), [ok]),
+ % empty dict
+ ?assertEqual(0, erldis_dict:size(Client)),
+ ?assertEqual(undefined, erldis_dict:fetch("foo", Client)),
+ ?assertEqual(error, erldis_dict:find("foo", Client)),
+ ?assertEqual(false, erldis_dict:is_key("foo", Client)),
+ % add element
+ erldis_dict:store("foo", "bar", Client),
+ ?assertEqual("bar", erldis_dict:fetch("foo", Client)),
+ ?assertEqual({ok, "bar"}, erldis_dict:find("foo", Client)),
+ ?assertEqual(1, erldis_dict:size(Client)),
+ ?assertEqual(true, erldis_dict:is_key("foo", Client)),
+ % del element
+ erldis_dict:erase("foo", Client),
+ ?assertEqual(error, erldis_dict:find("foo", Client)),
+ ?assertEqual(0, erldis_dict:size(Client)),
+ ?assertEqual(false, erldis_dict:is_key("foo", Client)),
+ % update counter
+ ?assertEqual(1, erldis_dict:update_counter("count", Client)),
+ ?assertEqual(3, erldis_dict:update_counter("count", 2, Client)),
+ erldis_dict:erase("count", Client),
+ erldis_sync_client:stop(Client).
View
156 test/erldis_list_tests.erl
@@ -0,0 +1,156 @@
+-module(erldis_list_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+queue_test() ->
+ Client = setup(),
+ % queue api
+ ?assertEqual(true, erldis_list:is_empty("foo", Client)),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ ?assertEqual(empty, erldis_list:out("foo", Client)),
+ erldis_list:in("a", "foo", Client),
+ ?assertEqual(false, erldis_list:is_empty("foo", Client)),
+ erldis_list:in("b", "foo", Client),
+ ?assertEqual(2, erldis_list:len("foo", Client)),
+ ?assertEqual({value, "a"}, erldis_list:out("foo", Client)),
+ ?assertEqual(1, erldis_list:len("foo", Client)),
+ erldis_list:in_r("x", "foo", Client),
+ ?assertEqual({value, "b"}, erldis_list:out_r("foo", Client)),
+ ?assertEqual(false, erldis_list:is_empty("foo", Client)),
+ ?assertEqual({value, "x"}, erldis_list:out("foo", Client)),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ ?assertEqual(empty, erldis_list:out("foo", Client)),
+ erldis_sync_client:stop(Client).
+
+extended_queue_test() ->
+ Client = setup(),
+ ?assertEqual(empty, erldis_list:get("foo", Client)),
+ ?assertEqual(empty, erldis_list:get_r("foo", Client)),
+ ?assertEqual(empty, erldis_list:peek("foo", Client)),
+ ?assertEqual(empty, erldis_list:peek_r("foo", Client)),
+ erldis_list:in("a", "foo", Client),
+ erldis_list:in("b", "foo", Client),
+ ?assertEqual("a", erldis_list:get("foo", Client)),
+ ?assertEqual("b", erldis_list:get_r("foo", Client)),
+ ?assertEqual(2, erldis_list:len("foo", Client)),
+ ?assertEqual({value, "a"}, erldis_list:peek("foo", Client)),
+ ?assertEqual({value, "b"}, erldis_list:peek_r("foo", Client)),
+ erldis_list:drop("foo", Client),
+ erldis_list:drop_r("foo", Client),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ erldis_sync_client:stop(Client).
+
+array_test() ->
+ Client = setup(),
+ erldis_list:in("a", "foo", Client),
+ erldis_list:in("b", "foo", Client),
+ ?assertEqual("b", erldis_list:get(1, "foo", Client)),
+ erldis_list:set(1, "x", "foo", Client),
+ ?assertEqual("x", erldis_list:get(1, "foo", Client)),
+ ?assertEqual(2, erldis_list:size("foo", Client)),
+ ?assertEqual({value, "a"}, erldis_list:out("foo", Client)),
+ ?assertEqual({value, "x"}, erldis_list:out("foo", Client)),
+ erldis_sync_client:stop(Client).
+
+lists_test() ->
+ Client = setup(),
+ ?assertEqual(false, erldis_list:is_list("foo", Client)),
+ ?assertEqual([], erldis_list:sublist("foo", Client, 1)),
+ erldis_list:in("a", "foo", Client),
+ erldis_list:in("b", "foo", Client),
+ erldis_list:in("c", "foo", Client),
+ erldis_list:in("b", "foo", Client),
+ ?assertEqual(["b", "c"], erldis_list:sublist("foo", Client, 2, 2)),
+ ?assertEqual("b", erldis_list:nth(1, "foo", Client)),
+ erldis_list:delete("b", "foo", Client),
+ ?assertEqual("c", erldis_list:nth(1, "foo", Client)),
+ ?assertEqual(3, erldis_list:len("foo", Client)),
+ ?assertEqual(["c", "b"], erldis_list:sublist("foo", Client, 2, 2)),
+ erldis_list:drop("foo", Client),
+ erldis_list:drop("foo", Client),
+ erldis_list:drop("foo", Client),
+ erldis_sync_client:stop(Client).
+ % this last call always produces a timeout error
+ %?assertEqual([], erldis_list:sublist("foo", Client, 3)).
+ % TODO: test negative sublist start index
+
+foreach_test() ->
+ Client = setup(),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ L = ["a", "b", "c"],
+ erldis_list:from_list(L, "foo", Client),
+ ?assertEqual(length(L), erldis_list:len("foo", Client)),
+ put(n, 1),
+
+ F = fun(Item) ->
+ N = get(n),
+ ?assertEqual(lists:nth(N, L), Item),
+ put(n, N+1)
+ end,
+
+ erldis_list:foreach(F, "foo", Client),
+ erldis_sync_client:stop(Client).
+
+merge_test() ->
+ Client = setup(),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ L1 = ["a", "c", "e"],
+ erldis_list:from_list(L1, "foo", Client),
+ ?assertEqual(length(L1), erldis_list:len("foo", Client)),
+ L2 = ["b", "d", "f"],
+ F = fun(A, B) -> A =< B end,
+ erldis_list:merge(F, L2, "foo", Client),
+ Merged = lists:merge(F, L1, L2),
+ ?assertEqual(Merged, lists:merge(L1, L2)),
+ ?assertEqual(length(Merged), erldis_list:len("foo", Client)),
+ ?assertEqual(Merged, erldis_list:to_list("foo", Client)),
+
+ L3 = ["a", "c", "f", "g"],
+ erldis_list:umerge(F, L3, "foo", Client),
+ Merged2 = lists:umerge(F, Merged, L3),
+ ?assertEqual(Merged2, lists:umerge(Merged, L3)),
+ ?assertEqual(length(Merged2), erldis_list:len("foo", Client)),
+ ?assertEqual(Merged2, erldis_list:to_list("foo", Client)),
+
+ erldis_sync_client:stop(Client).
+
+common_test() ->
+ Client = setup(),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ L = ["a", "b", "c"],
+ erldis_list:from_list(L, "foo", Client),
+ ?assertEqual(length(L), erldis_list:len("foo", Client)),
+ % to_list uses foldr
+ ?assertEqual(L, erldis_list:to_list("foo", Client)),
+ % reverse uses foldl
+ ?assertEqual(lists:reverse(L), erldis_list:reverse("foo", Client)),
+ % from_list overwrites current list if it exists
+ L2 = ["d" | L],
+ erldis_list:from_list(L2, "foo", Client),
+ ?assertEqual(length(L2), erldis_list:len("foo", Client)),
+ ?assertEqual(L2, erldis_list:to_list("foo", Client)),
+ erldis_sync_client:stop(Client).
+
+extra_queue_test() ->
+ Client = setup(),
+ L = ["a", "b", "c"],
+ Length = length(L),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ erldis_list:from_list(L, "foo", Client),
+ ?assertEqual(Length, erldis_list:len("foo", Client)),
+
+ F = fun(Item) ->
+ N = Length - erldis_list:len("foo", Client),
+ ?assertEqual(lists:nth(N, L), Item)
+ end,
+
+ erldis_list:out_foreach(F, "foo", Client),
+ ?assertEqual(0, erldis_list:len("foo", Client)),
+ erldis_sync_client:stop(Client).
+
+setup() ->
+ % setup
+ application:load(erldis),
+ {ok, Client} = erldis_sync_client:connect(),
+ ?assertEqual(erldis_sync_client:scall(Client, flushdb), [ok]),
+ Client.
View
35 test/erldis_sets_tests.erl
@@ -0,0 +1,35 @@
+-module(erldis_sets_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+sets_test() ->
+ % setup
+ application:load(erldis),
+ {ok, Client} = erldis_sync_client:connect(),
+ ?assertEqual(erldis_sync_client:scall(Client, flushdb), [ok]),
+ % non existent set
+ ?assertEqual(erldis_sets:is_set(Client, "foo"), false),
+ ?assertEqual(erldis_sets:to_list(Client, "foo"), []),
+ ?assertEqual(erldis_sets:size(Client, "foo"), 0),
+ ?assertEqual(erldis_sets:is_element("bar", Client, "foo"), false),
+ % add 1 element, values must be strings/lists, can't be integers
+ erldis_sets:add_element("1", Client, "foo"),
+ ?assertEqual(erldis_sets:is_set(Client, "foo"), true),
+ ?assertEqual(erldis_sets:size(Client, "foo"), 1),
+ ?assertEqual(erldis_sets:is_element("1", Client, "foo"), true),
+ ?assertEqual(erldis_sets:to_list(Client, "foo"), ["1"]),
+ % add 2 element
+ erldis_sets:add_element("2", Client, "foo"),
+ ?assertEqual(erldis_sets:size(Client, "foo"), 2),
+ ?assertEqual(erldis_sets:is_element("2", Client, "foo"), true),
+ ?assertEqual(lists:sort(erldis_sets:to_list(Client, "foo")), ["1", "2"]),
+ % del 2 element
+ erldis_sets:del_element("2", Client, "foo"),
+ ?assertEqual(erldis_sets:size(Client, "foo"), 1),
+ ?assertEqual(erldis_sets:is_element("2", Client, "foo"), false),
+ % from list
+ Elems = ["a", "b", "c"],
+ erldis_sets:from_list(Client, "foo", Elems),
+ ?assertEqual(lists:sort(erldis_sets:to_list(Client, "foo")), Elems),
+ erldis_sync_client:stop(Client).
+ % TODO: test union, intersection, is_disjoint, subtract.
Please sign in to comment.
Something went wrong with that request. Please try again.