From e9a0164643a5b0f679ada6ac5180d32b9200a0c9 Mon Sep 17 00:00:00 2001 From: Aliaksey Kandratsenka Date: Fri, 23 Dec 2011 11:38:25 +0300 Subject: [PATCH] send acks to upstream via separate process This fixes some rare cases where upstream memcached tries to send TAP data and does not receive ACKS/NACKS we're trying to send it. This leads to deadlock where we're waiting for ack to be sent (blocking on memcached consuming some of it's queued nacks/acks) and memcached waiting on us receiving TAP data. TAP ack design requires us to be able to queue up to tap_ack_interval nacks. So doing downstream -> upstream path asynchronously achieves that. Without, hopefully, introducing any really excessive queuing. Because of, seemingly, limited amount of queued acks/nacks required. NOTE: when we're shutting ebucketmigrator down, we're dropping queued to-upstream data on the floor. But that's what essentially happened before anyways. Change-Id: I1a518e55e3a539976f921dea1afc0c7ccff15f48 Reviewed-on: http://review.couchbase.org/11859 Tested-by: Aliaksey Kandratsenka Reviewed-by: Aliaksey Artamonau --- src/ebucketmigrator_srv.erl | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/ebucketmigrator_srv.erl b/src/ebucketmigrator_srv.erl index d810d2733a..20e921173d 100644 --- a/src/ebucketmigrator_srv.erl +++ b/src/ebucketmigrator_srv.erl @@ -32,6 +32,7 @@ -record(state, {bad_vbucket_count = 0 :: non_neg_integer(), upstream :: port(), downstream :: port(), + upstream_sender :: pid(), upbuf = <<>> :: binary(), downbuf = <<>> :: binary(), vbuckets, @@ -104,6 +105,9 @@ handle_info(check_for_timeout, State) -> false -> {noreply, State} end; +handle_info({'EXIT', Pid, Reason}, #state{upstream_sender = SenderPid} = State) when Pid =:= SenderPid -> + ?log_error("killing myself due to unexpected upstream sender exit with reason: ~p", [Reason]), + {stop, {unexpected_upstream_sender_exit, Reason}, State}; handle_info(Msg, State) -> ?log_info("handle_info(~p, ~p)", [Msg, State]), {noreply, State}. @@ -167,9 +171,13 @@ init({Src, Dst, Opts}) -> Timeout = proplists:get_value(timeout, Opts, ?TIMEOUT_CHECK_INTERVAL), {ok, _TRef} = timer:send_interval(Timeout, check_for_timeout), + UpstreamSender = spawn_link(erlang, apply, [fun upstream_sender_loop/1, [Upstream]]), + ?log_info("upstream_sender pid: ~p", [UpstreamSender]), + State = #state{ upstream=Upstream, downstream=Downstream, + upstream_sender = UpstreamSender, vbuckets=sets:from_list(ReadyVBuckets), last_seen=now(), takeover=TakeOver, @@ -178,6 +186,13 @@ init({Src, Dst, Opts}) -> erlang:process_flag(trap_exit, true), gen_server:enter_loop(?MODULE, [], maybe_setup_dumping(State, Args)). +upstream_sender_loop(Upstream) -> + receive + Data -> + gen_tcp:send(Upstream, Data) + end, + upstream_sender_loop(Upstream). + -spec maybe_setup_dumping(#state{}, term()) -> #state{}. maybe_setup_dumping(State, Args) -> case os:getenv("MEMBASE_DUMP_TAP_STREAMS") of @@ -339,7 +354,7 @@ process_downstream(<> = Packet, undefined -> ok; File -> file:write(File, Packet) end, - ok = gen_tcp:send(State#state.upstream, Packet), + State#state.upstream_sender ! Packet, State.