Skip to content
Browse files

Manage websocket server state for callback like gen_server does.

- Let endpoint with advanced callback specify initial state.
- Use state as fragment accumulator for basic callbacks,
  default to empty initial fragment accumulator.
- lots of renames to try and make the above clearer.
  • Loading branch information...
1 parent 458cf8a commit 20d78ae64901afc8b8760367ca96c06cb132f947 @jbothma committed Nov 26, 2011
Showing with 65 additions and 52 deletions.
  1. +40 −33 src/yaws_websockets.erl
  2. +23 −18 www/advanced_echo_callback.erl
  3. +2 −1 www/websockets_autobahn_endpoint.yaws
View
73 src/yaws_websockets.erl
@@ -91,11 +91,17 @@ handshake(Arg, SC, CallbackMod, Opts) ->
gen_tcp:send(CliSock, Handshake), % TODO: use the yaws way of supporting normal
% and ssl sockets
{callback, CallbackType} = lists:keyfind(callback, 1, Opts),
- State = #ws_state{ sock = CliSock,
+ WSState = #ws_state{ sock = CliSock,
vsn = ProtocolVersion,
frag_type = none
},
- loop(CallbackMod, State, {none, <<>>}, CallbackType)
+ CallbackState = case CallbackType of
+ basic ->
+ {none, <<>>};
+ {advanced, InitialState} ->
+ InitialState
+ end,
+ loop(CallbackMod, WSState, CallbackState, CallbackType)
end.
origin_check(_Origin, {origin, any}) ->
@@ -116,29 +122,32 @@ handshake(8, Arg, _CliSock, _WebSocketLocation, _Origin, _Protocol) ->
"Sec-WebSocket-Accept: ", AcceptHash , "\r\n",
"\r\n"].
-loop(CallbackMod, WSState = #ws_state{sock=Socket}, Acc, CallbackType) ->
+loop(CallbackMod, WSState = #ws_state{sock=Socket}, CallbackState, CallbackType) ->
receive
{tcp, Socket, FirstPacket} ->
FrameInfos = yaws_api:websocket_unframe(WSState, FirstPacket),
case CallbackType of
basic ->
- {BasicMessages, NewAcc} = basic_messages(FrameInfos, Acc),
+ {BasicMessages, NewCallbackState} =
+ basic_messages(FrameInfos, CallbackState),
CallbackResults = lists:map({CallbackMod, handle_message}, BasicMessages),
lists:map(handle_result_fun(WSState), CallbackResults);
- advanced ->
+ {advanced,_} ->
% TODO: use return from callback to reply or otherwise,
% instead of callback calling yaws_api:send
- NewAcc = lists:foldl(do_callback_fun(WSState, CallbackMod), Acc, FrameInfos)
+ NewCallbackState = lists:foldl(do_callback_fun(WSState, CallbackMod),
+ CallbackState,
+ FrameInfos)
end,
Last = lists:last(FrameInfos),
NewWSState = Last#ws_frame_info.ws_state,
- loop(CallbackMod, NewWSState, NewAcc, CallbackType);
+ loop(CallbackMod, NewWSState, NewCallbackState, CallbackType);
{tcp_closed, Socket} ->
io:format("Websocket closed. Terminating echo_server...~n");
Any ->
io:format("websocket server loop received msg:~p~n", [Any]),
- loop(CallbackMod, WSState, Acc, CallbackType)
+ loop(CallbackMod, WSState, CallbackState, CallbackType)
end.
handle_result_fun(WSState) ->
@@ -155,23 +164,22 @@ handle_result_fun(WSState) ->
end.
do_callback_fun(WSState, CallbackMod) ->
- fun(FrameInfo, AccStuff) ->
- case CallbackMod:handle_message(FrameInfo, AccStuff) of
- {reply, {Type, Data}, NewAccStuff} ->
+ fun(FrameInfo, CallbackState) ->
+ case CallbackMod:handle_message(FrameInfo, CallbackState) of
+ {reply, {Type, Data}, NewCallbackState} ->
yaws_api:websocket_send(WSState, {Type, Data}),
- NewAccStuff;
- {noreply, NewAccStuff} ->
- NewAccStuff;
+ NewCallbackState;
+ {noreply, NewCallbackState} ->
+ NewCallbackState;
{close, Reason} ->
exit(Reason)
end
end.
-basic_messages(FrameInfos, {FragType, BuffRemainder}=Acc) ->
- io:format("basic messages ~p~n~p~n", [FrameInfos,Acc]),
- {Messages, NewFragType, NewBuffRemainder}
- = lists:foldl(fun handle_message/2, {[], FragType, BuffRemainder}, FrameInfos),
- {Messages, {NewFragType, NewBuffRemainder}}.
+basic_messages(FrameInfos, {FragType, FragAcc}) ->
+ {Messages, NewFragType, NewFragAcc}
+ = lists:foldl(fun handle_message/2, {[], FragType, FragAcc}, FrameInfos),
+ {Messages, {NewFragType, NewFragAcc}}.
%% start of a fragmented message
handle_message( #ws_frame_info{ fin=0,
@@ -184,23 +192,22 @@ handle_message( #ws_frame_info{ fin=0,
handle_message( #ws_frame_info{ fin=0,
data=Data,
opcode=continuation},
- {Messages, FragType, Acc}) ->
- {Messages, FragType, <<Acc/binary,Data/binary>>};
+ {Messages, FragType, FragAcc}) ->
+ {Messages, FragType, <<FragAcc/binary,Data/binary>>};
%% end of text fragmented message
handle_message( #ws_frame_info{ fin=1,
opcode=continuation,
data=Data
},
- {Messages, text, Acc}) ->
- Unfragged = <<Acc/binary, Data/binary>>,
+ {Messages, text, FragAcc}) ->
+ Unfragged = <<FragAcc/binary, Data/binary>>,
NewMessage = {text, Unfragged},
{[NewMessage | Messages], none, <<>>};
%% unfragmented text message
handle_message( #ws_frame_info{opcode=text, data=Data},
{Messages, none, <<>>}) ->
- io:format("new message ~p~n", [Data]),
NewMessage = {text, Data},
{[NewMessage | Messages], none, <<>>};
@@ -209,8 +216,8 @@ handle_message( #ws_frame_info{ fin=1,
opcode=continuation,
data=Data
},
- {Messages, binary, Acc}) ->
- Unfragged = <<Acc/binary, Data/binary>>,
+ {Messages, binary, FragAcc}) ->
+ Unfragged = <<FragAcc/binary, Data/binary>>,
NewMessage = {binary, Unfragged},
{[NewMessage|Messages], none, <<>>};
@@ -224,21 +231,21 @@ handle_message( #ws_frame_info{ opcode=binary,
handle_message( #ws_frame_info{ opcode=ping,
data=Data,
ws_state=State},
- AccStuff) ->
+ Acc) ->
io:format("Replying pong to ping on behalf of basic callback module.~n",[]),
yaws_api:websocket_send(State, {pong, Data}),
- AccStuff;
+ Acc;
-handle_message(#ws_frame_info{opcode=pong}, AccStuff) ->
+handle_message(#ws_frame_info{opcode=pong}, Acc) ->
% A response to an unsolicited pong frame is not expected.
% http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08#section-4
io:format("ignoring unsolicited pong~n",[]),
- AccStuff;
+ Acc;
-handle_message(FrameInfo=#ws_frame_info{}, AccStuff) ->
+handle_message(FrameInfo=#ws_frame_info{}, Acc) ->
io:format("WS Endpoint Ignoring message for basic callback. ~p~n~p~n",
- [FrameInfo, AccStuff]),
- AccStuff.
+ [FrameInfo, Acc]),
+ Acc.
View
41 www/advanced_echo_callback.erl
@@ -8,61 +8,66 @@
-include("yaws_api.hrl").
+% define callback state to accumulate a fragmented WS message
+% which we echo back when all fragments are in, returning to
+% initial initial state.
+-record(state, {frag_type = none, % fragment type
+ acc = <<>>}). % accumulate fragment data
%% start of a fragmented message
handle_message( #ws_frame_info{ fin=0,
opcode=FragType,
data=Data },
- {none, <<>>}) ->
- {noreply, {FragType, Data}};
+ #state{frag_type=none, acc = <<>>}) ->
+ {noreply, #state{frag_type=FragType, acc=Data}};
%% non-final continuation of a fragmented message
handle_message( #ws_frame_info{ fin=0,
data=Data,
opcode=continuation},
- {FragType, Acc}) ->
- {noreply, {FragType, <<Acc/binary,Data/binary>>}};
+ #state{frag_type = FragType, acc = Acc}) ->
+ {noreply, #state{frag_type=FragType, acc = <<Acc/binary,Data/binary>>}};
%% end of text fragmented message
handle_message( #ws_frame_info{ fin=1,
opcode=continuation,
data=Data},
- {text, Acc}) ->
+ #state{frag_type=text, acc=Acc}) ->
Unfragged = <<Acc/binary, Data/binary>>,
- {reply, {text, Unfragged}, {none, <<>>}};
+ {reply, {text, Unfragged}, #state{frag_type=none, acc = <<>>}};
%% one full non-fragmented message
-handle_message( #ws_frame_info{opcode=text, data=Data}, AccStuff) ->
- {reply, {text, Data}, AccStuff};
+handle_message( #ws_frame_info{opcode=text, data=Data}, State) ->
+ {reply, {text, Data}, State};
%% end of binary fragmented message
handle_message( #ws_frame_info{ fin=1,
opcode=continuation,
data=Data },
- {binary, Acc}) ->
+ #state{frag_type=binary, acc=Acc}) ->
Unfragged = <<Acc/binary, Data/binary>>,
io:format("echoing back binary message~n",[]),
- {reply, {binary, Unfragged}, {none, <<>>}};
+ {reply, {binary, Unfragged}, #state{frag_type=none, acc = <<>>}};
% one full non-fragmented binary message
handle_message( #ws_frame_info{ opcode=binary,
data=Data},
- AccStuff) ->
+ State) ->
io:format("echoing back binary message~n",[]),
- {reply, {binary, Data}, AccStuff};
+ {reply, {binary, Data}, State};
handle_message( #ws_frame_info{ opcode=ping,
data=Data},
- AccStuff) ->
+ State) ->
io:format("replying pong to ping~n",[]),
- {reply, {pong, Data}, AccStuff};
+ {reply, {pong, Data}, State};
-handle_message(#ws_frame_info{opcode=pong}, AccStuff) ->
+handle_message(#ws_frame_info{opcode=pong}, State) ->
% A response to an unsolicited pong frame is not expected.
% http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08#section-4
io:format("ignoring unsolicited pong~n",[]),
- {noreply, AccStuff};
+ {noreply, State};
-handle_message(FrameInfo=#ws_frame_info{}, AccStuff) ->
- io:format("WS Endpoint Unhandled message: ~p~n~p~n", [FrameInfo, AccStuff]),
+handle_message(FrameInfo=#ws_frame_info{}, State) ->
+ io:format("WS Endpoint Unhandled message: ~p~n~p~n", [FrameInfo, State]),
{close, {error, {unhandled_message, FrameInfo}}}.
View
3 www/websockets_autobahn_endpoint.yaws
@@ -1,6 +1,7 @@
<erl>
out(A) ->
CallbackMod = advanced_echo_callback,
- Opts = [{callback, advanced}],
+ InitialState = {state, none, <<>>},
+ Opts = [{callback, {advanced, InitialState}}],
{websocket, CallbackMod, Opts}.
</erl>

0 comments on commit 20d78ae

Please sign in to comment.
Something went wrong with that request. Please try again.