Skip to content

Commit

Permalink
Added another property "timestamp_in_ms" into header.
Browse files Browse the repository at this point in the history
  • Loading branch information
hradilf committed Jul 21, 2017
1 parent 1a5c5cc commit 4f24180
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
@@ -1,6 +1,6 @@
# RabbitMQ Message Timestamp Plugin #

This plugin fills the `timestamp` property of a message as it enters
This plugin fills the `timestamp` and `timestamp_in_ms` properties of a message as it enters
RabbitMQ with the current (server node) timestamp value.

## Supported RabbitMQ Versions ##
Expand Down
17 changes: 17 additions & 0 deletions include/rabbit_message_timestamp.hrl
@@ -0,0 +1,17 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
%%

-define(TIMESTAMP_IN_MS, <<"timestamp_in_ms">>).
19 changes: 18 additions & 1 deletion src/rabbit_timestamp_interceptor.erl
Expand Up @@ -18,6 +18,7 @@

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_message_timestamp.hrl").

-behaviour(rabbit_channel_interceptor).

Expand All @@ -44,8 +45,10 @@ description() ->
intercept(#'basic.publish'{} = Method, Content, _IState) ->
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
Timestamp = os:system_time(seconds),
TimestampMs = os:system_time(milli_seconds),
Content2 = set_content_timestamp(DecodedContent, Timestamp),
{Method, Content2};
Content3 = set_content_timestamp_millis(Content2, TimestampMs),
{Method, Content3};

intercept(Method, Content, _VHost) ->
{Method, Content}.
Expand All @@ -66,3 +69,17 @@ set_content_timestamp(#content{properties = Props} = Content, Timestamp)
%% get serialized when deliverying the message.
Content#content{properties = Props#'P_basic'{timestamp = Timestamp},
properties_bin = none}.

set_content_timestamp_millis(#content{properties = #'P_basic'{headers = Headers} = Props} = Content, TimestampMs) ->
NewHeaders = add_header(Headers, new_timestamp_millis_header(TimestampMs)),
Content#content{
properties = Props#'P_basic'{headers = NewHeaders},
properties_bin = none
}.

add_header(undefined, Header) -> [Header];
add_header(Headers, Header) ->
lists:keystore(element(1, Header), 1, Headers, Header).

new_timestamp_millis_header(TimestampMs) ->
{?TIMESTAMP_IN_MS, long, TimestampMs}.
9 changes: 8 additions & 1 deletion test/rabbit_message_timestamp_SUITE.erl
Expand Up @@ -21,6 +21,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit_message_timestamp.hrl").

-define(SEND_DELAY, 1000).

Expand Down Expand Up @@ -92,7 +93,9 @@ timestamp_test(Config) ->
[begin
?assertNotEqual(get_timestamp(Msg), undefined),
?assert(is_integer(get_timestamp(Msg))),
?assert(get_timestamp(Msg) > 0)
?assert(get_timestamp(Msg) > 0),
?assertNotEqual(get_header_property(?TIMESTAMP_IN_MS, Msg), false),
?assert(get_header_property(?TIMESTAMP_IN_MS, Msg) > 0)
end|| Msg <- Result],

amqp_channel:call(Chan, delete_queue(Q)),
Expand Down Expand Up @@ -149,6 +152,10 @@ get_payload(#amqp_msg{payload = P}) ->
get_timestamp(#amqp_msg{props = #'P_basic'{timestamp = T}}) ->
T.

get_header_property(<<_,_/binary>> = Target,
#amqp_msg{props = #'P_basic'{headers = Headers}}) ->
lists:keyfind(Target, 1, Headers).

setup_fabric(Chan, ExDeclare, QueueDeclare) ->
setup_fabric(Chan, ExDeclare, QueueDeclare, <<>>).

Expand Down

0 comments on commit 4f24180

Please sign in to comment.