Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

BOSH support for TSUNG (TECH-1044).

To use BOSH, you need to change the server config of tsung, instead of

<server .... type="tcp"  port="5222"></server>
you must use:
<server .... type="bosh" port="5280"></server>

Note that BOSH uses xmpp 1.0, one must use SASL to authenticate.
In the generated reports, a new graph is included, showing
the # of HTTP connections and requests made. # of requests should
be much bigger than # of connections, as we are using persistent
connections.

Reported network traffic includes overhead by BOSH/HTTP protocol for POST
request it *DO NOT* include the HTTP overhead for responses.

TODO:
	normal HTTPS sessions are broken, I need to add ts_ssl implementing
        the same API than ts_tcp, ts_bosh.
  • Loading branch information...
commit fa26412d79a4dd8ffced6b41f0ae9c5d9ec66369 1 parent 132b8cc
@ppolv ppolv authored
View
8 src/templates/graph.thtml
@@ -43,6 +43,7 @@
[% IF async %]
<tr>
<th>Noack/Bidi</th>
+ [% IF bosh %]<th>BOSH</th>[% END %]
</tr>
<tr>
<td>
@@ -50,6 +51,13 @@
<img class="graph" src="images/graphes-Async-rate.png" alt="req/sec"/>
</a>
</td>
+[% IF bosh %]
+ <td>
+ <a href="images/graphes-Bosh-rate.ps">
+ <img class="graph" src="images/graphes-Bosh-rate.png" alt="req/sec"/>
+ </a>
+ </td>
+[% END %]
</tr>
[% END %]
<tr><th>Network traffic</th><th>New Users</th></tr>
View
466 src/tsung/ts_bosh.erl
@@ -0,0 +1,466 @@
+-module(ts_bosh).
+
+-export([ connect/3, send/3, close/1, set_opts/2, protocol_options/1 ]).
+
+-behaviour(gen_ts_transport).
+
+-include("ts_profile.hrl").
+-include("ts_config.hrl").
+-include_lib("xmerl/include/xmerl.hrl").
+
+-define(CONTENT_TYPE, "text/xml; charset=utf-8").
+-define(VERSION, "1.8").
+-define(WAIT, 60). %1 minute
+-define(HOLD, 1). %only 1 request pending
+
+-define(CONNECT_TIMEOUT, 20 * 1000).
+-define(MAX_QUEUE_SIZE, 5). %% at most 5 messages queued, after that close the connection.
+ %% In practice we never had more than 1 pending packet, as we are blocking
+ %% the client process until we sent the packet. But I keep this functionality in place,
+ %% in case we decide to do the sending() of data asynchronous.
+
+-record(state, {
+ host,
+ path,
+ port,
+ % {Host::string(), Port:integer(), Path::string(), Ssl::bool()}
+ domain = undefined,
+ sid,
+ rid,
+ parent_pid,
+ max_requests, %TODO: use this, now fixed on 2
+ queue = [], %% stanzas that have been queued because we reach the limit of requets
+ open = [],
+ free = [],
+ local_ip,
+ local_port,
+ is_fresh = true,
+ pending_ref
+ }).
+
+connect(Host, Port, Opts) ->
+ Parent = self(),
+ Path = "/http-bind/",
+ Pid = spawn_link(fun() -> loop(Host, Port, Path, Opts, Parent) end),
+ {ok, Pid}.
+
+extract_domain("to='" ++ Rest) ->
+ lists:takewhile(fun(C) -> C =/= $' end, Rest);
+extract_domain([_|Rest]) -> extract_domain(Rest).
+
+send(Pid, Data, _Opts) ->
+ Ref = make_ref(),
+ Msg = case Data of
+ <<"<?xml", Rest/binary>> -> %%HACK: use this to detect stream start (or restarts)
+ Domain = extract_domain(binary_to_list(Rest)),
+ {stream, Domain, Ref};
+ <<"</stream:stream>", _/binary>> -> %%Use this to detect stream end
+ {stream, terminate, Ref};
+ _ ->
+ {send, Data, Ref}
+ end,
+ Pid ! Msg,
+ MonitorRef = erlang:monitor(process,Pid),
+ receive
+ {'DOWN', MonitorRef, _Type, _Object, _Info} ->
+ {error, no_bosh_connection};
+ {ok, Ref} ->
+ erlang:demonitor(MonitorRef),
+ ok
+ after
+ 30000 ->
+ erlang:demonitor(MonitorRef),
+ {error, timeout}
+ end.
+
+close(Pid) ->
+ Pid ! close.
+
+set_opts(Pid, _Opts) ->
+ Pid.
+
+protocol_options(#proto_opts{}) ->
+ [].
+
+loop(Host, Port, Path, Opts, Parent) ->
+ {A,B,C} = now(),
+ random:seed(A,B,C),
+ process_flag(trap_exit, true),
+ loop(#state{is_fresh = true,
+ port = Port,
+ path = Path,
+ parent_pid = Parent,
+ host = Host,
+ local_ip = proplists:get_value(ip, Opts, undefined),
+ local_port = proplists:get_value(port, Opts, undefined)
+ }).
+
+loop(#state{parent_pid = ParentPid} = State) ->
+ receive
+ {'EXIT', ParentPid, _Reason} -> %%even 'normal' terminates this
+ ok;
+ close ->
+ ok;
+ {send, Data, Ref} ->
+ case do_send(State, Data) of
+ {sent, NewState} ->
+ ParentPid ! {ok, Ref},
+ loop(NewState);
+ {queued, #state{queue =Q} = NewState} when length(Q) < ?MAX_QUEUE_SIZE -> %%do not return yet..
+ loop(NewState#state{pending_ref = Ref});
+ {queued, NewState} -> %% we reach the max allowed queued messages.. close the connection.
+ ?LOGF("Client reached max bosh requests queue size: ~p. Closing session",
+ [length(NewState#state.queue)], ?ERR),
+ ts_mon:add({count, error_bosh_maxqueued}),
+ ParentPid ! {ok, Ref},
+ ParentPid ! {gen_ts_transport, self(), closed}
+ end;
+ {stream, terminate, Ref} ->
+ #state{host = Host,
+ path = Path,
+ sid = Sid,
+ rid = Rid} = State,
+ {NewState, Socket} = new_socket(State, false),
+ ok = make_raw_request(Socket, Host, Path, close_stream_msg(Sid, Rid)),
+ ParentPid ! {ok, Ref},
+ loop(NewState);
+ {stream, Domain, Ref} when State#state.domain == undefined ->
+ NewState = do_connect(State, Domain),
+ ParentPid ! {ok, Ref},
+ loop(NewState);
+ {stream, _Domain, Ref} -> %%here we must do a reset
+ NewState = do_reset(State),
+ ParentPid ! {ok, Ref},
+ loop(NewState);
+ {http, Socket, {http_response, Vsn, 200, <<"OK">>}} ->
+ case do_receive_http_response(State, Socket, Vsn) of
+ {ok, NewState} ->
+ loop(NewState);
+ terminate ->
+ ?LOG("Session terminated by server", ?INFO),
+ State#state.parent_pid ! {gen_ts_transport, self(), closed}
+ end;
+ {tcp_closed, Socket} ->
+ case lists:keymember(Socket, 1, State#state.open) of
+ true ->
+ %%ERROR, a current request is closed
+ ?LOG("Open request closed by server", ?ERR),
+ ts_mon:add({count, error_bosh_socket_closed}),
+ State#state.parent_pid ! {gen_ts_transport, self(), closed};
+ false ->
+ %% A HTTP persistent connection, currently not in use, is closed by the server.
+ %% We can continue without trouble, just remove it, it will be reopened when needed.
+ loop(State#state{free = lists:delete(Socket, State#state.free)})
+ end;
+ {http, _Socket, {http_response, _Vsn, ResponseCode, _StatusLine}} ->
+ State#state.parent_pid ! {gen_ts_transport, self(), error, list_to_atom(integer_to_list(ResponseCode))};
+ Unexpected ->
+ ?LOGF("Bosh process received unexpected message: ~p", [Unexpected], ?ERR),
+ State#state.parent_pid ! {gen_ts_transport, self(), error, unexpected_data}
+ end.
+
+do_receive_http_response(State, Socket, Vsn) ->
+ #state{open = Open,
+ sid = Sid,
+ rid = Rid,
+ queue = Queue,
+ host = Host,
+ path = Path,
+ parent_pid = ParentPid} = State,
+ {ok, {{200, <<"OK">>}, _Hdrs, Resp}} = read_response(Socket, Vsn, {200, <<"OK">>}, [], <<>>),
+ {_El = #xmlElement{name = body,
+ attributes = Attrs,
+ content = Content}, []}= xmerl_scan:string(binary_to_list(Resp)),
+ case get_attr(Attrs, type) of
+ "terminate" ->
+ ts_mon:add({count, error_bosh_terminated}),
+ State#state.parent_pid ! {gen_ts_transport, self(), closed},
+ terminate;
+ _ ->
+ NewOpen = lists:keydelete(Socket, 1, Open),
+ NewState2 = if
+ NewOpen == [] andalso State#state.is_fresh =:= false ->
+ inet:setopts(Socket, [{packet, http_bin}, {active, once}]),
+ ok = make_empty_request(Socket,Sid, Rid, Queue, Host, Path),
+ case length(Queue) of
+ 0 -> ok;
+ _ ->
+ ParentPid ! {ok, State#state.pending_ref}
+ %% we just sent the pending packet, wakeup the client
+ end,
+ State#state{open = [{Socket, Rid}], rid = Rid +1, queue = []};
+ length(NewOpen) == 1 andalso length(State#state.queue) > 0 ->
+ %%there are pending packet, sent it if the RID is ok, otherwise wait
+ case NewOpen of
+ [{_, R}] when (Rid - R) =< 1 ->
+ inet:setopts(Socket, [{packet, http_bin}, {active, once}]),
+ ok = make_empty_request(Socket,Sid, Rid, Queue, Host, Path),
+ ParentPid ! {ok, State#state.pending_ref},
+ %% we just sent the pending packet, wakeup the client
+ State#state{open = [{Socket, Rid}], rid = Rid +1, queue = []};
+ _ ->
+ NewState = return_socket(State, Socket),
+ NewState#state{open = NewOpen}
+ end;
+ true ->
+ NewState = return_socket(State, Socket),
+ NewState#state{open = NewOpen}
+ end,
+ case Content of
+ [] ->
+ %%empty response, do not bother the ts_client process with this
+ %% (so Noack/Bidi won't count this bosh specific thing, only async stanzas)
+ %% since ts_client don't see this, we need to count the size received
+ ts_mon:add({ sum, size_rcv, iolist_size(Resp)});
+ _ ->
+ ParentPid ! {gen_ts_transport, self(), Resp}
+ end,
+ {ok, NewState2}
+ end.
+
+do_connect(#state{host = Host, path = Path, parent_pid = ParentPid} = State, Domain) ->
+ Rid = 1000 + random:uniform(100000),
+ %%Port= proplists:get_value(local_port, Options, undefined),
+ NewState = State#state{
+ domain = Domain,
+ rid = Rid,
+ open = [],
+ queue = [],
+ free = []
+ },
+ {NewState2, Socket} = new_socket(NewState, false),
+ ok = make_raw_request(Socket, Host, Path, create_session_msg(Rid, Domain, ?WAIT, ?HOLD)),
+ {ok, {{200, <<"OK">>}, _Hdrs, Resp}} = read_response(Socket, nil, nil, [], <<>>),
+ NewState3 = return_socket(NewState2, Socket),
+ {_El = #xmlElement{name = body,
+ attributes = Attrs,
+ content = _Content}, []} = xmerl_scan:string(binary_to_list(Resp)),
+ ParentPid ! {gen_ts_transport, self(), Resp},
+ NewState3#state{rid = Rid +1,
+ open = [],
+ sid = get_attr(Attrs, sid),
+ max_requests = 2
+ }.
+
+do_reset(State) ->
+ #state{sid = Sid,
+ rid = Rid,
+ host = Host,
+ path = Path,
+ domain = Domain} = State,
+ {NewState, Socket} = new_socket(State, once),
+ ok = make_raw_request(Socket, Host, Path, restart_stream_msg(Sid, Rid, Domain)),
+ NewState#state{is_fresh = false, rid = Rid +1, open = [{Socket, Rid}|State#state.open]}.
+
+get_attr([], _Name) -> undefined;
+get_attr([#xmlAttribute{name = Name, value = Value}|_], Name) -> Value;
+get_attr([_|Rest], Name) -> get_attr(Rest, Name).
+
+do_send(State, Data) ->
+ #state{open = Open,
+ rid = Rid,
+ sid = Sid,
+ host = Host,
+ path = Path,
+ queue = Queue} = State,
+ Result = if
+ Open == [] -> send;
+ true ->
+ Min = lists:min(lists:map(fun({_S,R}) -> R end, Open)),
+ if
+ (Rid -Min) =< 1 ->
+ send;
+ true ->
+ queue
+ end
+ end,
+ case Result of
+ send ->
+ {NewState, Socket} = new_socket(State, once),
+ ok = make_request(Socket, Sid, Rid, Queue, Host, Path, Data),
+ {sent, NewState#state{rid = Rid +1, open = [{Socket, Rid}|Open], queue = []}};
+ queue ->
+ Queue = State#state.queue,
+ NewQueue = [Data|Queue],
+ {queued, State#state{queue = NewQueue}}
+ end.
+
+make_empty_request(Socket, Sid, Rid, Queue, Host, Path) ->
+ StanzasText = lists:reverse(Queue),
+ Body = stanzas_msg(Sid, Rid, StanzasText),
+ make_request(Socket, Host, Path, Body, iolist_size(StanzasText)).
+
+make_raw_request(Socket, Host, Path, Body) ->
+ make_request(Socket, Host, Path, Body, 0).
+
+make_request(Socket, Sid, Rid, Queue, Host, Path, Packet) ->
+ StanzasText = lists:reverse([Packet|Queue]),
+ Body = stanzas_msg(Sid, Rid, StanzasText),
+ make_request(Socket, Host, Path, Body, iolist_size(StanzasText)).
+make_request(Socket,Host, Path, Body, OriginalSize) ->
+ ts_mon:add({count, bosh_http_req}),
+ Hdrs = [{"Content-Type", ?CONTENT_TYPE}, {"keep-alive", "true"}],
+ Request = format_request(Path, "POST", Hdrs, Host, Body),
+ ok = gen_tcp:send(Socket, Request),
+ ts_mon:add({ sum, size_sent, iolist_size(Request) - OriginalSize}).
+ %% add the http overhead. The size of the stanzas are already counted by ts_client code.
+
+
+new_socket(State = #state{free = [Socket | Rest]}, Active) ->
+ inet:setopts(Socket, [{active, Active}, {packet, http_bin}]),
+ {State#state{free = Rest}, Socket};
+new_socket(State = #state{host = Host, port = Port, local_ip = LocalIp, local_port = LocalPort}, Active) ->
+ Options = case LocalIp of
+ undefined -> [{active, Active}, {packet, http_bin}];
+ _ -> case LocalPort of
+ undefined -> [{active, Active}, {packet, http_bin},{ip, LocalIp}];
+ _ ->
+ {ok, LPort} = ts_config_server:get_user_port(LocalIp),
+ [{active, Active}, {packet, http_bin},{ip, LocalIp}, {port, LPort}]
+ end
+ end,
+ {ok, Socket} = gen_tcp:connect(Host, Port, Options, ?CONNECT_TIMEOUT),
+ ts_mon:add({count, bosh_http_conn}),
+ {State, Socket}.
+
+return_socket(State, Socket) ->
+ inet:setopts(Socket, [{active, once}]),
+ %receive data from it, we want to know if something happens
+ State#state{free = [Socket | State#state.free]}.
+
+create_session_msg(Rid, To, Wait, Hold) ->
+ [ "<body xmlns='http://jabber.org/protocol/httpbind'"
+ " content='text/xml; charset=utf-8'",
+ " ver='1.8'"
+ " to='", To, "'",
+ " rid='", integer_to_list(Rid), "'"
+ " xmlns:xmpp='urn:xmpp:xbosh'",
+ " xmpp:version='1.0'",
+ " wait='", integer_to_list(Wait), "'"
+ " hold='", integer_to_list(Hold), "'/>"].
+
+stanzas_msg(Sid, Rid, Text) ->
+ [ "<body xmlns='http://jabber.org/protocol/httpbind' "
+ " rid='", integer_to_list(Rid), "'"
+ " sid='", Sid, "'>", Text, "</body>"].
+
+restart_stream_msg(Sid, Rid, Domain) ->
+ [ "<body xmlns='http://jabber.org/protocol/httpbind' "
+ " rid='", integer_to_list(Rid), "'",
+ " sid='", Sid, "'",
+ " xmpp:restart='true'",
+ " xmlns:xmpp='urn:xmpp:xbosh'",
+ " to='", Domain, "'",
+ "/>"].
+
+close_stream_msg(Sid, Rid) ->
+ [ "<body xmlns='http://jabber.org/protocol/httpbind' "
+ " rid='", integer_to_list(Rid), "'",
+ " sid='", Sid, "'",
+ " type='terminate'",
+ " xmlns:xmpp='urn:xmpp:xbosh'",
+ "/>"].
+
+read_response(Socket, Vsn, Status, Hdrs, Body) ->
+ inet:setopts(Socket, [{packet, http_bin}, {active, false}]),
+ case gen_tcp:recv(Socket, 0) of
+ {ok, {http_response, NewVsn, StatusCode, Reason}} ->
+ NewStatus = {StatusCode, Reason},
+ read_response(Socket, NewVsn, NewStatus, Hdrs, Body);
+ {ok, {http_header, _, Name, _, Value}} ->
+ Header = {Name, Value},
+ read_response(Socket, Vsn, Status, [Header | Hdrs], Body);
+ {ok, http_eoh} ->
+ inet:setopts(Socket, [{packet, raw}, binary]),
+ {NewBody, NewHdrs} = read_body(Vsn, Hdrs, Socket),
+ Response = {Status, NewHdrs, NewBody},
+ {ok, Response};
+ {error, closed} ->
+ erlang:error(closed);
+ {error, Reason} ->
+ erlang:error(Reason)
+ end.
+
+read_body(_Vsn, Hdrs, Socket) ->
+ % Find out how to read the entity body from the request.
+ % * If we have a Content-Length, just use that and read the complete
+ % entity.
+ % * If Transfer-Encoding is set to chunked, we should read one chunk at
+ % the time
+ % * If neither of this is true, we need to read until the socket is
+ % closed (AFAIK, this was common in versions before 1.1).
+ case proplists:get_value('Content-Length', Hdrs, undefined) of
+ undefined ->
+ throw({no_content_length, Hdrs});
+ ContentLength ->
+ read_length(Hdrs, Socket, list_to_integer(binary_to_list(ContentLength)))
+ end.
+
+read_length(Hdrs, Socket, Length) ->
+ case gen_tcp:recv(Socket, Length) of
+ {ok, Data} ->
+ {Data, Hdrs};
+ {error, Reason} ->
+ erlang:error(Reason)
+ end.
+
+%% @spec (Path, Method, Headers, Host, Body) -> Request
+%% Path = iolist()
+%% Method = atom() | string()
+%% Headers = [{atom() | string(), string()}]
+%% Host = string()
+%% Body = iolist()
+format_request(Path, Method, Hdrs, Host, Body) ->
+ [
+ Method, " ", Path, " HTTP/1.1\r\n",
+ format_hdrs(add_mandatory_hdrs(Method, Hdrs, Host, Body), []),
+ Body
+ ].
+
+%% spec normalize_method(AtomOrString) -> Method
+%% AtomOrString = atom() | string()
+%% Method = string()
+%% doc
+%% Turns the method in to a string suitable for inclusion in a HTTP request
+%% line.
+%% end
+%-spec normalize_method(atom() | string()) -> string().
+%normalize_method(Method) when is_atom(Method) ->
+% string:to_upper(atom_to_list(Method));
+%normalize_method(Method) ->
+% Method.
+
+format_hdrs([{Hdr, Value} | T], Acc) ->
+ NewAcc = [
+ Hdr, ":", Value, "\r\n" | Acc
+ ],
+ format_hdrs(T, NewAcc);
+format_hdrs([], Acc) ->
+ [Acc, "\r\n"].
+
+add_mandatory_hdrs(Method, Hdrs, Host, Body) ->
+ add_host(add_content_length(Method, Hdrs, Body), Host).
+
+add_content_length("POST", Hdrs, Body) ->
+ add_content_length(Hdrs, Body);
+add_content_length("PUT", Hdrs, Body) ->
+ add_content_length(Hdrs, Body);
+add_content_length(_, Hdrs, _) ->
+ Hdrs.
+
+add_content_length(Hdrs, Body) ->
+ case proplists:get_value("content-length", Hdrs, undefined) of
+ undefined ->
+ ContentLength = integer_to_list(iolist_size(Body)),
+ [{"Content-Length", ContentLength} | Hdrs];
+ _ -> % We have a content length
+ Hdrs
+ end.
+
+add_host(Hdrs, Host) ->
+ case proplists:get_value("host", Hdrs, undefined) of
+ undefined ->
+ [{"Host", Host } | Hdrs];
+ _ -> % We have a host
+ Hdrs
+ end.
View
161 src/tsung/ts_client.erl
@@ -161,41 +161,26 @@ handle_sync_event(_Event, _From, StateName, StateData) ->
%% {next_state, StateName, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
-%% inet data
-handle_info({NetEvent, _Socket, Data}, wait_ack, State) when NetEvent==tcp;
- NetEvent==ssl ->
- ?DebugF("TCP data received: size=~p ~n",[size(Data)]),
+%% received data
+handle_info({gen_ts_transport, _Socket, Data}, wait_ack, State) when is_binary(Data)->
+ ?DebugF("data received: size=~p ~n",[size(Data)]),
case handle_data_msg(Data, State) of
- {NewState=#state_rcv{ack_done=true}, Opts} ->
- NewSocket = ts_utils:inet_setopts(NewState#state_rcv.protocol,
- NewState#state_rcv.socket,
- [{active, once} | Opts]),
+ {NewState=#state_rcv{ack_done=true, protocol = Transport, socket = Socket}, Opts} ->
+ NewSocket = Transport:set_opts(Socket, [{active, once} | Opts]),
handle_next_action(NewState#state_rcv{socket=NewSocket,
ack_done=false});
- {NewState, Opts} ->
- NewSocket = ts_utils:inet_setopts(NewState#state_rcv.protocol,
- NewState#state_rcv.socket,
- [{active, once} | Opts]),
+ {NewState=#state_rcv{protocol = Transport, socket = Socket}, Opts} ->
+ NewSocket = Transport:set_opts(Socket, [{active, once} | Opts]),
TimeOut=(NewState#state_rcv.proto_opts)#proto_opts.idle_timeout,
{next_state, wait_ack, NewState#state_rcv{socket=NewSocket}, TimeOut}
end;
-handle_info({erlang, _Socket, Data}, wait_ack, State) ->
- ?DebugF("erlang function result received: size=~p ~n",[size(term_to_binary(Data))]),
- case handle_data_msg(Data, State) of
- {NewState=#state_rcv{ack_done=true}, _Opts} ->
- handle_next_action(NewState#state_rcv{ack_done=false});
- {NewState, _Opts} ->
- TimeOut=(NewState#state_rcv.proto_opts)#proto_opts.idle_timeout,
- {next_state, wait_ack, NewState, TimeOut}
- end;
-handle_info({udp, Socket,_IP,_InPortNo, Data}, StateName, State) ->
- ?DebugF("UDP packet received: size=~p ~n",[size(Data)]),
- %% we don't care about IP,InPortNo, do the same as for a tcp connection:
- handle_info({tcp, Socket, Data}, StateName, State);
+%% for erlang data
+handle_info({gen_ts_transport, _Socket, Data}, wait_ack, State) ->
+ handle_info({gen_ts_transport, _Socket, term_to_binary(Data)}, wait_ack, State);
+
%% inet close messages; persistent session, waiting for ack
-handle_info({NetEvent, _Socket}, wait_ack,
- State = #state_rcv{persistent=true}) when NetEvent==tcp_closed;
- NetEvent==ssl_closed ->
+handle_info({gen_ts_transport, _Socket, closed}, wait_ack,
+ State = #state_rcv{persistent=true}) ->
?LOG("connection closed while waiting for ack",?INFO),
set_connected_status(false),
{NewState, _Opts} = handle_data_msg(closed, State),
@@ -203,28 +188,25 @@ handle_info({NetEvent, _Socket}, wait_ack,
handle_next_action(NewState#state_rcv{socket=none});
%% inet close messages; persistent session
-handle_info({NetEvent, Socket}, think,
- State = #state_rcv{persistent=true}) when NetEvent==tcp_closed;
- NetEvent==ssl_closed ->
+handle_info({gen_ts_transport, _Socket, closed}, think,
+ State = #state_rcv{persistent=true}) ->
?LOG("connection closed, stay alive (persistent)",?INFO),
set_connected_status(false),
- catch ts_utils:close_socket(State#state_rcv.protocol, Socket), % mandatory for ssl
+ catch (State#state_rcv.protocol):close(State#state_rcv.socket), % mandatory for ssl
{next_state, think, State#state_rcv{socket = none}};
%% inet close messages
-handle_info({NetEvent, Socket}, _StateName, State) when NetEvent==tcp_closed;
- NetEvent==ssl_closed ->
+handle_info({gen_ts_transport, _Socket, closed}, _StateName, State) ->
?LOG("connection closed, abort", ?WARN),
%% the connexion was closed after the last msg was sent, stop quietly
ts_mon:add({ count, error_closed }),
set_connected_status(false),
- ts_utils:close_socket(State#state_rcv.protocol, Socket), % mandatory for ssl
+ catch (State#state_rcv.protocol):close(State#state_rcv.socket), % mandatory for ssl
{stop, normal, State#state_rcv{socket = none}};
%% inet errors
-handle_info({NetError, _Socket, Reason}, wait_ack, State) when NetError==tcp_error;
- NetError==ssl_error ->
- ?LOGF("Net error (~p): ~p~n",[NetError, Reason], ?WARN),
+handle_info({gen_ts_transport, _Socket, error, Reason}, _StateName, State) ->
+ ?LOGF("Net error: ~p~n",[Reason], ?WARN),
CountName="error_inet_"++atom_to_list(Reason),
ts_mon:add({ count, list_to_atom(CountName) }),
set_connected_status(false),
@@ -244,8 +226,8 @@ handle_info(timeout, StateName, State ) ->
ts_mon:add({ count, timeout }),
{stop, normal, State};
% bidirectional protocol
-handle_info({NetEvent, Socket, Data}, think,State=#state_rcv{
- clienttype=Type, bidi=true,host=Host,port=Port}) when ((NetEvent == tcp) or (NetEvent==ssl)) ->
+handle_info({gen_ts_transport, Socket, Data}, think,State=#state_rcv{
+ clienttype=Type, bidi=true,host=Host,port=Port}) ->
ts_mon:rcvmes({State#state_rcv.dump, self(), Data}),
ts_mon:add({ sum, size_rcv, size(Data)}),
Proto = State#state_rcv.protocol,
@@ -262,27 +244,25 @@ handle_info({NetEvent, Socket, Data}, think,State=#state_rcv{
send(Proto,Socket,Data2,Host,Port), %FIXME: handle errors ?
State2
end,
- NewSocket = ts_utils:inet_setopts(Proto, State#state_rcv.socket,
- [{active, once}]),
+ NewSocket = (State#state_rcv.protocol):set_opts(Socket, [{active, once}]),
{next_state, think, NewState#state_rcv{socket=NewSocket}};
% bidi is false, but parse is also false: continue even if we get data
-handle_info({NetEvent, _Socket, Data}, think, State = #state_rcv{request=Req} )
- when (Req#ts_request.ack /= parse) and ((NetEvent == tcp) or (NetEvent==ssl)) ->
+handle_info({gen_ts_transport, Socket, Data}, think, State = #state_rcv{request=Req} )
+ when (Req#ts_request.ack /= parse) ->
ts_mon:rcvmes({State#state_rcv.dump, self(), Data}),
ts_mon:add({ sum, size_rcv, size(Data)}),
?LOGF("Data receive from socket in state think, ack=~p, skip~n",
[Req#ts_request.ack],?NOTICE),
?DebugF("Data was ~p~n",[Data]),
- NewSocket = ts_utils:inet_setopts(State#state_rcv.protocol, State#state_rcv.socket,
- [{active, once}]),
+ NewSocket = (State#state_rcv.protocol):set_opts(Socket, [{active, once}]),
{next_state, think, State#state_rcv{socket=NewSocket}};
-handle_info({NetEvent, _Socket, Data}, think, State)
- when (NetEvent == tcp) or (NetEvent==ssl) ->
+handle_info({gen_ts_transport, _Socket, Data}, think, State) ->
ts_mon:rcvmes({State#state_rcv.dump, self(), Data}),
ts_mon:add({ count, error_unknown_data }),
?LOG("Data receive from socket in state think, stop~n", ?ERR),
?DebugF("Data was ~p~n",[Data]),
{stop, normal, State};
+%% pablo TODO: when this could happen??
handle_info({inet_reply, _Socket,ok}, StateName, State ) ->
?LOGF("inet_reply ok received in state ~p~n",[StateName],?NOTICE),
{next_state, StateName, State};
@@ -369,7 +349,7 @@ handle_next_action(State) ->
true -> % keep state
put({state, State#state_rcv.clienttype} , {State#state_rcv.socket,State#state_rcv.session,DynData#dyndata.proto});
false -> % don't keep state of old type, close connection
- ts_utils:close_socket(State#state_rcv.protocol, State#state_rcv.socket),
+ (State#state_rcv.protocol):close(State#state_rcv.socket),
set_connected_status(false)
end,
{Socket,Session,ProtoDynData} = case {Restore, get({state,NewCType})} of
@@ -551,8 +531,7 @@ handle_next_request(Request, State) ->
State#state_rcv.socket;
_ ->
?Debug("Change server configuration inside a session ~n"),
- ts_utils:close_socket(State#state_rcv.protocol,
- State#state_rcv.socket),
+ (State#state_rcv.protocol):close(State#state_rcv.socket),
set_connected_status(false),
none
end,
@@ -563,7 +542,7 @@ handle_next_request(Request, State) ->
%% reconnect if needed
Proto = {Protocol,State#state_rcv.proto_opts},
case reconnect(Socket,Host,Port,Proto,State#state_rcv.ip) of
- {ok, NewSocket} ->
+ {ok, NewSocket} when is_pid(NewSocket) ->
case catch send(Protocol, NewSocket, Message, Host, Port) of
ok ->
PageTimeStamp = case State#state_rcv.page_timestamp of
@@ -663,7 +642,7 @@ finish_session(State) ->
handle_close_while_sending(State=#state_rcv{persistent = true,
protocol = Proto,
proto_opts = PO})->
- ts_utils:close_socket(Proto, State#state_rcv.socket),
+ Proto:close(State#state_rcv.socket),
set_connected_status(false),
Think = PO#proto_opts.retry_timeout,
%%FIXME: report the error to ts_mon ?
@@ -709,7 +688,7 @@ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort}) ->
Opts = protocol_options(Protocol, Proto_opts) ++ [{ip, IP},{port,CPort}],
Before= now(),
case connect(Protocol,ServerName, Port, Opts) of
- {ok, Socket} ->
+ {ok, Socket} when is_pid(Socket) ->
Elapsed = ts_utils:elapsed(Before, now()),
ts_mon:add({ sample, connect, Elapsed }),
set_connected_status(true),
@@ -723,7 +702,7 @@ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort}) ->
ts_mon:add({ count, list_to_atom(CountName) }),
{error, Reason}
end;
-reconnect(Socket, _Server, _Port, _Protocol, _IP) ->
+reconnect(Socket, _Server, _Port, _Protocol, _IP) when is_pid(Socket) ->
{ok, Socket}.
%%----------------------------------------------------------------------
@@ -731,49 +710,57 @@ reconnect(Socket, _Server, _Port, _Protocol, _IP) ->
%% Purpose: wrapper function for send
%% Return: ok | {error, Reason}
%%----------------------------------------------------------------------
-send(gen_tcp,Socket,Message,_,_) -> gen_tcp:send(Socket,Message);
-send(ssl,Socket,Message,_,_) -> ssl:send(Socket,Message);
-send(gen_udp,Socket,Message,Host,Port) ->gen_udp:send(Socket,Host,Port,Message);
-send(erlang,Pid,Message,_,_) ->
- Pid ! Message,
- ok.
+
+send(Proto, Socket, Message, Host, Port) when is_pid (Socket) ->
+ Proto:send(Socket, Message, [{host, Host}, {port, Port}]).
+%send(gen_tcp,Socket,Message,_,_) -> gen_tcp:send(Socket,Message);
+%send(ssl,Socket,Message,_,_) -> ssl:send(Socket,Message);
+%send(gen_udp,Socket,Message,Host,Port) ->gen_udp:send(Socket,Host,Port,Message);
+%send(erlang,Pid,Message,_,_) ->
+% Pid ! Message,
+% ok.
%%----------------------------------------------------------------------
%% Func: connect/4
%% Return: {ok, Socket} | {error, Reason}
%%----------------------------------------------------------------------
-connect(gen_tcp,Server, Port, Opts) -> gen_tcp:connect(Server, Port, Opts);
-connect(ssl,Server, Port,Opts) -> ssl:connect(Server, Port, Opts);
-connect(gen_udp,_Server, _Port, Opts)-> gen_udp:open(0,Opts);
-connect(erlang,Server,Port,Opts) ->
- Pid=spawn_link(ts_erlang,client,[self(),Server,Port,Opts]),
- {ok, Pid}.
+connect(Proto, Server, Port, Opts) ->
+ Proto:connect(Server, Port, Opts).
+
+%connect(gen_tcp,Server, Port, Opts) -> gen_tcp:connect(Server, Port, Opts);
+%connect(ssl,Server, Port,Opts) -> ssl:connect(Server, Port, Opts);
+%connect(gen_udp,_Server, _Port, Opts)-> gen_udp:open(0,Opts);
+%connect(erlang,Server,Port,Opts) ->
+% Pid=spawn_link(ts_erlang,client,[self(),Server,Port,Opts]),
+% {ok, Pid}.
%%----------------------------------------------------------------------
%% Func: protocol_options/1
%% Purpose: set connection's options for the given protocol
%%----------------------------------------------------------------------
-protocol_options(ssl,#proto_opts{ssl_ciphers=negociate}) ->
- [binary, {active, once} ];
-protocol_options(ssl,#proto_opts{ssl_ciphers=Ciphers}) ->
- ?DebugF("cipher is ~p~n",[Ciphers]),
- [binary, {active, once}, {ciphers, Ciphers} ];
+protocol_options(Proto, #proto_opts{} = ProtoOpts) ->
+ Proto:protocol_options(ProtoOpts).
+%protocol_options(ssl,#proto_opts{ssl_ciphers=negociate}) ->
+% [binary, {active, once} ];
+%protocol_options(ssl,#proto_opts{ssl_ciphers=Ciphers}) ->
+% ?DebugF("cipher is ~p~n",[Ciphers]),
+% [binary, {active, once}, {ciphers, Ciphers} ];
-protocol_options(gen_tcp,#proto_opts{tcp_rcv_size=Rcv, tcp_snd_size=Snd}) ->
- [binary,
- {active, once},
- {recbuf, Rcv},
- {sndbuf, Snd},
- {keepalive, true} %% FIXME: should be an option
- ];
-protocol_options(gen_udp,#proto_opts{udp_rcv_size=Rcv, udp_snd_size=Snd}) ->
- [binary,
- {active, once},
- {recbuf, Rcv},
- {sndbuf, Snd}
- ];
-protocol_options(erlang,_) -> [].
+%protocol_options(gen_tcp,#proto_opts{tcp_rcv_size=Rcv, tcp_snd_size=Snd}) ->
+% [binary,
+% {active, once},
+% {recbuf, Rcv},
+% {sndbuf, Snd},
+% {keepalive, true} %% FIXME: should be an option
+% ];
+%protocol_options(gen_udp,#proto_opts{udp_rcv_size=Rcv, udp_snd_size=Snd}) ->
+% [binary,
+% {active, once},
+% {recbuf, Rcv},
+% {sndbuf, Snd}
+% ];
+%protocol_options(erlang,_) -> [].
@@ -827,7 +814,7 @@ handle_data_msg(Data,State=#state_rcv{request=Req,clienttype=Type,maxcount=MaxCo
case Close of
true ->
?Debug("Close connection required by protocol~n"),
- ts_utils:close_socket(State#state_rcv.protocol,State#state_rcv.socket),
+ (State#state_rcv.protocol):close(State#state_rcv.socket),
set_connected_status(false),
{NewState#state_rcv{ page_timestamp = PageTimeStamp,
socket = none,
View
5 src/tsung/ts_jabber_common.erl
@@ -354,11 +354,12 @@ auth_sasl(#jabber{username=Name,passwd=Passwd},Mechanism)->
%%----------------------------------------------------------------------
%% Func: auth_sasl/2
%%----------------------------------------------------------------------
-auth_sasl(Username, _Passwd, Mechanism) ->
+auth_sasl(Username, Passwd, Mechanism) ->
S = <<0>>,
N = list_to_binary(Username),
+ P = list_to_binary(Passwd),
list_to_binary(["<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='",Mechanism,"' >",
- ssl_base64:encode(<<S/binary,N/binary,S/binary,N/binary>>) ,"</auth>"]).
+ ssl_base64:encode(<<S/binary,N/binary,S/binary,P/binary>>) ,"</auth>"]).
%%----------------------------------------------------------------------
View
76 src/tsung/ts_tcp.erl
@@ -0,0 +1,76 @@
+-module(ts_tcp).
+
+-export([ connect/3, send/3, close/1, set_opts/2, protocol_options/1 ]).
+
+-behaviour(gen_ts_transport).
+
+-include("ts_profile.hrl").
+-include("ts_config.hrl").
+
+%% TODO:
+%% Refactor thhe gen_ts_transport API so we don't need to use a separate process
+%% here only to transform the incomming data packets tuple.
+
+protocol_options(#proto_opts{tcp_rcv_size=Rcv, tcp_snd_size=Snd}) ->
+ [binary,
+ {active, once},
+ {recbuf, Rcv},
+ {sndbuf, Snd},
+ {keepalive, true} %% FIXME: should be an option
+ ].
+
+
+
+connect(Host, Port, Opts) ->
+ case gen_tcp:connect(Host, Port, opts_to_tcp_opts(Opts)) of
+ {ok, Socket} ->
+ ParentPid = self(),
+ Pid = spawn_link(fun() -> enter_loop(Socket, ParentPid) end),
+ ok = gen_tcp:controlling_process(Socket, Pid),
+ {ok, Pid};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+opts_to_tcp_opts(Opts) -> Opts.
+
+send(Socket, Data, _Opts) when is_pid(Socket) ->
+ Socket ! {send, Data},
+ ok.
+
+close(Socket) ->
+ Socket ! close.
+
+
+set_opts(Socket, Opts) ->
+ Socket ! {set_opts, Opts},
+ Socket.
+
+enter_loop(Socket, ParentPid) ->
+ process_flag(trap_exit, true),
+ loop(Socket, ParentPid).
+
+loop(Socket, ParentPid) ->
+ receive
+ {'EXIT', ParentPid, _Reason} ->
+ gen_tcp:close(Socket);
+ %% terminate (even 'normal' termination of the
+ %% parent must trigger this process to terminate)
+ {tcp, Socket, Data} ->
+ ParentPid ! {gen_ts_transport, self(), Data},
+ loop(Socket, ParentPid);
+ {tcp_closed, Socket} ->
+ ParentPid ! {gen_ts_transport, self(), closed};
+ {tcp_error, Socket, Error} ->
+ ParentPid ! {gen_ts_transport, self(), error, Error };
+ {send, Data} ->
+ gen_tcp:send(Socket, Data),
+ %% TODO: ack the sender, so it is blocked in the same
+ %% way that in tsung before introducing this process
+ loop(Socket, ParentPid);
+ {set_opts, Opts} ->
+ inet:setopts(Socket, Opts),
+ loop(Socket, ParentPid);
+ close ->
+ gen_tcp:close(Socket)
+ end.
View
4 src/tsung_controller/ts_config.erl
@@ -101,7 +101,9 @@ parse(Element = #xmlElement{name=server, attributes=Attrs}, Conf=#config{servers
Port = getAttr(integer, Attrs, port),
Type = case getAttr(Attrs, type) of
"ssl" -> ssl;
- "tcp" -> gen_tcp;
+ %"tcp" -> gen_tcp;
+ "tcp" -> ts_tcp;
+ "bosh" -> ts_bosh;
"udp" -> gen_udp;
"erlang" -> erlang
end,
View
11 src/tsung_controller/ts_config_server.erl
@@ -51,7 +51,7 @@
get_client_config/1, newbeam/1, newbeam/2, start_slave/5,
get_monitor_hosts/0, encode_filename/1, decode_filename/1,
endlaunching/1, status/0, start_file_server/1, get_user_agents/0,
- get_client_config/2, get_user_param/1 ]).
+ get_client_config/2, get_user_param/1, get_user_port/1 ]).
%%debug
-export([choose_client_ip/1, choose_session/1]).
@@ -155,6 +155,9 @@ get_next_session(Host)->
get_user_param(Host)->
gen_server:call({global, ?MODULE},{get_user_param, Host}).
+get_user_port(Ip) ->
+ gen_server:call({global, ?MODULE},{get_user_port, Ip}).
+
endlaunching(Node) ->
gen_server:cast({global, ?MODULE},{end_launching, Node}).
@@ -256,6 +259,12 @@ handle_call({get_user_param, HostName}, _From, State=#state{users=UserId,ports=P
ts_mon:newclient({static,now()}),
{reply, {ok, { IPParam, Server, UserId}}, State#state{users=UserId+1,ports=NewPorts}};
+%% get user port. This is needed by bosh, as there are more than one socket per bosh connection.
+handle_call({get_user_port, IP}, _From, State=#state{ports=Ports}) ->
+ Config = State#state.config,
+ {NewPorts,CPort} = choose_port(IP, Ports,Config#config.ports_range),
+ {reply, {ok, CPort}, State#state{ports = NewPorts}};
+
%% get a new session id and user parameters for the given node
handle_call({get_next_session, HostName}, _From, State=#state{users=Users,ports=Ports}) ->
Config = State#state.config,
View
7 src/tsung_stats.pl.in
@@ -80,6 +80,7 @@ my $datadir = "data"; # all data files are created in this subdirectory
my $imgdir = "images"; # all data files are created in this subdirectory
my $gplotdir = "gnuplot_scripts"; # all data files are created in this subdirectory
my $http; # true if http. add status code graphs in the HTML output
+my $bosh; # true if bosh. add bosh specific graphs in the HTML output
foreach my $dir ($datadir, $imgdir, $gplotdir) {
unless (-d $dir) {
@@ -271,6 +272,7 @@ sub parse_stats_file {
my @time;
my @errors;
my @tps;
+ my @bosh_tps;
my @code;
my %extra_info = ();
my @session;
@@ -326,6 +328,9 @@ sub parse_stats_file {
push @async, "$key.txt";
} elsif ($key =~ /request$/ or $key eq "connect" or $key eq "async_rcv") {
push @tps, "$key.txt";
+ } elsif ($key eq "bosh_http_conn" or $key eq "bosh_http_req") {
+ $bosh = 1;
+ push @bosh_tps, "$key.txt";
} elsif ($key =~ /^tr_/ or $key eq "page") {
push @transactions, "$key.txt";
} elsif ($key =~ /^\d+$/) {
@@ -342,6 +347,7 @@ sub parse_stats_file {
plot_stats(\@col,"Session",undef,\@session,["sessions/sec"],$logy) unless $noplot;
plot_stats(\@colcount,"HTTP_CODE",undef,\@code,["number/sec","total"],$logy) if not $noplot and @code;
plot_stats(\@col,"Perfs",undef,\@tps,["rate","msec"],$logy) unless $noplot;
+ plot_stats(\@colcount,"Bosh",undef,\@bosh_tps,["rate"],$logy) unless $noplot;
plot_stats(\@col,"Transactions",undef,\@transactions,["transactions/sec","msec"],$logy) unless $noplot;
plot_stats(\@colcount,"Match",undef,\@match,["rate","rate"],$logy) unless $noplot;
plot_stats(\@colcount,"Event",undef,\@time,["rate","msec"],$logy) unless $noplot;
@@ -425,6 +431,7 @@ sub html_report {
http => $http,
match => $match,
async => $async,
+ bosh => $bosh,
title => $titre,
stats_subtitle => "Stats Report ",
graph_subtitle => "Graphs Report ",
View
2  tsung-1.0.dtd
@@ -20,7 +20,7 @@
<!ATTLIST server
host NMTOKEN #REQUIRED
port NMTOKEN #REQUIRED
- type (ssl | tcp | udp | erlang) #REQUIRED>
+ type (ssl | tcp | udp | erlang | bosh) #REQUIRED>
<!ELEMENT clients (client+)>
<!ELEMENT client (ip*) >
Please sign in to comment.
Something went wrong with that request. Please try again.