From 2f96cd7b162620b984737b03fc5b7d8cffd55d65 Mon Sep 17 00:00:00 2001 From: Luca Spiller Date: Sun, 5 May 2013 17:49:37 +0100 Subject: [PATCH] Add callback stuff to manager --- spec/stream_manager_spec.erl | 96 ++++++++++++++++++++++++++++++++++++ src/stream_manager.erl | 43 +++++++++++----- 2 files changed, 127 insertions(+), 12 deletions(-) diff --git a/spec/stream_manager_spec.erl b/spec/stream_manager_spec.erl index c9f539a..07203cc 100644 --- a/spec/stream_manager_spec.erl +++ b/spec/stream_manager_spec.erl @@ -195,5 +195,101 @@ spec() -> % check two seperate processes were started ?assertNotEqual(Child1, Child2) end) + end), + + describe("#set_callback", fun() -> + it("sets the callback to call with data", fun() -> + Parent = self(), + + HandleConnection = fun(Self, Callback) -> + receive + {data, Data} -> + Callback(Data), + Parent ! {self(), callback}, + Self(Self, Callback); + _ -> + {ok, terminate} + end + end, + + meck:expect(stream_client, connect, + fun(_, _, _, Callback) -> + Parent ! {self(), started}, + HandleConnection(HandleConnection, Callback) + end + ), + + ok = stream_manager:start_stream(test_stream_manager), + + % wait for child to start + Child = receive + {ChildPid, started} -> + ChildPid + after 100 -> + ?assert(timeout) + end, + + % send some data + Child ! {data, data1}, + + % wait for callback to be called + receive + {Child, callback} -> + ok + after 100 -> + ?assert(timeout) + end, + + Callback1 = fun(Data) -> + Parent ! {callback1, Data} + end, + + Callback2 = fun(Data) -> + Parent ! {callback2, Data} + end, + + % set a callback + stream_manager:set_callback(test_stream_manager, Callback1), + + % send some more data + Child ! {data, data2}, + + % wait for callback to be called + receive + {Child, callback} -> + ok + after 100 -> + ?assert(timeout) + end, + + % set another callback + stream_manager:set_callback(test_stream_manager, Callback2), + + % send some more data + Child ! {data, data3}, + + % wait for callback to be called + receive + {Child, callback} -> + ok + after 100 -> + ?assert(timeout) + end, + + % check callbacks called correctly + receive + {callback1, data2} -> + ok + after 100 -> + ?assert(timeout) + end, + + receive + {callback2, data3} -> + ok + after 100 -> + ?assert(timeout) + end + end) end) end). diff --git a/src/stream_manager.erl b/src/stream_manager.erl index aee4704..aacac33 100644 --- a/src/stream_manager.erl +++ b/src/stream_manager.erl @@ -14,6 +14,7 @@ start_stream/1, stop_stream/1, set_params/2, + set_callback/2, status/1 ]). @@ -47,6 +48,9 @@ stop_stream(ServerRef) -> set_params(ServerRef, Params) -> gen_server:call(ServerRef, {set_params, Params}). +set_callback(ServerRef, Callback) -> + gen_server:call(ServerRef, {set_callback, Callback}). + status(ServerRef) -> gen_server:call(ServerRef, status). @@ -81,7 +85,7 @@ handle_call(start_stream, _From, State = #state{client_pid = Pid}) -> case Pid of undefined -> % not started, start client - NewPid = spawn_link(client_connect(State)); + NewPid = client_connect(State); _ -> % alrady started, ignore NewPid = Pid @@ -108,11 +112,14 @@ handle_call({set_params, Params}, _From, State = #state{client_pid = Pid, params _ -> % already started, restart ok = client_shutdown(State), - NewPid = spawn_link(client_connect(State#state{ params = Params })) + NewPid = client_connect(State#state{ params = Params }) end, {reply, ok, State#state{ params = Params, client_pid = NewPid }} end; +handle_call({set_callback, Callback}, _From, State) -> + {reply, ok, State#state{ callback = Callback }}; + handle_call(status, _From, State = #state{status = Status}) -> {reply, Status, State}; @@ -125,6 +132,17 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- +handle_cast({client_data, Data}, State = #state{ callback = Callback }) -> + case Callback of + undefined -> + % no callback set, ignore data + ok; + _ -> + % callback set, call with data + spawn(fun() -> Callback(Data) end) + end, + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. @@ -181,17 +199,18 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- -spec client_connect(record()) -> pid(). -client_connect(#state{headers = Headers, params = Params, callback = StateCallback}) -> - % Callback can be undefined, other options have defaults - case StateCallback of - undefined -> - Callback = fun(_) -> ok end; - _ -> - Callback = StateCallback +client_connect(#state{headers = Headers, params = Params}) -> + Parent = self(), + + % We don't use the callback from the state, as we want to be able to change + % it without restarting the client. As such we call back into the manager + % which deals with the data as it sees fit + Callback = fun(Data) -> + gen_server:cast(Parent, {client_data, Data}) end, + Url = stream_client_util:filter_url(), - Parent = self(), - fun() -> + spawn_link(fun() -> case stream_client:connect(Url, Headers, Params, Callback) of {error, unauthorised} -> % Didn't connect, unauthorised @@ -206,7 +225,7 @@ client_connect(#state{headers = Headers, params = Params, callback = StateCallba % Connection closed due to error Parent ! {self(), client_exit, Error} end - end. + end). -spec client_shutdown(record()) -> ok. client_shutdown(#state{client_pid = Pid}) ->