From 31f160e24c1d11ef26a3fd8df9858a65e788064a Mon Sep 17 00:00:00 2001 From: Ilya Vassilevsky Date: Fri, 11 Jan 2019 03:46:11 +0400 Subject: [PATCH] Honor UDP packet size (MTU) limit UDP packets cannot exceed MTU. If they do, the app gets {error,emsgsize} from gen_udp:send. Similar issue: https://github.com/jaegertracing/jaeger-client-node/issues/124 This commit adds packet size check. Packets are sent immediately upon reaching max size, even before the end of batch window. Size calculation from CollectedMetrics map would've been costly. I changed the format to binary. Metrics are encoded into Line Protocol immediately now, instead of right before sending. This also helps avoid data loss. The map did not record all timestamps. It set the most recent timestamp for all fields of the same measurement. It also did not honor differences in tags, but I guess there aren't any when metrics go through Exometer. --- README.md | 5 +- rebar.config | 2 + src/exometer_report_influxdb.erl | 105 ++++++++++++++----------------- src/state.hrl | 23 +++++++ test/exometer_influxdb_test.erl | 39 +++++++++++- 5 files changed, 113 insertions(+), 61 deletions(-) create mode 100644 src/state.hrl diff --git a/README.md b/README.md index 1516d6c..5f75e1c 100644 --- a/README.md +++ b/README.md @@ -39,14 +39,15 @@ This reporter pushes data to [InfluxDB](https://influxdb.com/index.html). Available options: +* __protocol__ - `http`, `https` or `udp` for operating with InfluxDB. `http` by default. If you use `udp`, check __udp_mtu__ below to avoid `{error,emsgsize}`. * __host__ - InfluxDB host. `127.0.0.1` by default. -* __protocol__ - `http`, `https` or `udp` for operating with InfluxDB. `http` by default. * __port__ - InfluxDB port. `8086` by default. * __db__ - Database on InfluxDB for writing data. `exometer` by default. * __username__ - Username for authorization on InfluxDB. * __password__ - Password for authorization on InfluxDB. * __timestamping__ - Enable timestamping, `false` by default. To enable `timestamping` with the reporter you can use `true` or `{true, Precision}` where `Precision` is a unit taken from `[n,u,ms,s,m,h]`. The default unit is `u`. -* __batch_window_size__ - set window size in ms for batch sending. This means the reporter will collect measurements within this interval and send all measurements in one packet. `0` by default. +* __batch_window_size__ - Set window size in ms for batch sending. This means the reporter will collect measurements within this interval and send all measurements in one packet. `0` by default. +* __udp_mtu__ - (Used only with __protocol__ == `udp`.) MTU of the network interface through which UDP packets flow to the __host__. `65536` by default (Linux loopback interface MTU). Run `ifconfig` on the machine where this app will run to find it out. Metrics will be sent out if their size (in the Line Protocol format) exceeds this value, even if the current __batch_window_size__ is not over yet. (They will also be sent out at the end of __batch_window_size__ as usual, regardless of their size.) The following options can be set globally in the reporter config or locally in a specific subscription. The latter case overwrites the first. diff --git a/rebar.config b/rebar.config index c7d3bd9..3246aea 100644 --- a/rebar.config +++ b/rebar.config @@ -19,3 +19,5 @@ ]} ]} ]}. + +{cover_enabled, true}. diff --git a/src/exometer_report_influxdb.erl b/src/exometer_report_influxdb.erl index e8052a5..1ad1090 100644 --- a/src/exometer_report_influxdb.erl +++ b/src/exometer_report_influxdb.erl @@ -17,7 +17,9 @@ -ifdef(TEST). -export([evaluate_subscription_options/5, - make_packet/5]). + make_packet/5, + maybe_send/4, + name/1]). -endif. -include_lib("exometer_core/include/exometer.hrl"). @@ -33,40 +35,25 @@ -define(DEFAULT_FORMATTING, []). -define(DEFAULT_TIMESTAMP_OPT, false). -define(DEFAULT_BATCH_WINDOW_SIZE, 0). +-define(DEFAULT_UDP_MTU, 65536). -define(DEFAULT_AUTOSUBSCRIBE, false). -define(DEFAULT_SUBSCRIPTIONS_MOD, undefined). -define(VALID_PRECISIONS, [n, u, ms, s, m, h]). +% https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure +-define(MAX_UDP_PACKET_SIZE, 65535). +-define(UDP_HEADER_SIZE, 8). +-define(IP_HEADER_SIZE, 20). + -define(HTTP(Proto), (Proto =:= http orelse Proto =:= https)). +-include("state.hrl"). -include("log.hrl"). -type options() :: [{atom(), any()}]. -type value() :: any(). -type callback_result() :: {ok, state()} | any(). --type precision() :: n | u | ms | s | m | h. --type protocol() :: http | udp. - --record(state, {protocol :: protocol(), - db :: binary(), % for http - username :: undefined | binary(), % for http - password :: undefined | binary(), % for http - host :: inet:ip_address() | inet:hostname(), % for udp - port :: inet:port_number(), % for udp - timestamping :: boolean(), - precision :: precision(), - collected_metrics = #{} :: map(), - batch_window_size = 0 :: integer(), - tags :: map(), - series_name :: atom() | binary(), - formatting :: list(), - metrics :: map(), - autosubscribe :: boolean(), - subscriptions_module :: module(), - connection :: gen_udp:socket() | reference()}). --type state() :: #state{}. - %% =================================================================== %% Public API @@ -81,6 +68,7 @@ exometer_init(Opts) -> Password = get_opt(password, Opts, ?DEFAULT_PASSWORD), TimestampOpt = get_opt(timestamping, Opts, ?DEFAULT_TIMESTAMP_OPT), BatchWinSize = get_opt(batch_window_size, Opts, ?DEFAULT_BATCH_WINDOW_SIZE), + UDP_MTU = get_opt(udp_mtu, Opts, ?DEFAULT_UDP_MTU), {Timestamping, Precision} = evaluate_timestamp_opt(TimestampOpt), Tags = [{key(Key), Value} || {Key, Value} <- get_opt(tags, Opts, [])], SeriesName = get_opt(series_name, Opts, ?DEFAULT_SERIES_NAME), @@ -100,6 +88,7 @@ exometer_init(Opts) -> series_name = SeriesName, formatting = Formatting, batch_window_size = BatchWinSize, + max_udp_size = max_udp_size(UDP_MTU), autosubscribe = Autosubscribe, subscriptions_module = SubscriptionsMod, metrics = maps:new()}, @@ -128,7 +117,7 @@ exometer_report(Metric, DataPoint, _Extra, Value, #state{metrics = Metrics} = State) -> case maps:get(Metric, Metrics, not_found) of {MetricName, Tags} -> - maybe_send(Metric, MetricName, Tags, + maybe_send(MetricName, Tags, maps:from_list([{DataPoint, Value}]), State); Error -> ?warning("InfluxDB reporter got trouble when looking ~p metric's tag: ~p", @@ -175,13 +164,9 @@ exometer_cast(_Unknown, State) -> exometer_info({exometer_influxdb, reconnect}, State) -> reconnect(State); exometer_info({exometer_influxdb, send}, - #state{precision = Precision, - collected_metrics = CollectedMetrics} = State) -> - if CollectedMetrics /= #{} -> - Packets = [make_packet(MetricName, Tags, Fileds, Timestamping, Precision) ++ "\n" - || {_, {MetricName, Tags, Fileds, Timestamping}} - <- maps:to_list(CollectedMetrics)], - send(Packets, State#state{collected_metrics = #{}}); + #state{collected_metrics = CollectedMetrics} = State) -> + if size(CollectedMetrics) > 0 -> + send(CollectedMetrics, State#state{collected_metrics = <<>>}); true -> {ok, State} end; exometer_info(_Unknown, State) -> @@ -272,36 +257,35 @@ prepare_batch_send(Time) -> prepare_reconnect() -> erlang:send_after(1000, self(), {exometer_influxdb, reconnect}). --spec maybe_send(list(), list(), map(), map(), state()) -> +-spec maybe_send(list(), map(), map(), state()) -> {ok, state()} | {error, term()}. -maybe_send(OriginMetricName, MetricName, Tags0, Fields, - #state{batch_window_size = BatchWinSize, +maybe_send(MetricName, Tags, Fields, + #state{batch_window_size = 0, + precision = Precision, + timestamping = Timestamping} = State) -> + Packet = make_packet(MetricName, Tags, Fields, Timestamping andalso unix_time(Precision), Precision), + send(Packet, State); +maybe_send(MetricName, Tags, Fields, + #state{protocol = Protocol, + batch_window_size = BatchWindowSize, + max_udp_size = MaxUDPSize, precision = Precision, timestamping = Timestamping, - collected_metrics = CollectedMetrics} = State) - when BatchWinSize > 0 -> - NewCollectedMetrics = case maps:get(OriginMetricName, CollectedMetrics, not_found) of - {MetricName, Tags, Fields1} -> - NewFields = maps:merge(Fields, Fields1), - maps:put(OriginMetricName, - {MetricName, Tags, NewFields, Timestamping andalso unix_time(Precision)}, - CollectedMetrics); - {MetricName, Tags, Fields1, _OrigTimestamp} -> - NewFields = maps:merge(Fields, Fields1), - maps:put(OriginMetricName, - {MetricName, Tags, NewFields, Timestamping andalso unix_time(Precision)}, - CollectedMetrics); - not_found -> - maps:put(OriginMetricName, - {MetricName, Tags0, Fields, Timestamping andalso unix_time(Precision)}, - CollectedMetrics) - end, - maps:size(CollectedMetrics) == 0 andalso prepare_batch_send(BatchWinSize), - {ok, State#state{collected_metrics = NewCollectedMetrics}}; -maybe_send(_, MetricName, Tags, Fields, - #state{timestamping = Timestamping, precision = Precision} = State) -> - Packet = make_packet(MetricName, Tags, Fields, Timestamping, Precision), - send(Packet, State). + collected_metrics = CollectedMetrics} = State) -> + maybe_start_new_window(BatchWindowSize, CollectedMetrics), + Packet = make_packet(MetricName, Tags, Fields, Timestamping andalso unix_time(Precision), Precision), + BinaryPacket = list_to_binary(Packet), + NewCollectedMetrics = <>, + if + Protocol == udp andalso size(CollectedMetrics) > 0 andalso size(NewCollectedMetrics) > MaxUDPSize -> + send(CollectedMetrics, State#state{collected_metrics = BinaryPacket}); + true -> + {ok, State#state{collected_metrics = NewCollectedMetrics}} + end. + +maybe_start_new_window(Window, Metrics) when size(Metrics) == 0 -> + prepare_batch_send(Window); +maybe_start_new_window(_, _) -> ok. -spec send(binary() | list(), state()) -> {ok, state()} | {error, term()}. @@ -338,6 +322,11 @@ send(Packet, #state{protocol = udp, connection = Socket, end; send(_, #state{protocol = Protocol}) -> {error, {Protocol, not_supported}}. +max_udp_size(MTU) when MTU > ?MAX_UDP_PACKET_SIZE -> + max_udp_size(?MAX_UDP_PACKET_SIZE); +max_udp_size(MaxPacketSize) -> + MaxPacketSize - ?UDP_HEADER_SIZE - ?IP_HEADER_SIZE. + -spec merge_tags(list() | map(), list() | map()) -> map(). merge_tags(Tags, AdditionalTags) when is_list(Tags) -> merge_tags(maps:from_list(Tags), AdditionalTags); diff --git a/src/state.hrl b/src/state.hrl new file mode 100644 index 0000000..6003481 --- /dev/null +++ b/src/state.hrl @@ -0,0 +1,23 @@ +-type precision() :: n | u | ms | s | m | h. +-type protocol() :: http | udp. + +-record(state, {protocol :: protocol(), + db :: binary(), % for http + username :: undefined | binary(), % for http + password :: undefined | binary(), % for http + host :: inet:ip_address() | inet:hostname(), % for udp + port :: inet:port_number(), % for udp + timestamping :: boolean(), + precision :: precision(), + collected_metrics = <<>> :: binary(), + batch_window_size :: non_neg_integer(), + max_udp_size :: pos_integer(), + tags :: map(), + series_name :: atom() | binary(), + formatting :: list(), + metrics :: map(), + autosubscribe :: boolean(), + subscriptions_module :: module(), + connection :: gen_udp:socket() | reference()}). + +-type state() :: #state{}. diff --git a/test/exometer_influxdb_test.erl b/test/exometer_influxdb_test.erl index eeadf2f..b37d759 100644 --- a/test/exometer_influxdb_test.erl +++ b/test/exometer_influxdb_test.erl @@ -3,8 +3,13 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("exometer_core/include/exometer.hrl"). +-include("state.hrl"). + -import(exometer_report_influxdb, [evaluate_subscription_options/5, - make_packet/5]). + exometer_info/2, + make_packet/5, + maybe_send/4, + name/1]). evaluate_subscription_options(MetricId, Options) -> @@ -145,5 +150,37 @@ subscribtions_module_test() -> gen_udp:close(Socket), ok. +send_udp_packet_when_it_hits_size_limit_test() -> + TestServerPort = 55555, + {ok, ServerSocket} = gen_udp:open(TestServerPort, [{active, false}]), + {ok, ClientSocket} = gen_udp:open(0), + State1 = #state{protocol = udp, + connection = ClientSocket, + host = "localhost", + port = TestServerPort, + batch_window_size = 99999, + max_udp_size = 70, + precision = s, + timestamping = false}, + {ok, State2} = maybe_send(flights, [{type, departure}], #{count => 3}, State1), % fits into packet - send + {ok, State3} = maybe_send(flights, [{type, arrival}], #{count => 2}, State2), % fits into packet - send + {ok, State4} = maybe_send(flights, [{type, arrival}], #{count => 1}, State3), % doesn't fit - save for later + {ok, {_Address, _Port, Packet1}} = gen_udp:recv(ServerSocket, 0, 9999), + ?assertEqual("flights,type=departure count=3i \n" + "flights,type=arrival count=2i \n", Packet1), + ?assertEqual(<<"flights,type=arrival count=1i ">>, State4#state.collected_metrics), + + % Send the remaining metrics when the current batch window ends + {ok, State5} = exometer_info({exometer_influxdb, send}, State4), + {ok, {_Address, _Port, Packet2}} = gen_udp:recv(ServerSocket, 0, 9999), + ?assertEqual("flights,type=arrival count=1i ", Packet2), + ?assertEqual(<<>>, State5#state.collected_metrics). + +name_test() -> + ?assertEqual(<<"things">>, name(things)), + ?assertEqual(<<"things">>, name(<<"things">>)), + ?assertEqual(<<"foo_bar">>, name([foo, bar])), + ?assertEqual(<<"97_98_99">>, name("abc")). + make_bin_packet(Name, Tags, Fields, Timestamping, Precision) -> binary:list_to_bin(make_packet(Name, Tags, Fields, Timestamping, Precision)).