Permalink
Browse files

pubsub test has some error

  • Loading branch information...
litaocheng committed Nov 6, 2011
1 parent adced1d commit 47d9757f59afa73529dd1033bc7cbba933529a0f
Showing with 126 additions and 55 deletions.
  1. +8 −2 Makefile
  2. +1 −0 src/redis.erl
  3. +39 −16 src/redis_client.erl
  4. +77 −36 test/redis_pubsub_SUITE.erl
  5. +1 −1 test/test.spec
View
@@ -8,15 +8,21 @@ ERL_LIB := $(shell erl -noshell -eval 'io:format("~s",[code:lib_dir()]),erlang:h
2> /dev/null)
APP_FULLNAME := $(APP_NAME)-$(APP_VSN)
+ifdef LOG
+ MAKE_ARGS =
+else
+ MAKE_ARGS ='NOLOG=true'
+endif
+
all: compile
compile:
(mkdir -p ./ebin)
- (cd src;$(MAKE) NOLOG=true)
+ (cd src;$(MAKE) $(MAKE_ARGS))
test_compile:
(mkdir -p ./ebin)
- (cd src;$(MAKE) TEST=true NOLOG=true)
+ (cd src;$(MAKE) TEST=true $(MAKE_ARGS))
test: clean unit_test comm_test
@#(make clean)
View
@@ -1106,6 +1106,7 @@ is_empty_str(<<>>) -> true;
is_empty_str(_) -> false.
%% convert to list
+may_single_to_list([]) -> [];
may_single_to_list([H|_] = V) when is_list(H); is_binary(H) -> V;
may_single_to_list(V) -> [V].
View
@@ -145,11 +145,11 @@ punsubscribe(Client, Patterns, Callback) ->
%% it will return 'ok' immediately. you must do the working in
%% callback functions
subscribe(Client, Channels, CbSub, CbMsg) ->
- %?DEBUG2("subscribe to channels:~p", [Channels]),
+ ?DEBUG2("subscribe to channels:~p", [Channels]),
call(Client, {subscribe, ?PUBSUB_CHANNEL, Channels, CbSub, CbMsg}).
unsubscribe(Client, Channels, Callback) ->
- %?DEBUG2("subscribe to channels:~p", [Channels]),
+ ?DEBUG2("unsubscribe to channels:~p", [Channels]),
call(Client, {unsubscribe, ?PUBSUB_CHANNEL, Channels, Callback}).
-spec quit(client()) ->
@@ -214,12 +214,12 @@ init({Server = {Host, Port}, Passwd}) ->
handle_call({command, {Data, Timeout}}, _From,
State = #state{sock = Sock, server = _Server, ctx = normal}) ->
- ?DEBUG2("redis client command:~n~p~n\t=> ~p", [Data, _Server]),
+ ?DEBUG2("command:~n~p~n\t=> ~p", [Data, _Server]),
Reply = do_send_recv(Data, Sock, Timeout),
{reply, Reply, State};
handle_call({command, {Data, Len, Timeout}}, _From,
State = #state{sock = Sock, server = _Server, ctx = normal}) ->
- ?DEBUG2("redis client multi_command(~p):~p=> ~p", [Len, Data, _Server]),
+ ?DEBUG2("multi_command(~p):~p=> ~p", [Len, Data, _Server]),
Reply = do_send_multi_recv(Data, Len, Sock, Timeout),
{reply, Reply, State};
handle_call({command, _}, _From, State) ->
@@ -230,8 +230,8 @@ handle_call({subscribe, Type, L, CbSub, CbMsg}, _From,
State = #state{sock = Sock, pubsub_tid = Tid}) ->
Cmd =
case Type of
- ?PUBSUB_CHANNEL -> <<"subscribe">>;
- ?PUBSUB_PATTERN -> <<"psubscribe">>
+ ?PUBSUB_CHANNEL -> <<"SUBSCRIBE">>;
+ ?PUBSUB_PATTERN -> <<"PSUBSCRIBE">>
end,
Data = redis_proto:mbulk_list([Cmd | L]),
do_send(Sock, Data),
@@ -243,8 +243,8 @@ handle_call({unsubscribe, Type, L, Callback}, _From,
State = #state{sock = Sock, pubsub_tid = Tid}) ->
Cmd =
case Type of
- ?PUBSUB_CHANNEL -> <<"unsubscribe">>;
- ?PUBSUB_PATTERN -> <<"punsubscribe">>
+ ?PUBSUB_CHANNEL -> <<"UNSUBSCRIBE">>;
+ ?PUBSUB_PATTERN -> <<"PUNSUBSCRIBE">>
end,
Data = redis_proto:mbulk_list([Cmd | L]),
do_send(Sock, Data),
@@ -304,7 +304,7 @@ call(Client, Req) ->
%% create pubsub table
do_create_table() ->
- ets:new(dummy, [set, private, {keypos, 1},
+ ets:new(dummy, [set, private, {keypos, #pubsub.id},
{read_concurrency, true}]).
to_name(Host, Port, First) ->
@@ -427,8 +427,9 @@ do_auth(Sock, Passwd) ->
do_add_pubsub(Type, L, CbSub, CbMsg, Table) ->
lists:foreach(
fun(E) ->
+ B = ?IF(is_binary(E), E, ?S2B(E)),
PubSub = #pubsub{
- id = {Type, E},
+ id = {Type, B},
cb_sub = CbSub,
cb_msg = CbMsg
},
@@ -468,7 +469,9 @@ do_get_pubsub(Type, Data, Table) ->
%% handle the pubsub tcp data
do_handle_pubsub(Sock, Packet, State = #state{pubsub_tid = Table}) ->
- case do_handle_packet(Sock, Packet, null, ?COMMAND_TIMEOUT) of
+ List = do_handle_packet(Sock, Packet, null, ?COMMAND_TIMEOUT),
+ ?DEBUG2("***:~p", [List]),
+ case List of
[<<"subscribe">>, Channel, N] ->
do_handle_subscribe(?PUBSUB_CHANNEL, Channel, N, Table),
State;
@@ -481,26 +484,46 @@ do_handle_pubsub(Sock, Packet, State = #state{pubsub_tid = Table}) ->
do_handle_unsubscribe(?PUBSUB_PATTERN, Channel, N, State);
[<<"message">>, Channel, Msg] ->
#pubsub{cb_unsub = Fun} = do_get_pubsub(?PUBSUB_CHANNEL, Channel, Table),
- catch Fun(Channel, Msg),
+ try
+ Fun(Channel, Msg)
+ catch
+ _T:_R ->
+ ?ERROR2("message callback ~p:~p", [_T, _R])
+ end,
State;
[<<"pmessage">>, Pattern, Channel, Msg] ->
#pubsub{cb_unsub = Fun} = do_get_pubsub(?PUBSUB_PATTERN, Pattern, Table),
- catch Fun(Pattern, Channel, Msg),
+ try
+ Fun(Pattern, Channel, Msg)
+ catch
+ _T:_R ->
+ ?ERROR2("pmessage callback ~p:~p", [_T, _R])
+ end,
State
end.
%% handle the subscribe
do_handle_subscribe(Type, Data, N, Table) ->
#pubsub{cb_sub = Fun} = do_get_pubsub(Type, Data, Table),
- catch Fun(Data, N).
+ try
+ Fun(Data, N)
+ catch
+ _T:_R ->
+ ?ERROR2("subscribe callback ~p:~p", [_T, _R])
+ end.
%% handle the unsubscribe
do_handle_unsubscribe(_Type, _Data, 0, #state{pubsub_tid = Table} = State) ->
- ets:delete(Table),
+ ets:delete_all_objects(Table),
State#state{ctx = normal};
do_handle_unsubscribe(Type, Data, N, #state{pubsub_tid = Table} = State) ->
#pubsub{cb_unsub = Fun} = do_del_pubsub(Type, Data, Table),
- catch Fun(Data, N),
+ try
+ Fun(Data, N)
+ catch
+ _T:_R ->
+ ?ERROR2("unsubscribe callback ~p:~p", [_T, _R])
+ end,
State.
%%---------------
View
@@ -16,7 +16,7 @@ init_per_suite(Config) ->
code:add_path("../ebin"),
{ok, PidSub} = redis_client:start(localhost, 6379, ""),
RedisSub = redis_client:handler(PidSub),
- ok = RedisSub:flush_all(),
+ ok = RedisSub:flushall(),
{ok, PidPub} = redis_client:start(localhost, 6379, ""),
RedisPub = redis_client:handler(PidPub),
io:format("Redis sub: ~p pub:~p~n", [RedisSub, RedisPub]),
@@ -40,74 +40,115 @@ end_per_testcase(Name, Config) ->
all() ->
[
- test_sub,
- test_pub
+ test_channel,
+ test_pattern
].
%%-------------------------------------------------------------------------
%% Test cases sthreets here.
%%-------------------------------------------------------------------------
-%% test generic commands
-test_sub(Config) ->
+%% test channel pub/sub
+test_channel(Config) ->
RedisSub = ?config(redis_sub, Config),
+ RedisPub = ?config(redis_pub, Config),
- RedisSub:subscribe([<<"one">>, <<"two">>],
- fun callback_sub/2,
- fun callback_msg/2),
-
- {error, _} = RedisSub:get("k1"),
+ ok = RedisSub:subscribe([<<"one">>, <<"two">>],
+ fun cb_channel_sub/2,
+ fun cb_msg/2),
- RedisSub:subscribe([<<"three">>],
- fun callback_sub/2,
- fun callback_msg/2),
+ badmodel = (catch RedisSub:get("k1")),
- RedisSub:unsubscribe([<<"one">>],
- fun callback_unsub/2),
- RedisSub:unsubscribe(fun callback_sub/2),
- ok.
+ ok = RedisSub:subscribe([<<"three">>],
+ fun cb_channel_sub/2,
+ fun cb_msg/2),
-test_pub(Config) ->
- RedisSub = ?config(redis_sub, Config),
- RedisPub = ?config(redis_pub, Config),
+ ok = RedisSub:unsubscribe([<<"one">>],
+ fun cb_channel_unsub/2),
+ ok = RedisSub:unsubscribe(fun cb_channel_unsub/2),
- 0 = RedisPub:publish(<<"one">>, "one-msg"),
+ 0 = RedisPub:publish(<<"one">>, "no_sub"),
RedisSub:subscribe([<<"one">>],
- fun callback_sub/2,
- fun callback_msg/2),
- 1 = RedisPub:publish(<<"one">>, "one-msg"),
-
+ fun cb_channel_sub/2,
+ fun cb_msg/2),
+ erlang:put('one-msg', true),
+ 1 = RedisPub:publish(<<"one">>, "erase"),
+ 1 = RedisPub:publish(<<"one">>, "get"),
ok.
-
-callback_sub(<<"one">>, 1) ->
+%% subscribe callbacks
+cb_channel_sub(<<"one">>, 1) ->
?P("sub callback with channel one", []),
ok;
-callback_sub(<<"two">>, 2) ->
+cb_channel_sub(<<"two">>, 2) ->
?P("sub callback with channel two", []),
ok;
-callback_sub(<<"three">>, 3) ->
+cb_channel_sub(<<"three">>, 3) ->
?P("sub callback with channel three", []),
ok.
-callback_unsub(<<"one">>, 2) ->
+cb_channel_unsub(<<"one">>, 2) ->
?P("unsub callback with channel one", []),
ok;
-callback_unsub(<<"two">>, 1) ->
+cb_channel_unsub(<<"two">>, 1) ->
?P("unsub callback with channel two", []),
ok;
-callback_unsub(<<"three">>, 0) ->
+cb_channel_unsub(<<"three">>, 0) ->
?P("unsub callback with channel three", []),
ok.
-callback_msg(<<"one">>, <<"one-msg">>) ->
- ?P("msg callback with channel one", []),
+cb_msg(<<"one">>, <<"erase">>) ->
+ erlang:erase('one-msg'),
ok;
-callback_msg(<<"two">>, <<"two-msg">>) ->
+cb_msg(<<"one">>, <<"get">>) ->
+ undefined = erlang:get('one-msg'),
+ ok;
+cb_msg(<<"two">>, <<"two-msg">>) ->
?P("msg callback with channel two", []),
ok;
-callback_msg(<<"three">>, <<"three-msg">>) ->
+cb_msg(<<"three">>, <<"three-msg">>) ->
?P("msg callback with channel three", []),
ok.
+%%-----------------
+%% pattern pub/sub
+%%-----------------
+test_pattern(Config) ->
+ RedisSub = ?config(redis_sub, Config),
+ RedisPub = ?config(redis_pub, Config),
+
+ RedisSub:psubscribe("news.*",
+ fun(<<"news.*">>, 1) ->
+ "subscribe pattern ok"
+ end,
+ fun cb_pmessage1/3),
+ RedisSub:psubscribe("news.china.*",
+ fun(<<"news.china.*">>, 2) ->
+ "subscribe pattern ok"
+ end,
+ fun cb_pmessage2/3),
+ RedisSub:subscribe("news.china.edu",
+ fun(<<"news.china.edu">>, 1) ->
+ "subscribe channel ok"
+ end,
+ fun(<<"news.china.edu">>, <<"news 1">>) ->
+ ok
+ end),
+ badmodel = (catch RedisSub:publish("news", "news")),
+ 3 = RedisPub:publish("news.china.edu", "news 1"),
+ 2 = RedisPub:publish("news.china.food", "news 2"),
+ 1 = RedisPub:publish("news.china", "news 3"),
+ 0 = RedisPub:publish("other_topic", "news 4"),
+ ok.
+
+cb_pmessage1(<<"new.*">>, <<"news.china.edu">>, <<"news 1">>) ->
+ ok;
+cb_pmessage1(<<"new.*">>, <<"news.china.food">>, <<"news 2">>) ->
+ ok;
+cb_pmessage1(<<"new.*">>, <<"news.china">>, <<"news 3">>) ->
+ ok.
+cb_pmessage2(<<"new.china.*">>, <<"news.china.edu">>, <<"news 1">>) ->
+ ok;
+cb_pmessage2(<<"new.china.*">>, <<"news.china.food">>, <<"news 2">>) ->
+ ok.
View
@@ -1 +1 @@
-{suites, ".", [redis_SUITE]}.
+{suites, ".", [redis_SUITE, redis_pubsub_SUITE]}.

0 comments on commit 47d9757

Please sign in to comment.