Skip to content

Commit

Permalink
Merge pull request Eonblast#12 from lsowen/master
Browse files Browse the repository at this point in the history
Rename record 'connection' to 'emysql_connection'. This is an internal symbol name change to avoid conflicts with RabbitMQ. It doesn't make the code more readable but makes sense in the long run.
  • Loading branch information
Eonblast committed Oct 28, 2011
2 parents 3db0e45 + 0fce8c9 commit 0d33806
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 34 deletions.
2 changes: 1 addition & 1 deletion include/emysql.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


-record(pool, {pool_id, size, user, password, host, port, database, encoding, available=queue:new(), locked=gb_trees:empty()}).
-record(connection, {id, pool_id, socket, version, thread_id, caps, language, prepared=gb_trees:empty(), locked_at, alive=true}).
-record(emysql_connection, {id, pool_id, socket, version, thread_id, caps, language, prepared=gb_trees:empty(), locked_at, alive=true}).
-record(greeting, {protocol_version, server_version, thread_id, salt1, salt2, caps, caps_high, language, status, seq_num, plugin}).
-record(field, {seq_num, catalog, db, table, org_table, name, org_name, type, default, charset_nr, length, flags, decimals}).
-record(packet, {size, seq_num, data}).
Expand Down
10 changes: 5 additions & 5 deletions src/emysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ decrement_pool_size(PoolId, Num) when is_integer(Num) ->
%% ParamNamesBin = list_to_binary(string:join([[$@ | integer_to_list(I)] || I <- lists:seq(1, length(Args))], ", ")),
%% StmtNameBin = atom_to_binary(StmtName, utf8),
%% Packet = <<?COM_QUERY, "EXECUTE ", StmtNameBin/binary, " USING ", ParamNamesBin/binary>>,
%% emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0);
%% emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);
%% Error ->
%% Error
%% end.
Expand Down Expand Up @@ -509,15 +509,15 @@ execute(PoolId, StmtName, Args, Timeout) when is_atom(StmtName), is_list(Args) a
%%
execute(PoolId, Query, Args, Timeout, nonblocking) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso is_integer(Timeout) ->
case emysql_conn_mgr:lock_connection(PoolId) of
Connection when is_record(Connection, connection) ->
Connection when is_record(Connection, emysql_connection) ->
monitor_work(Connection, Timeout, {emysql_conn, execute, [Connection, Query, Args]});
Other ->
Other
end;

execute(PoolId, StmtName, Args, Timeout, nonblocking) when is_atom(StmtName), is_list(Args) andalso is_integer(Timeout) ->
case emysql_conn_mgr:lock_connection(PoolId) of
Connection when is_record(Connection, connection) ->
Connection when is_record(Connection, emysql_connection) ->
monitor_work(Connection, Timeout, {emysql_conn, execute, [Connection, StmtName, Args]});
Other ->
Other
Expand Down Expand Up @@ -553,7 +553,7 @@ execute(PoolId, StmtName, Args, Timeout, nonblocking) when is_atom(StmtName), is
%% @private
%% @end doc: hd feb 11
%%
monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, connection) ->
monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, emysql_connection) ->
%% spawn a new process to do work, then monitor that process until
%% it either dies, returns data or times out.
Parent = self(),
Expand All @@ -569,7 +569,7 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, connection
{'DOWN', Mref, process, Pid, {_, closed}} ->
%-% io:format("monitor_work: ~p DOWN/closed -> renew~n", [Pid]),
case emysql_conn:reset_connection(emysql_conn_mgr:pools(), Connection, keep) of
NewConnection when is_record(NewConnection, connection) ->
NewConnection when is_record(NewConnection, emysql_connection) ->
% re-loop, with new connection.
[_OldConn | RestArgs] = A,
NewA = [NewConnection | RestArgs],
Expand Down
34 changes: 17 additions & 17 deletions src/emysql_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@
set_database(_, undefined) -> ok;
set_database(Connection, Database) ->
Packet = <<?COM_QUERY, "use ", (iolist_to_binary(Database))/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0).
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

set_encoding(Connection, Encoding) ->
Packet = <<?COM_QUERY, "set names '", (erlang:atom_to_binary(Encoding, utf8))/binary, "'">>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0).
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

execute(Connection, Query, []) when is_list(Query); is_binary(Query) ->
%-% io:format("~n~p~n", [iolist_to_binary(Query)]),
Packet = <<?COM_QUERY, (iolist_to_binary(Query))/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0);
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);

execute(Connection, StmtName, []) when is_atom(StmtName) ->
prepare_statement(Connection, StmtName),
StmtNameBin = atom_to_binary(StmtName, utf8),
Packet = <<?COM_QUERY, "EXECUTE ", StmtNameBin/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0);
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);

execute(Connection, Query, Args) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) ->
StmtName = "stmt_"++integer_to_list(erlang:phash2(Query)),
Expand All @@ -63,7 +63,7 @@ execute(Connection, Query, Args) when (is_list(Query) orelse is_binary(Query)) a
OK when is_record(OK, ok_packet) ->
ParamNamesBin = list_to_binary(string:join([[$@ | integer_to_list(I)] || I <- lists:seq(1, length(Args))], ", ")),
Packet = <<?COM_QUERY, "EXECUTE ", (list_to_binary(StmtName))/binary, " USING ", ParamNamesBin/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0);
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);
Error ->
Error
end,
Expand All @@ -77,7 +77,7 @@ execute(Connection, StmtName, Args) when is_atom(StmtName), is_list(Args) ->
ParamNamesBin = list_to_binary(string:join([[$@ | integer_to_list(I)] || I <- lists:seq(1, length(Args))], ", ")),
StmtNameBin = atom_to_binary(StmtName, utf8),
Packet = <<?COM_QUERY, "EXECUTE ", StmtNameBin/binary, " USING ", ParamNamesBin/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0);
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0);
Error ->
Error
end.
Expand All @@ -86,13 +86,13 @@ prepare(Connection, Name, Statement) when is_atom(Name) ->
prepare(Connection, atom_to_list(Name), Statement);
prepare(Connection, Name, Statement) ->
Packet = <<?COM_QUERY, "PREPARE ", (list_to_binary(Name))/binary, " FROM '", (iolist_to_binary(Statement))/binary, "'">>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0).
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

unprepare(Connection, Name) when is_atom(Name)->
unprepare(Connection, atom_to_list(Name));
unprepare(Connection, Name) ->
Packet = <<?COM_QUERY, "DEALLOCATE PREPARE ", (list_to_binary(Name))/binary>>,
emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0).
emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0).

open_n_connections(PoolId, N) ->
%-% io:format("open ~p connections for pool ~p~n", [N, PoolId]),
Expand Down Expand Up @@ -127,7 +127,7 @@ open_connection(#pool{pool_id=PoolId, host=Host, port=Port, user=User, password=
%-% io:format("~p open connection: ... greeting~n", [self()]),
Greeting = emysql_auth:do_handshake(Sock, User, Password),
%-% io:format("~p open connection: ... make new connection~n", [self()]),
Connection = #connection{
Connection = #emysql_connection{
id = erlang:port_to_list(Sock),
pool_id = PoolId,
socket = Sock,
Expand Down Expand Up @@ -175,11 +175,11 @@ reset_connection(Pools, Conn, StayLocked) ->
%-% io:format("spawn process to close connection~n"),
spawn(fun() -> close_connection(Conn) end),
%% OPEN NEW SOCKET
case emysql_conn_mgr:find_pool(Conn#connection.pool_id, Pools, []) of
case emysql_conn_mgr:find_pool(Conn#emysql_connection.pool_id, Pools, []) of
{Pool, _} ->
%-% io:format("... open new connection to renew~n"),
case catch open_connection(Pool) of
NewConn when is_record(NewConn, connection) ->
NewConn when is_record(NewConn, emysql_connection) ->
%-% io:format("... got it, replace old (~p)~n", [StayLocked]),
case StayLocked of
pass -> emysql_conn_mgr:replace_connection_as_available(Conn, NewConn);
Expand All @@ -188,7 +188,7 @@ reset_connection(Pools, Conn, StayLocked) ->
%-% io:format("... done, return new connection~n"),
NewConn;
Error ->
DeadConn = Conn#connection{alive=false},
DeadConn = Conn#emysql_connection{alive=false},
emysql_conn_mgr:replace_connection_as_available(Conn, DeadConn),
%-% io:format("... failed to re-open. Shelving dead connection as available.~n"),
{error, {cannot_reopen_in_reset, Error}}
Expand All @@ -199,9 +199,9 @@ reset_connection(Pools, Conn, StayLocked) ->

close_connection(Conn) ->
%% DEALLOCATE PREPARED STATEMENTS
[(catch unprepare(Conn, Name)) || Name <- emysql_statements:remove(Conn#connection.id)],
[(catch unprepare(Conn, Name)) || Name <- emysql_statements:remove(Conn#emysql_connection.id)],
%% CLOSE SOCKET
gen_tcp:close(Conn#connection.socket),
gen_tcp:close(Conn#emysql_connection.socket),
ok.

%%--------------------------------------------------------------------
Expand All @@ -213,21 +213,21 @@ set_params(Connection, Num, [Val|Tail], _) ->
NumBin = emysql_util:encode(Num, true),
ValBin = emysql_util:encode(Val, true),
Packet = <<?COM_QUERY, "SET @", NumBin/binary, "=", ValBin/binary>>,
Result = emysql_tcp:send_and_recv_packet(Connection#connection.socket, Packet, 0),
Result = emysql_tcp:send_and_recv_packet(Connection#emysql_connection.socket, Packet, 0),
set_params(Connection, Num+1, Tail, Result).

prepare_statement(Connection, StmtName) ->
case emysql_statements:fetch(StmtName) of
undefined ->
exit(statement_has_not_been_prepared);
{Version, Statement} ->
case emysql_statements:version(Connection#connection.id, StmtName) of
case emysql_statements:version(Connection#emysql_connection.id, StmtName) of
Version ->
ok;
_ ->
case prepare(Connection, StmtName, Statement) of
OK when is_record(OK, ok_packet) ->
emysql_statements:prepare(Connection#connection.id, StmtName, Version);
emysql_statements:prepare(Connection#emysql_connection.id, StmtName, Version);
Err when is_record(Err, error_packet) ->
exit({failed_to_prepare_statement, Err#error_packet.msg})
end
Expand Down
22 changes: 11 additions & 11 deletions src/emysql_conn_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ handle_call({lock_connection, PoolId}, _From, State) ->
case find_next_connection_in_pool(State#state.pools, PoolId) of
[Pool, OtherPools, Conn, OtherConns] ->
%-% io:format("gen srv: lock connection ... found a good next connection~n", []),
NewConn = Conn#connection{locked_at=lists:nth(2, tuple_to_list(now()))},
Locked = gb_trees:enter(NewConn#connection.id, NewConn, Pool#pool.locked),
NewConn = Conn#emysql_connection{locked_at=lists:nth(2, tuple_to_list(now()))},
Locked = gb_trees:enter(NewConn#emysql_connection.id, NewConn, Pool#pool.locked),
State1 = State#state{pools = [Pool#pool{available=OtherConns, locked=Locked}|OtherPools]},
{reply, NewConn, State1};
Other ->
Expand All @@ -222,11 +222,11 @@ handle_call({replace_connection_as_available, OldConn, NewConn}, _From, State) -
%% passed in to serve as the replacement for the old one.
%% But i.e. if the sql server is down, it can be fed a dead
%% old connection as new connection, to preserve the pool size.
case find_pool(OldConn#connection.pool_id, State#state.pools, []) of
case find_pool(OldConn#emysql_connection.pool_id, State#state.pools, []) of
{Pool, OtherPools} ->
Pool1 = Pool#pool{
available = queue:in(NewConn, Pool#pool.available),
locked = gb_trees:delete_any(OldConn#connection.id, Pool#pool.locked)
locked = gb_trees:delete_any(OldConn#emysql_connection.id, Pool#pool.locked)
},
{reply, ok, State#state{pools=[Pool1|OtherPools]}};
undefined ->
Expand All @@ -237,10 +237,10 @@ handle_call({replace_connection_as_locked, OldConn, NewConn}, _From, State) ->
%% replace an existing, locked condition with the newly supplied one
%% and keep it in the locked list so that the caller can continue to use it
%% without having to lock another connection.
case find_pool(OldConn#connection.pool_id, State#state.pools, []) of
case find_pool(OldConn#emysql_connection.pool_id, State#state.pools, []) of
{Pool, OtherPools} ->
LockedStripped = gb_trees:delete_any(OldConn#connection.id, Pool#pool.locked),
LockedAdded = gb_trees:enter(NewConn#connection.id, NewConn, LockedStripped),
LockedStripped = gb_trees:delete_any(OldConn#emysql_connection.id, Pool#pool.locked),
LockedAdded = gb_trees:enter(NewConn#emysql_connection.id, NewConn, LockedStripped),
Pool1 = Pool#pool{locked = LockedAdded},
{reply, ok, State#state{pools=[Pool1|OtherPools]}};
undefined ->
Expand Down Expand Up @@ -334,16 +334,16 @@ pass_on_or_queue_as_available(State, Connection, Waiting) ->
case queue:is_empty(Waiting) of
true ->
%% if no processes are waiting then unlock the connection
case find_pool(Connection#connection.pool_id, State#state.pools, []) of
case find_pool(Connection#emysql_connection.pool_id, State#state.pools, []) of
{Pool, OtherPools} ->
%% find connection in locked tree
case gb_trees:lookup(Connection#connection.id, Pool#pool.locked) of
case gb_trees:lookup(Connection#emysql_connection.id, Pool#pool.locked) of
{value, Conn} ->
%%%
%% add it to the available queue and remove from locked tree
Pool1 = Pool#pool{
available = queue:in(Conn#connection{locked_at=undefined}, Pool#pool.available),
locked = gb_trees:delete_any(Connection#connection.id, Pool#pool.locked)
available = queue:in(Conn#emysql_connection{locked_at=undefined}, Pool#pool.available),
locked = gb_trees:delete_any(Connection#emysql_connection.id, Pool#pool.locked)
},
{ok, State#state{pools = [Pool1|OtherPools]}};
%%%
Expand Down

0 comments on commit 0d33806

Please sign in to comment.