Skip to content

Commit

Permalink
change parse_bidi API: add a way to jump to the next request (using t…
Browse files Browse the repository at this point in the history
…hinktime='wait_bidi' or ack='bidi_ack'). Used in xmpp:starttls
  • Loading branch information
nniclausse committed Jun 18, 2015
1 parent 8f8416d commit b553ddf
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 29 deletions.
6 changes: 6 additions & 0 deletions docs/conf-sessions.rst
Expand Up @@ -76,6 +76,12 @@ the global lock (the value can be set using the option ``<option
name="global_number" value ="XXX"/>`` and by setting `maxnumber=N` in
``<arrivalphase>``).

**Since version 1.5.2**, you can wait for a 'bidi' ack. If your protocol is bidirectionnal (e.g. xmpp, websocket, ...), you can wait until the server sends some data, and the code that handle this data exits the ``think`` state.

.. code-block:: xml
<thinktime value="wait_bidi"></thinktime> -
HTTP
^^^^

Expand Down
5 changes: 3 additions & 2 deletions examples/jabber_starttls.xml.in
Expand Up @@ -50,9 +50,10 @@

<thinktime value="5"></thinktime>

<request> <jabber type="starttls" ack="no_ack" /> </request>
<thinktime value="3"></thinktime>
<request> <jabber type="starttls" ack="bidi_ack" /> </request>
<request> <jabber type="connect" ack="local"></jabber> </request>
<!-- needed -->
<thinktime value="3"></thinktime>

<transaction name="authenticate">
<request> <jabber type="auth_sasl" ack="local"></jabber></request>
Expand Down
6 changes: 3 additions & 3 deletions src/tsung/ts_amqp.erl
Expand Up @@ -263,15 +263,15 @@ parse_bidi(<<>>, State=#state_rcv{acc = [], session = AMQPSession}) ->
AckBuf = AMQPSession#amqp_session.ack_buf,
NewAMQPSession = AMQPSession#amqp_session{ack_buf = <<>>},
?DebugF("ack buf: ~p~n", [AckBuf]),
{confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession}};
{confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession},think};
parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
?DebugF("parse bidi data: ~p ~p~n", [size(Data), Data]),
Protocol = AMQPSession#amqp_session.protocol,
AckBuf = AMQPSession#amqp_session.ack_buf,
case decode_frame(Protocol, Data) of
{error, _Reason} ->
?DebugF("decode error: ~p~n", [_Reason]),
{nodata, State};
{nodata, State, think};
{ok, heartbeat, Left} ->
?DebugF("receive bidi: ~p~n", [heartbeat]),
HB = list_to_binary(rabbit_binary_generator:build_heartbeat_frame()),
Expand All @@ -286,7 +286,7 @@ parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
parse_bidi(Left, State#state_rcv{session = NewAMQPSession});
{incomplete, Left} ->
?DebugF("incomplete frame: ~p~n", [Left]),
{confirm_ack_buf(AckBuf), State#state_rcv{acc = Left}}
{confirm_ack_buf(AckBuf), State#state_rcv{acc = Left},think}
end;
parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize,
session = AMQPSession}) ->
Expand Down
22 changes: 14 additions & 8 deletions src/tsung/ts_client.erl
Expand Up @@ -297,20 +297,25 @@ handle_info2({gen_ts_transport, Socket, Data}, think,State=#state_rcv{
ts_mon:add({ sum, size_rcv, size(Data)}),
Proto = State#state_rcv.protocol,
?LOG("Data received from socket (bidi) in state think~n",?INFO),
NewState = case Type:parse_bidi(Data, State) of
{nodata, State2} ->
{NextAction, NewState} = case Type:parse_bidi(Data, State) of
{nodata, State2, Action} ->
?LOG("Bidi: no data ~n",?DEB),
ts_mon:add({count, async_unknown_data_rcv}),
State2;
{Data2, State2} ->
{Action, State2};
{Data2, State2, Action} ->
ts_mon:add([{ sum, size_sent, size(Data2)},{count, async_data_sent}]),
ts_mon:sendmes({State#state_rcv.dump, self(), Data2}),
?LOG("Bidi: send data back to server~n",?DEB),
send(Proto,Socket,Data2,Host,Port), %FIXME: handle errors ?
State2
{Action, State2}
end,
NewSocket = (NewState#state_rcv.protocol):set_opts(NewState#state_rcv.socket, [{active, once}]),
{next_state, think, NewState#state_rcv{socket=NewSocket}};
case NextAction of
think ->
{next_state, think, NewState#state_rcv{socket=NewSocket}};
continue ->
handle_next_action(NewState#state_rcv{socket=NewSocket})
end;
% bidi is false, but parse is also false: continue even if we get data
handle_info2({gen_ts_transport, Socket, Data}, think, State = #state_rcv{request=Req} )
when (Req#ts_request.ack /= parse) ->
Expand Down Expand Up @@ -823,6 +828,8 @@ handle_next_request(Request, State) ->
send_timestamp= Now,
timestamp= Now },
case Request#ts_request.ack of
bidi_ack ->
{next_state, think, NewState};
no_ack ->
{PTimeStamp, _} = update_stats_noack(NewState),
handle_next_action(NewState#state_rcv{ack_done=true, page_timestamp=PTimeStamp});
Expand Down Expand Up @@ -1163,8 +1170,7 @@ handle_data_msg(closed,State) ->
{State,[]};

%% ack = global
handle_data_msg(Data,State=#state_rcv{request=Req,datasize=OldSize})
when Req#ts_request.ack==global ->
handle_data_msg(Data,State=#state_rcv{request=Req,datasize=OldSize}) when Req#ts_request.ack==global ->
%% FIXME: we do not report size now (but after receiving the
%% global ack), the size stats may be not very accurate.
%% FIXME: should we set buffer and parse for dynvars ?
Expand Down
17 changes: 10 additions & 7 deletions src/tsung/ts_jabber.erl
Expand Up @@ -176,7 +176,7 @@ parse_bidi(Data, State) ->
_Else ->
Acc
end
end, {nodata, State}, BidiElements).
end, {nodata, State, think}, BidiElements).

presence_bidi(RcvdXml, State)->
{match,SubMatches} = re:run(RcvdXml,"<presence[^>]*subscribe[\"\'][^>]*>",[global]),
Expand All @@ -191,14 +191,14 @@ message_bidi(RcvdXml, State) ->
Mega = list_to_integer(MegaS),
Secs = list_to_integer(SecsS),
Micro = list_to_integer(MicroS),
Latency = timer:now_diff(erlang:now(), {Mega, Secs, Micro}),
Latency = timer:now_diff(?NOW, {Mega, Secs, Micro}),
ts_mon:add({ sample, xmpp_msg_latency, Latency / 1000});
_ ->
ignore
end,
{nodata, State}.
{nodata, State, think}.

starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
starttls_bidi(_RcvdXml, #state_rcv{socket= Socket, send_timestamp=SendTime}=State)->
ssl:start(),
Req = subst(State#state_rcv.request#ts_request.param, State#state_rcv.dynvars),
Opt = lists:filter(fun({_,V}) -> V /= undefined end,
Expand All @@ -208,7 +208,9 @@ starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
{cacertfile,Req#jabber.cacertfile}]),
{ok, SSL} = ts_ssl:connect(Socket, Opt),
?LOGF("Upgrading to TLS : ~p",[SSL],?INFO),
{nodata, State#state_rcv{socket=SSL,protocol=ts_ssl}}.
Latency = ts_utils:elapsed(SendTime, ?NOW),
ts_mon:add({ sample, xmpp_starttls, Latency}),
{nodata, State#state_rcv{socket=SSL,protocol=ts_ssl}, continue}.

%%----------------------------------------------------------------------
%% Function: bidi_resp/4
Expand All @@ -219,6 +221,7 @@ starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
%% State (record)
%% Returns: Data (binary)
%% NewState (record)
%% think|continue
%%----------------------------------------------------------------------
%% subscribed: Complete a pending subscription request
bidi_resp(subscribed,RcvdXml,SubMatches,State) ->
Expand All @@ -238,10 +241,10 @@ bidi_resp(subscribed,RcvdXml,SubMatches,State) ->
end,"",SubMatches),
case lists:flatten(JoinedXml) of
"" ->
{nodata,State};
{nodata,State, think};
_ ->
?LOGF("RESPONSE TO SEND : ~s~n",[JoinedXml],?DEB),
{list_to_binary(JoinedXml),State}
{list_to_binary(JoinedXml),State, think}
end.

%%
Expand Down
4 changes: 2 additions & 2 deletions src/tsung/ts_jabber_common.erl
Expand Up @@ -451,7 +451,7 @@ auth_sasl_bind(#jabber{username=Name,passwd=Passwd,domain=Domain, resource=Resou
%%----------------------------------------------------------------------
auth_sasl_bind(_Username, _Passwd, _Domain, Resource) ->
list_to_binary(["<iq type='set' id='",ts_msg_server:get_id(list),
"' ><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'>",
"'><bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'>",
"<resource>",Resource,"</resource>",
"</bind></iq>"]).

Expand All @@ -468,7 +468,7 @@ auth_sasl_session(#jabber{username=Name,passwd=Passwd,domain=Domain})->
%%----------------------------------------------------------------------
auth_sasl_session(_Username, _Passwd, _Domain) ->
list_to_binary(["<iq type='set' id='",ts_msg_server:get_id(list),
"' ><session xmlns='urn:ietf:params:xml:ns:xmpp-session' /></iq>"]).
"'><session xmlns='urn:ietf:params:xml:ns:xmpp-session'/></iq>"]).

%%----------------------------------------------------------------------
%% Func: registration/1
Expand Down
2 changes: 1 addition & 1 deletion src/tsung/ts_ldap.erl
Expand Up @@ -80,7 +80,7 @@ dump(A,B)->
ts_plugin:dump(A,B).

parse_bidi(A, B) ->
ts_plugin:dump(A,B).
ts_plugin:parse_bidi(A,B).

%%----------------------------------------------------------------------
%% Function: parse/2
Expand Down
4 changes: 2 additions & 2 deletions src/tsung/ts_mqtt.erl
Expand Up @@ -215,7 +215,7 @@ parse_bidi(<<>>, State=#state_rcv{acc = [], session = MqttSession}) ->
end,
NewMqttSession = MqttSession#mqtt_session{ack_buf = <<>>},
?DebugF("ack buf: ~p~n", [AckBuf]),
{Ack, State#state_rcv{session = NewMqttSession}};
{Ack, State#state_rcv{session = NewMqttSession}, think};
parse_bidi(Data, State=#state_rcv{acc = [], session = MqttSession}) ->
AckBuf = MqttSession#mqtt_session.ack_buf,
case mqtt_frame:decode(Data) of
Expand All @@ -239,7 +239,7 @@ parse_bidi(Data, State=#state_rcv{acc = [], session = MqttSession}) ->
[mqtt_frame:command_for_type(_Type), _MqttMsg]),
parse_bidi(Left, State);
more ->
{nodata, State#state_rcv{acc = Data}}
{nodata, State#state_rcv{acc = Data},think}
end;
parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) ->
NewSize = DataSize + size(Data),
Expand Down
8 changes: 5 additions & 3 deletions src/tsung/ts_plugin.erl
Expand Up @@ -50,10 +50,12 @@ dump(_Type,_Data) ->
ok.

%% @spec parse_bidi(Data::binary(),State::record(state_rcv)) ->
%% {NewData::binary()|nodata, NewState::record(state_rcv)}
%% {NewData::binary()|nodata, NewState::record(state_rcv), think|continue}
%% @doc Parse a block of data from the server. No reply will be sent
%% if the return value is nodata, otherwise the Data binary will be
%% sent back to the server immediately.
%% sent back to the server immediately. If the last argument is
%% 'think', it will continue to wait; if it's 'continue', it will
%% handle the next action (request, thinktime, ...)
%% @end
parse_bidi(_Data, State) ->
{nodata, State}.
{nodata, State, think}.
2 changes: 2 additions & 0 deletions src/tsung_controller/ts_config.erl
Expand Up @@ -926,6 +926,8 @@ parse(Element = #xmlElement{name=option, attributes=Attrs},
parse(Element = #xmlElement{name=thinktime, attributes=Attrs},
Conf = #config{curid=Id, session_tab = Tab, sessions = [CurS |_]}) ->
{RT,T} = case getAttr(Attrs, value) of
"wait_bidi" ->
{infinity, infinity};
"wait_global" ->
{wait_global,infinity};
"%%"++Tail -> % dynamic thinktime
Expand Down
2 changes: 1 addition & 1 deletion tsung-1.0.dtd
Expand Up @@ -257,7 +257,7 @@ repeat | if | change_type | foreach | set_option | interaction )*>

<!ELEMENT jabber (xmpp_authenticate?) >
<!ATTLIST jabber
ack (global | local | no_ack | parse) #REQUIRED
ack (global | local | no_ack | parse | bidi_ack) #REQUIRED
destination (online | offline | random | unique | previous) "random"
id NMTOKEN #IMPLIED
size NMTOKEN "0"
Expand Down

0 comments on commit b553ddf

Please sign in to comment.