diff --git a/src/upr_consumer_conn.erl b/src/upr_consumer_conn.erl index 16b2b1a9e1..f63fb4b58f 100644 --- a/src/upr_consumer_conn.erl +++ b/src/upr_consumer_conn.erl @@ -24,7 +24,7 @@ -export([start_link/2, setup_streams/2, takeover/2, maybe_close_stream/2]). --export([init/1, handle_packet/5, handle_call/4, handle_cast/3]). +-export([init/2, handle_packet/5, handle_call/4, handle_cast/3]). -record(stream_state, {owner :: {pid(), any()}, to_add :: [vbucket_id()], @@ -48,16 +48,16 @@ start_link(ConnName, Bucket) -> upr_proxy:start_link(consumer, ConnName, node(), Bucket, ?MODULE, []). -init([]) -> - #state{ - partitions = [], - state = idle - }. +init([], ParentState) -> + {#state{ + partitions = [], + state = idle + }, ParentState}. handle_packet(response, ?UPR_ADD_STREAM, Packet, #state{state = #stream_state{to_add = ToAdd, errors = Errors} = StreamState} - = State, _ParentState) -> + = State, ParentState) -> {Header, Body} = mc_binary:decode_packet(Packet), {Partition, NewToAdd, NewErrors} = process_add_close_stream_response(Header, ToAdd, Errors), @@ -73,20 +73,20 @@ handle_packet(response, ?UPR_ADD_STREAM, Packet, [Partition, Opaque, "0x"]), add_partition(Partition, State) end, - {block, maybe_reply_setup_streams(NewState#state{state = NewStreamState})}; + {block, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState}; handle_packet(response, ?UPR_ADD_STREAM, Packet, #state{state = #takeover_state{owner = From, partition = Partition, open_ack = false} - = TakeoverState} = State, _ParentState) -> + = TakeoverState} = State, ParentState) -> {Header, Body} = mc_binary:decode_packet(Packet), case {upr_commands:process_response(Header, Body), Header#mc_header.opaque} of {{ok, _}, Partition} -> NewTakeoverState = TakeoverState#takeover_state{open_ack = true}, - {block, maybe_reply_takeover(From, NewTakeoverState, State)}; + {block, maybe_reply_takeover(From, NewTakeoverState, State), ParentState}; {Error, Partition} -> gen_server:reply(From, Error), - {block, State#state{state = idle}}; + {block, State#state{state = idle}, ParentState}; {_, WrongOpaque} -> ?rebalance_error("Unexpected response. Unrecognized opaque ~p~nHeader: ~p~nPartition: ~p", [WrongOpaque, Header, Partition]), @@ -95,16 +95,16 @@ handle_packet(response, ?UPR_ADD_STREAM, Packet, handle_packet(request, ?UPR_STREAM_REQ, Packet, #state{state = #takeover_state{state = requested, partition = Partition} - = TakeoverState} = State, _ParentState) -> + = TakeoverState} = State, ParentState) -> {Header, _Body} = mc_binary:decode_packet(Packet), Partition = Header#mc_header.vbucket, NewTakeoverState = TakeoverState#takeover_state{state = opaque_known, opaque = Header#mc_header.opaque}, - {proxy, State#state{state = NewTakeoverState}}; + {proxy, State#state{state = NewTakeoverState}, ParentState}; handle_packet(response, ?UPR_CLOSE_STREAM, Packet, #state{state = #stream_state{to_close = ToClose, errors = Errors} = StreamState} - = State, _ParentState) -> + = State, ParentState) -> {Header, _Body} = mc_binary:decode_packet(Packet), {Partition, NewToClose, NewErrors} = process_add_close_stream_response(Header, ToClose, Errors), @@ -117,14 +117,14 @@ handle_packet(response, ?UPR_CLOSE_STREAM, Packet, _ -> del_partition(Partition, State) end, - {block, maybe_reply_setup_streams(NewState#state{state = NewStreamState})}; + {block, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState}; handle_packet(response, ?UPR_SET_VBUCKET_STATE, Packet, #state{state = #takeover_state{opaque = Opaque, state = opaque_known, partition = Partition, owner = From, requested_partition_state = VbState} = TakeoverState} - = State, _ParentState) -> + = State, ParentState) -> {Header, _Body} = mc_binary:decode_packet(Packet), case {Header#mc_header.opaque, Header#mc_header.status} of @@ -134,23 +134,24 @@ handle_packet(response, ?UPR_SET_VBUCKET_STATE, Packet, case VbState of ?VB_STATE_ACTIVE -> NewTakeoverState = TakeoverState#takeover_state{state = active}, - {proxy, maybe_reply_takeover(From, NewTakeoverState, State)}; + {proxy, maybe_reply_takeover(From, NewTakeoverState, State), ParentState}; _ -> {proxy, State#state{state = TakeoverState#takeover_state{ requested_partition_state = none } - }} + }, + ParentState} end; _ -> - {proxy, State} + {proxy, State, ParentState} end; -handle_packet(_, _, _, State, _) -> - {proxy, State}. +handle_packet(_, _, _, State, ParentState) -> + {proxy, State, ParentState}. -handle_call(get_partitions, _From, State, _ParentState) -> - {reply, get_partitions(State), State}; +handle_call(get_partitions, _From, State, ParentState) -> + {reply, get_partitions(State), State, ParentState}; handle_call({maybe_close_stream, Partition}, From, #state{state=idle} = State, ParentState) -> @@ -169,7 +170,7 @@ handle_call({setup_streams, Partitions}, From, case {StreamsToStart, StreamsToStop} of {[], []} -> - {reply, ok, State}; + {reply, ok, State, ParentState}; _ -> StartStreamRequests = lists:map(fun (Partition) -> upr_commands:add_stream(Sock, Partition, @@ -194,14 +195,14 @@ handle_call({setup_streams, Partitions}, From, to_close_on_producer = StopStreamRequests, errors = [] } - }} + }, ParentState} end; handle_call({takeover, Partition}, From, #state{state=idle} = State, ParentState) -> Sock = upr_proxy:get_socket(ParentState), case has_partition(Partition, State) of true -> - {reply, {error, takeover_on_open_stream_is_not_allowed}, State}; + {reply, {error, takeover_on_open_stream_is_not_allowed}, State, ParentState}; false -> upr_commands:add_stream(Sock, Partition, Partition, takeover), {noreply, State#state{state = #takeover_state{ @@ -210,18 +211,19 @@ handle_call({takeover, Partition}, From, #state{state=idle} = State, ParentState opaque = Partition, partition = Partition } - }} + }, + ParentState} end; -handle_call(Msg, _From, State, _ParentState) -> +handle_call(Msg, _From, State, ParentState) -> ?rebalance_warning("Unhandled call: Msg = ~p, State = ~p", [Msg, State]), - {reply, refused, State}. + {reply, refused, State, ParentState}. handle_cast({producer_stream_closed, Packet}, #state{state = #stream_state{to_close_on_producer = ToClose, errors = Errors} = StreamState} = State, - _ParentState) -> + ParentState) -> {Header, _Body} = mc_binary:decode_packet(Packet), {Partition, NewToClose, NewErrors} = process_add_close_stream_response(Header, ToClose, Errors), @@ -235,13 +237,13 @@ handle_cast({producer_stream_closed, Packet}, _ -> del_partition(Partition, State) end, - {noreply, maybe_reply_setup_streams(NewState#state{state = NewStreamState})}; + {noreply, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState}; handle_cast({set_vbucket_state, Packet}, #state{state = #takeover_state{opaque = Opaque, state = opaque_known, partition = Partition, requested_partition_state = none} = TakeoverState} - = State, _ParentState) -> + = State, ParentState) -> {Header, Body} = mc_binary:decode_packet(Packet), <> = Body#mc_entry.ext, @@ -250,15 +252,16 @@ handle_cast({set_vbucket_state, Packet}, ?rebalance_debug("Partition ~p is about to change status to ~p", [Partition, mc_client_binary:vbucket_state_to_atom(VbState)]), {noreply, State#state{state = - TakeoverState#takeover_state{requested_partition_state = VbState}}}; + TakeoverState#takeover_state{requested_partition_state = VbState}}, + ParentState}; _ -> - {noreply, State} + {noreply, State, ParentState} end; -handle_cast(Msg, State, _ParentState) -> +handle_cast(Msg, State, ParentState) -> ?rebalance_warning("Unhandled cast: Msg = ~p, State = ~p", [Msg, State]), - {noreply, State}. + {noreply, State, ParentState}. process_add_close_stream_response(Header, PendingPartitions, Errors) -> case lists:keytake(Header#mc_header.opaque, 1, PendingPartitions) of diff --git a/src/upr_notifier.erl b/src/upr_notifier.erl index 49fc644451..a07b7bd0f9 100644 --- a/src/upr_notifier.erl +++ b/src/upr_notifier.erl @@ -29,7 +29,7 @@ -export([start_link/1, subscribe/5]). --export([init/1, handle_packet/5, handle_call/4, handle_cast/3]). +-export([init/2, handle_packet/5, handle_call/4, handle_cast/3]). start_link(Bucket) -> single_bucket_sup:ignore_if_not_couchbase_bucket( @@ -43,10 +43,10 @@ start_link(Bucket) -> subscribe(Bucket, Partition, StartSeqNo, UUID, HighSeqNo) -> gen_server:call(server_name(Bucket), {subscribe, Partition, StartSeqNo, UUID, HighSeqNo}, infinity). -init([Bucket]) -> +init([Bucket], ParentState) -> upr_proxy:no_proxy_setup(self()), erlang:register(server_name(Bucket), self()), - []. + {[], ParentState}. server_name(Bucket) -> list_to_atom(?MODULE_STRING "-" ++ Bucket). @@ -55,27 +55,29 @@ handle_call({subscribe, Partition, StartSeqNo, UUID, HighSeqNo}, From, State, Pa PartitionState = get_partition(Partition, State), do_subscribe(From, {StartSeqNo, UUID, HighSeqNo}, PartitionState, State, ParentState). -handle_cast(Msg, State, _ParentState) -> +handle_cast(Msg, State, ParentState) -> ?log_warning("Unhandled cast: ~p", [Msg]), - {noreply, State}. + {noreply, State, ParentState}. handle_packet(request, ?NOOP, _Packet, State, ParentState) -> {ok, quiet} = mc_client_binary:respond(?NOOP, upr_proxy:get_socket(ParentState), {#mc_header{status = ?SUCCESS}, #mc_entry{}}), - {block, State}; + {block, State, ParentState}; handle_packet(request, Opcode, Packet, State, ParentState) -> {Header, Body} = mc_binary:decode_packet(Packet), PartitionState = get_partition(Header#mc_header.opaque, State), {block, - set_partition(handle_request(Opcode, Header, Body, PartitionState, ParentState), State)}; + set_partition(handle_request(Opcode, Header, Body, PartitionState, ParentState), State), + ParentState}; handle_packet(response, Opcode, Packet, State, ParentState) -> {Header, Body} = mc_binary:decode_packet(Packet), PartitionState = get_partition(Header#mc_header.opaque, State), {block, - set_partition(handle_response(Opcode, Header, Body, PartitionState, ParentState), State)}. + set_partition(handle_response(Opcode, Header, Body, PartitionState, ParentState), State), + ParentState}. handle_response(?UPR_STREAM_REQ, Header, Body, #partition{stream_state = pending, @@ -144,9 +146,9 @@ add_subscriber(From, #partition{subscribers = Subscribers} = PartitionState) -> %% and return immediately since the data is already available do_subscribe(_From, {StartSeqNo, UUID, HighSeqNo}, #partition{last_known_pos = {LNStartSeqNo, UUID, HighSeqNo}, - stream_state = closed} = PartitionState, State, _ParentState) + stream_state = closed} = PartitionState, State, ParentState) when StartSeqNo =< LNStartSeqNo -> - {reply, ok, set_partition(PartitionState, State)}; + {reply, ok, set_partition(PartitionState, State), ParentState}; do_subscribe(From, {StartSeqNo, UUID, HighSeqNo} = Pos, #partition{partition = Partition, @@ -157,13 +159,13 @@ do_subscribe(From, {StartSeqNo, UUID, HighSeqNo} = Pos, PartitionState1 = PartitionState#partition{last_known_pos = Pos, stream_state = pending}, - {noreply, set_partition(add_subscriber(From, PartitionState1), State)}; + {noreply, set_partition(add_subscriber(From, PartitionState1), State), ParentState}; do_subscribe(From, Pos, #partition{last_known_pos = Pos} = PartitionState, - State, _ParentState) -> - {noreply, set_partition(add_subscriber(From, PartitionState), State)}; + State, ParentState) -> + {noreply, set_partition(add_subscriber(From, PartitionState), State), ParentState}; do_subscribe(_From, _Pos, - PartitionState, State, _ParentState) -> - {reply, ok, set_partition(PartitionState, State)}. + PartitionState, State, ParentState) -> + {reply, ok, set_partition(PartitionState, State), ParentState}. diff --git a/src/upr_producer_conn.erl b/src/upr_producer_conn.erl index 8d014ec3c5..d26fe961e1 100644 --- a/src/upr_producer_conn.erl +++ b/src/upr_producer_conn.erl @@ -21,35 +21,35 @@ -include("mc_constants.hrl"). -include("mc_entry.hrl"). --export([start_link/3, init/1, handle_packet/5, handle_call/4, handle_cast/3]). +-export([start_link/3, init/2, handle_packet/5, handle_call/4, handle_cast/3]). start_link(ConnName, ProducerNode, Bucket) -> upr_proxy:start_link(producer, ConnName, ProducerNode, Bucket, ?MODULE, []). -init([]) -> - []. +init([], ParentState) -> + {[], ParentState}. handle_packet(request, ?UPR_SET_VBUCKET_STATE, Packet, State, ParentState) -> Consumer = upr_proxy:get_partner(ParentState), gen_server:cast(Consumer, {set_vbucket_state, Packet}), - {proxy, State}; + {proxy, State, ParentState}; handle_packet(response, ?UPR_CLOSE_STREAM, Packet, State, ParentState) -> Consumer = upr_proxy:get_partner(ParentState), gen_server:cast(Consumer, {producer_stream_closed, Packet}), - {block, State}; + {block, State, ParentState}; -handle_packet(_, _, _, State, _ParentState) -> - {proxy, State}. +handle_packet(_, _, _, State, ParentState) -> + {proxy, State, ParentState}. -handle_call(Msg, _From, State, _ParentState) -> +handle_call(Msg, _From, State, ParentState) -> ?rebalance_warning("Unhandled call: Msg = ~p, State = ~p", [Msg, State]), - {reply, refused, State}. + {reply, refused, State, ParentState}. handle_cast({close_stream, Partition}, State, ParentState) -> upr_commands:close_stream(upr_proxy:get_socket(ParentState), Partition, Partition), - {noreply, State}; + {noreply, State, ParentState}; -handle_cast(Msg, State, _ParentState) -> +handle_cast(Msg, State, ParentState) -> ?rebalance_warning("Unhandled cast: Msg = ~p, State = ~p", [Msg, State]), - {noreply, State}. + {noreply, State, ParentState}. diff --git a/src/upr_proxy.erl b/src/upr_proxy.erl index fbcdab55dd..9cded045dc 100644 --- a/src/upr_proxy.erl +++ b/src/upr_proxy.erl @@ -45,13 +45,13 @@ init([Type, ConnName, Node, Bucket, ExtModule, InitArgs]) -> erlang:process_flag(trap_exit, true), Sock = connect(Type, ConnName, Node, Bucket), - ExtState = ExtModule:init(InitArgs), + {ExtState, State} = ExtModule:init(InitArgs, #state{}), - {ok, #state{ - sock = Sock, - ext_module = ExtModule, - ext_state = ExtState - }, ?HIBERNATE_TIMEOUT}. + {ok, State#state{ + sock = Sock, + ext_module = ExtModule, + ext_state = ExtState + }, ?HIBERNATE_TIMEOUT}. start_link(Type, ConnName, Node, Bucket, ExtModule, InitArgs) -> gen_server:start_link(?MODULE, [Type, ConnName, Node, Bucket, ExtModule, InitArgs], []). @@ -71,8 +71,8 @@ handle_cast({setup_proxy, Partner, ProxyTo}, #state{sock = Socket} = State) -> {noreply, State#state{proxy_to = ProxyTo, partner = Partner}, ?HIBERNATE_TIMEOUT}; handle_cast(Msg, State = #state{ext_module = ExtModule, ext_state = ExtState}) -> - {noreply, NewExtState} = ExtModule:handle_cast(Msg, ExtState, State), - {noreply, State#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT}. + {noreply, NewExtState, NewState} = ExtModule:handle_cast(Msg, ExtState, State), + {noreply, NewState#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT}. terminate(_Reason, State) -> ?log_debug("Terminating. Disconnecting from socket ~p", [State#state.sock]), @@ -103,10 +103,10 @@ handle_call(get_socket, _From, State = #state{sock = Sock}) -> {reply, Sock, State, ?HIBERNATE_TIMEOUT}; handle_call(Command, From, State = #state{ext_module = ExtModule, ext_state = ExtState}) -> case ExtModule:handle_call(Command, From, ExtState, State) of - {ReplyType, Reply, NewExtState} -> - {ReplyType, Reply, State#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT}; - {ReplyType, NewExtState} -> - {ReplyType, State#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT} + {ReplyType, Reply, NewExtState, NewState} -> + {ReplyType, Reply, NewState#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT}; + {ReplyType, NewExtState, NewState} -> + {ReplyType, NewState#state{ext_state = NewExtState}, ?HIBERNATE_TIMEOUT} end. handle_packet(<> = Packet, @@ -124,14 +124,14 @@ handle_packet(<> = Packet, ?RES_MAGIC -> response end, - {Action, NewExtState} = ExtModule:handle_packet(Type, Opcode, Packet, ExtState, State), + {Action, NewExtState, NewState} = ExtModule:handle_packet(Type, Opcode, Packet, ExtState, State), case Action of proxy -> ok = gen_tcp:send(ProxyTo, Packet); block -> ok end, - {ok, State#state{ext_state = NewExtState}}. + {ok, NewState#state{ext_state = NewExtState}}. suppress_logging(<>) -> true;