Skip to content

Commit

Permalink
Centralize and expose preflist calculation
Browse files Browse the repository at this point in the history
This will help when testing/debugging/benchmarking.  You need this
info if you want to manually target a specific index instance.
  • Loading branch information
rzezeski committed Mar 8, 2012
1 parent 6f418c9 commit 7239503
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
27 changes: 8 additions & 19 deletions src/riak_search_op_term.erl
Expand Up @@ -45,18 +45,18 @@ preplan(Op, State) ->
Weights2 = [{Node, Count} || {_, Node, Count} <- Weights1],
TotalCount = lists:sum([Count || {_, _, Count} <- Weights1]),
case length(Weights1) == 0 of
true ->
true ->
throw({error, data_not_available, {IndexName, FieldName, Term}}),
DocFrequency = undefined; %% Make compiler happy.
false ->
false ->
DocFrequency = TotalCount / length(Weights1)
end,
Op#term { weights=Weights2, doc_freq=DocFrequency }.

chain_op(Op, OutputPid, OutputRef, State) ->
F = fun() ->
F = fun() ->
erlang:link(State#search_state.parent),
start_loop(Op, OutputPid, OutputRef, State)
start_loop(Op, OutputPid, OutputRef, State)
end,
erlang:spawn_link(F),
{ok, 1}.
Expand All @@ -81,10 +81,7 @@ stream(Index, Field, Term, FilterFun) ->
%% Get the primary preflist, minus any down nodes. (We don't use
%% secondary nodes since we ultimately read results from one node
%% anyway.)
DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
{ok, Schema} = riak_search_config:get_schema(Index),
NVal = Schema:n_val(),
Preflist = get_preflist(DocIdx, NVal),
Preflist = riak_search_utils:preflist(Index, Field, Term),

%% Try to use the local node if possible. Otherwise choose
%% randomly.
Expand All @@ -103,11 +100,8 @@ info(Index, Field, Term) ->
%% Get the primary preflist, minus any down nodes. (We don't use
%% secondary nodes since we ultimately read results from one node
%% anyway.)
DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
{ok, Schema} = riak_search_config:get_schema(Index),
NVal = Schema:n_val(),
Preflist = get_preflist(DocIdx, NVal),

Preflist = riak_search_utils:preflist(Index, Field, Term),

{ok, Ref} = riak_search_vnode:info(Preflist, Index, Field, Term, self()),
{ok, Results} = riak_search_backend:collect_info_response(length(Preflist), Ref, []),
Results.
Expand Down Expand Up @@ -144,7 +138,7 @@ calculate_score(ScoringVars, Props) ->
TF = math:pow(Frequency, 0.5),
IDF = (1 + math:log(NumDocs/DocFrequency)),
Norm = DocFieldBoost,

Score = TF * math:pow(IDF, 2) * TermBoost * Norm,
ScoreList = case lists:keyfind(score, 1, Props) of
{score, OldScores} ->
Expand All @@ -153,8 +147,3 @@ calculate_score(ScoringVars, Props) ->
[Score]
end,
lists:keystore(score, 1, Props, {score, ScoreList}).

-spec get_preflist(binary(), pos_integer()) -> list().
get_preflist(DocIdx, NVal) ->
lists:map(fun({IdxNode, _}) -> IdxNode end,
riak_core_apl:get_primary_apl(DocIdx, NVal, riak_search)).
26 changes: 18 additions & 8 deletions src/riak_search_utils.erl
@@ -1,13 +1,14 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% -------------------------------------------------------------------

-module(riak_search_utils).

-export([
combine_terms/2,
preflist/3,
to_atom/1,
to_binary/1,
to_utf8/1,
Expand Down Expand Up @@ -108,8 +109,8 @@ from_binary(L) ->
%% Return a key clock to use for revisioning IFTVPs
current_key_clock() ->
{MegaSeconds,Seconds,MilliSeconds}=erlang:now(),
(MegaSeconds * 1000000000000) +
(Seconds * 1000000) +
(MegaSeconds * 1000000000000) +
(Seconds * 1000000) +
MilliSeconds.

%% Choose a random element from the List.
Expand All @@ -123,7 +124,7 @@ choose(List) ->
coalesce(undefined, B) -> B;
coalesce(A, _) -> A.

coalesce([undefined|T]) ->
coalesce([undefined|T]) ->
coalesce(T);
coalesce([H|_]) ->
H;
Expand Down Expand Up @@ -157,7 +158,7 @@ ets_keys_1(Table, Key) ->
%% Given a binary, return an Erlang term.
consult(Binary) ->
case erl_scan:string(riak_search_utils:to_list(Binary)) of
{ok, Tokens, _} ->
{ok, Tokens, _} ->
consult_1(Tokens);
Error ->
Error
Expand All @@ -177,6 +178,15 @@ consult_2(AST) ->
Error
end.

%% @doc Get preflist for the given IFT.
-spec preflist(index(), field(), s_term()) -> list().
preflist(Index, Field, Term) ->
DocIdx = riak_search_ring_utils:calc_partition(Index, Field, Term),
{ok, Schema} = riak_search_config:get_schema(Index),
NVal = Schema:n_val(),
[IdxNode || {IdxNode, _} <- riak_core_apl:get_primary_apl(DocIdx,
NVal,
riak_search)].

%% Run a transform operation in parallel. Results are returned as a
%% list, ordering is not guaranteed in any way. This was implemented
Expand All @@ -202,7 +212,7 @@ ptransform(F, List, NumProcesses) ->
%% the number of processes. Batch size should be at least 1.
ListLength = length(List),
BatchSize = lists:max([1, ListLength div NumProcesses]),

%% Create a ref, used to prevent later interference.
Ref = make_ref(),
Pids = ptransform_spawn(F, List, ListLength, Ref, BatchSize, []),
Expand All @@ -215,7 +225,7 @@ ptransform_spawn(F, List, ListLength, Ref, BatchSize, Pids) when List /= [] ->
true ->
{Pre, Post} = {List, []},
NewListLength = 0;
false ->
false ->
{Pre, Post} = lists:split(BatchSize, List),
NewListLength = ListLength - BatchSize
end,
Expand All @@ -234,7 +244,7 @@ ptransform_spawn(_, [], 0, _, _, Pids) ->

ptransform_collect(Ref, Pids, Acc) when Pids /= [] ->
%% Collect a chunk, and concat results.
receive
receive
{results, Results, Pid, Ref} ->
NewPids = Pids -- [Pid],
NewAcc = Results ++ Acc,
Expand Down

0 comments on commit 7239503

Please sign in to comment.