Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parity #14

Merged
merged 21 commits into from
Mar 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c792d4
updated all the scripts to start/stop kafka
HernanRivasAcosta Sep 23, 2014
15c108d
changed the default replication factor
HernanRivasAcosta Sep 23, 2014
4481d18
the default replication factor is sometimes ignored, so this script i…
HernanRivasAcosta Sep 23, 2014
a64c13e
Merge branch 'master' of https://github.com/HernanRivasAcosta/kafkerl
HernanRivasAcosta Sep 23, 2014
cd75d79
fixed a bug where you could not unsubscribe from kafkerl events
HernanRivasAcosta Oct 2, 2014
09d67b2
Merge pull request #29 from HernanRivasAcosta/hernan.unsubscribe_fix
HernanRivasAcosta Oct 2, 2014
40acd85
naive handling (by retrying) of an ets error
HernanRivasAcosta Oct 3, 2014
6e225a7
Merge pull request #30 from HernanRivasAcosta/hernan.handling_ets_rea…
HernanRivasAcosta Oct 3, 2014
f0def04
Added Gitter badge
gitter-badger Oct 10, 2014
df96a0e
Merge pull request #31 from gitter-badger/gitter-badge
HernanRivasAcosta Oct 10, 2014
3abab42
fixed a pattern match error when the produce response parsing failed
HernanRivasAcosta Feb 26, 2015
ae2b4ed
improved the handling of invalid topic metadata
HernanRivasAcosta Feb 26, 2015
d9b2d0b
Merge pull request #32 from HernanRivasAcosta/hernan.fixes
HernanRivasAcosta Feb 26, 2015
7bcdd17
fixed an issue where parsing errors where able to bring down the system
HernanRivasAcosta Mar 3, 2015
65e3ab2
Merge pull request #33 from inaka/hernan.stability_fix
HernanRivasAcosta Mar 3, 2015
3bbc53b
fixed not properly parsing packets larger than the MTU
HernanRivasAcosta Mar 4, 2015
0d70083
Merge pull request #34 from HernanRivasAcosta/hernan.quickfix.metadat…
HernanRivasAcosta Mar 4, 2015
b3cc259
fixed some issues with the metadata parsing
HernanRivasAcosta Mar 4, 2015
1798bc7
Merge pull request #35 from HernanRivasAcosta/hernan.fixes.parsing
HernanRivasAcosta Mar 4, 2015
89db034
fixes an error with the protocol
HernanRivasAcosta Mar 5, 2015
4a4a159
Merge pull request #36 from HernanRivasAcosta/hernan.quickfix.protoco…
HernanRivasAcosta Mar 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
kafkerl v1.0.2
==============
[![Gitter](https://badges.gitter.im/Join Chat.svg)](https://gitter.im/HernanRivasAcosta/kafkerl?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

Apache Kafka 0.8 high performance producer for erlang.
Developed thanks to the support and sponsorship of [TigerText](http://www.tigertext.com/).
Expand Down
28 changes: 21 additions & 7 deletions src/kafkerl_broker_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ handle_info({tcp_closed, _Socket}, State = #state{name = Name,
NewState = handle_tcp_close(State),
{noreply, NewState};
handle_info({tcp, _Socket, Bin}, State) ->
NewState = handle_tcp_data(Bin, State),
{noreply, NewState};
case handle_tcp_data(Bin, State) of
{ok, NewState} -> {noreply, NewState};
{error, Reason} -> {stop, {error, Reason}, State}
end;
handle_info({flush, Time}, State) ->
{ok, _Tref} = queue_flush(Time),
handle_flush(State);
Expand Down Expand Up @@ -185,20 +187,21 @@ handle_tcp_data(Bin, State = #state{connector = Connector, ets = EtsName,
end),
case handle_errors(Errors, Messages, Name) of
ignore ->
State;
{ok, State};
{request_metadata, MessagesToResend} ->
kafkerl_connector:request_metadata(Connector),
ok = resend_messages(MessagesToResend, Connector),
State
{ok, State}
end;
_ ->
lager:warning("~p was unable to properly process produce response",
[Name])
[Name]),
{error, invalid_produce_response}
end;
Other ->
lager:critical("~p got unexpected response when parsing message: ~p",
[Name, Other]),
State
{ok, State}
end.

%%==============================================================================
Expand Down Expand Up @@ -314,4 +317,15 @@ get_all_messages(Buffers) ->
get_all_messages([], Acc) ->
Acc;
get_all_messages([H | T], Acc) ->
get_all_messages(T, Acc ++ ets_buffer:read_all(H)).
get_all_messages(T, Acc ++ get_messages_from(H, 20)).

get_messages_from(Ets, Retries) ->
case ets_buffer:read_all(Ets) of
L when is_list(L) ->
L;
_Error when Retries > 0 ->
get_messages_from(Ets, Retries - 1);
_Error ->
lager:warning("giving up on reading from the ETS buffer"),
[]
end.
95 changes: 60 additions & 35 deletions src/kafkerl_connector.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
-behaviour(gen_server).

%% API
-export([send/3, request_metadata/1, request_metadata/2, subscribe/2,
subscribe/3, get_partitions/1, unsubscribe/2]).
-export([send/3, request_metadata/1, request_metadata/2, request_metadata/3,
subscribe/2, subscribe/3, get_partitions/1, unsubscribe/2]).
% Only for internal use
-export([request_metadata/6]).
-export([do_request_metadata/6, make_metadata_request/1]).
% Only for broker connections
-export([produce_succeeded/2]).
% Supervisors
Expand Down Expand Up @@ -85,9 +85,13 @@ unsubscribe(ServerRef, Callback) ->
request_metadata(ServerRef) ->
gen_server:call(ServerRef, {request_metadata}).

-spec request_metadata(server_ref(), [topic()]) -> ok.
request_metadata(ServerRef, Topics) ->
gen_server:call(ServerRef, {request_metadata, Topics}).
-spec request_metadata(server_ref(), [topic()] | boolean()) -> ok.
request_metadata(ServerRef, TopicsOrForced) ->
gen_server:call(ServerRef, {request_metadata, TopicsOrForced}).

-spec request_metadata(server_ref(), [topic()], boolean()) -> ok.
request_metadata(ServerRef, Topics, Forced) ->
gen_server:call(ServerRef, {request_metadata, Topics, Forced}).

-spec produce_succeeded(server_ref(),
[{topic(), partition(), [binary()], integer()}]) -> ok.
Expand All @@ -112,8 +116,12 @@ handle_call({send, Message}, _From, State) ->
handle_send(Message, State);
handle_call({request_metadata}, _From, State) ->
{reply, ok, handle_request_metadata(State, [])};
handle_call({request_metadata, Forced}, _From, State) when is_boolean(Forced) ->
{reply, ok, handle_request_metadata(State, [], true)};
handle_call({request_metadata, Topics}, _From, State) ->
{reply, ok, handle_request_metadata(State, Topics)};
handle_call({request_metadata, Topics, Forced}, _From, State) ->
{reply, ok, handle_request_metadata(State, Topics, Forced)};
handle_call({get_partitions}, _From, State) ->
{reply, handle_get_partitions(State), State};
handle_call({subscribe, Callback}, _From, State) ->
Expand All @@ -124,7 +132,8 @@ handle_call({subscribe, Callback}, _From, State) ->
{reply, {error, invalid_callback}, State}
end;
handle_call({unsubscribe, Callback}, _From, State) ->
{reply, ok, State#state{callbacks = State#state.callbacks -- [Callback]}}.
NewCallbacks = lists:keydelete(Callback, 2, State#state.callbacks),
{reply, ok, State#state{callbacks = NewCallbacks}}.

handle_info(metadata_timeout, State) ->
{stop, {error, unable_to_retrieve_metadata}, State};
Expand All @@ -149,6 +158,13 @@ handle_info({metadata_updated, Mapping}, State) ->
{noreply, State#state{broker_mapping = NewBrokerMapping, pending = [],
callbacks = NewCallbacks,
known_topics = NewKnownTopics}};
handle_info({'DOWN', Ref, process, _, normal}, State) ->
true = demonitor(Ref),
{noreply, State};
handle_info({'DOWN', Ref, process, _, Reason}, State) ->
lager:error("metadata request failed, reason: ~p", [Reason]),
true = demonitor(Ref),
{noreply, handle_request_metadata(State, [], true)};
handle_info(Msg, State) ->
lager:notice("Unexpected info message received: ~p on ~p", [Msg, State]),
{noreply, State}.
Expand Down Expand Up @@ -187,11 +203,7 @@ init([Config]) ->
autocreate_topics = AutocreateTopics,
max_metadata_retries = MaxMetadataRetries,
metadata_request_cd = MetadataRequestCooldown},
Request = metadata_request(State, Topics),
% Start requesting metadata
Params = [self(), Brokers, get_metadata_tcp_options(), MaxMetadataRetries,
RetryInterval, Request],
_Pid = spawn_link(?MODULE, request_metadata, Params),
{_Pid, _Ref} = make_metadata_request(State),
{ok, State};
{errors, Errors} ->
lists:foreach(fun(E) ->
Expand Down Expand Up @@ -241,25 +253,24 @@ handle_get_partitions(#state{broker_mapping = void}) ->
handle_get_partitions(#state{broker_mapping = Mapping}) ->
{ok, Mapping}.


handle_request_metadata(State, Topics) ->
handle_request_metadata(State, Topics, false).

% Ignore it if the topic mapping is void, we are already requesting the metadata
handle_request_metadata(State = #state{broker_mapping = void}, _Topics) ->
handle_request_metadata(State = #state{broker_mapping = void}, _, false) ->
State;
handle_request_metadata(State, NewTopics) ->
handle_request_metadata(State, NewTopics, _) ->
SortedNewTopics = lists:sort(NewTopics),
NewKnownTopics = lists:umerge(State#state.known_topics, SortedNewTopics),
Request = metadata_request(State, NewKnownTopics),
Params = [self(), State#state.brokers, get_metadata_tcp_options(),
State#state.max_metadata_retries, State#state.retry_interval,
Request],
{A, B, C} = erlang:now(),
Now = (A * 1000000 + B) * 1000 + C div 1000,
Now = get_timestamp(),
LastRequest = State#state.last_metadata_request,
Cooldown = State#state.metadata_request_cd,
_ = case Cooldown - (Now - LastRequest) of
Negative when Negative < 0 ->
_ = spawn_link(?MODULE, request_metadata, Params);
NonNegative ->
_ = timer:apply_after(NonNegative, ?MODULE, request_metadata, Params)
Negative when Negative =< 0 ->
_ = make_metadata_request(State);
Time ->
_ = timer:apply_after(Time, ?MODULE, request_metadata, [self(), true])
end,
State#state{broker_mapping = void, known_topics = NewKnownTopics,
last_metadata_request = Now}.
Expand All @@ -280,12 +291,12 @@ get_ets_dump_name({OldName, Counter}) ->
end.

get_metadata_tcp_options() ->
kafkerl_utils:get_tcp_options([{active, false}]).
kafkerl_utils:get_tcp_options([{active, false}, {packet, 4}]).

request_metadata(Pid, _Brokers, _TCPOpts, 0, _RetryInterval, _Request) ->
do_request_metadata(Pid, _Brokers, _TCPOpts, 0, _RetryInterval, _Request) ->
Pid ! metadata_timeout;
request_metadata(Pid, Brokers, TCPOpts, Retries, RetryInterval, Request) ->
case request_metadata(Brokers, TCPOpts, Request) of
do_request_metadata(Pid, Brokers, TCPOpts, Retries, RetryInterval, Request) ->
case do_request_metadata(Brokers, TCPOpts, Request) of
{ok, TopicMapping} ->
Pid ! {metadata_updated, TopicMapping};
_Error ->
Expand All @@ -294,41 +305,41 @@ request_metadata(Pid, Brokers, TCPOpts, Retries, RetryInterval, Request) ->
-1 -> -1;
N -> N - 1
end,
request_metadata(Pid, Brokers, TCPOpts, NewRetries, RetryInterval,
do_request_metadata(Pid, Brokers, TCPOpts, NewRetries, RetryInterval,
Request)
end.

request_metadata([], _TCPOpts, _Request) ->
do_request_metadata([], _TCPOpts, _Request) ->
{error, all_down};
request_metadata([{Host, Port} = _Broker | T] = _Brokers, TCPOpts, Request) ->
do_request_metadata([{Host, Port} = _Broker | T] = _Brokers, TCPOpts, Request) ->
lager:debug("Attempting to connect to broker at ~s:~p", [Host, Port]),
% Connect to the Broker
case gen_tcp:connect(Host, Port, TCPOpts) of
{error, Reason} ->
warn_metadata_request(Host, Port, Reason),
% Failed, try with the next one in the list
request_metadata(T, TCPOpts, Request);
do_request_metadata(T, TCPOpts, Request);
{ok, Socket} ->
% On success, send the metadata request
case gen_tcp:send(Socket, Request) of
{error, Reason} ->
warn_metadata_request(Host, Port, Reason),
% Unable to send request, try the next broker
request_metadata(T, TCPOpts, Request);
do_request_metadata(T, TCPOpts, Request);
ok ->
case gen_tcp:recv(Socket, 0, 6000) of
{error, Reason} ->
warn_metadata_request(Host, Port, Reason),
gen_tcp:close(Socket),
% Nothing received (probably a timeout), try the next broker
request_metadata(T, TCPOpts, Request);
do_request_metadata(T, TCPOpts, Request);
{ok, Data} ->
gen_tcp:close(Socket),
case kafkerl_protocol:parse_metadata_response(Data) of
{error, Reason} ->
warn_metadata_request(Host, Port, Reason),
% The parsing failed, try the next broker
request_metadata(T, TCPOpts, Request);
do_request_metadata(T, TCPOpts, Request);
{ok, _CorrelationId, Metadata} ->
% We received a metadata response, make sure it has brokers
{ok, get_topic_mapping(Metadata)}
Expand Down Expand Up @@ -458,6 +469,20 @@ send_mapping_to(NewCallback, #state{broker_mapping = Mapping}) ->
Partitions = get_partitions_from_mapping(Mapping),
send_event({partition_update, Partitions}, NewCallback).

make_metadata_request(State = #state{brokers = Brokers,
known_topics = Topics,
max_metadata_retries = MaxMetadataRetries,
retry_interval = RetryInterval}) ->
Request = metadata_request(State, Topics),
% Start requesting metadata
Params = [self(), Brokers, get_metadata_tcp_options(), MaxMetadataRetries,
RetryInterval, Request],
spawn_monitor(?MODULE, do_request_metadata, Params).

get_timestamp() ->
{A, B, C} = erlang:now(),
(A * 1000000 + B) * 1000 + C div 1000.

%%==============================================================================
%% Error handling
%%==============================================================================
Expand Down
33 changes: 21 additions & 12 deletions src/kafkerl_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ build_fetch_request(Data, ClientId, CorrelationId, MaxWait, MinBytes) ->
-spec build_metadata_request(topic() | [topic()], correlation_id(),
client_id()) -> iodata().
build_metadata_request(Topics, CorrelationId, ClientId) ->
{Size, Request} = build_metadata_request(Topics),
[build_request_header(ClientId, ?METADATA_KEY, CorrelationId, Size), Request].
{_Size, Request} = build_metadata_request(Topics),
[build_request_header(ClientId, ?METADATA_KEY, CorrelationId), Request].

% Message parsing
-spec parse_produce_response(binary()) -> produce_response().
Expand Down Expand Up @@ -60,8 +60,7 @@ parse_fetch_response(Bin, {Remainder, CorrelationId, Steps}) ->
parse_steps(NewBin, CorrelationId, Steps).

-spec parse_metadata_response(binary()) -> metadata_response().
parse_metadata_response(<<_Size:32/unsigned-integer,
CorrelationId:32/unsigned-integer,
parse_metadata_response(<<CorrelationId:32/unsigned-integer,
BrokerCount:32/unsigned-integer,
BrokersBin/binary>>) ->
case parse_brokers(BrokerCount, BrokersBin) of
Expand All @@ -81,19 +80,22 @@ parse_metadata_response(_Other) ->
%%==============================================================================
%% Message Building
%%==============================================================================
build_request_header(ClientId, ApiKey, CorrelationId, RequestSize) ->
build_request_header(ClientId, ApiKey, CorrelationId) ->
% Build the header (http://goo.gl/5SNNTV)
ApiVersion = 0, % Both the key and version should be 0, it's not a placeholder
ApiVersion = 0, % The version should be 0, it's not a placeholder
ClientIdSize = byte_size(ClientId),
% 10 is the size of the header
MessageSize = ClientIdSize + RequestSize + 10,
[<<MessageSize:32/unsigned-integer,
ApiKey:16/unsigned-integer,
[<<ApiKey:16/unsigned-integer,
ApiVersion:16/unsigned-integer,
CorrelationId:32/unsigned-integer,
ClientIdSize:16/unsigned-integer>>,
ClientId].

build_request_header(ClientId, ApiKey, CorrelationId, RequestSize) ->
% 10 is the size of the header
MessageSize = byte_size(ClientId) + RequestSize + 10,
[<<MessageSize:32/unsigned-integer>>,
build_request_header(ClientId, ApiKey, CorrelationId)].

%% PRODUCE REQUEST
build_produce_request([{Topic, Partition, Messages}], Compression) ->
build_produce_request({Topic, Partition, Messages}, Compression);
Expand Down Expand Up @@ -471,14 +473,21 @@ parse_topic_metadata(Count, <<>>, Acc) when Count =< 0 ->
parse_topic_metadata(Count, Bin, Acc) when Count =< 0 ->
lager:warning("Finished parsing topic metadata, ignoring bytes: ~p", [Bin]),
{ok, lists:reverse(Acc)};
parse_topic_metadata(Count, <<ErrorCode:16/signed-integer,
parse_topic_metadata(Count, <<0:16/signed-integer,
TopicSize:16/unsigned-integer,
TopicName:TopicSize/binary,
PartitionCount:32/unsigned-integer,
PartitionsBin/binary>>, Acc) ->
{ok, PartitionsMetadata, Remainder} = parse_partition_metadata(PartitionCount,
PartitionsBin),
TopicMetadata = {ErrorCode, TopicName, PartitionsMetadata},
TopicMetadata = {0, TopicName, PartitionsMetadata},
parse_topic_metadata(Count - 1, Remainder, [TopicMetadata | Acc]);
parse_topic_metadata(Count, <<ErrorCode:16/signed-integer,
-1:16/signed-integer, % TopicSize
0:32/unsigned-integer, % PartitionCount
Remainder/binary>>, Acc) ->
{ok, PartitionsMetadata, Remainder} = parse_partition_metadata(0, Remainder),
TopicMetadata = {ErrorCode, <<"unkown">>, PartitionsMetadata},
parse_topic_metadata(Count - 1, Remainder, [TopicMetadata | Acc]).

parse_partition_metadata(Count, Bin) ->
Expand Down