From 72395036ef44caff46ca07ea3146d44cc2abbe38 Mon Sep 17 00:00:00 2001 From: Ryan Zezeski Date: Sat, 3 Mar 2012 10:24:54 -0500 Subject: [PATCH] Centralize and expose preflist calculation This will help when testing/debugging/benchmarking. You need this info if you want to manually target a specific index instance. --- src/riak_search_op_term.erl | 27 ++++++++------------------- src/riak_search_utils.erl | 26 ++++++++++++++++++-------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/riak_search_op_term.erl b/src/riak_search_op_term.erl index 74c2da25..51c0cfe4 100644 --- a/src/riak_search_op_term.erl +++ b/src/riak_search_op_term.erl @@ -45,18 +45,18 @@ preplan(Op, State) -> Weights2 = [{Node, Count} || {_, Node, Count} <- Weights1], TotalCount = lists:sum([Count || {_, _, Count} <- Weights1]), case length(Weights1) == 0 of - true -> + true -> throw({error, data_not_available, {IndexName, FieldName, Term}}), DocFrequency = undefined; %% Make compiler happy. - false -> + false -> DocFrequency = TotalCount / length(Weights1) end, Op#term { weights=Weights2, doc_freq=DocFrequency }. chain_op(Op, OutputPid, OutputRef, State) -> - F = fun() -> + F = fun() -> erlang:link(State#search_state.parent), - start_loop(Op, OutputPid, OutputRef, State) + start_loop(Op, OutputPid, OutputRef, State) end, erlang:spawn_link(F), {ok, 1}. @@ -81,10 +81,7 @@ stream(Index, Field, Term, FilterFun) -> %% Get the primary preflist, minus any down nodes. (We don't use %% secondary nodes since we ultimately read results from one node %% anyway.) - DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term), - {ok, Schema} = riak_search_config:get_schema(Index), - NVal = Schema:n_val(), - Preflist = get_preflist(DocIdx, NVal), + Preflist = riak_search_utils:preflist(Index, Field, Term), %% Try to use the local node if possible. Otherwise choose %% randomly. @@ -103,11 +100,8 @@ info(Index, Field, Term) -> %% Get the primary preflist, minus any down nodes. (We don't use %% secondary nodes since we ultimately read results from one node %% anyway.) - DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term), - {ok, Schema} = riak_search_config:get_schema(Index), - NVal = Schema:n_val(), - Preflist = get_preflist(DocIdx, NVal), - + Preflist = riak_search_utils:preflist(Index, Field, Term), + {ok, Ref} = riak_search_vnode:info(Preflist, Index, Field, Term, self()), {ok, Results} = riak_search_backend:collect_info_response(length(Preflist), Ref, []), Results. @@ -144,7 +138,7 @@ calculate_score(ScoringVars, Props) -> TF = math:pow(Frequency, 0.5), IDF = (1 + math:log(NumDocs/DocFrequency)), Norm = DocFieldBoost, - + Score = TF * math:pow(IDF, 2) * TermBoost * Norm, ScoreList = case lists:keyfind(score, 1, Props) of {score, OldScores} -> @@ -153,8 +147,3 @@ calculate_score(ScoringVars, Props) -> [Score] end, lists:keystore(score, 1, Props, {score, ScoreList}). - --spec get_preflist(binary(), pos_integer()) -> list(). -get_preflist(DocIdx, NVal) -> - lists:map(fun({IdxNode, _}) -> IdxNode end, - riak_core_apl:get_primary_apl(DocIdx, NVal, riak_search)). diff --git a/src/riak_search_utils.erl b/src/riak_search_utils.erl index 30a67dcf..584c8639 100644 --- a/src/riak_search_utils.erl +++ b/src/riak_search_utils.erl @@ -1,6 +1,6 @@ %% ------------------------------------------------------------------- %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved. %% %% ------------------------------------------------------------------- @@ -8,6 +8,7 @@ -export([ combine_terms/2, + preflist/3, to_atom/1, to_binary/1, to_utf8/1, @@ -108,8 +109,8 @@ from_binary(L) -> %% Return a key clock to use for revisioning IFTVPs current_key_clock() -> {MegaSeconds,Seconds,MilliSeconds}=erlang:now(), - (MegaSeconds * 1000000000000) + - (Seconds * 1000000) + + (MegaSeconds * 1000000000000) + + (Seconds * 1000000) + MilliSeconds. %% Choose a random element from the List. @@ -123,7 +124,7 @@ choose(List) -> coalesce(undefined, B) -> B; coalesce(A, _) -> A. -coalesce([undefined|T]) -> +coalesce([undefined|T]) -> coalesce(T); coalesce([H|_]) -> H; @@ -157,7 +158,7 @@ ets_keys_1(Table, Key) -> %% Given a binary, return an Erlang term. consult(Binary) -> case erl_scan:string(riak_search_utils:to_list(Binary)) of - {ok, Tokens, _} -> + {ok, Tokens, _} -> consult_1(Tokens); Error -> Error @@ -177,6 +178,15 @@ consult_2(AST) -> Error end. +%% @doc Get preflist for the given IFT. +-spec preflist(index(), field(), s_term()) -> list(). +preflist(Index, Field, Term) -> + DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term), + {ok, Schema} = riak_search_config:get_schema(Index), + NVal = Schema:n_val(), + [IdxNode || {IdxNode, _} <- riak_core_apl:get_primary_apl(DocIdx, + NVal, + riak_search)]. %% Run a transform operation in parallel. Results are returned as a %% list, ordering is not guaranteed in any way. This was implemented @@ -202,7 +212,7 @@ ptransform(F, List, NumProcesses) -> %% the number of processes. Batch size should be at least 1. ListLength = length(List), BatchSize = lists:max([1, ListLength div NumProcesses]), - + %% Create a ref, used to prevent later interference. Ref = make_ref(), Pids = ptransform_spawn(F, List, ListLength, Ref, BatchSize, []), @@ -215,7 +225,7 @@ ptransform_spawn(F, List, ListLength, Ref, BatchSize, Pids) when List /= [] -> true -> {Pre, Post} = {List, []}, NewListLength = 0; - false -> + false -> {Pre, Post} = lists:split(BatchSize, List), NewListLength = ListLength - BatchSize end, @@ -234,7 +244,7 @@ ptransform_spawn(_, [], 0, _, _, Pids) -> ptransform_collect(Ref, Pids, Acc) when Pids /= [] -> %% Collect a chunk, and concat results. - receive + receive {results, Results, Pid, Ref} -> NewPids = Pids -- [Pid], NewAcc = Results ++ Acc,