Skip to content

Commit

Permalink
Merge remote-tracking branch 'jvanvleet/master' into github
Browse files Browse the repository at this point in the history
  • Loading branch information
nniclausse committed Oct 8, 2013
2 parents 9d7000c + 7579a6f commit f643e58
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
9 changes: 7 additions & 2 deletions examples/amqp.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@
<request>
<amqp type="confirm.select"></amqp>
</request>

<for from="1" to="200" incr="1" var="loops">
<for from="1" to="10000" incr="1" var="counter">
<transaction name="publish">
<!-- specify payload_size to have tsung generate a payload for you -->
<request>
<amqp type="basic.publish" exchange="test_exchange" routing_key="4.queue_lb" persistent="true" payload_size="100"></amqp>
</request>
<!-- substitutions are supported on the payload. Payload will override payload_size. -->
<request>
<amqp type="basic.publish" exchange="test_exchange" routing_key="4.queue_lb" persistent="true" size="100"></amqp>
<amqp type="basic.publish" exchange="test_exchange" routing_key="4.queue_lb" persistent="true" payload="Test Payload"></amqp>
</request>
</transaction>
</for>
Expand Down
3 changes: 2 additions & 1 deletion include/ts_amqp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
prefetch_size,
prefetch_count,
persistent,
size,
payload,
payload_size,
queue,
ack
}).
Expand Down
21 changes: 14 additions & 7 deletions src/tsung/ts_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,23 @@ get_message(#amqp_request{type = 'basic.qos', prefetch_size = PrefetchSize,
{Frame, NewAMQPSession};

get_message(#amqp_request{type = 'basic.publish', exchange = Exchange,
routing_key = RoutingKey, size = Size,
persistent = Persistent},
routing_key = RoutingKey, payload_size = Size,
payload=Payload, persistent = Persistent},
#state_rcv{session = AMQPSession}) ->
Protocol = AMQPSession#amqp_session.protocol,
Payload = list_to_binary(ts_utils:urandomstr_noflat(Size)),
MsgPayload = case Payload of
"" ->list_to_binary(ts_utils:urandomstr_noflat(Size));
_ -> list_to_binary(Payload)
end,
Publish = #'basic.publish'{exchange = list_to_binary(Exchange),
routing_key = list_to_binary(RoutingKey)},
Msg = case Persistent of
true ->
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
build_content(Props, Payload);
build_content(Props, MsgPayload);
false ->
Props = #'P_basic'{},
build_content(Props, Payload)
build_content(Props, MsgPayload)
end,
Frame = assemble_frames(1, Publish, Msg, ?FRAME_MIN_SIZE, Protocol),
NewAMQPSession = case AMQPSession#amqp_session.next_pub_seqno of
Expand Down Expand Up @@ -362,8 +365,12 @@ parse_config(Element, Conf) ->
%% Purpose: we dont actually do anything
%% Returns: #amqp_request
%%----------------------------------------------------------------------
add_dynparams(_Bool, _DynData, Param, _HostData) ->
Param.
add_dynparams(false, {_DynVars, _Session}, Param, _HostData) ->
Param;

add_dynparams(true, {DynVars, _Session}, Req=#amqp_request{payload=Payload}, _HostData) ->
SubstPayload=ts_search:subst(Payload, DynVars),
Req#amqp_request{payload=SubstPayload}.

%%----------------------------------------------------------------------
confirm_ack_buf(AckBuf) ->
Expand Down
8 changes: 5 additions & 3 deletions src/tsung_controller/ts_config_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ parse_request(Element, Type = 'basic.publish', _Tab) ->
RoutingKey = ts_config:getAttr(string, Element#xmlElement.attributes,
routing_key, "/"),
Size = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes,
size, 100),
payload_size, 100),
PersistentStr = ts_config:getAttr(string, Element#xmlElement.attributes,
persistent, "false"),
Payload = ts_config:getAttr(string, Element#xmlElement.attributes,
payload, ""),
Persistent = list_to_atom(PersistentStr),
Request = #amqp_request{type = Type, exchange = Exchange,
routing_key = RoutingKey, size = Size,
persistent = Persistent},
routing_key = RoutingKey, payload_size = Size,
payload = Payload, persistent = Persistent},
{no_ack, Request};
parse_request(Element, Type = 'basic.consume', _Tab) ->
Queue = ts_config:getAttr(string,
Expand Down
3 changes: 2 additions & 1 deletion tsung-1.0.dtd
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ repeat | if | change_type | foreach | set_option | interaction )*>
vhost CDATA "/"
exchange CDATA ""
routing_key CDATA ""
size CDATA "100"
payload CDATA ""
payload_size CDATA "100"
prefetch_size CDATA "0"
prefetch_count CDATA "0"
persistent CDATA "false"
Expand Down

0 comments on commit f643e58

Please sign in to comment.