Permalink
Browse files

handle pgsql extended protocol in the client and 'bind' should be fully

recorded and replayed (TSUN-185)
  • Loading branch information...
1 parent ec983ac commit 009d537d1475b19d1014fa3fa531377f32616637 @nniclausse nniclausse committed Aug 11, 2011
View
@@ -79,7 +79,7 @@ new_session() ->
%% Function: get_message/21
%% Purpose: Build a message/request ,
%% Args: record
-%% Returns: binary
+%% Returns: {binary,#pgsql}
%%----------------------------------------------------------------------
get_message(#pgsql_request{type=connect, database=DB, username=UserName},#state_rcv{session=S}) ->
Version = <<?PROTOCOL_MAJOR:16/integer, ?PROTOCOL_MINOR:16/integer>>,
@@ -105,7 +105,24 @@ get_message(#pgsql_request{type=authenticate, auth_method= {?PG_AUTH_MD5, Salt},
{pgsql_proto:encode_message(pass_md5, {User,PassString,Salt}),S};
get_message(#pgsql_request{type=authenticate, auth_method=AuthType},#state_rcv{session=S}) ->
?LOGF("PGSQL: Authentication method not implemented ! [~p] ~n",[AuthType],?ERR),
- {<<>>, S}.
+ {<<>>, S};
+get_message(#pgsql_request{type=execute,name_portal=Portal,max_rows=Max},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(execute,{Portal,Max}), S};
+get_message(#pgsql_request{type=parse,name_prepared=Name,equery=Query},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(parse,{Name,Query,""}), S};
+get_message(#pgsql_request{type=bind,formats=Formats,
+ name_portal=Portal,name_prepared=NPrep,
+ parameters=Params, formats_results=FormatsResults},
+ #state_rcv{session=S})->
+ {pgsql_proto:encode_message(bind,{Portal,NPrep,Params,Formats,FormatsResults}), S};
+%% describe
+get_message(#pgsql_request{type=describe, name_portal=Name,name_prepared=undefined}, #state_rcv{session=S})->
+ {pgsql_proto:encode_message(describe,{portal,Name}), S};
+get_message(#pgsql_request{type=describe, name_portal=undefined,name_prepared=Name}, #state_rcv{session=S})->
+ {pgsql_proto:encode_message(describe,{prepared_statement,Name}), S};
+%% sync
+get_message(#pgsql_request{type=sync},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(sync,[]), S}.
parse_bidi(Data, State) ->
ts_plugin:parse_bidi(Data,State).
View
@@ -458,13 +458,13 @@ concat_atoms(Atoms) when is_list(Atoms) ->
%% separated by Sep.
join(_Sep, []) -> [];
join(Sep, List) when is_list(List)->
- join2(Sep, lists:reverse(List)).
-join2(Sep, [First | List]) when is_integer(First)->
- join2(Sep, [integer_to_list(First) | List]);
-join2(Sep, [First | List]) when is_float(First)->
- join2(Sep, [float_to_list(First) | List]);
-join2(Sep, [First | List]) when is_list(First)->
- lists:foldl(fun(X, Sum) -> X ++ Sep ++ Sum end, First, List).
+ ToStr = fun(A) when is_integer(A) -> integer_to_list(A);
+ (A) when is_list(A) -> A;
+ (A) when is_float(A) -> float_to_list(A);
+ (A) when is_atom(A) -> atom_to_list(A);
+ (A) when is_binary(A) -> binary_to_list(A)
+ end,
+ string:join(lists:map(ToStr,List), Sep).
%% split a string (at first occurence of char)
split(String,Chr) ->
@@ -52,23 +52,53 @@ parse_config(Element = #xmlElement{name=pgsql},
sessions = [CurS | _], dynvar=DynVar,
subst = SubstFlag, match=MatchRegExp}) ->
- Request = case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
+ {Ack,Request} = case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
sql ->
ValRaw = ts_config:getText(Element#xmlElement.content),
SQL = list_to_binary(ts_utils:clean_str(ValRaw)),
?LOGF("Got SQL query: ~p~n",[SQL], ?NOTICE),
- #pgsql_request{sql=SQL, type= sql};
+ {parse,#pgsql_request{sql=SQL, type= sql}};
close ->
- #pgsql_request{type= close};
+ {parse,#pgsql_request{type=close}};
+ sync ->
+ {parse,#pgsql_request{type=sync}};
+ execute ->
+ Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
+ Limit = ts_config:getAttr(integer,Element#xmlElement.attributes, max_rows,0),
+ {no_ack,#pgsql_request{type=execute,name_portal=Portal,max_rows=Limit}};
+ parse ->
+ Name = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
+ Query = list_to_binary(ts_config:getText(Element#xmlElement.content)),
+ {no_ack,#pgsql_request{type=parse,name_prepared=Name,equery=Query}};
+ bind ->
+ Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
+ Prep = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
+ Formats = case ts_config:getAttr(Element#xmlElement.attributes, formats_results) of
+ "" -> "";
+ FR ->
+ lists:map(fun(A)->list_to_atom(A) end,ts_utils:split(FR,","))
+ end,
+ Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
+ "" -> "";
+ P ->
+ ts_utils:split(P,",")
+ end,
+ ParamsFormat = ts_config:getAttr(atom,Element#xmlElement.attributes, formats, none),
+ {no_ack,#pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep,
+ formats=ParamsFormat,formats_results=Formats,parameters=Params}};
+ describe ->
+ Portal=ts_config:getAttr(string,Element#xmlElement.attributes, name_portal,undefined),
+ Prep = ts_config:getAttr(string,Element#xmlElement.attributes, name_prepared,undefined),
+ {no_ack,#pgsql_request{type=describe,name_portal=Portal,name_prepared=Prep}};
authenticate ->
Passwd = ts_config:getAttr(Element#xmlElement.attributes, password),
- #pgsql_request{passwd=Passwd, type= authenticate};
+ {parse,#pgsql_request{passwd=Passwd, type= authenticate}};
connect ->
Database = ts_config:getAttr(Element#xmlElement.attributes, database),
User = ts_config:getAttr(Element#xmlElement.attributes, username),
- #pgsql_request{username=User, database=Database, type=connect}
+ {parse,#pgsql_request{username=User, database=Database, type=connect}}
end,
- Msg= #ts_request{ack = parse,
+ Msg= #ts_request{ack = Ack,
endpage = true,
dynvar_specs = DynVar,
subst = SubstFlag,
@@ -581,8 +581,8 @@ replace_str({A,B},X) ->
%% Func: print_info/0 Print system info
%%----------------------------------------------------------------------
print_info() ->
- VSN = case lists:keyfind(tsung_controller,1,application:loaded_applications()) of
- {_,_ ,V} -> V;
+ VSN = case lists:keysearch(tsung_controller,1,application:loaded_applications()) of
+ {value, {_,_ ,V}} -> V;
_ -> "unknown"
end,
?LOGF("SYSINFO:Tsung version: ~s~n",[VSN],?NOTICE),
@@ -123,12 +123,12 @@ process_data(State,RawData = <<Code:8/integer, Size:4/integer-unit:8, Tail/binar
%% TODO: handle Parameters if defined
ts_proxy_recorder:dorecord({#pgsql_request{type=parse,name_prepared=StringName,equery=StringQuery}}),
State#proxy{buffer=[]};
- {portal,{<<>>,<<>>,0,_Params} } ->
- ts_proxy_recorder:dorecord({#pgsql_request{type=bind}}),
- State#proxy{buffer=[]};
- {portal,{<<>>,StringQuery,0,_Params} } ->
- ts_proxy_recorder:dorecord({#pgsql_request{type=bind, name_prepared=StringQuery}}),
- %%TODO: handle other parameters
+ {bind,{Portal,StringQuery,Params, ParamsFormat,ResFormats} } ->
+ R={#pgsql_request{type=bind, name_prepared=StringQuery,
+ name_portal=Portal,
+ parameters=Params,formats=ParamsFormat,
+ formats_results=ResFormats}},
+ ts_proxy_recorder:dorecord(R),
State#proxy{buffer=[]};
sync ->
ts_proxy_recorder:dorecord({#pgsql_request{type=sync}}),
@@ -190,9 +190,18 @@ decode_packet($E, Data) -> %execute
decode_packet($B, Data) -> %bind
[NamePortal, StringQuery | _] = split(Data,<<0>>,[global,trim]),
Size = size(NamePortal)+size(StringQuery)+2,
- << _:Size/binary, NParams:16/integer,Params/binary >> = Data,
- ?LOGF("Extended protocol: bind ~p ~p ~p~n",[NamePortal,StringQuery,NParams], ?DEB),
- {portal,{NamePortal,StringQuery,NParams,Params}};
+ << _:Size/binary, NParamsFormat:16/integer,Tail1/binary >> = Data,
+ << Formats:NParamsFormat/binary, NParams:16/integer, Tail2/binary>> = Tail1,
+ ParamsFormat = case {NParamsFormat,Formats} of
+ {0,_} -> none;
+ {1,<< 0:16/integer >> } -> text;
+ {1,<< 1:16/integer >> } -> binary;
+ _ -> auto
+ end,
+ {Params,<< NFormatRes:16/integer,FormatsResBin/binary >> }=get_params(NParams,Tail2,[]),
+ ResFormats=get_params_format(FormatsResBin,[]),
+ ?LOGF("Extended protocol: bind ~p ~p ~p ~p ~p~n",[NamePortal,StringQuery,Params,ParamsFormat,ResFormats ], ?DEB),
+ {bind,{NamePortal,StringQuery,Params,ParamsFormat,ResFormats}};
decode_packet($P, Data) -> % parse
[StringName, StringQuery | _] = split(Data,<<0>>,[global,trim]),
Size = size(StringName)+size(StringQuery)+2,
@@ -201,6 +210,18 @@ decode_packet($P, Data) -> % parse
{parse,{StringName,StringQuery,NParams,Params}}.
+get_params_format(<<>>,Acc) ->
+ lists:reverse(Acc);
+get_params_format(<<0:16/integer,Tail/binary>>,Acc) ->
+ get_params_format(Tail,[text|Acc]);
+get_params_format(<<1:16/integer,Tail/binary>>,Acc) ->
+ get_params_format(Tail,[binary|Acc]).
+
+get_params(0,Tail,Acc) ->
+ {lists:reverse(Acc), Tail};
+get_params(N,<<Size:32/integer,S:Size/binary,Tail/binary>>,Acc) ->
+ get_params(N-1,Tail,[S|Acc]).
+
split(Bin,Pattern,Options)->
%% we should remove this once R13B and older are no longer supported by tsung
case ts_utils:release_is_newer_or_eq("5.8") of
@@ -245,23 +266,29 @@ record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=describe,name_por
io:format(Fd,"<request><pgsql type='describe' name_prepared='~s'/></request>~n", [Val]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=parse,name_portal=undefined,name_prepared=undefined,equery=Query})->
- io:format(Fd,"<request><pgsql type='parse' /><![CDATA[~s]]></request>~n", [Query]),
+ io:format(Fd,"<request><pgsql type='parse'><![CDATA[~s]]></pgsql></request>~n", [Query]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=parse,name_portal=undefined,name_prepared=Val,equery=Query})->
- io:format(Fd,"<request><pgsql type='parse' name_prepared='~s'/><![CDATA[~s]]></request>~n", [Val,Query]),
+ io:format(Fd,"<request><pgsql type='parse' name_prepared='~s'><![CDATA[~s]]></pgsql></request>~n", [Val,Query]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=parse,name_portal=Portal,name_prepared=Prep,equery=Query})->
- io:format(Fd,"<request><pgsql type='parse' name_portal='~s' name_prepared='~s'/><![CDATA[~s]]></request>~n", [Portal,Prep,Query]),
+ io:format(Fd,"<request><pgsql type='parse' name_portal='~s' name_prepared='~s'><![CDATA[~s]]></pgsql></request>~n", [Portal,Prep,Query]),
{ok,State};
-record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=bind,name_portal=undefined,name_prepared=undefined})->
- io:format(Fd,"<request><pgsql type='bind'/></request>~n", []),
- {ok,State};
-record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=bind,name_portal=undefined,name_prepared=Val})->
- io:format(Fd,"<request><pgsql type='bind' name_prepared='~s'/></request>~n", [Val]),
- {ok,State};
-record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep})->
- io:format(Fd,"<request><pgsql type='bind' name_portal='~s' name_prepared='~s'/></request>~n", [Portal,Prep]),
+record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=bind,name_portal = <<>>,name_prepared=Val, parameters=[],formats=ParamsFormat, formats_results=ResFormats})->
+ ResFormatsStr=ts_utils:join(",",ResFormats),
+ io:format(Fd,"<request><pgsql type='bind' name_prepared='~s' formats='~s' formats_results='~s' /></request>~n", [Val,ParamsFormat,ResFormatsStr]),
+ {ok,State};
+record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=bind,
+ name_portal=Portal,
+ name_prepared=Prep,
+ parameters=Params,
+ formats=ParamsFormat,
+ formats_results=ResFormats})->
+ ParamsStr=ts_utils:join(",",Params),
+ ResFormatsStr=ts_utils:join(",",ResFormats),
+ io:format(Fd,"<request><pgsql type='bind' name_portal='~s' name_prepared='~s' formats='~s' formats_results='~s' parameters='~s'/></request>~n",
+ [Portal,Prep,ParamsFormat,ResFormatsStr,ParamsStr]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=execute,name_portal=[],max_rows=unlimited})->
io:format(Fd,"<request><pgsql type='execute'/></request>~n", []),
View
@@ -267,7 +267,7 @@ repeat | if | change_type | foreach | set_option)*>
name_prepared CDATA #IMPLIED
query CDATA #IMPLIED
parameters CDATA #IMPLIED
- max_rows CDATA "unlimited"
+ max_rows CDATA "0"
formats CDATA #IMPLIED
formats_results CDATA #IMPLIED
type (connect | authenticate | sql | close | bind | parse | sync | execute | describe) #REQUIRED >

0 comments on commit 009d537

Please sign in to comment.