Skip to content

Commit

Permalink
Refactor WebSockets and add support of optional callback functions
Browse files Browse the repository at this point in the history
Main changes:
  * Fix some bugs about UTF-8 encoding and messages fragmentation
  * Add support of optional callback functions
  * Add support of many startup options
  * Add support of outgoing fragmented messages
  * Add a websocket testsuite

                                 - * -
*** bug fixes ***

First of all, an huge part of yaws_websocket.erl was rewritten to fix bugs
about the messages fragmentation and the UTF-8 encoding of incoming text
messages:

  * UTF-8 encoding
    before, when a text message was fragmented, only the first frame was
    checked and partial UTF-8 sequences were not supported. Now, checks
    are done on each message part and a partial UTF-8 sequence at the end
    of a frame is accumulated and checked with the next frame (for basic
    callback only).

  * Messages fragmentation
    for basic callback modules, because of a buggy mapping between frames
    and messages, the messages fragmentation was almost unusable. To fix
    this, the message handling was rewritten.

Now, all tests in the autobahn testsuite[1] pass successfully.

                                 - * -
*** Optional callback functions ***

Then, from an idea of François de Metz[2], yaws_websocket module was
extended to support optional callback functions. See the documentation for
details (www/websockets.yaws).

Quickly, optional callback functions are:

  * Module:init/1           (for basic and advanced callback modules)
  * Module:terminate/2      (for basic and advanced callback modules)
  * Module:handle_open/2    (for basic and advanced callback modules)
  * Module:handle_info/2    (for basic and advanced callback modules)
  * Module:handle_message/2 (for basic callback modules only, used in place
                             of Module:handle_message/1)

Thanks to Pablo Vieytes[3] which added handle_info to optional callback
functions.
                                 - * -
*** Startup options ***

To start a websocket process a script must return the following term from
its out/1 function:

  {websocket, CallbackMod, Options}

where 'Options' is a (possibly empty) proplist. Following parameters are
supported:

  * {origin, Orig}
  * {callback, Type}
  * {keepalive, Boolean}
  * {keepalive_timeout, Tout}
  * {keepalive_grace_period, Time}
  * {drop_on_timeout, Boolean}
  * {close_timeout, Tout}
  * {close_if_unmasked, Boolean}
  * {max_frame_size, Int}
  * {max_message_size, Int}
  * {auto_fragment_message, Boolean}
  * {auto_fragment_threshold, Int}

See the documentation for details (www/websockets.yaws).

                                 - * -
*** Outgoing fragmented messages ***

A callback module can now send fragmented messages to clients using the
record #ws_frame{}:

 #ws_frame{fin     = true,  %% true | false
           rsv     = 0,
           opcode,          %% text | binary | continuation...
           payload = <<>>}. %% binary(), unmasked data

--
[1] http://autobahn.ws/testsuite
[2] #99
[3] https://github.com/pvieytes
  • Loading branch information
Christopher Faulet committed Feb 11, 2013
1 parent c341c31 commit 29a7989
Show file tree
Hide file tree
Showing 20 changed files with 4,295 additions and 704 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -28,6 +28,8 @@ test/support/include.sh
test/t1/localhost:8000/
test/t[1-9]/logs/
test/t[1-9]/yaws.conf
test/t10/logs/
test/t10/yaws.conf
test/t4/www2/8388608.bin
www/yaws.pdf
www/yaws.ps
Expand Down
473 changes: 363 additions & 110 deletions doc/yaws.tex

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions examples/src/Makefile
Expand Up @@ -10,9 +10,10 @@ endif


MODULES= advanced_echo_callback \
authmod_gssapi \
basic_echo_callback \
server_sent_events
authmod_gssapi \
basic_echo_callback \
basic_echo_callback_extended \
server_sent_events

EBIN_FILES=$(MODULES:%=../ebin/%.$(EMULATOR))
ERLC_FLAGS+=-Werror $(DEBUG_FLAGS)
Expand Down
147 changes: 105 additions & 42 deletions examples/src/advanced_echo_callback.erl
Expand Up @@ -14,65 +14,128 @@
-record(state, {frag_type = none, % fragment type
acc = <<>>}). % accumulate fragment data

%% start of a fragmented message
handle_message(#ws_frame_info{fin=0,
opcode=FragType,
data=Data},

%% unfragmented text message
handle_message(#ws_frame_info{fin=1, opcode=text, data=Data},
#state{frag_type=none, acc = <<>>}=State) ->
case unicode:characters_to_binary(Data, utf8, utf8) of
Data -> {reply, {text, Data}, State};
_ -> {close, {1007, <<"invalid utf-8">>}}
end;

%% start of a fragmented text message
handle_message(#ws_frame_info{fin=0, opcode=text, data=Data},
#state{frag_type=none, acc = <<>>}) ->
{noreply, #state{frag_type=FragType, acc=Data}};
case unicode:characters_to_binary(Data, utf8, utf8) of
Data ->
{noreply, #state{frag_type=text, acc={[Data], <<>>}}};
{incomplete, Dec, Rest} ->
{noreply, #state{frag_type=text, acc={[Dec], Rest}}};
_ ->
{close, {1007, <<"invalid utf-8">>}}
end;

%% non-final continuation of a fragmented message
handle_message(#ws_frame_info{fin=0,
data=Data,
opcode=continuation},
#state{frag_type = FragType, acc = Acc}) ->
{noreply, #state{frag_type=FragType, acc = <<Acc/binary,Data/binary>>}};
%% non-final continuation of a fragmented text message
handle_message(#ws_frame_info{fin=0, data=Data, opcode=continuation},
#state{frag_type=text, acc={Dec0, Rest0}}=State) ->
Data1 = <<Rest0/binary, Data/binary>>,
case unicode:characters_to_binary(Data1, utf8, utf8) of
Data1 ->
{noreply, State#state{acc={[Data1|Dec0], <<>>}}};
{incomplete, Dec1, Rest1} ->
{noreply, State#state{acc={[Dec1|Dec0], Rest1}}};
_ ->
{close, {1007, <<"invalid utf-8">>}}
end;

%% end of text fragmented message
handle_message(#ws_frame_info{fin=1,
opcode=continuation,
data=Data},
#state{frag_type=text, acc=Acc}) ->
Unfragged = <<Acc/binary, Data/binary>>,
{reply, {text, Unfragged}, #state{frag_type=none, acc = <<>>}};
handle_message(#ws_frame_info{fin=1, opcode=continuation, data=Data},
#state{frag_type=text, acc={Dec, Rest}}) ->
Data1 = <<Rest/binary, Data/binary>>,
case unicode:characters_to_binary(Data1, utf8, utf8) of
Data1 ->
Msg = list_to_binary(lists:reverse([Data1|Dec])),
{reply, {text, Msg}, #state{}};
_ ->
{close, {1007, <<"invalid utf-8">>}}
end;

%% unfragmented binary message
handle_message(#ws_frame_info{fin=1, opcode=binary, data=Data},
#state{frag_type=none, acc = <<>>}=State) ->
{reply, {binary, Data}, State};

%% start of a fragmented binary message
handle_message(#ws_frame_info{fin=0, opcode=binary, data=Data},
#state{frag_type=none, acc = <<>>}) ->
{noreply, #state{frag_type=binary, acc=Data}};

%% one full non-fragmented message
handle_message(#ws_frame_info{opcode=text, data=Data}, State) ->
{reply, {text, Data}, State};
%% non-final continuation of a fragmented binary message
handle_message(#ws_frame_info{fin=0, data=Data, opcode=continuation},
#state{frag_type=binary, acc=FragAcc}=State) ->
{noreply, State#state{acc = <<FragAcc/binary,Data/binary>>}};

%% end of binary fragmented message
handle_message(#ws_frame_info{fin=1,
opcode=continuation,
data=Data},
#state{frag_type=binary, acc=Acc}) ->
Unfragged = <<Acc/binary, Data/binary>>,
io:format("echoing back binary message~n",[]),
{reply, {binary, Unfragged}, #state{frag_type=none, acc = <<>>}};

%% one full non-fragmented binary message
handle_message(#ws_frame_info{opcode=binary,
data=Data},
State) ->
io:format("echoing back binary message~n",[]),
{reply, {binary, Data}, State};
handle_message(#ws_frame_info{fin=1, opcode=continuation, data=Data},
#state{frag_type=binary, acc=FragAcc}) ->
Unfragged = <<FragAcc/binary, Data/binary>>,
{reply, {binary, Unfragged}, #state{}};


handle_message(#ws_frame_info{opcode=ping,
data=Data},
State) ->
handle_message(#ws_frame_info{opcode=ping, data=Data}, State) ->
io:format("replying pong to ping~n",[]),
{reply, {pong, Data}, State};

handle_message(#ws_frame_info{opcode=pong}, State) ->
%% A response to an unsolicited pong frame is not expected.
%% http://tools.ietf.org/html/\
%% draft-ietf-hybi-thewebsocketprotocol-08#section-4
%% draft-ietf-hybi-thewebsocketprotocol-08#section-4
io:format("ignoring unsolicited pong~n",[]),
{noreply, State};

handle_message(#ws_frame_info{opcode=close}, _State) ->
io:format("got close~n",[]),
{close, normal};
%% According to RFC 6455 section 5.4, control messages like close
%% MAY be injected in the middle of a fragmented message, which is
%% why we pass FragType and FragAcc along below. Whether any clients
%% actually do this in practice, I don't know.
handle_message(#ws_frame_info{opcode=close, length=Len,
data=Data, ws_state=WSState},
_State) ->
Reason = case Len of
0 -> {1000, <<>>};
1 -> {1002, <<"protocol error">>};
_ ->
<<Status:16/big, Msg/binary>> = Data,
case unicode:characters_to_binary(Msg, utf8, utf8) of
Msg -> {check_close_code(Status, WSState), Msg};
_ -> {1007, <<"invalid utf-8">>}
end
end,
io:format("got close. reply reason: ~p~n", [Reason]),
{close, Reason};

handle_message(#ws_frame_info{}=FrameInfo, State) ->
io:format("WS Endpoint Unhandled message: ~p~n~p~n", [FrameInfo, State]),
{close, {error, {unhandled_message, FrameInfo}}}.
{close, {1002, <<"protocol error">>}};

handle_message({fail_connection, Status, Msg}, State) ->
io:format("Connection failure: ~p:~p~n~p~n", [Status, Msg, State]),
{close, {Status, Msg}}.


%% The checks for close status codes here are based on RFC 6455 and on the
%% autobahn testsuite (http://autobahn.ws/testsuite).
check_close_code(Code, WSState) ->
if
Code >= 3000 andalso Code =< 4999 ->
Code;
Code < 1000 ->
1002;
Code == 1006 andalso WSState#ws_state.sock == undefined ->
Code;
Code >= 1004 andalso Code =< 1006 ->
1002;
Code > 1011 ->
1002;
true ->
Code
end.
4 changes: 2 additions & 2 deletions examples/src/basic_echo_callback.erl
Expand Up @@ -30,8 +30,8 @@ handle_message({text, Message}) ->
handle_message({binary, Message}) ->
{reply, {binary, Message}};

handle_message({close, _Status, _Reason}) ->
{close, normal}.
handle_message({close, Status, _Reason}) ->
{close, Status}.


say_hi(Pid) ->
Expand Down
82 changes: 82 additions & 0 deletions examples/src/basic_echo_callback_extended.erl
@@ -0,0 +1,82 @@
%%%===========================================================
%%% compiled using erlc -I include src/basic_echo_callback.erl
%%%===========================================================

-module(basic_echo_callback_extended).

-include("yaws_api.hrl").

%% Export for websocket callbacks
-export([init/1, terminate/2, handle_open/2, handle_message/2, handle_info/2]).

%% Export for apply
-export([say_hi/1]).

-record(state, {nb_texts=0, nb_bins=0}).

init([_Arg, Params]) ->
io:format("Initalize ~p: ~p~n", [self(), Params]),
{ok, #state{}}.

handle_open(WSState, State) ->
yaws_websockets:send(WSState, {text, <<"Welcome !">>}),
{ok, State}.

handle_message({text, <<"bye">>}, #state{nb_texts=N, nb_bins=M}=State) ->
io:format("User said bye. ~p text / ~p binary messages echoed ~n", [N, M]),
NbTexts = list_to_binary(integer_to_list(N)),
NbBins = list_to_binary(integer_to_list(M)),
Messages = [
{text, <<"Goodbye !">>},
{text, <<NbTexts/binary, " text messages echoed">>},
{text, <<NbBins/binary, " binary messages echoed">>}
],
{close, {1000, <<"bye">>}, Messages, State};

handle_message({text, <<"something">>}, State) ->
io:format("Some action without a reply~n", []),
{noreply, State};

handle_message({text, <<"say hi later">>}, State) ->
timer:apply_after(3000, ?MODULE, say_hi, [self()]),
{noreply, State};

handle_message({text, <<"fragmented message">>}, State) ->
io:format("Send a message fragmented in 3 frames~n", []),
Frag1 = #ws_frame{fin = false,
opcode = text,
payload = <<"frag1">>},
Frag2 = #ws_frame{fin = false,
opcode = continuation,
payload = <<"frag2">>},
Frag3 = #ws_frame{fin = true,
opcode = continuation,
payload = <<"frag3">>},
{reply, [Frag1, Frag2, Frag3], State};

handle_message({text, Msg}, #state{nb_texts=N}=State) ->
io:format("Receive text message (N=~p): ~p bytes~n", [N, byte_size(Msg)]),
{reply, {text, Msg}, State#state{nb_texts=N+1}};

handle_message({binary, Msg}, #state{nb_bins=M}=State) ->
io:format("Receive binary message (M=~p): ~p bytes~n", [M, byte_size(Msg)]),
{reply, {binary, Msg}, State#state{nb_bins=M+1}};

handle_message({close, Status, Reason}, _) ->
io:format("Close connection: ~p - ~p~n", [Status, Reason]),
{close, Status}.


handle_info(timeout, State) ->
io:format("process timed out~n", []),
{reply, {text, <<"Anybody Else ?">>}, State};
handle_info(_Info, State) ->
{noreply, State}.

terminate(Reason, State) ->
io:format("terminate ~p: ~p (state:~p)~n", [self(), Reason, State]),
ok.

say_hi(Pid) ->
io:format("asynchronous greeting~n", []),
yaws_api:websocket_send(Pid, {text, <<"hi there!">>}).
13 changes: 11 additions & 2 deletions include/yaws_api.hrl
Expand Up @@ -114,8 +114,8 @@
}).

%% Corresponds to the frame sections as in
%% http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-08#section-4
%% plus 'data' and 'ws_state'
%% http://tools.ietf.org/html/rfc6455#section-5.2
%% plus 'data' and 'ws_state'. Used for incoming frames.
-record(ws_frame_info, {
fin,
rsv,
Expand All @@ -130,6 +130,15 @@
% fragment a potentially fragmented message is.
}).

%% Used for outgoing frames. No checks are done on the validity of a frame. This
%% is the application's responsability to send valid frames.
-record(ws_frame, {
fin = true,
rsv = 0,
opcode,
payload = <<>>
}).

%%----------------------------------------------------------------------
%% The state of a WebSocket connection.
%% This is held by the ws owner process and passed in calls to yaws_api.
Expand Down
5 changes: 4 additions & 1 deletion src/yaws_api.erl
Expand Up @@ -979,7 +979,10 @@ stream_process_end(Sock, YawsPid) ->

%% Pid must the the process in control of the websocket connection.
websocket_send(Pid, {Type, Data}) ->
yaws_websockets:send(Pid, {Type, Data}).
yaws_websockets:send(Pid, {Type, Data});
websocket_send(Pid, #ws_frame{}=Frame) ->
yaws_websockets:send(Pid, Frame).


%% returns {ok, SSL socket} if an SSL socket, undefined otherwise
get_sslsocket({ssl, SslSocket}) ->
Expand Down

0 comments on commit 29a7989

Please sign in to comment.