Allow ns_proxy extensions to modify parent state
Change-Id: I19fad853b632438f33950d210c98d4f0254d243e
Tested-by: Artem Stemkovski <>
Reviewed-by: Aliaksey Kandratsenka <>
vzasade authored and alk committed Apr 19, 2014
1 parent 96a5029 commit 77b371e
Showing 4 changed files with 82 additions and 77 deletions.
75 changes: 39 additions & 36 deletions src/upr_consumer_conn.erl
Expand Up @@ -24,7 +24,7 @@
setup_streams/2, takeover/2, maybe_close_stream/2]).

-export([init/1, handle_packet/5, handle_call/4, handle_cast/3]).
-export([init/2, handle_packet/5, handle_call/4, handle_cast/3]).

-record(stream_state, {owner :: {pid(), any()},
to_add :: [vbucket_id()],
Expand All @@ -48,16 +48,16 @@
start_link(ConnName, Bucket) ->
upr_proxy:start_link(consumer, ConnName, node(), Bucket, ?MODULE, []).

init([]) ->
partitions = [],
state = idle
init([], ParentState) ->
partitions = [],
state = idle
}, ParentState}.

handle_packet(response, ?UPR_ADD_STREAM, Packet,
#state{state = #stream_state{to_add = ToAdd, errors = Errors} = StreamState}
= State, _ParentState) ->
= State, ParentState) ->
{Header, Body} = mc_binary:decode_packet(Packet),

{Partition, NewToAdd, NewErrors} = process_add_close_stream_response(Header, ToAdd, Errors),
Expand All @@ -73,20 +73,20 @@ handle_packet(response, ?UPR_ADD_STREAM, Packet,
[Partition, Opaque, "0x"]),
add_partition(Partition, State)
{block, maybe_reply_setup_streams(NewState#state{state = NewStreamState})};
{block, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState};

handle_packet(response, ?UPR_ADD_STREAM, Packet,
#state{state = #takeover_state{owner = From, partition = Partition, open_ack = false}
= TakeoverState} = State, _ParentState) ->
= TakeoverState} = State, ParentState) ->

{Header, Body} = mc_binary:decode_packet(Packet),
case {upr_commands:process_response(Header, Body), Header#mc_header.opaque} of
{{ok, _}, Partition} ->
NewTakeoverState = TakeoverState#takeover_state{open_ack = true},
{block, maybe_reply_takeover(From, NewTakeoverState, State)};
{block, maybe_reply_takeover(From, NewTakeoverState, State), ParentState};
{Error, Partition} ->
gen_server:reply(From, Error),
{block, State#state{state = idle}};
{block, State#state{state = idle}, ParentState};
{_, WrongOpaque} ->
?rebalance_error("Unexpected response. Unrecognized opaque ~p~nHeader: ~p~nPartition: ~p",
[WrongOpaque, Header, Partition]),
Expand All @@ -95,16 +95,16 @@ handle_packet(response, ?UPR_ADD_STREAM, Packet,

handle_packet(request, ?UPR_STREAM_REQ, Packet,
#state{state = #takeover_state{state = requested, partition = Partition}
= TakeoverState} = State, _ParentState) ->
= TakeoverState} = State, ParentState) ->
{Header, _Body} = mc_binary:decode_packet(Packet),
Partition = Header#mc_header.vbucket,
NewTakeoverState = TakeoverState#takeover_state{state = opaque_known,
opaque = Header#mc_header.opaque},
{proxy, State#state{state = NewTakeoverState}};
{proxy, State#state{state = NewTakeoverState}, ParentState};

handle_packet(response, ?UPR_CLOSE_STREAM, Packet,
#state{state = #stream_state{to_close = ToClose, errors = Errors} = StreamState}
= State, _ParentState) ->
= State, ParentState) ->
{Header, _Body} = mc_binary:decode_packet(Packet),

{Partition, NewToClose, NewErrors} = process_add_close_stream_response(Header, ToClose, Errors),
Expand All @@ -117,14 +117,14 @@ handle_packet(response, ?UPR_CLOSE_STREAM, Packet,
_ ->
del_partition(Partition, State)
{block, maybe_reply_setup_streams(NewState#state{state = NewStreamState})};
{block, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState};

handle_packet(response, ?UPR_SET_VBUCKET_STATE, Packet,
#state{state = #takeover_state{opaque = Opaque, state = opaque_known,
partition = Partition,
owner = From,
requested_partition_state = VbState} = TakeoverState}
= State, _ParentState) ->
= State, ParentState) ->
{Header, _Body} = mc_binary:decode_packet(Packet),

case {Header#mc_header.opaque, Header#mc_header.status} of
Expand All @@ -134,23 +134,24 @@ handle_packet(response, ?UPR_SET_VBUCKET_STATE, Packet,
case VbState of
NewTakeoverState = TakeoverState#takeover_state{state = active},
{proxy, maybe_reply_takeover(From, NewTakeoverState, State)};
{proxy, maybe_reply_takeover(From, NewTakeoverState, State), ParentState};
_ ->
{proxy, State#state{state =
requested_partition_state = none
_ ->
{proxy, State}
{proxy, State, ParentState}

handle_packet(_, _, _, State, _) ->
{proxy, State}.
handle_packet(_, _, _, State, ParentState) ->
{proxy, State, ParentState}.

handle_call(get_partitions, _From, State, _ParentState) ->
{reply, get_partitions(State), State};
handle_call(get_partitions, _From, State, ParentState) ->
{reply, get_partitions(State), State, ParentState};

handle_call({maybe_close_stream, Partition}, From,
#state{state=idle} = State, ParentState) ->
Expand All @@ -169,7 +170,7 @@ handle_call({setup_streams, Partitions}, From,

case {StreamsToStart, StreamsToStop} of
{[], []} ->
{reply, ok, State};
{reply, ok, State, ParentState};
_ ->
StartStreamRequests = lists:map(fun (Partition) ->
upr_commands:add_stream(Sock, Partition,
Expand All @@ -194,14 +195,14 @@ handle_call({setup_streams, Partitions}, From,
to_close_on_producer = StopStreamRequests,
errors = []
}, ParentState}

handle_call({takeover, Partition}, From, #state{state=idle} = State, ParentState) ->
Sock = upr_proxy:get_socket(ParentState),
case has_partition(Partition, State) of
true ->
{reply, {error, takeover_on_open_stream_is_not_allowed}, State};
{reply, {error, takeover_on_open_stream_is_not_allowed}, State, ParentState};
false ->
upr_commands:add_stream(Sock, Partition, Partition, takeover),
{noreply, State#state{state = #takeover_state{
Expand All @@ -210,18 +211,19 @@ handle_call({takeover, Partition}, From, #state{state=idle} = State, ParentState
opaque = Partition,
partition = Partition

handle_call(Msg, _From, State, _ParentState) ->
handle_call(Msg, _From, State, ParentState) ->
?rebalance_warning("Unhandled call: Msg = ~p, State = ~p", [Msg, State]),
{reply, refused, State}.
{reply, refused, State, ParentState}.

handle_cast({producer_stream_closed, Packet},
#state{state = #stream_state{to_close_on_producer = ToClose,
errors = Errors} = StreamState} = State,
_ParentState) ->
ParentState) ->
{Header, _Body} = mc_binary:decode_packet(Packet),

{Partition, NewToClose, NewErrors} = process_add_close_stream_response(Header, ToClose, Errors),
Expand All @@ -235,13 +237,13 @@ handle_cast({producer_stream_closed, Packet},
_ ->
del_partition(Partition, State)
{noreply, maybe_reply_setup_streams(NewState#state{state = NewStreamState})};
{noreply, maybe_reply_setup_streams(NewState#state{state = NewStreamState}), ParentState};

handle_cast({set_vbucket_state, Packet},
#state{state = #takeover_state{opaque = Opaque, state = opaque_known,
partition = Partition,
requested_partition_state = none} = TakeoverState}
= State, _ParentState) ->
= State, ParentState) ->
{Header, Body} = mc_binary:decode_packet(Packet),
<<VbState:8>> = Body#mc_entry.ext,

Expand All @@ -250,15 +252,16 @@ handle_cast({set_vbucket_state, Packet},
?rebalance_debug("Partition ~p is about to change status to ~p",
[Partition, mc_client_binary:vbucket_state_to_atom(VbState)]),
{noreply, State#state{state =
TakeoverState#takeover_state{requested_partition_state = VbState}}};
TakeoverState#takeover_state{requested_partition_state = VbState}},
_ ->
{noreply, State}
{noreply, State, ParentState}

handle_cast(Msg, State, _ParentState) ->
handle_cast(Msg, State, ParentState) ->
?rebalance_warning("Unhandled cast: Msg = ~p, State = ~p", [Msg, State]),
{noreply, State}.
{noreply, State, ParentState}.

process_add_close_stream_response(Header, PendingPartitions, Errors) ->
case lists:keytake(Header#mc_header.opaque, 1, PendingPartitions) of
Expand Down
32 changes: 17 additions & 15 deletions src/upr_notifier.erl
Expand Up @@ -29,7 +29,7 @@

-export([start_link/1, subscribe/5]).

-export([init/1, handle_packet/5, handle_call/4, handle_cast/3]).
-export([init/2, handle_packet/5, handle_call/4, handle_cast/3]).

start_link(Bucket) ->
Expand All @@ -43,10 +43,10 @@ start_link(Bucket) ->
subscribe(Bucket, Partition, StartSeqNo, UUID, HighSeqNo) ->
gen_server:call(server_name(Bucket), {subscribe, Partition, StartSeqNo, UUID, HighSeqNo}, infinity).

init([Bucket]) ->
init([Bucket], ParentState) ->
erlang:register(server_name(Bucket), self()),
{[], ParentState}.

server_name(Bucket) ->
list_to_atom(?MODULE_STRING "-" ++ Bucket).
Expand All @@ -55,27 +55,29 @@ handle_call({subscribe, Partition, StartSeqNo, UUID, HighSeqNo}, From, State, Pa
PartitionState = get_partition(Partition, State),
do_subscribe(From, {StartSeqNo, UUID, HighSeqNo}, PartitionState, State, ParentState).

handle_cast(Msg, State, _ParentState) ->
handle_cast(Msg, State, ParentState) ->
?log_warning("Unhandled cast: ~p", [Msg]),
{noreply, State}.
{noreply, State, ParentState}.

handle_packet(request, ?NOOP, _Packet, State, ParentState) ->
{ok, quiet} = mc_client_binary:respond(?NOOP, upr_proxy:get_socket(ParentState),
{#mc_header{status = ?SUCCESS},
{block, State};
{block, State, ParentState};

handle_packet(request, Opcode, Packet, State, ParentState) ->
{Header, Body} = mc_binary:decode_packet(Packet),
PartitionState = get_partition(Header#mc_header.opaque, State),
set_partition(handle_request(Opcode, Header, Body, PartitionState, ParentState), State)};
set_partition(handle_request(Opcode, Header, Body, PartitionState, ParentState), State),

handle_packet(response, Opcode, Packet, State, ParentState) ->
{Header, Body} = mc_binary:decode_packet(Packet),
PartitionState = get_partition(Header#mc_header.opaque, State),
set_partition(handle_response(Opcode, Header, Body, PartitionState, ParentState), State)}.
set_partition(handle_response(Opcode, Header, Body, PartitionState, ParentState), State),

handle_response(?UPR_STREAM_REQ, Header, Body,
#partition{stream_state = pending,
Expand Down Expand Up @@ -144,9 +146,9 @@ add_subscriber(From, #partition{subscribers = Subscribers} = PartitionState) ->
%% and return immediately since the data is already available
do_subscribe(_From, {StartSeqNo, UUID, HighSeqNo},
#partition{last_known_pos = {LNStartSeqNo, UUID, HighSeqNo},
stream_state = closed} = PartitionState, State, _ParentState)
stream_state = closed} = PartitionState, State, ParentState)
when StartSeqNo =< LNStartSeqNo ->
{reply, ok, set_partition(PartitionState, State)};
{reply, ok, set_partition(PartitionState, State), ParentState};

do_subscribe(From, {StartSeqNo, UUID, HighSeqNo} = Pos,
#partition{partition = Partition,
Expand All @@ -157,13 +159,13 @@ do_subscribe(From, {StartSeqNo, UUID, HighSeqNo} = Pos,

PartitionState1 = PartitionState#partition{last_known_pos = Pos,
stream_state = pending},
{noreply, set_partition(add_subscriber(From, PartitionState1), State)};
{noreply, set_partition(add_subscriber(From, PartitionState1), State), ParentState};

do_subscribe(From, Pos,
#partition{last_known_pos = Pos} = PartitionState,
State, _ParentState) ->
{noreply, set_partition(add_subscriber(From, PartitionState), State)};
State, ParentState) ->
{noreply, set_partition(add_subscriber(From, PartitionState), State), ParentState};

do_subscribe(_From, _Pos,
PartitionState, State, _ParentState) ->
{reply, ok, set_partition(PartitionState, State)}.
PartitionState, State, ParentState) ->
{reply, ok, set_partition(PartitionState, State), ParentState}.
24 changes: 12 additions & 12 deletions src/upr_producer_conn.erl
Expand Up @@ -21,35 +21,35 @@

-export([start_link/3, init/1, handle_packet/5, handle_call/4, handle_cast/3]).
-export([start_link/3, init/2, handle_packet/5, handle_call/4, handle_cast/3]).

start_link(ConnName, ProducerNode, Bucket) ->
upr_proxy:start_link(producer, ConnName, ProducerNode, Bucket, ?MODULE, []).

init([]) ->
init([], ParentState) ->
{[], ParentState}.

handle_packet(request, ?UPR_SET_VBUCKET_STATE, Packet, State, ParentState) ->
Consumer = upr_proxy:get_partner(ParentState),
gen_server:cast(Consumer, {set_vbucket_state, Packet}),
{proxy, State};
{proxy, State, ParentState};

handle_packet(response, ?UPR_CLOSE_STREAM, Packet, State, ParentState) ->
Consumer = upr_proxy:get_partner(ParentState),
gen_server:cast(Consumer, {producer_stream_closed, Packet}),
{block, State};
{block, State, ParentState};

handle_packet(_, _, _, State, _ParentState) ->
{proxy, State}.
handle_packet(_, _, _, State, ParentState) ->
{proxy, State, ParentState}.

handle_call(Msg, _From, State, _ParentState) ->
handle_call(Msg, _From, State, ParentState) ->
?rebalance_warning("Unhandled call: Msg = ~p, State = ~p", [Msg, State]),
{reply, refused, State}.
{reply, refused, State, ParentState}.

handle_cast({close_stream, Partition}, State, ParentState) ->
upr_commands:close_stream(upr_proxy:get_socket(ParentState), Partition, Partition),
{noreply, State};
{noreply, State, ParentState};

handle_cast(Msg, State, _ParentState) ->
handle_cast(Msg, State, ParentState) ->
?rebalance_warning("Unhandled cast: Msg = ~p, State = ~p", [Msg, State]),
{noreply, State}.
{noreply, State, ParentState}.

