Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
surik committed Oct 21, 2015
0 parents commit ddabcc6
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
@@ -0,0 +1,4 @@
deps
ebin
.rebar
log
64 changes: 64 additions & 0 deletions README.md
@@ -0,0 +1,64 @@
# Exometer InfluxDB reporter

This reporter pushes data to [InfluxDB](https://influxdb.com/index.html).

## Usage

1. Add exometer_influxdb to your list of dependencies in rebar.config:

```erlang
{deps, [
{exometer_influxdb, ".*", {git, "https://github.com/surik/exometer_influxdb.git", "master"}}
]}.
```

2. Ensure exometer_influxdb is started before your application:

```erlang
{applications, [exometer_influxdb]}.
```

3. Configure it:

```erlang
{exometer,
{reporters, [
{exometer_influxdb, [{protocol, http},
{host, <<"localhost">>},
{port, 9090},
{db, <<"exometer">>},
{precision, n},
{tags, [region, ru]}]}
]}
}.
```

Available options:

* __host__ - InfluxDB host. `localhost` by default.
* __protocol__ - `http` or `udp` for operating with InfluxDB. `http` by default.
* __port__ - InfluxDB port. `8086` by default.
* __db__ - database on InfluxDB for writing data. `exometer` by default
* __username__ - username for authorization on InfluxDB. __Not implemented yet__.
* __password__ - password for authorization on InfluxDB. __Not implemented yet__.
* __precision__ = [n,u,ms,s,m,h] - sets the precision of the supplied Unix time values. `u` by default.
* __tags__ - list of default tags for each data point. Here always is `host` which local host name by default.

There is possibility to extend the default tag list which only has `host` by default. When you describe subscriptions list you can add tags to `Extra`.
For example:

```erlang
{exometer,
{subscriptions, [
{exometer_influxdb, [erlang, memory], total, 5000, true, [{tag, <<"value">>}]},
]}
}.

```

# TODO

* UDP support
* Auth support
* Tests
* Reconfiguration on runtime
4 changes: 4 additions & 0 deletions rebar.config
@@ -0,0 +1,4 @@
{deps, [
{hackney, ".*", {git, "git://github.com/benoitc/hackney.git", {branch, "master"}}},
{exometer_core, ".*", {git, "git://github.com/Feuerlabs/exometer_core.git", {branch, "master"}}}
]}.
13 changes: 13 additions & 0 deletions src/exometer_influxdb.app.src
@@ -0,0 +1,13 @@
{application, exometer_influxdb,
[
{description, ""},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
hackney,
exometer_core
]},
{env, []}
]}.
235 changes: 235 additions & 0 deletions src/exometer_influxdb.erl
@@ -0,0 +1,235 @@
-module(exometer_influxdb).

-behaviour(exometer_report).

%% gen_server callbacks
-export([exometer_init/1,
exometer_info/2,
exometer_cast/2,
exometer_call/3,
exometer_report/5,
exometer_subscribe/5,
exometer_unsubscribe/4,
exometer_newentry/2,
exometer_setopts/4,
exometer_terminate/2]).


-define(DEFAULT_HOST, <<"127.0.0.1">>).
-define(DEFAULT_DB, <<"exometer">>).
-define(DEFAULT_PROTOCOL, http).
-define(DEFAULT_PORT, 8086).
-define(DEFAULT_USERNAME, undefined).
-define(DEFAULT_PASSWORD, undefined).
-define(DEFAULT_PRECISION, u).

-type options() :: [{atom(), any()}].
-type value() :: any().
-type callback_result() :: {ok, state()} | any().
-type precision() :: n | u | ms | s | n | h.
-type protocol() :: http | udp.

-record(state, {protocol :: protocol(),
db :: binary(),
username :: undefined | binary(), % for http
password :: undefined | binary(), % for http
precision :: precision(),
tags :: map(),
connection :: gen_udp:socket() | reference()}).
-type state() :: #state{}.


%% ===================================================================
%% Public API
%% ===================================================================
-spec exometer_init(options()) -> callback_result().
exometer_init(Opts) ->
Host = get_opt(host, Opts, ?DEFAULT_HOST),
Protocol = get_opt(protocol, Opts, ?DEFAULT_PROTOCOL),
Port = get_opt(port, Opts, ?DEFAULT_PORT),
DB = get_opt(db, Opts, ?DEFAULT_DB),
Username = get_opt(username, Opts, ?DEFAULT_USERNAME),
Password = get_opt(password, Opts, ?DEFAULT_PASSWORD),
Precision = get_opt(precision, Opts, ?DEFAULT_PRECISION),
Tags = [{key(Key), Value} || {Key, Value} <- get_opt(tags, Opts, [])],
{ok, Connection} = connect(Protocol, Host, Port),
{ok, #state{protocol = Protocol,
db = DB,
username = Username,
password = Password,
precision = Precision,
tags = merge_tags([{<<"host">>, net_adm:localhost()}], Tags),
connection = Connection}}.

-spec exometer_report(exometer_report:metric(),
exometer_report:datapoint(),
exometer_report:extra(),
value(),
state()) -> callback_result().
exometer_report(Metric, DataPoint, Extra, Value, #state{tags = Tags} = State) ->
ExtraTags = case Extra of undefined -> []; _ -> Extra end,
Packet = make_packet(Metric, merge_tags(Tags, ExtraTags),
#{DataPoint => Value}, State#state.precision),
send(Packet, State).

-spec exometer_subscribe(exometer_report:metric(),
exometer_report:datapoint(),
exometer_report:interval(),
exometer_report:extra(),
state()) -> callback_result().
exometer_subscribe(_Metric, _DataPoint, _Interval, _Extra, State) ->
{ok, State}.

-spec exometer_unsubscribe(exometer_report:metric(),
exometer_report:datapoint(),
exometer_report:extra(),
state()) -> callback_result().
exometer_unsubscribe(_Metric, _DataPoint, _Extra, State) ->
{ok, State}.

-spec exometer_call(any(), pid(), state()) ->
{reply, any(), state()} | {noreply, state()} | any().
exometer_call(_Unknown, _From, State) ->
{ok, State}.

-spec exometer_cast(any(), state()) -> {noreply, state()} | any().
exometer_cast(_Unknown, State) ->
{ok, State}.

-spec exometer_info(any(), state()) -> callback_result().
exometer_info(_Unknown, State) ->
{ok, State}.

-spec exometer_newentry(exometer:entry(), state()) -> callback_result().
exometer_newentry(_Entry, State) ->
{ok, State}.

-spec exometer_setopts(exometer:entry(), options(),
exometer:status(), state()) -> callback_result().
exometer_setopts(_Metric, _Options, _Status, State) ->
{ok, State}.

-spec exometer_terminate(any(), state()) -> any().
exometer_terminate(_, _) ->
ignore.


%% ===================================================================
%% Internal functions
%% ===================================================================
-spec connect(protocol(), binary(), integer()) ->
{ok, pid() | reference()} | {error, term()}.
connect(http, Host, Port) ->
hackney:connect(hackney_tcp_transport, Host, Port, []);
connect(udp, _, _) -> {error, {udp, not_implemented}};
connect(Protocol, _, _) -> {error, {Protocol, not_supported}}.

-spec send(binary() | list(), state()) ->
{ok, state()} | {error, term()}.
send(Packet, #state{protocol = http, connection= Connection,
precision = Precision, db = DB} = State) ->
Url = hackney_url:make_url(<<"/">>, <<"write">>,
[{<<"db">>, DB}, {<<"precision">>, Precision}]),
Req = {post, Url, [], Packet},
case hackney:send_request(Connection, Req) of
{ok, 204, _, Ref} ->
hackney:body(Ref),
{ok, State};
{ok, _, _Headers, Ref} ->
{ok, Body} = hackney:body(Ref),
{error, Body};
{error, _} = Error -> Error
end;
send(_, #state{protocol = udp}) -> {error, {udp, not_implemented}};
send(_, #state{protocol = Protocol}) -> {error, {Protocol, not_supported}}.

-spec merge_tags(list() | map(), list() | map()) -> map().
merge_tags(Tags, AdditionalTags) when is_list(Tags) ->
merge_tags(maps:from_list(Tags), AdditionalTags);
merge_tags(Tags, AdditionalTags) when is_list(AdditionalTags) ->
merge_tags(Tags, maps:from_list(AdditionalTags));
merge_tags(Tags, AdditionalTags) when not is_map(AdditionalTags) -> Tags;
merge_tags(Tags, AdditionalTags) -> maps:merge(Tags, AdditionalTags).

-spec get_opt(atom(), list(), any()) -> any().
get_opt(K, Opts, Default) ->
exometer_util:get_opt(K, Opts, Default).

%% LINE PROTOCOL
-define(SEP(V), case V of <<>> -> <<>>; [] -> <<>>; _ -> <<$,>> end).

-spec microsecs() -> integer().
microsecs() ->
{MegaSecs, Secs, MicroSecs} =
try erlang:timestamp()
catch error:undef -> apply(erlang, now, [])
end,
MegaSecs * 1000000 * 1000000 + Secs * 1000000 + MicroSecs.

-spec convert_time_unit(integer(), erlang:time_unit() | minute | hour) ->
integer().
convert_time_unit(MicroSecs, minute) ->
round(convert_time_unit(MicroSecs, seconds) / 60);
convert_time_unit(MicroSecs, hour) ->
round(convert_time_unit(MicroSecs, seconds) / 3660);
convert_time_unit(MicroSecs, To) ->
erlang:convert_time_unit(MicroSecs, micro_seconds, To).

-spec unix_time(precision() | undefined) -> integer() | undefined.
unix_time(n) -> convert_time_unit(microsecs(), nano_seconds);
unix_time(u) -> microsecs();
unix_time(ms) -> convert_time_unit(microsecs(), milli_second);
unix_time(s) -> convert_time_unit(microsecs(), seconds);
unix_time(m) -> convert_time_unit(microsecs(), minute);
unix_time(h) -> convert_time_unit(microsecs(), hour);
unix_time(_) -> undefined.

-spec metric_to_string(list()) -> string().
metric_to_string([Final]) -> metric_elem_to_list(Final);
metric_to_string([H | T]) ->
metric_elem_to_list(H) ++ "_" ++ metric_to_string(T).

-spec metric_elem_to_list(atom() | string() | integer()) -> string().
metric_elem_to_list(E) when is_atom(E) -> atom_to_list(E);
metric_elem_to_list(E) when is_list(E) -> E;
metric_elem_to_list(E) when is_integer(E) -> integer_to_list(E).

-spec name(exometer_report:metric()) -> binary().
name(Metric) -> iolist_to_binary(metric_to_string(Metric)).

-spec key(atom() | list() | binary()) -> binary().
key(K) when is_list(K) -> key(list_to_binary(K));
key(K) when is_atom(K) -> key(atom_to_binary(K, utf8));
key(K) ->
binary:replace(K, [<<" ">>, <<$,>>, <<$=>>], <<$\\>>,
[global, {insert_replaced, 1}]).

-spec value(any()) -> binary() | list().
value(K) when is_atom(K) -> key(atom_to_binary(K, utf8));
value(V) when is_integer(V) -> [integer_to_binary(V), $i];
value(V) when is_float(V) -> float_to_binary(V);
value(V) when is_atom(V) -> value(atom_to_binary(V, utf8));
value(V) when is_list(V) -> value(list_to_binary(V));
value(V) when is_binary(V) ->
[$", binary:replace(V, <<$">>, <<$\\, $">>, [global]), $"].

-spec flatten_fields(list()) -> list().
flatten_fields(Fields) ->
maps:fold(fun(K, V, Acc) ->
[Acc, ?SEP(Acc), key(K), $=, value(V)]
end, <<>>, Fields).

-spec flatten_tags(map() | list()) -> list().
flatten_tags(Tags) when is_map(Tags) -> flatten_tags(maps:to_list(Tags));
flatten_tags(Tags) ->
lists:foldl(fun({K, V}, Acc) ->
[Acc, ?SEP(Acc), key(K), $=, key(V)]
end, [], lists:keysort(1, Tags)).

-spec make_packet(exometer_report:metric(), map() | list(),
list(), precision()) -> list().
make_packet(Measurement, Tags, Fields, Precision) ->
BinaryTags = flatten_tags(Tags),
BinaryFields = flatten_fields(Fields),
[name(Measurement), ?SEP(BinaryTags), BinaryTags, " ", BinaryFields,
" ", integer_to_binary(unix_time(Precision))].

0 comments on commit ddabcc6

Please sign in to comment.