Permalink
Browse files

Complete gen_server implementation (statsderl.erl) + Add RED to limit…

… memory usage
  • Loading branch information...
lpgauth committed Sep 9, 2011
1 parent e8bd054 commit cc2e43a03c550fb05023bbb80f779dc669ecc110
Showing with 95 additions and 4 deletions.
  1. +1 −1 src/statsderl.app.src
  2. +86 −3 src/statsderl.erl
  3. +8 −0 src/statsderl_sup.erl
View
@@ -8,7 +8,7 @@
]},
{mod, { statsderl_app, []}},
{env, [
- {hostname, {127, 0, 0, 1}},
+ {hostname, {127,0,0,1}},
{port, 8125}
]}
]}.
View
@@ -1,12 +1,21 @@
-module(statsderl).
-behaviour(gen_server).
+
-define(SERVER, ?MODULE).
+-define(LOW_BACKLOG, 5000).
+-define(HIGH_BACKLOG, 10000).
+
+-record(state, {
+ hostname,
+ port,
+ socket
+}).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
--export([start_link/0]).
+-export([start_link/0, increment/3, decrement/3, timing/3]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
@@ -21,17 +30,43 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+increment(Key, Magnitude, SampleRate) ->
+ Stats = io_lib:format("~s:~B|c|@~f", [Key, Magnitude, SampleRate]),
+ udp_send(Stats, SampleRate).
+
+decrement(Key, Magnitude, SampleRate) ->
+ Stats = io_lib:format("~s:-~B|c|@~f", [Key, Magnitude, SampleRate]),
+ udp_send(Stats, SampleRate).
+
+timing(Key, Timestamp, SampleRate) ->
+ Timing = timer:now_diff(erlang:now(), Timestamp) div 1000,
+ Stats = io_lib:format("~s:~B|ms|@~f", [Key, Timing, SampleRate]),
+ udp_send(Stats, SampleRate).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
-init(Args) ->
- {ok, Args}.
+init(_Args) ->
+ {ok, Hostname} = application:get_env(statsderl, hostname),
+ {ok, Port} = application:get_env(statsderl, port),
+ {ok, Socket} = gen_udp:open(0, [{active, false}]),
+ State = #state {
+ hostname = Hostname,
+ port = Port,
+ socket = Socket
+ },
+ {ok, State}.
handle_call(_Request, _From, State) ->
{noreply, ok, State}.
+handle_cast({udp_send, Stats},
+ #state{hostname=Hostname, port=Port, socket=Socket}=State) ->
+ gen_udp:send(Socket, Hostname, Port, Stats),
+ decrease_backlog(),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -48,3 +83,51 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------
+udp_send(Stats, SampleRate) ->
+ case sample(SampleRate) of
+ true ->
+ increase_backlog(),
+ gen_server:cast(?MODULE, {udp_send, Stats});
+ false ->
+ ok
+ end.
+
+sample(SampleRate) ->
+ case red() of
+ true ->
+ false;
+ false ->
+ random:seed(erlang:now()),
+ random:uniform() =< SampleRate
+ end.
+
+%% ------------------------------------------------------------------
+%% Random Early Drop
+%% http://en.wikipedia.org/wiki/Random_early_detection
+%% ------------------------------------------------------------------
+
+red() ->
+ Requests = ets:lookup_element(statsderl, backlog, 2),
+ case Requests of
+ _ when ?LOW_BACKLOG >= Requests->
+ false;
+ _ when Requests >= ?HIGH_BACKLOG ->
+ true;
+ _ ->
+ random_drop_function(Requests)
+ end.
+
+random_drop_function(Requests) ->
+ Distribution = ?HIGH_BACKLOG - Requests,
+ case erlang:phash2({self(), now()}, Distribution) + 1 of
+ Distribution ->
+ true;
+ _ ->
+ false
+ end.
+
+increase_backlog() ->
+ ets:update_counter(statsderl, backlog, 1).
+
+decrease_backlog() ->
+ ets:update_counter(statsderl, backlog, -1).
View
@@ -17,6 +17,7 @@
%% ===================================================================
start_link() ->
+ init_red(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
@@ -26,3 +27,10 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10}, [?CHILD(statsderl, worker)]} }.
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+init_red() ->
+ statsderl = ets:new(statsderl, [set, public, named_table, {read_concurrency, true}]),
+ true = ets:insert(statsderl, {backlog, 0}).

0 comments on commit cc2e43a

Please sign in to comment.