Skip to content

Commit

Permalink
pgsql: handle COPY msgs (extended protocol)
Browse files Browse the repository at this point in the history
  • Loading branch information
nniclausse committed Aug 16, 2011
1 parent dea1ecd commit c0e9c76
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 99 deletions.
12 changes: 12 additions & 0 deletions src/tsung/ts_pgsql.erl
Expand Up @@ -123,6 +123,15 @@ get_message(#pgsql_request{type=describe, name_portal=undefined,name_prepared=Na
%% sync
get_message(#pgsql_request{type=sync},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(sync,[]), S};
%% copyfail
get_message(#pgsql_request{type=copyfail,equery=Msg},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(copyfail,Msg), S};
%% copydone
get_message(#pgsql_request{type=copydone},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(copydone,<< >> ), S};
%% copy
get_message(#pgsql_request{type=copy,equery=Data},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(copy,Data), S};
%% flush
get_message(#pgsql_request{type=flush},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(flush,[]), S}.
Expand Down Expand Up @@ -176,6 +185,9 @@ parse(Data, State=#state_rcv{acc = [], dyndata=DynData}) ->
NewDynData=DynData#dyndata{proto=#pgsql_dyndata{auth_method=AuthType}},
{State#state_rcv{ack_done = true, dyndata=NewDynData},[],false};

{ok, {copy_response, {_Format,_ColsFormat}},_ } ->
?LOG("PGSQL: Copy response ~n",?DEB),
{State#state_rcv{ack_done = true},[],false};
{ok, _Pair, Tail } ->
parse(Tail, State);

Expand Down
25 changes: 23 additions & 2 deletions src/tsung/ts_utils.erl
Expand Up @@ -43,7 +43,7 @@
decode_base64/1, encode_base64/1, to_lower/1, release_is_newer_or_eq/1,
randomstr/1,urandomstr/1,urandomstr_noflat/1, eval/1, list_to_number/1,
time2sec/1, time2sec_hires/1, read_file_raw/1, init_seed/1, jsonpath/2, pmap/2,
concat_atoms/1, ceiling/1, accept_loop/3
concat_atoms/1, ceiling/1, accept_loop/3, append_to_filename/3, splitchar/2
]).

level2int("debug") -> ?DEB;
Expand Down Expand Up @@ -466,10 +466,23 @@ join(Sep, List) when is_list(List)->
end,
string:join(lists:map(ToStr,List), Sep).

%% split a string (at first occurence of char)
%% split a string given a string (at first occurence of char)
split(String,Chr) ->
re:split(String,Chr,[{return,list}]).

%% split a string given a char (faster)
splitchar(String,Chr) ->
splitchar2(String,Chr,[],[]).
splitchar2([],_,[],Acc) ->
lists:reverse(Acc);
splitchar2([],_,AccChr,Acc) ->
lists:reverse([lists:reverse(AccChr)|Acc]);
splitchar2([Chr|String],Chr,AccChr,Acc) ->
splitchar2(String,Chr,[],[lists:reverse(AccChr)|Acc]);
splitchar2([Other|String],Chr,AccChr,Acc) ->
splitchar2(String,Chr,[Other|AccChr],Acc).


%% split a string in 2 (at first occurence of char)
split2(String,Chr) ->
split2(String,Chr,nostrip).
Expand Down Expand Up @@ -798,3 +811,11 @@ accept_loop(PPid, Tag, ServerSock)->
_->
normal
end.


append_to_filename(Filename, From, To) ->
case re:replace(Filename,From,To, [{return,list},global] ) of
Filename -> Filename ++"." ++ To;
RealName -> RealName
end.

122 changes: 69 additions & 53 deletions src/tsung_controller/ts_config_pgsql.erl
Expand Up @@ -52,59 +52,75 @@ parse_config(Element = #xmlElement{name=pgsql},
sessions = [CurS | _], dynvar=DynVar,
subst = SubstFlag, match=MatchRegExp}) ->

{Ack,Request} = case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
sql ->
ValRaw = ts_config:getText(Element#xmlElement.content),
SQL = list_to_binary(ts_utils:clean_str(ValRaw)),
?LOGF("Got SQL query: ~p~n",[SQL], ?NOTICE),
{parse,#pgsql_request{sql=SQL, type= sql}};
close ->
{parse,#pgsql_request{type=close}};
sync ->
{parse,#pgsql_request{type=sync}};
flush ->
{parse,#pgsql_request{type=flush}};
execute ->
Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
Limit = ts_config:getAttr(integer,Element#xmlElement.attributes, max_rows,0),
{no_ack,#pgsql_request{type=execute,name_portal=Portal,max_rows=Limit}};
parse ->
Name = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
Query = list_to_binary(ts_config:getText(Element#xmlElement.content)),
Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
"" -> "";
P ->
lists:map(fun(S)-> list_to_integer(S) end, ts_utils:split(P,","))
end,
{no_ack,#pgsql_request{type=parse,name_prepared=Name,equery=Query,parameters=Params}};
bind ->
Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
Prep = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
Formats = case ts_config:getAttr(Element#xmlElement.attributes, formats_results) of
"" -> "";
FR ->
lists:map(fun(A)->list_to_atom(A) end,ts_utils:split(FR,","))
end,
Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
"" -> "";
P ->
ts_utils:split(P,",")
end,
ParamsFormat = ts_config:getAttr(atom,Element#xmlElement.attributes, formats, none),
{no_ack,#pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep,
formats=ParamsFormat,formats_results=Formats,parameters=Params}};
describe ->
Portal=ts_config:getAttr(string,Element#xmlElement.attributes, name_portal,undefined),
Prep = ts_config:getAttr(string,Element#xmlElement.attributes, name_prepared,undefined),
{no_ack,#pgsql_request{type=describe,name_portal=Portal,name_prepared=Prep}};
authenticate ->
Passwd = ts_config:getAttr(Element#xmlElement.attributes, password),
{parse,#pgsql_request{passwd=Passwd, type= authenticate}};
connect ->
Database = ts_config:getAttr(Element#xmlElement.attributes, database),
User = ts_config:getAttr(Element#xmlElement.attributes, username),
{parse,#pgsql_request{username=User, database=Database, type=connect}}
end,
{Ack,Request} =
case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
sql ->
ValRaw = ts_config:getText(Element#xmlElement.content),
SQL = list_to_binary(ts_utils:clean_str(ValRaw)),
?LOGF("Got SQL query: ~p~n",[SQL], ?NOTICE),
{parse,#pgsql_request{sql=SQL, type= sql}};
close ->
{parse,#pgsql_request{type=close}};
sync ->
{parse,#pgsql_request{type=sync}};
flush ->
{parse,#pgsql_request{type=flush}};
copydone ->
{parse,#pgsql_request{type=copydone}};
execute ->
Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
Limit = ts_config:getAttr(integer,Element#xmlElement.attributes, max_rows,0),
{no_ack,#pgsql_request{type=execute,name_portal=Portal,max_rows=Limit}};
parse ->
Name = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
Query = list_to_binary(ts_config:getText(Element#xmlElement.content)),
Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
"" -> "";
P ->
lists:map(fun(S)-> list_to_integer(S) end, ts_utils:splitchar(P,$,))
end,
{no_ack,#pgsql_request{type=parse,name_prepared=Name,equery=Query,parameters=Params}};
bind ->
Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
Prep = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
Formats = case ts_config:getAttr(Element#xmlElement.attributes, formats_results) of
"" -> "";
FR ->
lists:map(fun(A)->list_to_atom(A) end,ts_utils:splitchar(FR,$,))
end,
Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
"" -> "";
P ->
ts_utils:split(P,",")
end,
ParamsFormat = ts_config:getAttr(atom,Element#xmlElement.attributes, formats, none),
{no_ack,#pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep,
formats=ParamsFormat,formats_results=Formats,parameters=Params}};
copy ->
Contents = case ts_config:getAttr(string, Element#xmlElement.attributes, contents_from_file) of
[] ->
P=ts_config:getText(Element#xmlElement.content),
list_to_binary(lists:map(fun(S)-> list_to_integer(S) end, ts_utils:splitchar(P,$,)));
FileName ->
{ok, FileContent} = file:read_file(FileName),
FileContent
end,
{no_ack,#pgsql_request{type=copy,equery=Contents}};
copyfail ->
Str = ts_config:getAttr(string,Element#xmlElement.attributes, equery,undefined),
{parse,#pgsql_request{type=copyfail,equery=list_to_binary(Str)}};
describe ->
Portal=ts_config:getAttr(string,Element#xmlElement.attributes, name_portal,undefined),
Prep = ts_config:getAttr(string,Element#xmlElement.attributes, name_prepared,undefined),
{no_ack,#pgsql_request{type=describe,name_portal=Portal,name_prepared=Prep}};
authenticate ->
Passwd = ts_config:getAttr(Element#xmlElement.attributes, password),
{parse,#pgsql_request{passwd=Passwd, type= authenticate}};
connect ->
Database = ts_config:getAttr(Element#xmlElement.attributes, database),
User = ts_config:getAttr(Element#xmlElement.attributes, username),
{parse,#pgsql_request{username=User, database=Database, type=connect}}
end,
Msg= #ts_request{ack = Ack,
endpage = true,
dynvar_specs = DynVar,
Expand Down
6 changes: 4 additions & 2 deletions src/tsung_controller/ts_config_server.erl
Expand Up @@ -47,7 +47,7 @@

%%--------------------------------------------------------------------
%% External exports
-export([start_link/1, read_config/1, get_req/2, get_next_session/1,
-export([start_link/1, read_config/1, read_config/2, get_req/2, get_next_session/1,
get_client_config/1, newbeams/1, newbeam/2,
get_monitor_hosts/0, encode_filename/1, decode_filename/1,
endlaunching/1, status/0, start_file_server/1, get_user_agents/0,
Expand Down Expand Up @@ -124,7 +124,9 @@ get_user_agents()->
%% Returns: ok | {error, Reason}
%%--------------------------------------------------------------------
read_config(ConfigFile)->
gen_server:call({global,?MODULE},{read_config, ConfigFile},?config_timeout).
read_config(ConfigFile,?config_timeout).
read_config(ConfigFile,Timeout)->
gen_server:call({global,?MODULE},{read_config, ConfigFile},Timeout).

%%--------------------------------------------------------------------
%% Function: get_client_config/1
Expand Down
14 changes: 13 additions & 1 deletion src/tsung_controller/tsung_controller.erl
Expand Up @@ -30,6 +30,7 @@
-behaviour(application).

-include("ts_profile.hrl").
-include_lib("kernel/include/file.hrl").

%%----------------------------------------------------------------------
%% Func: start/2
Expand Down Expand Up @@ -59,7 +60,18 @@ start(_Type, _StartArgs) ->
end.

start_phase(load_config, _StartType, _PhaseArgs) ->
case ts_config_server:read_config(?config(config_file)) of
Conf = ?config(config_file),
Timeout = case file:read_file_info(Conf) of
{ok, #file_info{size=Size}} when Size > 10000000 -> % > 10MB
erlang:display(["Can take up to 5mn to read config ",Size]),
300000; % 10mn
{ok, #file_info{size=Size}} when Size > 1000000 -> % > 1MB
erlang:display(["Can take up to 3mn to read config ",Size]),
180000; % 5mn
{ok, #file_info{size=Size}} ->
120000 % 2mn
end,
case ts_config_server:read_config(Conf,Timeout) of
{error,Reason}->
erlang:display(["Config Error, aborting ! ", Reason]),
init:stop();
Expand Down
9 changes: 1 addition & 8 deletions src/tsung_recorder/ts_proxy_http.erl
Expand Up @@ -289,7 +289,7 @@ record_request(State=#state_rec{prev_host=Host, prev_port=Port, prev_scheme=Sche
Id=State#state_rec.ext_file_id,
case ts_utils:key1search(ParsedHeader,"content-type") of
"multipart/form-data" ++ _Tail->
FileName=append_to_filename(State#state_rec.log_file,".xml","-"++integer_to_list(Id)++".bin"),
FileName=ts_utils:append_to_filename(State#state_rec.log_file,".xml","-"++integer_to_list(Id)++".bin"),
?LOGF("multipart/form-data, write body data in external binary file ~s~n",[FileName],?NOTICE),
ok = file:write_file(FileName,list_to_binary(Body)),
io:format(Fd," contents_from_file='~s' ", [FileName]),
Expand Down Expand Up @@ -349,10 +349,3 @@ record_header(Fd, Headers,HeaderName, Msg, Fun)->
Value -> io:format(Fd,Msg,[Fun(Value)])
end.

append_to_filename(Filename, From, To) ->
case re:replace(Filename,From,To, [{return,list},global] ) of
Filename -> Filename ++"." ++ To;
RealName -> RealName
end.


0 comments on commit c0e9c76

Please sign in to comment.