Skip to content
This repository has been archived by the owner on May 27, 2019. It is now read-only.

Commit

Permalink
Add callback stuff to manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaspiller committed May 5, 2013
1 parent 7a22c03 commit 2f96cd7
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 12 deletions.
96 changes: 96 additions & 0 deletions spec/stream_manager_spec.erl
Expand Up @@ -195,5 +195,101 @@ spec() ->
% check two seperate processes were started
?assertNotEqual(Child1, Child2)
end)
end),

describe("#set_callback", fun() ->
it("sets the callback to call with data", fun() ->
Parent = self(),

HandleConnection = fun(Self, Callback) ->
receive
{data, Data} ->
Callback(Data),
Parent ! {self(), callback},
Self(Self, Callback);
_ ->
{ok, terminate}
end
end,

meck:expect(stream_client, connect,
fun(_, _, _, Callback) ->
Parent ! {self(), started},
HandleConnection(HandleConnection, Callback)
end
),

ok = stream_manager:start_stream(test_stream_manager),

% wait for child to start
Child = receive
{ChildPid, started} ->
ChildPid
after 100 ->
?assert(timeout)
end,

% send some data
Child ! {data, data1},

% wait for callback to be called
receive
{Child, callback} ->
ok
after 100 ->
?assert(timeout)
end,

Callback1 = fun(Data) ->
Parent ! {callback1, Data}
end,

Callback2 = fun(Data) ->
Parent ! {callback2, Data}
end,

% set a callback
stream_manager:set_callback(test_stream_manager, Callback1),

% send some more data
Child ! {data, data2},

% wait for callback to be called
receive
{Child, callback} ->
ok
after 100 ->
?assert(timeout)
end,

% set another callback
stream_manager:set_callback(test_stream_manager, Callback2),

% send some more data
Child ! {data, data3},

% wait for callback to be called
receive
{Child, callback} ->
ok
after 100 ->
?assert(timeout)
end,

% check callbacks called correctly
receive
{callback1, data2} ->
ok
after 100 ->
?assert(timeout)
end,

receive
{callback2, data3} ->
ok
after 100 ->
?assert(timeout)
end
end)
end)
end).
43 changes: 31 additions & 12 deletions src/stream_manager.erl
Expand Up @@ -14,6 +14,7 @@
start_stream/1,
stop_stream/1,
set_params/2,
set_callback/2,
status/1
]).

Expand Down Expand Up @@ -47,6 +48,9 @@ stop_stream(ServerRef) ->
set_params(ServerRef, Params) ->
gen_server:call(ServerRef, {set_params, Params}).

set_callback(ServerRef, Callback) ->
gen_server:call(ServerRef, {set_callback, Callback}).

status(ServerRef) ->
gen_server:call(ServerRef, status).

Expand Down Expand Up @@ -81,7 +85,7 @@ handle_call(start_stream, _From, State = #state{client_pid = Pid}) ->
case Pid of
undefined ->
% not started, start client
NewPid = spawn_link(client_connect(State));
NewPid = client_connect(State);
_ ->
% alrady started, ignore
NewPid = Pid
Expand All @@ -108,11 +112,14 @@ handle_call({set_params, Params}, _From, State = #state{client_pid = Pid, params
_ ->
% already started, restart
ok = client_shutdown(State),
NewPid = spawn_link(client_connect(State#state{ params = Params }))
NewPid = client_connect(State#state{ params = Params })
end,
{reply, ok, State#state{ params = Params, client_pid = NewPid }}
end;

handle_call({set_callback, Callback}, _From, State) ->
{reply, ok, State#state{ callback = Callback }};

handle_call(status, _From, State = #state{status = Status}) ->
{reply, Status, State};

Expand All @@ -125,6 +132,17 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast({client_data, Data}, State = #state{ callback = Callback }) ->
case Callback of
undefined ->
% no callback set, ignore data
ok;
_ ->
% callback set, call with data
spawn(fun() -> Callback(Data) end)
end,
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -181,17 +199,18 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
-spec client_connect(record()) -> pid().
client_connect(#state{headers = Headers, params = Params, callback = StateCallback}) ->
% Callback can be undefined, other options have defaults
case StateCallback of
undefined ->
Callback = fun(_) -> ok end;
_ ->
Callback = StateCallback
client_connect(#state{headers = Headers, params = Params}) ->
Parent = self(),

% We don't use the callback from the state, as we want to be able to change
% it without restarting the client. As such we call back into the manager
% which deals with the data as it sees fit
Callback = fun(Data) ->
gen_server:cast(Parent, {client_data, Data})
end,

Url = stream_client_util:filter_url(),
Parent = self(),
fun() ->
spawn_link(fun() ->
case stream_client:connect(Url, Headers, Params, Callback) of
{error, unauthorised} ->
% Didn't connect, unauthorised
Expand All @@ -206,7 +225,7 @@ client_connect(#state{headers = Headers, params = Params, callback = StateCallba
% Connection closed due to error
Parent ! {self(), client_exit, Error}
end
end.
end).

-spec client_shutdown(record()) -> ok.
client_shutdown(#state{client_pid = Pid}) ->
Expand Down

0 comments on commit 2f96cd7

Please sign in to comment.