Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

adding trace module

  • Loading branch information...
commit baed3c4c58402dc036ffa5a75c4c376da172a8b2 1 parent 3d53596
Jacob Vorreuter authored
View
5 include/emysql.hrl
@@ -23,7 +23,7 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-record(pool, {pool_id, size, user, password, host, port, database, encoding, connections=[]}).
--record(connection, {id, pool_id, state=available, socket, version, thread_id, caps, language, prepared=gb_trees:empty()}).
+-record(connection, {id, pool_id, state=available, socket, version, thread_id, caps, language, prepared=gb_trees:empty(), locked_at}).
-record(greeting, {protocol_version, server_version, thread_id, salt1, salt2, caps, language, status, seq_num}).
-record(field, {seq_num, catalog, db, table, org_table, name, org_name, type, default, charset_nr, length, flags, decimals}).
-record(packet, {size, seq_num, data}).
@@ -32,7 +32,8 @@
-record(result_packet, {seq_num, field_list, rows, extra}).
-define(TIMEOUT, 8000).
--define(MAXPACKETBYTES, 1000000).
+-define(LOCK_TIMEOUT, 5000).
+-define(MAXPACKETBYTES, 50000000).
-define(LONG_PASSWORD, 1).
-define(LONG_FLAG, 4).
-define(PROTOCOL_41, 512).
View
3  src/emysql.erl
@@ -169,9 +169,8 @@ monitor_work(Connection, Timeout, {M,F,A}) when is_record(Connection, connection
%% if the process returns data, unlock the
%% connection and collect the normal 'DOWN'
%% message send from the child process
- erlang:demonitor(Mref),
+ erlang:demonitor(Mref, [flush]),
emysql_conn_mgr:unlock_connection(Connection),
- receive {'DOWN', Mref, process, Pid, normal} -> ok after 0 -> ok end,
Result
after Timeout ->
%% if we timeout waiting for the process to return,
View
2  src/emysql_conn.erl
@@ -123,7 +123,7 @@ reset_connection(Pools, Conn) ->
%% the socket must be closed and the connection reset
%% in the conn_mgr state. Also a new connection needs
%% to be opened to replace the old one.
- close_connection(Conn),
+ spawn(fun() -> close_connection(Conn) end),
%% OPEN NEW SOCKET
case emysql_conn_mgr:find_pool(Conn#connection.pool_id, Pools, []) of
{Pool, _} ->
View
35 src/emysql_conn_mgr.erl
@@ -68,15 +68,15 @@ remove_connections(PoolId, Num) when is_atom(PoolId), is_integer(Num) ->
lock_connection(PoolId) when is_atom(PoolId) ->
do_gen_call({lock_connection, PoolId}).
-wait_for_connection(Arg) when is_atom(Arg); is_record(Arg, connection) ->
+wait_for_connection(PoolId) when is_atom(PoolId) ->
%% try to lock a connection. if no connections are available then
%% wait to be notified of the next available connection
- case lock_connection(Arg) of
+ case lock_connection(PoolId) of
unavailable ->
gen_server:call(?MODULE, start_wait, infinity),
receive
{connection, Connection} -> Connection
- after 5000 ->
+ after lock_timeout() ->
exit(connection_lock_timeout)
end;
Connection ->
@@ -187,7 +187,7 @@ handle_call({lock_connection, PoolId}, {From, _Mref}, State) ->
%% find the next available connection in the pool identified by PoolId
case find_next_connection_in_pool(State#state.pools, PoolId) of
[Pool, OtherPools, Conn, OtherConns] ->
- NewConn = Conn#connection{state=From},
+ NewConn = Conn#connection{state=From, locked_at=lists:nth(2, tuple_to_list(now()))},
State1 = State#state{pools = [Pool#pool{connections=[NewConn|OtherConns]}|OtherPools]},
{reply, NewConn, State1};
Other ->
@@ -293,15 +293,22 @@ find_connection(ConnID, [Conn|Tail], OtherConns) ->
find_connection(ConnID, Tail, [Conn|OtherConns]).
find_next_available(List) ->
- find_next_available(List, []).
+ find_next_available(List, [], lists:nth(2, tuple_to_list(now()))).
-find_next_available([#connection{state=available}=Conn|Tail], Rest) ->
+find_next_available([#connection{state=available}=Conn|Tail], Rest, _) ->
{Conn, lists:append(Rest, Tail)};
-find_next_available([Conn|Tail], Rest) ->
- find_next_available(Tail, [Conn|Rest]);
+find_next_available([#connection{state=Pid, locked_at=LockedAt}=Conn|Tail], Rest, NowSecs) when is_pid(Pid) andalso is_integer(LockedAt) andalso (LockedAt < (NowSecs - 120)) ->
+ spawn(fun() ->
+ exit(Pid, connection_locked_too_long),
+ emysql_conn:reset_connection(emysql_conn_mgr:pools(), Conn)
+ end),
+ find_next_available(Tail, [Conn|Rest], NowSecs);
-find_next_available([], _) ->
+find_next_available([Conn|Tail], Rest, NowSecs) ->
+ find_next_available(Tail, [Conn|Rest], NowSecs);
+
+find_next_available([], _, _) ->
undefined.
find_connection_in_pool(Pools, Connection) ->
@@ -347,7 +354,7 @@ pass_connection_to_waiting_pid(State, Connection, Waiting) ->
%% if not processes are waiting then unlock the connection
case find_connection_in_pool(State#state.pools, Connection) of
[Pool, OtherPools, Conn, OtherConns] ->
- State1 = State#state{pools = [Pool#pool{connections=[Conn#connection{state=available}|OtherConns]}|OtherPools]},
+ State1 = State#state{pools = [Pool#pool{connections=[Conn#connection{state=available, locked_at=undefined}|OtherConns]}|OtherPools]},
{ok, State1};
Other ->
{Other, State}
@@ -365,4 +372,10 @@ pass_connection_to_waiting_pid(State, Connection, Waiting) ->
_ ->
pass_connection_to_waiting_pid(State, Connection, Waiting1)
end
- end.
+ end.
+
+lock_timeout() ->
+ case application:get_env(emysql, lock_timeout) of
+ undefined -> ?LOCK_TIMEOUT;
+ {ok, Timeout} -> Timeout
+ end.
View
12 src/emysql_tcp.erl
@@ -161,7 +161,9 @@ type_cast_row_data(Data, #field{type=Type})
{ok, [Year, Month, Day], _} ->
{date, {Year, Month, Day}};
{error, _} ->
- binary_to_list(Data)
+ binary_to_list(Data);
+ Other ->
+ io:format("[~p:~b] unexpected ~p~n", [?MODULE, ?LINE, Other])
end;
type_cast_row_data(Data, #field{type=Type})
@@ -170,7 +172,9 @@ type_cast_row_data(Data, #field{type=Type})
{ok, [Hour, Minute, Second], _} ->
{time, {Hour, Minute, Second}};
{error, _} ->
- binary_to_list(Data)
+ binary_to_list(Data);
+ Other ->
+ io:format("[~p:~b] unexpected ~p~n", [?MODULE, ?LINE, Other])
end;
type_cast_row_data(Data, #field{type=Type})
@@ -180,7 +184,9 @@ type_cast_row_data(Data, #field{type=Type})
{ok, [Year, Month, Day, Hour, Minute, Second], _} ->
{datetime, {{Year, Month, Day}, {Hour, Minute, Second}}};
{error, _} ->
- binary_to_list(Data)
+ binary_to_list(Data);
+ Other ->
+ io:format("[~p:~b] unexpected ~p~n", [?MODULE, ?LINE, Other])
end;
type_cast_row_data(Data, #field{type=Type})
View
43 src/emysql_tracer.erl
@@ -0,0 +1,43 @@
+-module(emysql_tracer).
+-compile(export_all).
+
+trace_module(Fun) ->
+ spawn(fun() -> trace_module1(Fun) end).
+
+trace_module1(Fun) ->
+ Modules =
+ [emysql] ++
+ [emysql_auth] ++
+ [emysql_conn] ++
+ [emysql_conn_mgr] ++
+ [emysql_statements] ++
+ %%[emysql_tcp] ++
+ [],
+ [erlang:trace_pattern({Mod, '_', '_'},[{'_', [], [{return_trace}]}],[local]) || Mod <- Modules],
+ S = self(),
+ Pid = spawn(fun() -> do_trace(S, Fun) end),
+ erlang:trace(all, true, [call,procs,exiting]),
+ Pid ! {self(), start},
+ trace_loop().
+
+do_trace(Parent, Fun) ->
+ receive
+ {Parent, start} ->
+ Fun()
+ end.
+
+trace_loop() ->
+ receive
+ % {trace, _, call, X} ->
+ % io:format("Call: ~p~n", [X]),
+ % trace_loop();
+ % {trace, _, return_from, Call, Ret} ->
+ % io:format("Return From: ~p => ~p~n", [Call, Ret]),
+ % trace_loop();
+ % {trace, _, send_to_non_existing_process, Msg, To} ->
+ % io:format("send_to_non_existing_process ~p to ~p~n", [Msg, To]),
+ % trace_loop();
+ _Other ->
+ io:format("~p~n", [_Other]),
+ trace_loop()
+ end.
View
101 t/emysql_load_test.erl
@@ -1,9 +1,15 @@
%% erlc -o ebin t/emysql_load_test.erl -W0
%% erl -pa ebin -boot start_sasl -name emysql_load_test@`hostname` -config priv/load-test -eval 'emysql_load_test:start_link()'
+%% erl -pa ebin -boot start_sasl -name emysql_load_test@`hostname` -config priv/load-test -eval 'emysql_tracer:trace_module(emysql_load_test, fun emysql_load_test:start_link/0)'
+%% emysql_tracer:trace_module(emysql_load_test, fun emysql_load_test:start_link/0).
+%% emysql_tracer:trace_module(emysql_load_test, fun emysql_load_test:start_link/0, [call,procs,send,'receive']).
%% emysql_load_test:add_pool().
+%% emysql_load_test:select_all().
%% emysql_load_test:increment_pool_size().
%% emysql_load_test:increment_pool_size().
%% [emysql_load_test:select_all() || _ <- lists:seq(1,10)].
+%% emysql_tracer:trace_module(emysql_load_test, fun emysql_load_test:select_all/0).
+%% emysql_tracer:trace_module(emysql_load_test, fun emysql_load_test:select_all/0).
-module(emysql_load_test).
-behaviour(gen_server).
@@ -14,7 +20,7 @@
-include_lib("emysql/include/emysql.hrl").
--record(state, {tables=[]}).
+-record(state, {}).
start_link() ->
application:start(crypto),
@@ -104,14 +110,11 @@ handle_call(decrement_pool_size, _From, State) ->
handle_call(select_all, _From, State) ->
Pool = get_pool(),
- State1 = get_table_list(State),
- I = random:uniform(length(State1#state.tables)),
- spawn_link(fun() ->
- {TableName, _} = lists:nth(I, State1#state.tables),
- emysql:prepare(list_to_atom(TableName), <<"SELECT * FROM `", (list_to_binary(TableName))/binary, "`">>),
- case (catch emysql:execute(Pool#pool.pool_id, list_to_atom(TableName))) of
+ I = random:uniform(length(tables())),
+ spawn(fun() ->
+ {TableName, _} = lists:nth(I, tables()),
+ case (catch emysql:execute(Pool#pool.pool_id, <<"SELECT * FROM `", (list_to_binary(TableName))/binary, "`">>)) of
Result when is_record(Result, result_packet) ->
- %io:format("~p rows: ~p~n", [TableName, length(Result:rows())]);
ok;
{'EXIT',connection_lock_timeout} ->
ok;
@@ -121,7 +124,7 @@ handle_call(select_all, _From, State) ->
io:format("~p error: ~p~n", [TableName, Err])
end
end),
- {reply, ok, State1};
+ {reply, ok, State};
handle_call(_, _From, State) -> {reply, {error, invalid_call}, State}.
@@ -168,10 +171,76 @@ get_pool() ->
[] -> undefined;
[Pool|_] -> Pool
end.
-
-get_table_list(#state{tables=[]}=State) ->
- Pool = get_pool(),
- Result = emysql:execute(Pool#pool.pool_id, <<"SHOW TABLES">>),
- Tables = [{binary_to_list(Table), []} || [{_, Table}] <- Result:zip()],
- State#state{tables=Tables};
-get_table_list(State) -> State.
+
+tables() ->
+ [{"applications",[]},
+ {"asset_tokens",[]},
+ {"assets",[]},
+ {"auth_tokens",[]},
+ {"avatars",[]},
+ {"blog_posts",[]},
+ {"box_art",[]},
+ {"challenge_issuance_watches",[]},
+ {"challenge_issuances",[]},
+ {"challenge_participations",[]},
+ {"challenge_ratings",[]},
+ {"challenges",[]},
+ {"challenges_events",[]},
+ {"client_igo_stats",[]},
+ {"client_session_stats",[]},
+ {"comment_photos",[]},
+ {"comments",[]},
+ {"conversions",[]},
+ {"critic_reviews",[]},
+ {"datasource_games",[]},
+ {"datasources",[]},
+ {"event_media",[]},
+ {"event_medium_photos",[]},
+ {"events",[]},
+ {"facebook_applications",[]},
+ {"facebook_identities",[]},
+ {"facebook_invitations",[]},
+ {"feedbacks",[]},
+ {"feeds",[]},
+ {"flags",[]},
+ {"game_aliases",[]},
+ {"game_client_data",[]},
+ {"game_objective_types",[]},
+ {"game_objectives",[]},
+ {"game_sessions",[]},
+ {"games",[]},
+ {"identities",[]},
+ {"identity_group_definitions",[]},
+ {"identity_group_memberships",[]},
+ {"identity_groups",[]},
+ {"identity_properties",[]},
+ {"identity_providers",[]},
+ {"image_sizes",[]},
+ {"images",[]},
+ {"invitations",[]},
+ {"items",[]},
+ {"lobby_dataset_stats",[]},
+ {"lobby_datasets",[]},
+ {"lobby_game_servers",[]},
+ {"messages",[]},
+ {"metagame_event_types",[]},
+ {"metagame_events",[]},
+ {"plasma_game_stats",[]},
+ {"platforms",[]},
+ {"privacy_lists",[]},
+ {"promotions",[]},
+ {"rostergroups",[]},
+ {"rupture_invitations",[]},
+ {"schema_migrations",[]},
+ {"server_settings",[]},
+ {"sessions",[]},
+ {"socialapplication_identity_associations",[]},
+ {"socialapplication_networks",[]},
+ {"subscriptions",[]},
+ {"test_groups",[]},
+ {"top_games",[]},
+ {"top_users",[]},
+ {"user_games",[]},
+ {"user_invitations",[]},
+ {"user_presence",[]},
+ {"users",[]}].
Please sign in to comment.
Something went wrong with that request. Please try again.