Permalink
Browse files

add transaction for emysql

  • Loading branch information...
slepher committed Feb 3, 2012
1 parent a2ec8da commit 50548682bb3eaa513a82883b40e326269476a6a4
Showing with 54 additions and 4 deletions.
  1. +12 −0 src/emysql.erl
  2. +42 −4 src/emysql_conn.erl
@@ -99,6 +99,7 @@
add_pool/8, remove_pool/1, increment_pool_size/2, decrement_pool_size/2,
prepare/2,
execute/2, execute/3, execute/4, execute/5,
transaction/2, transaction/3,
default_timeout/0,
modules/0
]).
@@ -527,6 +528,17 @@ execute(PoolId, StmtName, Args, Timeout, nonblocking) when is_atom(StmtName), is
Other
end.
transaction(PoolId, Fun) ->
transaction(PoolId, Fun, default_timeout()).
transaction(PoolId, Fun, Timeout) ->
case emysql_conn_mgr:lock_connection(PoolId) of
Connection when is_record(Connection, emysql_connection) ->
monitor_work(Connection, Timeout, {emysql_conn, transaction, [Connection, Fun]});
Other ->
Other
end.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
@@ -27,10 +27,11 @@
-module(emysql_conn).
-export([set_database/2, set_encoding/2,
execute/3, prepare/3, unprepare/2,
open_connections/1, open_connection/1,
reset_connection/3, close_connection/1,
open_n_connections/2, hstate/1
execute/3, prepare/3, unprepare/2,
transaction/2,
open_connections/1, open_connection/1,
reset_connection/3, close_connection/1,
open_n_connections/2, hstate/1
]).
-include("emysql.hrl").
@@ -101,6 +102,43 @@ unprepare(Connection, Name) ->
Packet = <<?COM_QUERY, "DEALLOCATE PREPARE ", (list_to_binary(Name))/binary>>, % todo: utf8?
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).
transaction(Connection, Fun) ->
case begin_transaction(Connection) of
#ok_packet{} ->
try Fun() of
Val ->
case commit_transaction(Connection) of
#ok_packet{} ->
{atomic, Val};
#error_packet{} = ErrorPacket ->
{aborted, {commit_error, ErrorPacket}}
end
catch
_:Exception ->
rollback_transaction(Connection),
case Exception of
{aborted, Reason} ->
{aborted, Reason};
_ ->
exit(Exception)
end
end;
#error_packet{} = ErrorPacket ->
{aborted, {begin_error, ErrorPacket}}
end.
begin_transaction(Connection) ->
Packet = <<?COM_QUERY, "BEGIN">>,
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).
rollback_transaction(Connection) ->
Packet = <<?COM_QUERY, "ROLLBACK">>,
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).
commit_transaction(Connection) ->
Packet = <<?COM_QUERY, "COMMIT">>,
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).
open_n_connections(PoolId, N) ->
%-% io:format("open ~p connections for pool ~p~n", [N, PoolId]),
case emysql_conn_mgr:find_pool(PoolId, emysql_conn_mgr:pools()) of

8 comments on commit 5054868

@Eonblast

This comment has been minimized.

Eonblast replied Feb 11, 2012

Interesting stuff slepher, how is it coming along?

Please take a look at this while you're at it. It's a Mnesia-style approach to transactions that Brendon Hogger started implementing a while ago. I believe they are running something very close to these lines at their company now but couldn't entice him to push.

%%%-------------------------------------------------------------------
%%% @author Brendon Hogger <brendonh@dev.brendonh.org>
%%% @copyright (C) 2011, Brendon Hogger
%%% @doc
%%% EMysql transaction wrapper
%%% @end
%%% Created : 31 Mar 2011 by Brendon Hogger <brendonh@dev.brendonh.org>
%%%-------------------------------------------------------------------
-module(bnb_transact).

-include("emysql.hrl").

-export([transact/2, transact/3, 
         test_transact/0]).


transact(PoolId, TransactFun) ->
    transact(PoolId, TransactFun, infinity).

transact(PoolId, TransactFun, Timeout) when is_atom(PoolId) ->
    Connection = emysql_conn_mgr:wait_for_connection(PoolId),
    transact(Connection, TransactFun, Timeout);


transact(Connection, TransactFun, Timeout) ->
    % This is almost straight from emysql:monitor_work/1

    Parent = self(),
    Pid = spawn(
            fun() ->
                    receive start ->
                            Parent ! {self(), wrap(Connection, TransactFun)}
                    end
            end),
    Mref = erlang:monitor(process, Pid),
    Pid ! start,
    receive
        {'DOWN', Mref, process, Pid, {_, closed}} ->
            NewConnection = emysql_conn:renew_connection(emysql_conn_mgr:pools(), Connection),
            transact(NewConnection, TransactFun, Timeout);
        {'DOWN', Mref, process, Pid, Reason} ->
            %% if the process dies, reset the connection
            %% and re-throw the error on the current pid
            emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection),
            exit(Reason);
        {Pid, Result} ->
            %% if the process returns data, unlock the
            %% connection and collect the normal 'DOWN'
            %% message send from the child process
            erlang:demonitor(Mref, [flush]),
            emysql_conn_mgr:unlock_connection(Connection),
            Result
    after Timeout ->
            %% if we timeout waiting for the process to return,
            %% then reset the connection and throw a timeout error
            erlang:demonitor(Mref),
            exit(Pid, normal),
            emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection),
            exit(mysql_timeout)
    end.


%% -------------------------------------------------------------


wrap(Connection, TransactFun) ->
    emysql_conn:execute(Connection, <<"START TRANSACTION WITH CONSISTENT SNAPSHOT">>, []),
    case catch(TransactFun(Connection)) of
        {ok, _}=Result ->
            emysql_conn:execute(Connection, <<"COMMIT">>, []),
            Result;
        {'EXIT', Reason} ->
            emysql_conn:execute(Connection, <<"ROLLBACK">>, []),
            {error, Reason};
        OtherResult ->
            emysql_conn:execute(Connection, <<"ROLLBACK">>, []),
            OtherResult
    end.


%% -------------------------------------------------------------


test_transact() ->
    crypto:start(),
    application:start(emysql),

    ok = emysql:add_pool(test, 2, "brendonh", "arthur", "localhost", 3306, "bnb", utf8),

    emysql:execute(test, <<"DROP TABLE IF EXISTS quicktest">>),
    emysql:execute(test, <<"CREATE TABLE quicktest (val INTEGER) engine=innodb">>),
    emysql:execute(test, <<"INSERT INTO quicktest VALUES (0)">>),

    Tests = [
             {success, fun(Conn) ->
                     emysql_conn:execute(
                       Conn, <<"UPDATE quicktest SET val = val + 1">>, []),
                     {ok, done}
             end},

             {crash, fun(Conn) ->
                     emysql_conn:execute(
                       Conn, <<"UPDATE quicktest SET val = val + 1">>, []),
                     1 / 0
             end},

             {error, fun(Conn) ->
                     emysql_conn:execute(
                       Conn, <<"UPDATE quicktest SET val = val + 1">>, []),
                     {error, whatever}
             end}
            ],

    lists:map(fun({Name, TestFun}) ->
                      io:format("-----------------------~n"),
                      io:format("Test: ~s~n", [Name]),
                      emysql:execute(test, <<"UPDATE quicktest SET val = 0">>),
                      TResult = transact(test, TestFun),
                      io:format("Result: ~p~n", [TResult]),
                      Result = emysql:execute(test, <<"SELECT val FROM quicktest">>),
                      io:format("Val after: ~p~n", [hd(hd(Result#result_packet.rows))])
              end,
              Tests).


%% $ erl -pa `agner prefix emysql`/ebin  -pa ebin -noshell -s bnb_transact test_transact -s init stop
%% -----------------------
%% Test: success
%% Result: {ok,done}
%% Val after: 1
%% -----------------------
%% Test: crash
%% Result: {error,{badarith,[{bnb_transact,'-test_transact/0-fun-1-',1},
%%                           {bnb_transact,wrap,2},
%%                           {bnb_transact,'-transact/3-fun-0-',3}]}}
%% Val after: 0
%% -----------------------
%% Test: error
%% Result: {error,whatever}
%% Val after: 0
@slepher

This comment has been minimized.

Owner

slepher replied Feb 22, 2012

it's a prototype moudle works on my company project,
I will send you a pull request when it become more stable if you need.

@Eonblast

This comment has been minimized.

Eonblast replied Feb 22, 2012

Yes, cool!

How do you like Brendon's approach?

Cheers,
Henning

@slepher

This comment has been minimized.

Owner

slepher replied Feb 22, 2012

it's same as my implementation except some exception processes,
as the return value, I prefer mnesia style {atomic, Val} | {aborted, Reason}, and use abort(Reason) to rollback the transaction.
For module style, do you mean you prefer the stand alone module?

@Eonblast

This comment has been minimized.

Eonblast replied Feb 22, 2012

I guess I did not understand that it's similar. From a superficial look it looked quite different.

The better. Let's use what works.

I don't think a stand alone module is better, that's not what I meant. That's fine.

If you want to send me some sample code, I'll be happy to roll some Common Tests for the exceptions.

Thanks,
Henning

@slepher

This comment has been minimized.

Owner

slepher replied Feb 22, 2012

plz checkout git://github.com/slepher/Emysql.git
see test_transaction.erl
and use sh test_transaction.sh to run the sample code

@Eonblast

This comment has been minimized.

Eonblast replied Feb 25, 2012

Sorry for the slow response, I'll have more time at my hands mid March. Can't wait.

@Eonblast

This comment has been minimized.

Eonblast replied Mar 14, 2012

Ok, what I'll do is adding your sample to the test cases, adding a Common Test suite specifically for transactions and make that available to you in a dev branch with all your changes in it. Hoping that you might contribute cases to the suite.

Best,
Henning

Please sign in to comment.