Permalink
Browse files

first step to handle pgsql extended protocol

  • Loading branch information...
1 parent 9c00c3c commit 64c5bde4b1433109bf9505791b60e9a4d40886e7 @nniclausse nniclausse committed Aug 4, 2011
Showing with 82 additions and 29 deletions.
  1. +6 −0 include/ts_pgsql.hrl
  2. +69 −28 src/tsung_recorder/ts_proxy_pgsql.erl
  3. +7 −1 tsung-1.0.dtd
View
@@ -35,6 +35,12 @@
salt,
auth_method,
database,
+ name_portal,
+ name_prepared,
+ equery,
+ parameters,
+ formats,
+ formats_results,
sql
}).
@@ -87,37 +87,54 @@ parse(State=#proxy{},_,ServerSocket,String) ->
NewString = lists:append(State#proxy.buffer, String),
Data = list_to_binary(NewString),
?LOGF("Received data from client: ~p~n",[Data],?DEB),
- Reply = case process_data(Data) of
- more ->
- ?LOG("need more~n",?DEB),
- {ok, State#proxy{buffer=NewString} };
- {ok, {sql, SQL}, _Tail } ->
- SQLStr= binary_to_list(SQL),
- ?LOGF("sql = ~s~n",[SQLStr],?DEB),
- ts_proxy_recorder:dorecord({#pgsql_request{type=sql, sql=SQLStr}}),
- {ok, State#proxy{buffer=[]}};
- {ok, terminate, _Tail } ->
- ts_proxy_recorder:dorecord({#pgsql_request{type=close}}),
- {ok, State#proxy{buffer=[]}};
- {ok, {password, Password}, _Tail } ->
- PwdStr= binary_to_list(Password),
- ?LOGF("password = ~s~n",[PwdStr],?DEB),
- ts_proxy_recorder:dorecord({#pgsql_request{type=authenticate, passwd=PwdStr}}),
- {ok, State#proxy{buffer=[]}}
- end,
+ NewState = process_data(State,Data),
ts_client_proxy:send(ServerSocket, String, ?MODULE),
- Reply.
+ {ok,NewState}.
-process_data(<<Code:8/integer, Size:4/integer-unit:8, Tail/binary>>) ->
+
+
+process_data(State,<< >>) ->
+ State;
+process_data(State,RawData = <<Code:8/integer, Size:4/integer-unit:8, Tail/binary>>) ->
?LOGF("PGSQL: received [~p] size=~p Pckt size= ~p ~n",[Code, Size, size(Tail)],?DEB),
RealSize = Size-4,
case RealSize =< size(Tail) of
true ->
<< Packet:RealSize/binary, Data/binary >> = Tail,
- {ok, Pair} = decode_packet(Code, Packet),
- ?LOGF("PGSQL: Pair=~p ~n",[Pair],?DEB),
- {ok, Pair, Data };
- false -> more
+ NewState=case decode_packet(Code, Packet) of
+ {sql, SQL} ->
+ SQLStr= binary_to_list(SQL),
+ ?LOGF("sql = ~s~n",[SQLStr],?DEB),
+ ts_proxy_recorder:dorecord({#pgsql_request{type=sql, sql=SQLStr}}),
+ State#proxy{buffer=[]};
+ terminate ->
+ ts_proxy_recorder:dorecord({#pgsql_request{type=close}}),
+ State#proxy{buffer=[]};
+ {password, Password} ->
+ PwdStr= binary_to_list(Password),
+ ?LOGF("password = ~s~n",[PwdStr],?DEB),
+ ts_proxy_recorder:dorecord({#pgsql_request{type=authenticate, passwd=PwdStr}}),
+ State#proxy{buffer=[]};
+ {parse,{StringName,StringQuery,NParams,Params} } ->
+ %%TODO: record
+ State;
+ {portal,{PortalName,StringQuery,NParams,Params} } ->
+ %%TODO: record
+ State;
+ sync ->
+ %%TODO: record
+ State;
+ {describe,{Type,Name} } ->
+ %%TODO: record
+ State;
+ {execute,{NamePortal,MaxParams} } ->
+ %%TODO: record
+ State
+ end,
+ process_data(NewState,Data);
+ false ->
+ ?LOG("need more~n",?DEB),
+ State#proxy{buffer=RawData}
end.
get_db_user(Arg) ->
@@ -134,13 +151,37 @@ get_db_user([_| Rest], Req) ->
decode_packet($Q, Data)->
Size= size(Data)-1,
<<SQL:Size/binary, 0:8 >> = Data,
- {ok, {sql, SQL}};
+ {sql, SQL};
decode_packet($p, Data) ->
Size= size(Data)-1,
<<Password:Size/binary, 0:8 >> = Data,
- {ok, {password, Password}};
+ {password, Password};
decode_packet($X, _) ->
- {ok, terminate}.
+ terminate;
+decode_packet($D, << Type:1/binary, Name/binary >>) -> %describe
+ ?LOGF("Extended protocol: describe ~s ~p~n",[Type,Name], ?DEB),
+ {describe,{Type,Name}};
+decode_packet($S, Data) -> %sync
+ sync;
+decode_packet($E, Data) -> %execute
+ {NamePortal,PortalSize} = pgsql_util:to_string(Data),
+ S1=PortalSize+1,
+ << _:S1/binary, MaxParams:32/integer >> = Data,
+ ?LOGF("Extended protocol: execute ~p ~p~n",[NamePortal,MaxParams], ?DEB),
+ {execute,{NamePortal,MaxParams}};
+decode_packet($B, Data) -> %bind
+ [NamePortal, StringQuery | Tail] = binary:split(Data,<<0>>,[global,trim]),
+ Size = size(NamePortal)+size(StringQuery)+2,
+ << _:Size/binary, NParams:16/integer,Params/binary >> = Data,
+ ?LOGF("Extended protocol: bind ~p ~p ~p~n",[NamePortal,StringQuery,NParams], ?DEB),
+ {portal,{NamePortal,StringQuery,NParams,Params}};
+decode_packet($P, Data) -> % parse
+ [StringName, StringQuery | Tail] = binary:split(Data,<<0>>,[global,trim]),
+ Size = size(StringName)+size(StringQuery)+2,
+ << _:Size/binary, NParams:16/integer,Params/binary >> = Data,
+ ?LOGF("Extended protocol: parse ~p ~p ~p~n",[StringName,StringQuery,NParams], ?DEB),
+ {parse,{StringName,StringQuery,NParams,Params}}.
+
%%--------------------------------------------------------------------
%% Func: record_request/2
@@ -153,7 +194,7 @@ record_request(State=#state_rec{logfd=Fd},
io:format(Fd,"</request>~n",[]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=sql, sql=SQL})->
- io:format(Fd,"<request><pgsql type='sql'><![CDATA[~s]]></pgsql>", [SQL]),
+ io:format(Fd,"<request> <pgsql type='sql'><![CDATA[~s]]></pgsql>", [SQL]),
io:format(Fd,"</request>~n",[]),
{ok,State};
record_request(State=#state_rec{logfd=Fd}, #pgsql_request{type=close})->
View
@@ -263,7 +263,13 @@ repeat | if | change_type | foreach | set_option)*>
password CDATA #IMPLIED
database CDATA #IMPLIED
username CDATA #IMPLIED
- type (connect | authenticate | sql | close) #REQUIRED >
+ name_portal CDATA #IMPLIED
+ name_prepared CDATA #IMPLIED
+ query CDATA #IMPLIED
+ parameters CDATA #IMPLIED
+ formats CDATA #IMPLIED
+ formats_results CDATA #IMPLIED
+ type (connect | authenticate | sql | close | bind | parse | sync | execute | describe) #REQUIRED >
<!ELEMENT mysql (#PCDATA) >
<!ATTLIST mysql

0 comments on commit 64c5bde

Please sign in to comment.