diff --git a/docs/conf-sessions.rst b/docs/conf-sessions.rst
index 09a978694..dea5a351b 100644
--- a/docs/conf-sessions.rst
+++ b/docs/conf-sessions.rst
@@ -76,6 +76,12 @@ the global lock (the value can be set using the option ```` and by setting `maxnumber=N` in
````).
+**Since version 1.5.2**, you can wait for a 'bidi' ack. If your protocol is bidirectionnal (e.g. xmpp, websocket, ...), you can wait until the server sends some data, and the code that handle this data exits the ``think`` state.
+
+.. code-block:: xml
+
+ -
+
HTTP
^^^^
diff --git a/examples/jabber_starttls.xml.in b/examples/jabber_starttls.xml.in
index 8e46ac69c..35ef4c008 100644
--- a/examples/jabber_starttls.xml.in
+++ b/examples/jabber_starttls.xml.in
@@ -50,9 +50,10 @@
-
-
+
+
+
diff --git a/src/tsung/ts_amqp.erl b/src/tsung/ts_amqp.erl
index 3ae0c17b2..dd67181ab 100644
--- a/src/tsung/ts_amqp.erl
+++ b/src/tsung/ts_amqp.erl
@@ -263,7 +263,7 @@ parse_bidi(<<>>, State=#state_rcv{acc = [], session = AMQPSession}) ->
AckBuf = AMQPSession#amqp_session.ack_buf,
NewAMQPSession = AMQPSession#amqp_session{ack_buf = <<>>},
?DebugF("ack buf: ~p~n", [AckBuf]),
- {confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession}};
+ {confirm_ack_buf(AckBuf), State#state_rcv{session = NewAMQPSession},think};
parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
?DebugF("parse bidi data: ~p ~p~n", [size(Data), Data]),
Protocol = AMQPSession#amqp_session.protocol,
@@ -271,7 +271,7 @@ parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
case decode_frame(Protocol, Data) of
{error, _Reason} ->
?DebugF("decode error: ~p~n", [_Reason]),
- {nodata, State};
+ {nodata, State, think};
{ok, heartbeat, Left} ->
?DebugF("receive bidi: ~p~n", [heartbeat]),
HB = list_to_binary(rabbit_binary_generator:build_heartbeat_frame()),
@@ -286,7 +286,7 @@ parse_bidi(Data, State=#state_rcv{acc = [], session = AMQPSession}) ->
parse_bidi(Left, State#state_rcv{session = NewAMQPSession});
{incomplete, Left} ->
?DebugF("incomplete frame: ~p~n", [Left]),
- {confirm_ack_buf(AckBuf), State#state_rcv{acc = Left}}
+ {confirm_ack_buf(AckBuf), State#state_rcv{acc = Left},think}
end;
parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize,
session = AMQPSession}) ->
diff --git a/src/tsung/ts_client.erl b/src/tsung/ts_client.erl
index 1d3a33941..26eabd731 100644
--- a/src/tsung/ts_client.erl
+++ b/src/tsung/ts_client.erl
@@ -297,20 +297,25 @@ handle_info2({gen_ts_transport, Socket, Data}, think,State=#state_rcv{
ts_mon:add({ sum, size_rcv, size(Data)}),
Proto = State#state_rcv.protocol,
?LOG("Data received from socket (bidi) in state think~n",?INFO),
- NewState = case Type:parse_bidi(Data, State) of
- {nodata, State2} ->
+ {NextAction, NewState} = case Type:parse_bidi(Data, State) of
+ {nodata, State2, Action} ->
?LOG("Bidi: no data ~n",?DEB),
ts_mon:add({count, async_unknown_data_rcv}),
- State2;
- {Data2, State2} ->
+ {Action, State2};
+ {Data2, State2, Action} ->
ts_mon:add([{ sum, size_sent, size(Data2)},{count, async_data_sent}]),
ts_mon:sendmes({State#state_rcv.dump, self(), Data2}),
?LOG("Bidi: send data back to server~n",?DEB),
send(Proto,Socket,Data2,Host,Port), %FIXME: handle errors ?
- State2
+ {Action, State2}
end,
NewSocket = (NewState#state_rcv.protocol):set_opts(NewState#state_rcv.socket, [{active, once}]),
- {next_state, think, NewState#state_rcv{socket=NewSocket}};
+ case NextAction of
+ think ->
+ {next_state, think, NewState#state_rcv{socket=NewSocket}};
+ continue ->
+ handle_next_action(NewState#state_rcv{socket=NewSocket})
+ end;
% bidi is false, but parse is also false: continue even if we get data
handle_info2({gen_ts_transport, Socket, Data}, think, State = #state_rcv{request=Req} )
when (Req#ts_request.ack /= parse) ->
@@ -823,6 +828,8 @@ handle_next_request(Request, State) ->
send_timestamp= Now,
timestamp= Now },
case Request#ts_request.ack of
+ bidi_ack ->
+ {next_state, think, NewState};
no_ack ->
{PTimeStamp, _} = update_stats_noack(NewState),
handle_next_action(NewState#state_rcv{ack_done=true, page_timestamp=PTimeStamp});
@@ -1163,8 +1170,7 @@ handle_data_msg(closed,State) ->
{State,[]};
%% ack = global
-handle_data_msg(Data,State=#state_rcv{request=Req,datasize=OldSize})
- when Req#ts_request.ack==global ->
+handle_data_msg(Data,State=#state_rcv{request=Req,datasize=OldSize}) when Req#ts_request.ack==global ->
%% FIXME: we do not report size now (but after receiving the
%% global ack), the size stats may be not very accurate.
%% FIXME: should we set buffer and parse for dynvars ?
diff --git a/src/tsung/ts_jabber.erl b/src/tsung/ts_jabber.erl
index 510878bdb..5dabeb863 100644
--- a/src/tsung/ts_jabber.erl
+++ b/src/tsung/ts_jabber.erl
@@ -176,7 +176,7 @@ parse_bidi(Data, State) ->
_Else ->
Acc
end
- end, {nodata, State}, BidiElements).
+ end, {nodata, State, think}, BidiElements).
presence_bidi(RcvdXml, State)->
{match,SubMatches} = re:run(RcvdXml,"]*subscribe[\"\'][^>]*>",[global]),
@@ -191,14 +191,14 @@ message_bidi(RcvdXml, State) ->
Mega = list_to_integer(MegaS),
Secs = list_to_integer(SecsS),
Micro = list_to_integer(MicroS),
- Latency = timer:now_diff(erlang:now(), {Mega, Secs, Micro}),
+ Latency = timer:now_diff(?NOW, {Mega, Secs, Micro}),
ts_mon:add({ sample, xmpp_msg_latency, Latency / 1000});
_ ->
ignore
end,
- {nodata, State}.
+ {nodata, State, think}.
-starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
+starttls_bidi(_RcvdXml, #state_rcv{socket= Socket, send_timestamp=SendTime}=State)->
ssl:start(),
Req = subst(State#state_rcv.request#ts_request.param, State#state_rcv.dynvars),
Opt = lists:filter(fun({_,V}) -> V /= undefined end,
@@ -208,7 +208,9 @@ starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
{cacertfile,Req#jabber.cacertfile}]),
{ok, SSL} = ts_ssl:connect(Socket, Opt),
?LOGF("Upgrading to TLS : ~p",[SSL],?INFO),
- {nodata, State#state_rcv{socket=SSL,protocol=ts_ssl}}.
+ Latency = ts_utils:elapsed(SendTime, ?NOW),
+ ts_mon:add({ sample, xmpp_starttls, Latency}),
+ {nodata, State#state_rcv{socket=SSL,protocol=ts_ssl}, continue}.
%%----------------------------------------------------------------------
%% Function: bidi_resp/4
@@ -219,6 +221,7 @@ starttls_bidi(_RcvdXml, #state_rcv{socket= Socket}=State)->
%% State (record)
%% Returns: Data (binary)
%% NewState (record)
+%% think|continue
%%----------------------------------------------------------------------
%% subscribed: Complete a pending subscription request
bidi_resp(subscribed,RcvdXml,SubMatches,State) ->
@@ -238,10 +241,10 @@ bidi_resp(subscribed,RcvdXml,SubMatches,State) ->
end,"",SubMatches),
case lists:flatten(JoinedXml) of
"" ->
- {nodata,State};
+ {nodata,State, think};
_ ->
?LOGF("RESPONSE TO SEND : ~s~n",[JoinedXml],?DEB),
- {list_to_binary(JoinedXml),State}
+ {list_to_binary(JoinedXml),State, think}
end.
%%
diff --git a/src/tsung/ts_jabber_common.erl b/src/tsung/ts_jabber_common.erl
index fa0d963ac..694d323ad 100644
--- a/src/tsung/ts_jabber_common.erl
+++ b/src/tsung/ts_jabber_common.erl
@@ -451,7 +451,7 @@ auth_sasl_bind(#jabber{username=Name,passwd=Passwd,domain=Domain, resource=Resou
%%----------------------------------------------------------------------
auth_sasl_bind(_Username, _Passwd, _Domain, Resource) ->
list_to_binary(["",
+ "'>",
"",Resource,"",
""]).
@@ -468,7 +468,7 @@ auth_sasl_session(#jabber{username=Name,passwd=Passwd,domain=Domain})->
%%----------------------------------------------------------------------
auth_sasl_session(_Username, _Passwd, _Domain) ->
list_to_binary([""]).
+"'>"]).
%%----------------------------------------------------------------------
%% Func: registration/1
diff --git a/src/tsung/ts_ldap.erl b/src/tsung/ts_ldap.erl
index 90129b77d..3c0e0d25d 100644
--- a/src/tsung/ts_ldap.erl
+++ b/src/tsung/ts_ldap.erl
@@ -80,7 +80,7 @@ dump(A,B)->
ts_plugin:dump(A,B).
parse_bidi(A, B) ->
- ts_plugin:dump(A,B).
+ ts_plugin:parse_bidi(A,B).
%%----------------------------------------------------------------------
%% Function: parse/2
diff --git a/src/tsung/ts_mqtt.erl b/src/tsung/ts_mqtt.erl
index 770c2e65c..4448e959b 100644
--- a/src/tsung/ts_mqtt.erl
+++ b/src/tsung/ts_mqtt.erl
@@ -215,7 +215,7 @@ parse_bidi(<<>>, State=#state_rcv{acc = [], session = MqttSession}) ->
end,
NewMqttSession = MqttSession#mqtt_session{ack_buf = <<>>},
?DebugF("ack buf: ~p~n", [AckBuf]),
- {Ack, State#state_rcv{session = NewMqttSession}};
+ {Ack, State#state_rcv{session = NewMqttSession}, think};
parse_bidi(Data, State=#state_rcv{acc = [], session = MqttSession}) ->
AckBuf = MqttSession#mqtt_session.ack_buf,
case mqtt_frame:decode(Data) of
@@ -239,7 +239,7 @@ parse_bidi(Data, State=#state_rcv{acc = [], session = MqttSession}) ->
[mqtt_frame:command_for_type(_Type), _MqttMsg]),
parse_bidi(Left, State);
more ->
- {nodata, State#state_rcv{acc = Data}}
+ {nodata, State#state_rcv{acc = Data},think}
end;
parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) ->
NewSize = DataSize + size(Data),
diff --git a/src/tsung/ts_plugin.erl b/src/tsung/ts_plugin.erl
index c3abe954f..647241e07 100644
--- a/src/tsung/ts_plugin.erl
+++ b/src/tsung/ts_plugin.erl
@@ -50,10 +50,12 @@ dump(_Type,_Data) ->
ok.
%% @spec parse_bidi(Data::binary(),State::record(state_rcv)) ->
-%% {NewData::binary()|nodata, NewState::record(state_rcv)}
+%% {NewData::binary()|nodata, NewState::record(state_rcv), think|continue}
%% @doc Parse a block of data from the server. No reply will be sent
%% if the return value is nodata, otherwise the Data binary will be
-%% sent back to the server immediately.
+%% sent back to the server immediately. If the last argument is
+%% 'think', it will continue to wait; if it's 'continue', it will
+%% handle the next action (request, thinktime, ...)
%% @end
parse_bidi(_Data, State) ->
- {nodata, State}.
+ {nodata, State, think}.
diff --git a/src/tsung_controller/ts_config.erl b/src/tsung_controller/ts_config.erl
index 410979718..862be4952 100644
--- a/src/tsung_controller/ts_config.erl
+++ b/src/tsung_controller/ts_config.erl
@@ -926,6 +926,8 @@ parse(Element = #xmlElement{name=option, attributes=Attrs},
parse(Element = #xmlElement{name=thinktime, attributes=Attrs},
Conf = #config{curid=Id, session_tab = Tab, sessions = [CurS |_]}) ->
{RT,T} = case getAttr(Attrs, value) of
+ "wait_bidi" ->
+ {infinity, infinity};
"wait_global" ->
{wait_global,infinity};
"%%"++Tail -> % dynamic thinktime
diff --git a/tsung-1.0.dtd b/tsung-1.0.dtd
index 9afe1889a..7c1bd5c2d 100644
--- a/tsung-1.0.dtd
+++ b/tsung-1.0.dtd
@@ -257,7 +257,7 @@ repeat | if | change_type | foreach | set_option | interaction )*>