Permalink
Browse files

Queue server implemented

  • Loading branch information...
1 parent d1b269c commit f530ec8e7657ac09fc080186146316aa10ce6119 @timclassic committed Dec 12, 2011
Showing with 59 additions and 16 deletions.
  1. +58 −15 src/tqserver.erl
  2. +1 −1 src/tqserver_app.erl
View
@@ -20,7 +20,7 @@
-define(ACCEPT_TIMEOUT, 100).
--record(state, {lsock}).
+-record(state, {lsock, queue}).
%%%=========================================================================
@@ -36,32 +36,29 @@ start_link(LSock) ->
%%%=========================================================================
init([LSock]) ->
- {ok, #state{lsock = LSock}, 0}.
+ Q = tqueue:new(),
+ {ok, #state{lsock = LSock, queue = Q}, 0}.
-handle_call(Request, From, State) ->
- io:format("unexpected handle_call:~nRequest: ~p~nFrom: ~p~n",
- [Request, From]),
+handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
-handle_cast(Msg, State) ->
- io:format("unexpected handle_cast: Msg: ~p~n", [Msg]),
+handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(timeout, #state{lsock = LSock} = State) ->
{ok, _Socket} = gen_tcp:accept(LSock),
{ok, _Pid} = tqserver_sup:start_child(),
{noreply, State};
handle_info({tcp, Socket, Data}, State) ->
- ok = handle_line(Socket, Data),
- {noreply, State};
+ {ok, NewState} = do_comms(Socket, Data, State),
+ {noreply, NewState};
handle_info({tcp_closed, _Socket}, State) ->
{stop, normal, State};
handle_info({tcp_error, _Socket}, State) ->
{stop, normal, State};
-handle_info(Info, State) ->
- io:format("unexpected handle_info: Info: ~p~n", [Info]),
- {noreply, State, 0}.
+handle_info(_Info, State) ->
+ {noreply, State}.
terminate(_Reason, _State) ->
ok.
@@ -75,6 +72,52 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=========================================================================
-handle_line(Socket, Data) ->
- io:format("~p: ~p~n", [Socket, Data]),
- ok.
+do_comms(Socket, Data, State) ->
+ {Msg, NewState} =
+ try
+ {Cmd, Term} = parse_line(Data),
+ {ok, OkMsg, OkState} = run_cmd(Cmd, Term, State),
+ {OkMsg, OkState}
+ catch
+ _Class:Err ->
+ case Err of
+ invalid_cmd ->
+ ErrMsg = io_lib:fwrite("ERROR: invalid command~n", []);
+ Else ->
+ ErrMsg = io_lib:fwrite("ERROR(unhandled): ~p~n", [Else])
+ end,
+ {ErrMsg, State}
+ end,
+ gen_tcp:send(Socket, Msg),
+ {ok, NewState}.
+
+
+parse_line(Data) ->
+ Line = re:replace(Data, "\r\n\$", "", [{return, list}]),
+
+ %% Command parsing is naive. For example, "outt" is considered
+ %% valid and will register as the out operation
+ case re:run(Line, "^(in|out) *(.*)\$", [{capture, [1, 2], list}]) of
+ {match, [Cmd, Term]} ->
+ {Cmd, Term};
+ _Else ->
+ throw(invalid_cmd)
+ end.
+
+run_cmd(Cmd, Term, #state{queue = Q} = State) ->
+ case Cmd of
+ "in" ->
+ NewQ = tqueue:in(Q, Term),
+ Msg = io_lib:fwrite("OK: data added to queue~n", []),
+ NewState = State#state{queue = NewQ};
+ "out" ->
+ case tqueue:out(Q) of
+ {ok, NewQ, OutTerm} ->
+ Msg = io_lib:fwrite("OK: ~p~n", [OutTerm]),
+ NewState = State#state{queue = NewQ};
+ {empty, _NewQ} ->
+ Msg = io_lib:fwrite("WARN: Queue is empty~n", []),
+ NewState = State
+ end
+ end,
+ {ok, Msg, NewState}.
View
@@ -21,7 +21,7 @@
start(_Type, _StartArgs) ->
ok = validate_env(),
{ok, Port} = application:get_env(listen_port),
- {ok, LSock} = gen_tcp:listen(Port, []),
+ {ok, LSock} = gen_tcp:listen(Port, [{reuseaddr, true}]),
case tqserver_sup:start_link(LSock) of
{ok, Pid} ->
tqserver_sup:start_child(),

0 comments on commit f530ec8

Please sign in to comment.