Skip to content

Commit

Permalink
send acks to upstream via separate process
Browse files Browse the repository at this point in the history
This fixes some rare cases where upstream memcached tries to send TAP
data and does not receive ACKS/NACKS we're trying to send it. This
leads to deadlock where we're waiting for ack to be sent (blocking on
memcached consuming some of it's queued nacks/acks) and memcached waiting on
us receiving TAP data.

TAP ack design requires us to be able to queue up to tap_ack_interval
nacks. So doing downstream -> upstream path asynchronously achieves
that. Without, hopefully, introducing any really excessive
queuing. Because of, seemingly, limited amount of queued acks/nacks
required.

NOTE: when we're shutting ebucketmigrator down, we're dropping queued
to-upstream data on the floor. But that's what essentially happened
before anyways.

Change-Id: I1a518e55e3a539976f921dea1afc0c7ccff15f48
Reviewed-on: http://review.couchbase.org/11859
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Aliaksey Artamonau <aliaksiej.artamonau@gmail.com>
  • Loading branch information
Aliaksey Kandratsenka authored and alk committed Dec 23, 2011
1 parent b01dd0e commit e9a0164
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion src/ebucketmigrator_srv.erl
Expand Up @@ -32,6 +32,7 @@
-record(state, {bad_vbucket_count = 0 :: non_neg_integer(),
upstream :: port(),
downstream :: port(),
upstream_sender :: pid(),
upbuf = <<>> :: binary(),
downbuf = <<>> :: binary(),
vbuckets,
Expand Down Expand Up @@ -104,6 +105,9 @@ handle_info(check_for_timeout, State) ->
false ->
{noreply, State}
end;
handle_info({'EXIT', Pid, Reason}, #state{upstream_sender = SenderPid} = State) when Pid =:= SenderPid ->
?log_error("killing myself due to unexpected upstream sender exit with reason: ~p", [Reason]),
{stop, {unexpected_upstream_sender_exit, Reason}, State};
handle_info(Msg, State) ->
?log_info("handle_info(~p, ~p)", [Msg, State]),
{noreply, State}.
Expand Down Expand Up @@ -167,9 +171,13 @@ init({Src, Dst, Opts}) ->
Timeout = proplists:get_value(timeout, Opts, ?TIMEOUT_CHECK_INTERVAL),
{ok, _TRef} = timer:send_interval(Timeout, check_for_timeout),

UpstreamSender = spawn_link(erlang, apply, [fun upstream_sender_loop/1, [Upstream]]),
?log_info("upstream_sender pid: ~p", [UpstreamSender]),

State = #state{
upstream=Upstream,
downstream=Downstream,
upstream_sender = UpstreamSender,
vbuckets=sets:from_list(ReadyVBuckets),
last_seen=now(),
takeover=TakeOver,
Expand All @@ -178,6 +186,13 @@ init({Src, Dst, Opts}) ->
erlang:process_flag(trap_exit, true),
gen_server:enter_loop(?MODULE, [], maybe_setup_dumping(State, Args)).

upstream_sender_loop(Upstream) ->
receive
Data ->
gen_tcp:send(Upstream, Data)
end,
upstream_sender_loop(Upstream).

-spec maybe_setup_dumping(#state{}, term()) -> #state{}.
maybe_setup_dumping(State, Args) ->
case os:getenv("MEMBASE_DUMP_TAP_STREAMS") of
Expand Down Expand Up @@ -339,7 +354,7 @@ process_downstream(<<?RES_MAGIC:8, _/binary>> = Packet,
undefined -> ok;
File -> file:write(File, Packet)
end,
ok = gen_tcp:send(State#state.upstream, Packet),
State#state.upstream_sender ! Packet,
State.


Expand Down

0 comments on commit e9a0164

Please sign in to comment.