Permalink
Browse files

pgsql: handle COPY msgs (extended protocol)

  • Loading branch information...
1 parent dea1ecd commit c0e9c7653cdd805c4ec53cd52454418f2f7b0e74 @nniclausse nniclausse committed Aug 16, 2011
View
@@ -123,6 +123,15 @@ get_message(#pgsql_request{type=describe, name_portal=undefined,name_prepared=Na
%% sync
get_message(#pgsql_request{type=sync},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(sync,[]), S};
+%% copyfail
+get_message(#pgsql_request{type=copyfail,equery=Msg},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(copyfail,Msg), S};
+%% copydone
+get_message(#pgsql_request{type=copydone},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(copydone,<< >> ), S};
+%% copy
+get_message(#pgsql_request{type=copy,equery=Data},#state_rcv{session=S}) ->
+ {pgsql_proto:encode_message(copy,Data), S};
%% flush
get_message(#pgsql_request{type=flush},#state_rcv{session=S}) ->
{pgsql_proto:encode_message(flush,[]), S}.
@@ -176,6 +185,9 @@ parse(Data, State=#state_rcv{acc = [], dyndata=DynData}) ->
NewDynData=DynData#dyndata{proto=#pgsql_dyndata{auth_method=AuthType}},
{State#state_rcv{ack_done = true, dyndata=NewDynData},[],false};
+ {ok, {copy_response, {_Format,_ColsFormat}},_ } ->
+ ?LOG("PGSQL: Copy response ~n",?DEB),
+ {State#state_rcv{ack_done = true},[],false};
{ok, _Pair, Tail } ->
parse(Tail, State);
View
@@ -43,7 +43,7 @@
decode_base64/1, encode_base64/1, to_lower/1, release_is_newer_or_eq/1,
randomstr/1,urandomstr/1,urandomstr_noflat/1, eval/1, list_to_number/1,
time2sec/1, time2sec_hires/1, read_file_raw/1, init_seed/1, jsonpath/2, pmap/2,
- concat_atoms/1, ceiling/1, accept_loop/3
+ concat_atoms/1, ceiling/1, accept_loop/3, append_to_filename/3, splitchar/2
]).
level2int("debug") -> ?DEB;
@@ -466,10 +466,23 @@ join(Sep, List) when is_list(List)->
end,
string:join(lists:map(ToStr,List), Sep).
-%% split a string (at first occurence of char)
+%% split a string given a string (at first occurence of char)
split(String,Chr) ->
re:split(String,Chr,[{return,list}]).
+%% split a string given a char (faster)
+splitchar(String,Chr) ->
+ splitchar2(String,Chr,[],[]).
+splitchar2([],_,[],Acc) ->
+ lists:reverse(Acc);
+splitchar2([],_,AccChr,Acc) ->
+ lists:reverse([lists:reverse(AccChr)|Acc]);
+splitchar2([Chr|String],Chr,AccChr,Acc) ->
+ splitchar2(String,Chr,[],[lists:reverse(AccChr)|Acc]);
+splitchar2([Other|String],Chr,AccChr,Acc) ->
+ splitchar2(String,Chr,[Other|AccChr],Acc).
+
+
%% split a string in 2 (at first occurence of char)
split2(String,Chr) ->
split2(String,Chr,nostrip).
@@ -798,3 +811,11 @@ accept_loop(PPid, Tag, ServerSock)->
_->
normal
end.
+
+
+append_to_filename(Filename, From, To) ->
+ case re:replace(Filename,From,To, [{return,list},global] ) of
+ Filename -> Filename ++"." ++ To;
+ RealName -> RealName
+ end.
+
@@ -52,59 +52,75 @@ parse_config(Element = #xmlElement{name=pgsql},
sessions = [CurS | _], dynvar=DynVar,
subst = SubstFlag, match=MatchRegExp}) ->
- {Ack,Request} = case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
- sql ->
- ValRaw = ts_config:getText(Element#xmlElement.content),
- SQL = list_to_binary(ts_utils:clean_str(ValRaw)),
- ?LOGF("Got SQL query: ~p~n",[SQL], ?NOTICE),
- {parse,#pgsql_request{sql=SQL, type= sql}};
- close ->
- {parse,#pgsql_request{type=close}};
- sync ->
- {parse,#pgsql_request{type=sync}};
- flush ->
- {parse,#pgsql_request{type=flush}};
- execute ->
- Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
- Limit = ts_config:getAttr(integer,Element#xmlElement.attributes, max_rows,0),
- {no_ack,#pgsql_request{type=execute,name_portal=Portal,max_rows=Limit}};
- parse ->
- Name = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
- Query = list_to_binary(ts_config:getText(Element#xmlElement.content)),
- Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
- "" -> "";
- P ->
- lists:map(fun(S)-> list_to_integer(S) end, ts_utils:split(P,","))
- end,
- {no_ack,#pgsql_request{type=parse,name_prepared=Name,equery=Query,parameters=Params}};
- bind ->
- Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
- Prep = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
- Formats = case ts_config:getAttr(Element#xmlElement.attributes, formats_results) of
- "" -> "";
- FR ->
- lists:map(fun(A)->list_to_atom(A) end,ts_utils:split(FR,","))
- end,
- Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
- "" -> "";
- P ->
- ts_utils:split(P,",")
- end,
- ParamsFormat = ts_config:getAttr(atom,Element#xmlElement.attributes, formats, none),
- {no_ack,#pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep,
- formats=ParamsFormat,formats_results=Formats,parameters=Params}};
- describe ->
- Portal=ts_config:getAttr(string,Element#xmlElement.attributes, name_portal,undefined),
- Prep = ts_config:getAttr(string,Element#xmlElement.attributes, name_prepared,undefined),
- {no_ack,#pgsql_request{type=describe,name_portal=Portal,name_prepared=Prep}};
- authenticate ->
- Passwd = ts_config:getAttr(Element#xmlElement.attributes, password),
- {parse,#pgsql_request{passwd=Passwd, type= authenticate}};
- connect ->
- Database = ts_config:getAttr(Element#xmlElement.attributes, database),
- User = ts_config:getAttr(Element#xmlElement.attributes, username),
- {parse,#pgsql_request{username=User, database=Database, type=connect}}
- end,
+ {Ack,Request} =
+ case ts_config:getAttr(atom, Element#xmlElement.attributes, type) of
+ sql ->
+ ValRaw = ts_config:getText(Element#xmlElement.content),
+ SQL = list_to_binary(ts_utils:clean_str(ValRaw)),
+ ?LOGF("Got SQL query: ~p~n",[SQL], ?NOTICE),
+ {parse,#pgsql_request{sql=SQL, type= sql}};
+ close ->
+ {parse,#pgsql_request{type=close}};
+ sync ->
+ {parse,#pgsql_request{type=sync}};
+ flush ->
+ {parse,#pgsql_request{type=flush}};
+ copydone ->
+ {parse,#pgsql_request{type=copydone}};
+ execute ->
+ Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
+ Limit = ts_config:getAttr(integer,Element#xmlElement.attributes, max_rows,0),
+ {no_ack,#pgsql_request{type=execute,name_portal=Portal,max_rows=Limit}};
+ parse ->
+ Name = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
+ Query = list_to_binary(ts_config:getText(Element#xmlElement.content)),
+ Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
+ "" -> "";
+ P ->
+ lists:map(fun(S)-> list_to_integer(S) end, ts_utils:splitchar(P,$,))
+ end,
+ {no_ack,#pgsql_request{type=parse,name_prepared=Name,equery=Query,parameters=Params}};
+ bind ->
+ Portal = ts_config:getAttr(Element#xmlElement.attributes, name_portal),
+ Prep = ts_config:getAttr(Element#xmlElement.attributes, name_prepared),
+ Formats = case ts_config:getAttr(Element#xmlElement.attributes, formats_results) of
+ "" -> "";
+ FR ->
+ lists:map(fun(A)->list_to_atom(A) end,ts_utils:splitchar(FR,$,))
+ end,
+ Params=case ts_config:getAttr(Element#xmlElement.attributes, parameters) of
+ "" -> "";
+ P ->
+ ts_utils:split(P,",")
+ end,
+ ParamsFormat = ts_config:getAttr(atom,Element#xmlElement.attributes, formats, none),
+ {no_ack,#pgsql_request{type=bind,name_portal=Portal,name_prepared=Prep,
+ formats=ParamsFormat,formats_results=Formats,parameters=Params}};
+ copy ->
+ Contents = case ts_config:getAttr(string, Element#xmlElement.attributes, contents_from_file) of
+ [] ->
+ P=ts_config:getText(Element#xmlElement.content),
+ list_to_binary(lists:map(fun(S)-> list_to_integer(S) end, ts_utils:splitchar(P,$,)));
+ FileName ->
+ {ok, FileContent} = file:read_file(FileName),
+ FileContent
+ end,
+ {no_ack,#pgsql_request{type=copy,equery=Contents}};
+ copyfail ->
+ Str = ts_config:getAttr(string,Element#xmlElement.attributes, equery,undefined),
+ {parse,#pgsql_request{type=copyfail,equery=list_to_binary(Str)}};
+ describe ->
+ Portal=ts_config:getAttr(string,Element#xmlElement.attributes, name_portal,undefined),
+ Prep = ts_config:getAttr(string,Element#xmlElement.attributes, name_prepared,undefined),
+ {no_ack,#pgsql_request{type=describe,name_portal=Portal,name_prepared=Prep}};
+ authenticate ->
+ Passwd = ts_config:getAttr(Element#xmlElement.attributes, password),
+ {parse,#pgsql_request{passwd=Passwd, type= authenticate}};
+ connect ->
+ Database = ts_config:getAttr(Element#xmlElement.attributes, database),
+ User = ts_config:getAttr(Element#xmlElement.attributes, username),
+ {parse,#pgsql_request{username=User, database=Database, type=connect}}
+ end,
Msg= #ts_request{ack = Ack,
endpage = true,
dynvar_specs = DynVar,
@@ -47,7 +47,7 @@
%%--------------------------------------------------------------------
%% External exports
--export([start_link/1, read_config/1, get_req/2, get_next_session/1,
+-export([start_link/1, read_config/1, read_config/2, get_req/2, get_next_session/1,
get_client_config/1, newbeams/1, newbeam/2,
get_monitor_hosts/0, encode_filename/1, decode_filename/1,
endlaunching/1, status/0, start_file_server/1, get_user_agents/0,
@@ -124,7 +124,9 @@ get_user_agents()->
%% Returns: ok | {error, Reason}
%%--------------------------------------------------------------------
read_config(ConfigFile)->
- gen_server:call({global,?MODULE},{read_config, ConfigFile},?config_timeout).
+ read_config(ConfigFile,?config_timeout).
+read_config(ConfigFile,Timeout)->
+ gen_server:call({global,?MODULE},{read_config, ConfigFile},Timeout).
%%--------------------------------------------------------------------
%% Function: get_client_config/1
@@ -30,6 +30,7 @@
-behaviour(application).
-include("ts_profile.hrl").
+-include_lib("kernel/include/file.hrl").
%%----------------------------------------------------------------------
%% Func: start/2
@@ -59,7 +60,18 @@ start(_Type, _StartArgs) ->
end.
start_phase(load_config, _StartType, _PhaseArgs) ->
- case ts_config_server:read_config(?config(config_file)) of
+ Conf = ?config(config_file),
+ Timeout = case file:read_file_info(Conf) of
+ {ok, #file_info{size=Size}} when Size > 10000000 -> % > 10MB
+ erlang:display(["Can take up to 5mn to read config ",Size]),
+ 300000; % 10mn
+ {ok, #file_info{size=Size}} when Size > 1000000 -> % > 1MB
+ erlang:display(["Can take up to 3mn to read config ",Size]),
+ 180000; % 5mn
+ {ok, #file_info{size=Size}} ->
+ 120000 % 2mn
+ end,
+ case ts_config_server:read_config(Conf,Timeout) of
{error,Reason}->
erlang:display(["Config Error, aborting ! ", Reason]),
init:stop();
@@ -289,7 +289,7 @@ record_request(State=#state_rec{prev_host=Host, prev_port=Port, prev_scheme=Sche
Id=State#state_rec.ext_file_id,
case ts_utils:key1search(ParsedHeader,"content-type") of
"multipart/form-data" ++ _Tail->
- FileName=append_to_filename(State#state_rec.log_file,".xml","-"++integer_to_list(Id)++".bin"),
+ FileName=ts_utils:append_to_filename(State#state_rec.log_file,".xml","-"++integer_to_list(Id)++".bin"),
?LOGF("multipart/form-data, write body data in external binary file ~s~n",[FileName],?NOTICE),
ok = file:write_file(FileName,list_to_binary(Body)),
io:format(Fd," contents_from_file='~s' ", [FileName]),
@@ -349,10 +349,3 @@ record_header(Fd, Headers,HeaderName, Msg, Fun)->
Value -> io:format(Fd,Msg,[Fun(Value)])
end.
-append_to_filename(Filename, From, To) ->
- case re:replace(Filename,From,To, [{return,list},global] ) of
- Filename -> Filename ++"." ++ To;
- RealName -> RealName
- end.
-
-
Oops, something went wrong.

0 comments on commit c0e9c76

Please sign in to comment.