Permalink
Browse files

common tests work, code api stabilized

  • Loading branch information...
1 parent a69a64b commit df25362f9cde560de680c76780d540afd43109f3 Bart van Deenen committed Apr 19, 2012
Showing with 140 additions and 61 deletions.
  1. +1 −1 include/emysql.hrl
  2. +11 −6 src/emysql_conn.erl
  3. +104 −28 src/emysql_transactions.erl
  4. +24 −26 test/transaction_SUITE.erl
View
@@ -28,7 +28,7 @@
% -define(transaction_debug,1).
-ifdef(transaction_debug).
--define(tdb(Format, Args), io:format(Format ++ "~n", Args)).
+-define(tdb(Format, Args), ct:print(Format ++ "~n", Args)).
-else.
-define(tdb(Format, Args), ok).
-endif.
View
@@ -50,27 +50,32 @@ execute_transaction( Connection, Query, Pid, Args ) when is_list(Query) ->
Pid, Args);
execute_transaction( Connection, Query, Pid, Args ) ->
- %-% io:format("~p:execute_transaction Query=~p~n", [?MODULE, Query]),
+ %-% ct:print("~p:execute_transaction conn = ~p Query=~p~n", [?MODULE, Connection, Query]),
StartTransaction = <<"START TRANSACTION; ">>,
Packet = <<?COM_QUERY, StartTransaction/binary, Query/binary>>,
Results = emysql_tcp:send_and_recv_packet( Connection#emysql_connection.socket,
Packet, 0) ,
- %-% io:format("~p:execute_transaction Results=~p~n", [?MODULE, Results]),
+ %-% ct:print("~p:execute_transaction conn = ~p Results=~p~n", [?MODULE, Connection, Results]),
Pid ! {self(), {Results, Args} } ,
Sql = receive
{Pid, commit} ->
<<"COMMIT">>;
- _ ->
+ {Pid, rollback} ->
<<"ROLLBACK">>
after
- 1500 ->
- io:format("~p:execute_transaction ~p timeout -> rollback ~n", [?MODULE, self()]),
+ 5000 ->
+ ct:print("~p:execute_transaction ~p timeout -> rollback ~n", [?MODULE, self()]),
<<"ROLLBACK">>
end,
emysql_tcp:send_and_recv_packet( Connection#emysql_connection.socket,
- <<?COM_QUERY, Sql/binary>>, 0) .
+ <<?COM_QUERY, Sql/binary>>, 0) ,
+
+ % we do this to make the transaction blocking, so that a consecutive query
+ % does not get the uncommitted results from the database (I ran into this
+ % during testing)
+ Pid ! {self(), done} .
View
@@ -1,54 +1,130 @@
-module(emysql_transactions).
--compile(export_all).
-
-include_lib("emysql/include/emysql.hrl").
+
+-export( [transaction/3, transaction/4,
+ parallel_transaction_handler/3, parallel_transaction_handler/4] ).
+
+
+transaction( Type, Sequential, Queries) ->
+ transaction( Type, Sequential, Queries, emysql:default_timeout() ).
+
+%% @doc do a list of queries in parallel, and only commit them when ALL of
+%% them have no mysql errors.
+%%
+%% TODO: implement sequential
+%%
+-spec transaction( blocking|nonblocking, parallel|sequential,
+ [ { #pool{}, string() | binary(), term() } ], pos_integer() ) ->
+ ok |
+ {atomic, [ [#ok_packet{} | #result_packet{} ] ]} |
+ {error, [ [#error_packet{} | #ok_packet{} | #result_packet{} ] ] } |
+ {error, timeout}.
+
+
+transaction( blocking, parallel, Queries, Timeout) ->
+
+ Pid = spawn_link( ?MODULE, parallel_transaction_handler, [self(), Timeout, Queries] ),
+
+ receive
+ {Pid, error, Results} ->
+ {error, Results};
+ {Pid, _Responses } ->
+ {atomic, _Responses}
+ after
+ Timeout ->
+ {error, timeout}
+ end;
+
+transaction( nonblocking, parallel, Queries, Timeout) ->
+ spawn( ?MODULE, parallel_transaction_handler, [undefined, Timeout,
+ Queries] ),
+ ok.
+
+
+
%%
%% @doc entry point for transaction handler
%%
-transaction( List ) ->
- ?tdb("~p:transaction( ~n~p )", [?MODULE, List]),
- transaction(List, 0).
+parallel_transaction_handler( OwnerPid, Timeout, List ) ->
+ parallel_transaction_handler(OwnerPid, Timeout, List, 0).
+
%%
%% @doc emysql:execute_transaction process spawner
%%
-transaction( [], Ct) ->
- transaction(Ct, 0, 0, []);
+parallel_transaction_handler(OwnerPid, Timeout, [], Ct) ->
+ parallel_transaction_handler(OwnerPid, Ct, 0, 0, [], [], Timeout);
%
% Args is just a term() that we pass around to the pool process.
%
-transaction( [ Parms | Rest ], Ct ) ->
- spawn_link( emysql, execute_transaction, [self()|Parms] ),
- transaction( Rest, Ct+1).
+parallel_transaction_handler( OwnerPid, Timeout, [ Parms | Rest ], Ct ) ->
+ _Pid = spawn_link( emysql, execute_transaction, [self()|Parms] ),
+ ?tdb("spawned ~p ~p", [_Pid, [self()|Parms]]),
+ parallel_transaction_handler( OwnerPid, Timeout, Rest, Ct+1).
+
+
+
+
+finish_workers( Pids, Message)
+ when Message =:= commit orelse Message =:= rollback ->
+ lists:map( fun(P) ->
+ P ! {self(), Message },
+ receive
+ {P, done} -> ok
+ after
+ 100 ->
+ ct:print("too late", [])
+ end
+ end, Pids).
+
%%
%% @doc transaction response handler
%%
% everyting ok
-transaction( 0, _, 0, Pids) ->
- lists:map( fun(P) -> P ! {self(), commit } end, Pids);
+parallel_transaction_handler( OwnerPid, 0, _, 0, Pids, Responses, _Timeout) ->
+ finish_workers(Pids, commit),
+ case OwnerPid of
+ undefined ->
+ ok;
+ OwnerPid ->
+ OwnerPid ! {self(), Responses}
+ end;
+
% at least one bad
-transaction( 0, _, Bads, Pids) ->
- ?tdb("~p actions went bad, we're doing a rollback ~n", [Bads]),
- lists:map( fun(P) -> P ! {self(), rollback } end, Pids);
+parallel_transaction_handler( OwnerPid, 0, _, _Bads, Pids, Responses, _Timeout) ->
+ ?tdb("~p actions went bad, we're doing a rollback ~n", [_Bads]),
+ finish_workers(Pids, rollback),
+ case OwnerPid of
+ undefined ->
+ ok;
+ OwnerPid ->
+ OwnerPid ! {self(), error, Responses}
+ end;
% still handling responses from transaction pool processes.
-transaction( Remaining, Goods, Bads, Pids) ->
+parallel_transaction_handler( OwnerPid, Remaining, Goods, Bads, Pids,
+Responses, Timeout) ->
+ ?tdb("parallel_transaction_handler(~p,~p,~p,~p,~p,~p)", [
+ OwnerPid, Remaining, Goods, Bads, Pids, Responses
+ ]),
receive
- {Pid, {Results, Args}} ->
+ {Pid, {Results, Args}= Re} ->
case is_not_ok(Results, Args) of
- true -> transaction(Remaining-1, Goods, Bads+1, [Pid|Pids]);
- false -> transaction(Remaining-1, Goods+1, Bads, [Pid|Pids])
+ true -> parallel_transaction_handler(OwnerPid, Remaining-1, Goods, Bads+1,
+ [Pid|Pids], [Re|Responses] , Timeout);
+ false -> parallel_transaction_handler(OwnerPid, Remaining-1, Goods+1,
+ Bads, [Pid|Pids], [Re|Responses], Timeout )
end;
- Other ->
- ?tdb("transaction received unexpected: ~p", [Other])
- after 1000 ->
- ?tdb("Aborting all transaction actions due to a timeout", []),
+ _Other ->
+ ?tdb("parallel_transaction_handler received unexpected: ~p", [_Other])
+ after Timeout ->
+ ct:print("Aborting all transaction actions due to a timeout", []),
lists:map( fun(P) -> P ! {self(), rollback } end, Pids)
end.
@@ -58,14 +134,14 @@ transaction( Remaining, Goods, Bads, Pids) ->
%%
-spec is_not_ok( [tuple()], [term()] ) -> boolean().
-is_not_ok( Results , Args) ->
+is_not_ok( Results , _Args) ->
lists:any( fun(R) ->
case R of
- #eof_packet{status=M} ->
- ?tdb("Args: ~p action eof ~p", [Args, M]),
+ #eof_packet{status=_M} ->
+ ?tdb("Args: ~p action eof ~p", [_Args, _M]),
true;
- #error_packet{msg=M} ->
- ?tdb("Args: ~p action error ~p", [Args, M]),
+ #error_packet{msg=_M} ->
+ ?tdb("Args: ~p action error ~p", [_Args, _M]),
true;
_ -> false
end
View
@@ -34,11 +34,11 @@ init_per_suite(Config) ->
crypto:start(),
application:start(emysql),
- emysql:add_pool(uniq_db, 1,
+ emysql:add_pool(uniq_db, 5,
"hello_username", "hello_password", "localhost", 3306,
"hello_database", utf8),
- emysql:add_pool(nonuniq_db, 1,
+ emysql:add_pool(nonuniq_db, 5,
"hello_username", "hello_password", "localhost", 3306,
"hello_database", utf8),
@@ -49,34 +49,36 @@ init_per_suite(Config) ->
end_per_suite(_) ->
ok.
-
-%% Test Case 1
-%%--------------------------------------------------------------------
-tx_1(_) ->
-
+init_per_testcase(_Case,_Config) ->
emysql:execute(uniq_db, <<"truncate uniq">>),
emysql:execute(nonuniq_db, <<"truncate nonuniq">>),
emysql:execute(uniq_db, <<"insert into uniq values (1,1), (2,2)">>),
emysql:execute(nonuniq_db, <<"insert into nonuniq values (1,1), (2,2)">>),
+ _Config.
+
+end_per_testcase(_Case, _Config) ->
+ ok.
+
+%% Test Case 1
+%%--------------------------------------------------------------------
+tx_1(_) ->
+
check([[1,1],[2,2]],[[1,1],[2,2]]).
%% Test Case 2
%%--------------------------------------------------------------------
tx_2(_) ->
- tx_1(undefined),
- % create a transient transaction process that spawns two processes that
- % perform jobs in a transaction.
- %
- spawn_link( emysql_transactions, transaction, [[
+ %% This shall cause an error because the first insert violates unique key
+ %% constraints
+ {error, _Results} = emysql_transactions:transaction( blocking, parallel, [
[ uniq_db, "insert into uniq values (1,1), (2,2)", 1 ],
[ nonuniq_db, "insert into nonuniq values (1,1), (2,2)", 2 ]
- ]]),
+ ]),
- timer:sleep(100),
check([[1,1],[2,2]],
[[1,1],[2,2] ]).
@@ -88,26 +90,22 @@ tx_2(_) ->
%%--------------------------------------------------------------------
tx_3(_) ->
- tx_1(undefined),
-
- spawn_link( emysql_transactions, transaction, [[
+ {atomic, _Results} = emysql_transactions:transaction(blocking, parallel, [
[ uniq_db, <<"insert into uniq values (3,3), (4,4)">>, 1 ],
[ nonuniq_db, <<"insert into nonuniq values (3,3), (4,4)">>, 2 ]
- ]]),
+ ]),
- timer:sleep(100),
- check([[1,1],[2,2],[3,3],[4,4]],
- [[1,1],[2,2],[3,3],[4,4]]).
+ check( [ [ 1 , 1] , [ 2 , 2] , [ 3 , 3] , [ 4 , 4]] ,
+ [ [ 1 , 1] , [ 2 , 2] , [ 3 , 3] , [ 4 , 4]]).
tx_4(_) ->
tx_2(undefined),
% this should rollback, even though the first statement is ok.
- spawn_link( emysql_transactions, transaction, [[
- [ uniq_db, <<"update uniq set `value` = 8 where `key` = 1">>, 1] ,
- [ uniq_db, <<"insert into uniq values (3,3), (4,4)">>, 2 ]
- ]]),
- timer:sleep(100),
+ {error, _Results} = emysql_transactions:transaction(blocking, parallel, [
+ [ uniq_db, <<"update uniq set `value` = 8 where `key` = 1;insert into uniq values (1,1)">>, 2 ]
+ ]),
+
check([[1,1],[2,2]],
[[1,1],[2,2] ]).

0 comments on commit df25362

Please sign in to comment.