Skip to content

Commit

Permalink
Add node filtering to search
Browse files Browse the repository at this point in the history
During ownership change there can be multiple copies of the same
replica on two different nodes.  The generated query needs to filter
on the current owner to avoid returning incorrect results during
ownership change.
  • Loading branch information
rzezeski committed Aug 28, 2012
1 parent 8434ceb commit 7df145a
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 66 deletions.
8 changes: 8 additions & 0 deletions include/yokozuna.hrl
Expand Up @@ -77,6 +77,13 @@
-type dist() :: non_neg_integer().
%% Mapping from logical partition to partition
-type logical_idx() :: [{lp(), p()}].
-type logical_filter() :: all | [lp()].
-type filter() :: all | [p()].
-type p_node() :: {p(), node()}.
-type lp_node() :: {lp(), node()}.
-type cover_set() :: [p_node()].
-type logical_cover_set() :: [lp_node()].
-type filter_cover_set() :: [{p_node(), filter()}].

-type node_event() :: {node_event, node(), up | down}.
-type ring_event() :: {ring_event, riak_core_ring:riak_core_ring()}.
Expand All @@ -91,6 +98,7 @@
-define(ERROR(Fmt, Args), error_logger:error_msg(Fmt ++ "~n", Args)).
-define(INFO(Fmt, Args), error_logger:error_msg(Fmt ++ "~n", Args)).

-define(ATOM_TO_BIN(A), list_to_binary(atom_to_list(A))).
-define(BIN_TO_INT(B), list_to_integer(binary_to_list(B))).
-define(INT_TO_BIN(I), list_to_binary(integer_to_list(I))).
-define(INT_TO_STR(I), integer_to_list(I)).
Expand Down
3 changes: 3 additions & 0 deletions priv/conf/schema.xml
Expand Up @@ -157,6 +157,9 @@
preflist, used for further filtering on overlapping partitions. -->
<field name="_fpn" type="string" indexed="true" stored="true"/>

<!-- Node: The name of the node that this doc was created on. -->
<field name="_node" type="string" indexed="true" stored="true"/>

<!-- Riak Key: The key of the Riak object this doc corresponds to. -->
<field name="_rk" type="string" indexed="true" stored="true"/>

Expand Down
125 changes: 64 additions & 61 deletions src/yz_cover.erl
Expand Up @@ -31,79 +31,68 @@

plan(Index) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
Q = riak_core_ring:num_partitions(Ring),
BProps = riak_core_bucket:get_bucket(Index, Ring),
Selector = all,
NVal = riak_core_bucket:n_val(BProps),
NumPrimaries = 1,
ReqId = erlang:phash2(erlang:now()),
Service = yokozuna,

Result = riak_core_coverage_plan:create_plan(Selector,
NVal,
NumPrimaries,
ReqId,
Service),
?YZ_SVC_NAME),
case Result of
{error, Error} ->
throw(Error);
{CoveringSet, _} ->
{Partitions, Nodes} = lists:unzip(CoveringSet),
{CoverSet, _} ->
{_Partitions, Nodes} = lists:unzip(CoverSet),
UniqNodes = lists:usort(Nodes),
FilterPairs = filter_pairs(Ring, NVal, Partitions),
{UniqNodes, reify(Ring, FilterPairs)}
LPI = logical_index(Ring),
LogicalCoverSet = add_filtering(NVal, Q, LPI, CoverSet),
{UniqNodes, reify(LPI, LogicalCoverSet)}
end.

%%%===================================================================
%%% Private
%%%===================================================================

%% @doc Create a `{LogicalPartition, Include}' filter pair for a given
%% `{LogicalPartition, Dist}' pair. `Include' indicates which
%% replicas should be included for the paired `LogicalPartition'.
%% The value `all' means all replicas. If the value if a list of
%% `lp()' then a replica must has one of the LPs as it's first
%% primary partition on the preflist.
-spec filter_pair(n(), q(), {lp(), dist()}) -> {lp(), all | [lp()]}.
filter_pair(N, _Q, {LP, N}) ->
{LP, all};
filter_pair(N, Q, {LP, Dist}) ->
LPSeq = lists:reverse(lp_seq(N, Q, LP)),
Filter = lists:sublist(LPSeq, Dist),
{LP, Filter}.

filter_pairs(Ring, N, CPartitions) ->
Q = riak_core_ring:num_partitions(Ring),
LPI = logical_partitions(Ring),
Logical = make_logical(LPI, CPartitions),
Pairs = make_pairs(Logical, hd(Logical), []),
Dist = make_distance_pairs(Q, Pairs),
make_filter_pairs(N, Q, Dist).
%% @doc Create a covering set using logical partitions and add
%% filtering information to eliminate overlap.
-spec add_filtering(n(), q(), logical_idx(), cover_set()) ->
[{lp_node(), logical_filter()}].
add_filtering(N, Q, LPI, CS) ->
CS2 = make_logical(LPI, CS),
CS3 = yz_misc:make_pairs(CS2),
CS4 = make_distance_pairs(Q, CS3),
make_filter_pairs(N, Q, CS4).

%% @doc Get the distance between the logical partition `LPB' and
%% `LPA'.
-spec get_distance(q(), lp(), lp()) -> dist().
get_distance(Q, LPA, LPB) when LPB < LPA ->
-spec get_distance(q(), lp_node(), lp_node()) -> dist().
get_distance(Q, {LPA,_}, {LPB,_}) when LPB < LPA ->
%% Wrap around
BottomDiff = LPB - 1,
TopDiff = Q - LPA,
BottomDiff + TopDiff + 1;
get_distance(_Q, LPA, LPB) ->
get_distance(_Q, {LPA,_}, {LPB,_}) ->
LPB - LPA.

%% @doc Map `Partition' to it's logical partition.
-spec logical_partition(logical_idx(), p()) -> lp().
logical_partition(LogicalIndex, Partition) ->
{Logical, _} = lists:keyfind(Partition, 2, LogicalIndex),
Logical.

%% @doc Create a mapping from logical to actual partition.
-spec logical_partitions(riak_core_ring:ring()) -> logical_idx().
logical_partitions(Ring) ->
-spec logical_index(riak_core_ring:ring()) -> logical_idx().
logical_index(Ring) ->
{Partitions, _} = lists:unzip(riak_core_ring:all_owners(Ring)),
Q = riak_core_ring:num_partitions(Ring),
Logical = lists:seq(1, Q),
lists:zip(Logical, lists:sort(Partitions)).

%% @doc Map `Partition' to it's logical partition.
-spec logical_partition(logical_idx(), p()) -> lp().
logical_partition(LogicalIndex, Partition) ->
{Logical, _} = lists:keyfind(Partition, 2, LogicalIndex),
Logical.

%% @doc Generate the sequence of `N' partitions leading up to `EndLP'.
%%
%% NOTE: Logical partition numbers start at 1
Expand All @@ -122,40 +111,54 @@ lp_seq(N, Q, EndLP) ->
%% `{LogicalPartition, Distance}' pairs. The list will contain
%% the second partition in the original pair and it's distance
%% from the partition it was paired with.
-spec make_distance_pairs(q(), [{lp(), lp()}]) -> [{lp(), dist()}].
-spec make_distance_pairs(q(), [{lp_node(), lp_node()}]) ->
[{lp_node(), dist()}].
make_distance_pairs(Q, PartitionPairs) ->
[{LPB, get_distance(Q, LPA, LPB)} || {LPA, LPB} <- PartitionPairs].

make_filter_pairs(N, Q, Dist) ->
[filter_pair(N, Q, DP) || DP <- Dist].

%% @doc Take a list of `Partitions' and create a list of logical
%% partitions.
-spec make_logical(logical_idx(), [p()]) -> [lp()].
make_logical(LogicalIndex, Partitions) ->
[logical_partition(LogicalIndex, P) || P <- Partitions].
%% @doc Create a `{LogicalPartition, Include}' filter pair for a given
%% `{LogicalPartition, Dist}' pair. `Include' indicates which
%% replicas should be included for the paired `LogicalPartition'.
%% The value `all' means all replicas. If the value if a list of
%% `lp()' then a replica must has one of the LPs as it's first
%% primary partition on the preflist.
-spec make_filter_pair(n(), q(), {lp_node(), dist()}) ->
{lp_node(), all | [lp()]}.
make_filter_pair(N, _Q, {LPNode, N}) ->
{LPNode, all};
make_filter_pair(N, Q, {{LP, Node}, Dist}) ->
LPSeq = lists:reverse(lp_seq(N, Q, LP)),
Filter = lists:sublist(LPSeq, Dist),
{{LP, Node}, Filter}.

-spec make_filter_pairs(n(), q(), [{lp_node(), dist()}]) ->
logical_cover_set().
make_filter_pairs(N, Q, Cover) ->
[make_filter_pair(N, Q, DP) || DP <- Cover].

%% @doc Take a list of logical partitions and pair them up. Since the
%% ring wraps the last and first logical partitions must also be
%% paired.
-spec make_pairs([lp()], lp(), [{lp(), lp()}]) -> [{lp(), lp()}].
make_pairs([Last], First, Pairs) ->
[{Last, First}|lists:reverse(Pairs)];
make_pairs([A,B|T], _First, Pairs) ->
make_pairs([B|T], _First, [{A,B}|Pairs]).
%% @doc Convert the `Cover' set to use logical partitions.
-spec make_logical(logical_idx(), cover_set()) -> logical_cover_set().
make_logical(LogicalIndex, Cover) ->
[{logical_partition(LogicalIndex, P), Node} || {P, Node} <- Cover].

%% @doc Map `LP' to actual partition.
-spec partition(logical_idx(), lp()) -> p().
partition(LogicalIndex, LP) ->
{_, P} = lists:keyfind(LP, 1, LogicalIndex),
P.

reify(Ring, FilterPairs) ->
LPI = logical_partitions(Ring),
[reify_pair(LPI, FilterPair) || FilterPair <- FilterPairs].
-spec reify(logical_idx(), logical_cover_set()) -> filter_cover_set().
reify(LPI, LogicalCoverSet) ->
[reify_data(LPI, E) || E <- LogicalCoverSet].

-spec reify_data(logical_idx(), {lp_node(), logical_filter()}) ->
{{p(), node()}, filter()}.
%% @doc Take a logical filter and make it concrete.
reify_pair(LPI, {LP, all}) ->
{partition(LPI, LP), all};
reify_pair(LPI, {LP, IncludeLPs}) ->
{partition(LPI, LP), [partition(LPI, LP2) || LP2 <- IncludeLPs]}.
reify_data(LPI, {{LP, Node}, all}) ->
P = partition(LPI, LP),
{{P, Node}, all};
reify_data(LPI, {{LP, Node}, IncludeLPs}) ->
P = partition(LPI, LP),
Includes = [partition(LPI, LP2) || LP2 <- IncludeLPs],
{{P, Node}, Includes}.
1 change: 1 addition & 0 deletions src/yz_doc.erl
Expand Up @@ -41,6 +41,7 @@ make_doc(O, FPN, Partition) ->
{text, value(O)},
{?YZ_ENTROPY_DATA_FIELD, gen_vc(O)},
{'_fpn', FPN},
{'_node', ?ATOM_TO_BIN(node())},
{'_pn', Partition},
{'_rk', riak_key(O)}],
{doc, Fields}.
Expand Down
15 changes: 14 additions & 1 deletion src/yz_misc.erl
Expand Up @@ -30,6 +30,13 @@
add_routes(Routes) ->
[webmachine_router:add_route(R) || R <- Routes].

%% @doc Take a list `L' and pair adjacent elements wrapping around the
%% end by pairing the first with the last.
-spec make_pairs([T]) -> [{T,T}].
make_pairs(L) ->
make_pairs(L, hd(L), []).


%% @doc Return the list of partitions owned and about to be owned by
%% this `Node' for the given `Ring'.
-spec owned_and_next_partitions(node(), riak_core_ring:riak_core_ring()) ->
Expand All @@ -45,4 +52,10 @@ owned_and_next_partitions(Node, Ring) ->

%% @private
is_owner(Node) ->
fun({P,Owner}) -> Node == Owner end.
fun({_P, Owner}) -> Node == Owner end.

%% @private
make_pairs([Last], First, Pairs) ->
[{Last, First}|lists:reverse(Pairs)];
make_pairs([A,B|T], _First, Pairs) ->
make_pairs([B|T], _First, [{A,B}|Pairs]).
11 changes: 7 additions & 4 deletions src/yz_solr.erl
Expand Up @@ -195,12 +195,15 @@ build_fq(Partitions) ->
Fields = [filter_to_str(P) || P <- Partitions],
string:join(Fields, " OR ").

filter_to_str({Partition, all}) ->
"_pn:" ++ integer_to_list(Partition);
filter_to_str({Partition, FPFilter}) ->
filter_to_str({{Partition, Owner}, all}) ->
OwnerQ = "_node:" ++ atom_to_list(Owner),
PNQ = "_pn:" ++ integer_to_list(Partition),
"(" ++ OwnerQ ++ " AND " ++ PNQ ++ ")";
filter_to_str({{Partition, Owner}, FPFilter}) ->
OwnerQ = "_node:" ++ atom_to_list(Owner),
PNQ = "_pn:" ++ integer_to_list(Partition),
FPQ = string:join(lists:map(fun fpn_str/1, FPFilter), " OR "),
"(" ++ PNQ ++ " AND (" ++ FPQ ++ "))".
"(" ++ OwnerQ ++ " AND " ++ PNQ ++ " AND (" ++ FPQ ++ "))".

fpn_str(FPN) ->
"_fpn:" ++ integer_to_list(FPN).
Expand Down

0 comments on commit 7df145a

Please sign in to comment.