From 5211fb63f5fce77b156eddfc04dc15becd76ac64 Mon Sep 17 00:00:00 2001 From: Louis-Philippe Gauthier Date: Wed, 24 May 2017 14:21:14 -0400 Subject: [PATCH] Instrument with statsderl --- include/shackle_internal.hrl | 2 ++ rebar.config | 4 +++- rebar.config.script | 4 +++- rebar.lock | 8 ++++++-- src/shackle.app.src | 2 +- src/shackle_pool.erl | 23 +++++++++++++++++++++++ src/shackle_server_utils.erl | 17 +++++++++++------ src/shackle_ssl_server.erl | 10 +++++++++- src/shackle_tcp_server.erl | 10 +++++++++- src/shackle_udp_server.erl | 10 +++++++++- 10 files changed, 76 insertions(+), 14 deletions(-) diff --git a/include/shackle_internal.hrl b/include/shackle_internal.hrl index 075f563..8d1c073 100644 --- a/include/shackle_internal.hrl +++ b/include/shackle_internal.hrl @@ -7,6 +7,8 @@ -define(LOOKUP(Key, List, Default), shackle_utils:lookup(Key, List, Default)). -define(MSG_CONNECT, connect). -define(SERVER, shackle_server). +-define(SR, 0.005). +-define(STATS_INCR(Key), statsderl:increment(Key, 1, ?SR)). -define(SUPERVISOR, shackle_sup). -define(SERVER_UTILS, shackle_server_utils). -define(WARN(PoolName, Format, Data), shackle_utils:warning_msg(PoolName, Format, Data)). diff --git a/rebar.config b/rebar.config index 7a53c9a..6478d3a 100644 --- a/rebar.config +++ b/rebar.config @@ -17,7 +17,8 @@ {deps, [ {foil, "0.1.1"}, {granderl, "0.1.5"}, - {metal, "0.1.1"} + {metal, "0.1.1"}, + {statsderl, "0.5.2"} ]}. {edoc_opts, [ @@ -32,6 +33,7 @@ {erl_opts, [ debug_info, + {parse_transform, statsderl_transform}, {platform_define, "18|19|^2", 'ETS_TAKE'}, {platform_define, "19|^2", 'UDP_HEADER'} ]}. diff --git a/rebar.config.script b/rebar.config.script index bd8318b..eae7f07 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -8,7 +8,9 @@ Config = case erlang:function_exported(rebar3, main, 1) of {granderl, ".*", {git, "https://github.com/tokenrove/granderl.git", {tag, "v0.1.5"}}}, {metal, ".*", - {git, "https://github.com/lpgauth/metal.git", {tag, "0.1.1"}}} + {git, "https://github.com/lpgauth/metal.git", {tag, "0.1.1"}}}, + {statsderl, ".*", + {git, "https://github.com/lpgauth/statsderl.git", {tag, "0.5.2"}}} ]} | lists:keydelete(deps, 1, CONFIG)] end, diff --git a/rebar.lock b/rebar.lock index 5d59106..2c80ed6 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,10 +1,14 @@ {"1.1.0", [{<<"foil">>,{pkg,<<"foil">>,<<"0.1.1">>},0}, {<<"granderl">>,{pkg,<<"granderl">>,<<"0.1.5">>},0}, - {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}]}. + {<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}, + {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.0.0">>},1}, + {<<"statsderl">>,{pkg,<<"statsderl">>,<<"0.5.2">>},0}]}. [ {pkg_hash,[ {<<"foil">>, <<"4D07B62C114636BBC3EEBD5CEE04B23A7AAB1262B0F68AA79005A6FBC3790472">>}, {<<"granderl">>, <<"F20077A68BD80B8D8783BD15A052813C6483771DEC1A5B837D307CBE92F14122">>}, - {<<"metal">>, <<"5D3D1322DA7BCD34B94FED5486F577973685298883954F7A3E517EF5EF6953F5">>}]} + {<<"metal">>, <<"5D3D1322DA7BCD34B94FED5486F577973685298883954F7A3E517EF5EF6953F5">>}, + {<<"parse_trans">>, <<"9E96B1C9C3A0DF54E7B76F8F685D38BFA1EB21B31E042B1D1A5A70258E4DB1E3">>}, + {<<"statsderl">>, <<"9FBE68445DA83A86E2509D007823E94CB8A71561F0428058BD5ABE9B62386A32">>}]} ]. diff --git a/src/shackle.app.src b/src/shackle.app.src index e4aa67e..ecd371c 100644 --- a/src/shackle.app.src +++ b/src/shackle.app.src @@ -1,5 +1,5 @@ {application, shackle, [ - {applications, [kernel, stdlib, granderl, metal, foil, ssl]}, + {applications, [kernel, stdlib, granderl, metal, foil, ssl, statsderl]}, {description, "High-Performance Erlang Network Client Framework"}, {env, []}, {licenses, ["MIT"]}, diff --git a/src/shackle_pool.erl b/src/shackle_pool.erl index 9a73d77..d82f3df 100644 --- a/src/shackle_pool.erl +++ b/src/shackle_pool.erl @@ -85,6 +85,8 @@ server(Name) -> true -> {ok, Client, Server}; false -> + Key = ["shackle.", client_bin(Client), ".backlog_full"], + ?STATS_INCR(Key), {error, backlog_full} end; {error, Reson} -> @@ -112,6 +114,27 @@ cleanup_foil(Name, #pool_options {pool_size = PoolSize}) -> [foil:delete(?MODULE, {Name, N}) || N <- lists:seq(1, PoolSize)], foil:load(?MODULE). +client_bin(acr_historical_client) -> + <<"acr_historical_client">>; +client_bin(acr_instant_client) -> + <<"acr_instant_client">>; +client_bin(anchor_client) -> + <<"anchor_client">>; +client_bin(buoy_client) -> + <<"buoy_client">>; +client_bin(flare_client) -> + <<"flare_client">>; +client_bin(identifyd_client) -> + <<"identifyd_client">>; +client_bin(iplists_client) -> + <<"iplists_client">>; +client_bin(marina_client) -> + <<"marina_client">>; +client_bin(pacingderl_client) -> + <<"pacingderl_client">>; +client_bin(Atom) -> + atom_to_binary(Atom, latin1). + options(Name) -> case foil:lookup(?MODULE, Name) of {ok, Options} -> diff --git a/src/shackle_server_utils.erl b/src/shackle_server_utils.erl index d9df225..62fc64f 100644 --- a/src/shackle_server_utils.erl +++ b/src/shackle_server_utils.erl @@ -8,7 +8,7 @@ -export([ cancel_timer/1, client/5, - process_responses/2, + process_responses/3, reconnect_state/1, reconnect_state_reset/1, reply/3, @@ -35,20 +35,25 @@ client(Client, PoolName, InitOptions, SocketType, Socket) -> {error, Reason, undefined} end. --spec process_responses([response()], server_name()) -> +-spec process_responses([response()], server_name(), binary()) -> ok. -process_responses([], _Name) -> +process_responses([], _Name, _ClientBin) -> ok; -process_responses([{ExtRequestId, Reply} | T], Name) -> +process_responses([{ExtRequestId, Reply} | T], Name, ClientBin) -> + ?STATS_INCR(["shackle.", ClientBin, ".replies"]), case shackle_queue:remove(Name, ExtRequestId) of - {ok, Cast, TimerRef} -> + {ok, #cast {timestamp = Timestamp} = Cast, TimerRef} -> + ?STATS_INCR(["shackle.", ClientBin, ".found"]), + Diff = timer:now_diff(os:timestamp(), Timestamp), + statsderl:timing(["shackle.", ClientBin, ".reply"], Diff, ?SR), erlang:cancel_timer(TimerRef), reply(Name, Reply, Cast); {error, not_found} -> + ?STATS_INCR(["shackle.", ClientBin, ".not_found"]), ok end, - process_responses(T, Name). + process_responses(T, Name, ClientBin). -spec reconnect_state(client_options()) -> undefined | reconnect_state(). diff --git a/src/shackle_ssl_server.erl b/src/shackle_ssl_server.erl index 9ce2e73..773f5a2 100644 --- a/src/shackle_ssl_server.erl +++ b/src/shackle_ssl_server.erl @@ -18,6 +18,7 @@ -record(state, { client :: client(), init_options :: init_options(), + client_bin :: binary(), ip :: inet:ip_address() | inet:hostname(), name :: server_name(), parent :: pid(), @@ -60,6 +61,7 @@ init(Name, Parent, Opts) -> {ok, {#state { client = Client, init_options = InitOptions, + client_bin = atom_to_binary(Client, latin1), ip = Ip, name = Name, parent = Parent, @@ -83,6 +85,7 @@ handle_msg({Request, #cast { timeout = Timeout } = Cast}, {#state { client = Client, + client_bin = ClientBin, name = Name, pool_name = PoolName, socket = Socket @@ -92,6 +95,7 @@ handle_msg({Request, #cast { {ok, ExtRequestId, Data, ClientState2} -> case ssl:send(Socket, Data) of ok -> + ?STATS_INCR(["shackle.", ClientBin, ".send"]), Msg = {timeout, ExtRequestId}, TimerRef = erlang:send_after(Timeout, self(), Msg), shackle_queue:add(ExtRequestId, Cast, TimerRef), @@ -111,14 +115,16 @@ handle_msg({Request, #cast { end; handle_msg({ssl, Socket, Data}, {#state { client = Client, + client_bin = ClientBin, name = Name, pool_name = PoolName, socket = Socket } = State, ClientState}) -> + ?STATS_INCR(["shackle.", ClientBin, ".recv"]), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> - ?SERVER_UTILS:process_responses(Replies, Name), + ?SERVER_UTILS:process_responses(Replies, Name, ClientBin), {ok, {State, ClientState2}}; {error, Reason, ClientState2} -> ?WARN(PoolName, "handle_data error: ~p", [Reason]), @@ -132,11 +138,13 @@ handle_msg({ssl, Socket, Data}, {#state { close(State, ClientState) end; handle_msg({timeout, ExtRequestId}, {#state { + client_bin = ClientBin, name = Name } = State, ClientState}) -> case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, _TimerRef} -> + ?STATS_INCR(["shackle.", ClientBin, ".timeout"]), ?SERVER_UTILS:reply(Name, {error, timeout}, Cast); {error, not_found} -> ok diff --git a/src/shackle_tcp_server.erl b/src/shackle_tcp_server.erl index a7006fb..9951c21 100644 --- a/src/shackle_tcp_server.erl +++ b/src/shackle_tcp_server.erl @@ -18,6 +18,7 @@ -record(state, { client :: client(), init_options :: init_options(), + client_bin :: binary(), ip :: inet:ip_address() | inet:hostname(), name :: server_name(), parent :: pid(), @@ -60,6 +61,7 @@ init(Name, Parent, Opts) -> {ok, {#state { client = Client, init_options = InitOptions, + client_bin = atom_to_binary(Client, latin1), ip = Ip, name = Name, parent = Parent, @@ -83,6 +85,7 @@ handle_msg({Request, #cast { timeout = Timeout } = Cast}, {#state { client = Client, + client_bin = ClientBin, name = Name, pool_name = PoolName, socket = Socket @@ -92,6 +95,7 @@ handle_msg({Request, #cast { {ok, ExtRequestId, Data, ClientState2} -> case gen_tcp:send(Socket, Data) of ok -> + ?STATS_INCR(["shackle.", ClientBin, ".send"]), Msg = {timeout, ExtRequestId}, TimerRef = erlang:send_after(Timeout, self(), Msg), shackle_queue:add(ExtRequestId, Cast, TimerRef), @@ -111,14 +115,16 @@ handle_msg({Request, #cast { end; handle_msg({tcp, Socket, Data}, {#state { client = Client, + client_bin = ClientBin, name = Name, pool_name = PoolName, socket = Socket } = State, ClientState}) -> + ?STATS_INCR(["shackle.", ClientBin, ".recv"]), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> - ?SERVER_UTILS:process_responses(Replies, Name), + ?SERVER_UTILS:process_responses(Replies, Name, ClientBin), {ok, {State, ClientState2}}; {error, Reason, ClientState2} -> ?WARN(PoolName, "handle_data error: ~p", [Reason]), @@ -132,11 +138,13 @@ handle_msg({tcp, Socket, Data}, {#state { close(State, ClientState) end; handle_msg({timeout, ExtRequestId}, {#state { + client_bin = ClientBin, name = Name } = State, ClientState}) -> case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, _TimerRef} -> + ?STATS_INCR(["shackle.", ClientBin, ".timeout"]), ?SERVER_UTILS:reply(Name, {error, timeout}, Cast); {error, not_found} -> ok diff --git a/src/shackle_udp_server.erl b/src/shackle_udp_server.erl index 845761c..0df2046 100644 --- a/src/shackle_udp_server.erl +++ b/src/shackle_udp_server.erl @@ -17,6 +17,7 @@ -record(state, { client :: client(), + client_bin :: binary(), header :: undefined | iodata(), init_options :: init_options(), ip :: inet:ip_address() | inet:hostname(), @@ -64,6 +65,7 @@ init(Name, Parent, Opts) -> {ok, {#state { client = Client, init_options = InitOptions, + client_bin = atom_to_binary(Client, latin1), ip = Ip, name = Name, parent = Parent, @@ -87,6 +89,7 @@ handle_msg({Request, #cast { timeout = Timeout } = Cast}, {#state { client = Client, + client_bin = ClientBin, header = Header, name = Name, pool_name = PoolName, @@ -97,6 +100,7 @@ handle_msg({Request, #cast { {ok, ExtRequestId, Data, ClientState2} -> case send(Socket, Header, Data) of ok -> + ?STATS_INCR(["shackle.", ClientBin, ".send"]), Msg = {timeout, ExtRequestId}, TimerRef = erlang:send_after(Timeout, self(), Msg), shackle_queue:add(ExtRequestId, Cast, TimerRef), @@ -118,14 +122,16 @@ handle_msg({inet_reply, _Socket, ok}, {State, ClientState}) -> {ok, {State, ClientState}}; handle_msg({udp, Socket, _Ip, _InPortNo, Data}, {#state { client = Client, + client_bin = ClientBin, name = Name, pool_name = PoolName, socket = Socket } = State, ClientState}) -> + ?STATS_INCR(["shackle.", ClientBin, ".recv"]), try Client:handle_data(Data, ClientState) of {ok, Replies, ClientState2} -> - ?SERVER_UTILS:process_responses(Replies, Name), + ?SERVER_UTILS:process_responses(Replies, Name, ClientBin), {ok, {State, ClientState2}}; {error, Reason, ClientState2} -> ?WARN(PoolName, "handle_data error: ~p", [Reason]), @@ -139,11 +145,13 @@ handle_msg({udp, Socket, _Ip, _InPortNo, Data}, {#state { close(State, ClientState) end; handle_msg({timeout, ExtRequestId}, {#state { + client_bin = ClientBin, name = Name } = State, ClientState}) -> case shackle_queue:remove(Name, ExtRequestId) of {ok, Cast, _TimerRef} -> + ?STATS_INCR(["shackle.", ClientBin, ".timeout"]), ?SERVER_UTILS:reply(Name, {error, timeout}, Cast); {error, not_found} -> ok