Skip to content

Commit

Permalink
Instrument with statsderl
Browse files Browse the repository at this point in the history
  • Loading branch information
lpgauth committed Jul 20, 2018
1 parent 1cf730d commit 5211fb6
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 14 deletions.
2 changes: 2 additions & 0 deletions include/shackle_internal.hrl
Expand Up @@ -7,6 +7,8 @@
-define(LOOKUP(Key, List, Default), shackle_utils:lookup(Key, List, Default)).
-define(MSG_CONNECT, connect).
-define(SERVER, shackle_server).
-define(SR, 0.005).
-define(STATS_INCR(Key), statsderl:increment(Key, 1, ?SR)).
-define(SUPERVISOR, shackle_sup).
-define(SERVER_UTILS, shackle_server_utils).
-define(WARN(PoolName, Format, Data), shackle_utils:warning_msg(PoolName, Format, Data)).
Expand Down
4 changes: 3 additions & 1 deletion rebar.config
Expand Up @@ -17,7 +17,8 @@
{deps, [
{foil, "0.1.1"},
{granderl, "0.1.5"},
{metal, "0.1.1"}
{metal, "0.1.1"},
{statsderl, "0.5.2"}
]}.

{edoc_opts, [
Expand All @@ -32,6 +33,7 @@

{erl_opts, [
debug_info,
{parse_transform, statsderl_transform},
{platform_define, "18|19|^2", 'ETS_TAKE'},
{platform_define, "19|^2", 'UDP_HEADER'}
]}.
Expand Down
4 changes: 3 additions & 1 deletion rebar.config.script
Expand Up @@ -8,7 +8,9 @@ Config = case erlang:function_exported(rebar3, main, 1) of
{granderl, ".*",
{git, "https://github.com/tokenrove/granderl.git", {tag, "v0.1.5"}}},
{metal, ".*",
{git, "https://github.com/lpgauth/metal.git", {tag, "0.1.1"}}}
{git, "https://github.com/lpgauth/metal.git", {tag, "0.1.1"}}},
{statsderl, ".*",
{git, "https://github.com/lpgauth/statsderl.git", {tag, "0.5.2"}}}
]} | lists:keydelete(deps, 1, CONFIG)]
end,

Expand Down
8 changes: 6 additions & 2 deletions rebar.lock
@@ -1,10 +1,14 @@
{"1.1.0",
[{<<"foil">>,{pkg,<<"foil">>,<<"0.1.1">>},0},
{<<"granderl">>,{pkg,<<"granderl">>,<<"0.1.5">>},0},
{<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0}]}.
{<<"metal">>,{pkg,<<"metal">>,<<"0.1.1">>},0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.0.0">>},1},
{<<"statsderl">>,{pkg,<<"statsderl">>,<<"0.5.2">>},0}]}.
[
{pkg_hash,[
{<<"foil">>, <<"4D07B62C114636BBC3EEBD5CEE04B23A7AAB1262B0F68AA79005A6FBC3790472">>},
{<<"granderl">>, <<"F20077A68BD80B8D8783BD15A052813C6483771DEC1A5B837D307CBE92F14122">>},
{<<"metal">>, <<"5D3D1322DA7BCD34B94FED5486F577973685298883954F7A3E517EF5EF6953F5">>}]}
{<<"metal">>, <<"5D3D1322DA7BCD34B94FED5486F577973685298883954F7A3E517EF5EF6953F5">>},
{<<"parse_trans">>, <<"9E96B1C9C3A0DF54E7B76F8F685D38BFA1EB21B31E042B1D1A5A70258E4DB1E3">>},
{<<"statsderl">>, <<"9FBE68445DA83A86E2509D007823E94CB8A71561F0428058BD5ABE9B62386A32">>}]}
].
2 changes: 1 addition & 1 deletion src/shackle.app.src
@@ -1,5 +1,5 @@
{application, shackle, [
{applications, [kernel, stdlib, granderl, metal, foil, ssl]},
{applications, [kernel, stdlib, granderl, metal, foil, ssl, statsderl]},
{description, "High-Performance Erlang Network Client Framework"},
{env, []},
{licenses, ["MIT"]},
Expand Down
23 changes: 23 additions & 0 deletions src/shackle_pool.erl
Expand Up @@ -85,6 +85,8 @@ server(Name) ->
true ->
{ok, Client, Server};
false ->
Key = ["shackle.", client_bin(Client), ".backlog_full"],
?STATS_INCR(Key),
{error, backlog_full}
end;
{error, Reson} ->
Expand Down Expand Up @@ -112,6 +114,27 @@ cleanup_foil(Name, #pool_options {pool_size = PoolSize}) ->
[foil:delete(?MODULE, {Name, N}) || N <- lists:seq(1, PoolSize)],
foil:load(?MODULE).

client_bin(acr_historical_client) ->
<<"acr_historical_client">>;
client_bin(acr_instant_client) ->
<<"acr_instant_client">>;
client_bin(anchor_client) ->
<<"anchor_client">>;
client_bin(buoy_client) ->
<<"buoy_client">>;
client_bin(flare_client) ->
<<"flare_client">>;
client_bin(identifyd_client) ->
<<"identifyd_client">>;
client_bin(iplists_client) ->
<<"iplists_client">>;
client_bin(marina_client) ->
<<"marina_client">>;
client_bin(pacingderl_client) ->
<<"pacingderl_client">>;
client_bin(Atom) ->
atom_to_binary(Atom, latin1).

options(Name) ->
case foil:lookup(?MODULE, Name) of
{ok, Options} ->
Expand Down
17 changes: 11 additions & 6 deletions src/shackle_server_utils.erl
Expand Up @@ -8,7 +8,7 @@
-export([
cancel_timer/1,
client/5,
process_responses/2,
process_responses/3,
reconnect_state/1,
reconnect_state_reset/1,
reply/3,
Expand All @@ -35,20 +35,25 @@ client(Client, PoolName, InitOptions, SocketType, Socket) ->
{error, Reason, undefined}
end.

-spec process_responses([response()], server_name()) ->
-spec process_responses([response()], server_name(), binary()) ->
ok.

process_responses([], _Name) ->
process_responses([], _Name, _ClientBin) ->
ok;
process_responses([{ExtRequestId, Reply} | T], Name) ->
process_responses([{ExtRequestId, Reply} | T], Name, ClientBin) ->
?STATS_INCR(["shackle.", ClientBin, ".replies"]),
case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, TimerRef} ->
{ok, #cast {timestamp = Timestamp} = Cast, TimerRef} ->
?STATS_INCR(["shackle.", ClientBin, ".found"]),
Diff = timer:now_diff(os:timestamp(), Timestamp),
statsderl:timing(["shackle.", ClientBin, ".reply"], Diff, ?SR),
erlang:cancel_timer(TimerRef),
reply(Name, Reply, Cast);
{error, not_found} ->
?STATS_INCR(["shackle.", ClientBin, ".not_found"]),
ok
end,
process_responses(T, Name).
process_responses(T, Name, ClientBin).

-spec reconnect_state(client_options()) ->
undefined | reconnect_state().
Expand Down
10 changes: 9 additions & 1 deletion src/shackle_ssl_server.erl
Expand Up @@ -18,6 +18,7 @@
-record(state, {
client :: client(),
init_options :: init_options(),
client_bin :: binary(),
ip :: inet:ip_address() | inet:hostname(),
name :: server_name(),
parent :: pid(),
Expand Down Expand Up @@ -60,6 +61,7 @@ init(Name, Parent, Opts) ->
{ok, {#state {
client = Client,
init_options = InitOptions,
client_bin = atom_to_binary(Client, latin1),
ip = Ip,
name = Name,
parent = Parent,
Expand All @@ -83,6 +85,7 @@ handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
client_bin = ClientBin,
name = Name,
pool_name = PoolName,
socket = Socket
Expand All @@ -92,6 +95,7 @@ handle_msg({Request, #cast {
{ok, ExtRequestId, Data, ClientState2} ->
case ssl:send(Socket, Data) of
ok ->
?STATS_INCR(["shackle.", ClientBin, ".send"]),
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
Expand All @@ -111,14 +115,16 @@ handle_msg({Request, #cast {
end;
handle_msg({ssl, Socket, Data}, {#state {
client = Client,
client_bin = ClientBin,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->

?STATS_INCR(["shackle.", ClientBin, ".recv"]),
try Client:handle_data(Data, ClientState) of
{ok, Replies, ClientState2} ->
?SERVER_UTILS:process_responses(Replies, Name),
?SERVER_UTILS:process_responses(Replies, Name, ClientBin),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handle_data error: ~p", [Reason]),
Expand All @@ -132,11 +138,13 @@ handle_msg({ssl, Socket, Data}, {#state {
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
client_bin = ClientBin,
name = Name
} = State, ClientState}) ->

case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
?STATS_INCR(["shackle.", ClientBin, ".timeout"]),
?SERVER_UTILS:reply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
Expand Down
10 changes: 9 additions & 1 deletion src/shackle_tcp_server.erl
Expand Up @@ -18,6 +18,7 @@
-record(state, {
client :: client(),
init_options :: init_options(),
client_bin :: binary(),
ip :: inet:ip_address() | inet:hostname(),
name :: server_name(),
parent :: pid(),
Expand Down Expand Up @@ -60,6 +61,7 @@ init(Name, Parent, Opts) ->
{ok, {#state {
client = Client,
init_options = InitOptions,
client_bin = atom_to_binary(Client, latin1),
ip = Ip,
name = Name,
parent = Parent,
Expand All @@ -83,6 +85,7 @@ handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
client_bin = ClientBin,
name = Name,
pool_name = PoolName,
socket = Socket
Expand All @@ -92,6 +95,7 @@ handle_msg({Request, #cast {
{ok, ExtRequestId, Data, ClientState2} ->
case gen_tcp:send(Socket, Data) of
ok ->
?STATS_INCR(["shackle.", ClientBin, ".send"]),
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
Expand All @@ -111,14 +115,16 @@ handle_msg({Request, #cast {
end;
handle_msg({tcp, Socket, Data}, {#state {
client = Client,
client_bin = ClientBin,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->

?STATS_INCR(["shackle.", ClientBin, ".recv"]),
try Client:handle_data(Data, ClientState) of
{ok, Replies, ClientState2} ->
?SERVER_UTILS:process_responses(Replies, Name),
?SERVER_UTILS:process_responses(Replies, Name, ClientBin),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handle_data error: ~p", [Reason]),
Expand All @@ -132,11 +138,13 @@ handle_msg({tcp, Socket, Data}, {#state {
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
client_bin = ClientBin,
name = Name
} = State, ClientState}) ->

case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
?STATS_INCR(["shackle.", ClientBin, ".timeout"]),
?SERVER_UTILS:reply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
Expand Down
10 changes: 9 additions & 1 deletion src/shackle_udp_server.erl
Expand Up @@ -17,6 +17,7 @@

-record(state, {
client :: client(),
client_bin :: binary(),
header :: undefined | iodata(),
init_options :: init_options(),
ip :: inet:ip_address() | inet:hostname(),
Expand Down Expand Up @@ -64,6 +65,7 @@ init(Name, Parent, Opts) ->
{ok, {#state {
client = Client,
init_options = InitOptions,
client_bin = atom_to_binary(Client, latin1),
ip = Ip,
name = Name,
parent = Parent,
Expand All @@ -87,6 +89,7 @@ handle_msg({Request, #cast {
timeout = Timeout
} = Cast}, {#state {
client = Client,
client_bin = ClientBin,
header = Header,
name = Name,
pool_name = PoolName,
Expand All @@ -97,6 +100,7 @@ handle_msg({Request, #cast {
{ok, ExtRequestId, Data, ClientState2} ->
case send(Socket, Header, Data) of
ok ->
?STATS_INCR(["shackle.", ClientBin, ".send"]),
Msg = {timeout, ExtRequestId},
TimerRef = erlang:send_after(Timeout, self(), Msg),
shackle_queue:add(ExtRequestId, Cast, TimerRef),
Expand All @@ -118,14 +122,16 @@ handle_msg({inet_reply, _Socket, ok}, {State, ClientState}) ->
{ok, {State, ClientState}};
handle_msg({udp, Socket, _Ip, _InPortNo, Data}, {#state {
client = Client,
client_bin = ClientBin,
name = Name,
pool_name = PoolName,
socket = Socket
} = State, ClientState}) ->

?STATS_INCR(["shackle.", ClientBin, ".recv"]),
try Client:handle_data(Data, ClientState) of
{ok, Replies, ClientState2} ->
?SERVER_UTILS:process_responses(Replies, Name),
?SERVER_UTILS:process_responses(Replies, Name, ClientBin),
{ok, {State, ClientState2}};
{error, Reason, ClientState2} ->
?WARN(PoolName, "handle_data error: ~p", [Reason]),
Expand All @@ -139,11 +145,13 @@ handle_msg({udp, Socket, _Ip, _InPortNo, Data}, {#state {
close(State, ClientState)
end;
handle_msg({timeout, ExtRequestId}, {#state {
client_bin = ClientBin,
name = Name
} = State, ClientState}) ->

case shackle_queue:remove(Name, ExtRequestId) of
{ok, Cast, _TimerRef} ->
?STATS_INCR(["shackle.", ClientBin, ".timeout"]),
?SERVER_UTILS:reply(Name, {error, timeout}, Cast);
{error, not_found} ->
ok
Expand Down

0 comments on commit 5211fb6

Please sign in to comment.