Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'aae-3'

  • Loading branch information...
commit 7a32c6fce7b3d30de0f5f3f0d7a9a6ac29460f9f 2 parents 7de5a0b + eda7f66
Ryan Zezeski authored
23 include/yokozuna.hrl
@@ -96,6 +96,8 @@
96 96 -define(INT_TO_STR(I), integer_to_list(I)).
97 97 -define(PARTITION_BINARY(S), S#state.partition_binary).
98 98
  99 +-define(DATA_DIR, application:get_env(riak_core, platform_data_dir)).
  100 +
99 101 -define(YZ_DEFAULT_SOLR_PORT, "8983").
100 102 -define(YZ_DEFAULT_SOLR_STARTUP_WAIT, 15).
101 103 -define(YZ_DEFAULT_TICK_INTERVAL, 60000).
@@ -114,6 +116,22 @@
114 116 "Not enough nodes are up to service this request.").
115 117
116 118 %%%===================================================================
  119 +%%% Anti Entropy
  120 +%%%===================================================================
  121 +
  122 +-define(YZ_AE_DIR,
  123 + application:get_env(?YZ_APP_NAME, anti_entropy_data_dir)).
  124 +-define(YZ_ENTROPY_TICK,
  125 + app_helper:get_env(?YZ_APP_NAME, entropy_tick, 60000)).
  126 +
  127 +-type hashtree() :: hashtree:hashtree().
  128 +-type exchange() :: {p(), {p(), n()}}.
  129 +-type exchange_mode() :: automatic | manual.
  130 +-type tree() :: pid().
  131 +-type trees() :: orddict(p(), tree()).
  132 +
  133 +
  134 +%%%===================================================================
117 135 %%% Riak KV
118 136 %%%===================================================================
119 137
@@ -164,6 +182,9 @@
164 182 -type index_info() :: #index_info{}.
165 183 -type index_name() :: string().
166 184
  185 +-define(YZ_DEFAULT_INDEX, "_yz_default").
  186 +-define(YZ_INDEX_CONTENT, yz_index_content).
  187 +
167 188 %%%===================================================================
168 189 %%% Schemas
169 190 %%%===================================================================
@@ -204,3 +225,5 @@
204 225
205 226 %% Riak key
206 227 -define(YZ_RK_FIELD, '_yz_rk').
  228 +-define(YZ_RK_FIELD_S, "_yz_rk").
  229 +-define(YZ_RK_FIELD_B, <<"_yz_rk">>).
81 priv/java/com/basho/yokozuna/handler/EntropyData.java
@@ -16,16 +16,18 @@
16 16
17 17 package com.basho.yokozuna.handler;
18 18
19   -import java.security.MessageDigest;
20   -import java.security.NoSuchAlgorithmException;
  19 +import java.io.IOException;
21 20
22 21 import org.apache.commons.codec.binary.Base64;
23 22
  23 +import org.apache.lucene.util.Bits;
24 24 import org.apache.lucene.util.BytesRef;
25 25 import org.apache.lucene.index.AtomicReader;
  26 +import org.apache.lucene.index.DocsEnum;
26 27 import org.apache.lucene.index.Term;
27 28 import org.apache.lucene.index.Terms;
28 29 import org.apache.lucene.index.TermsEnum;
  30 +import org.apache.lucene.search.DocIdSetIterator;
29 31
30 32 import org.apache.solr.common.SolrDocument;
31 33 import org.apache.solr.common.SolrDocumentList;
@@ -73,6 +75,12 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
73 75 throw new Exception("Parameter 'before' is required");
74 76 }
75 77 int n = req.getParams().getInt("n", DEFAULT_N);
  78 +
  79 + String partition = req.getParams().get("partition");
  80 + if (partition == null) {
  81 + throw new Exception("Parameter 'partition' is required");
  82 + }
  83 +
76 84 SolrDocumentList docs = new SolrDocumentList();
77 85
78 86 // Add docs here and modify object inline in code
@@ -83,6 +91,12 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
83 91 AtomicReader rdr = searcher.getAtomicReader();
84 92 BytesRef tmp = null;
85 93 Terms terms = rdr.terms(ENTROPY_DATA_FIELD);
  94 +
  95 + if (terms == null) {
  96 + rsp.add("more", false);
  97 + return;
  98 + }
  99 +
86 100 TermsEnum te = terms.iterator(null);
87 101
88 102 if (isContinue(cont)) {
@@ -111,32 +125,43 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
111 125 String text = null;
112 126 String[] vals = null;
113 127 String ts = null;
114   - String docId = null;
115   - String vectorClock = null;
  128 + String docPartition = null;
  129 + String riakBucket = null;
  130 + String riakKey = null;
  131 + String hash = null;
116 132 int count = 0;
117 133 BytesRef current = null;
  134 + DocsEnum de = null;
  135 + Bits liveDocs = rdr.getLiveDocs();
118 136
119 137 while(!endOfItr(tmp) && count < n) {
120   - current = BytesRef.deepCopyOf(tmp);
121   - text = tmp.utf8ToString();
122   - log.debug("text: " + text);
123   - vals = text.split(" ");
124   - ts = vals[0];
125   -
126   - // TODO: what if null?
127   - if (! (ts.compareTo(before) < 0)) {
128   - rsp.add("more", false);
129   - docs.setNumFound(count);
130   - return;
131   - }
  138 + if (isLive(liveDocs, te)) {
  139 + current = BytesRef.deepCopyOf(tmp);
  140 + text = tmp.utf8ToString();
  141 + log.debug("text: " + text);
  142 + vals = text.split(" ");
  143 + ts = vals[0];
  144 +
  145 + // TODO: what if null?
  146 + if (! (ts.compareTo(before) < 0)) {
  147 + rsp.add("more", false);
  148 + docs.setNumFound(count);
  149 + return;
  150 + }
132 151
133   - docId = vals[1];
134   - vectorClock = vals[2];
135   - SolrDocument tmpDoc = new SolrDocument();
136   - tmpDoc.addField("doc_id", docId);
137   - tmpDoc.addField("base64_vclock", Base64.encodeBase64String(sha(vectorClock)));
138   - docs.add(tmpDoc);
139   - count++;
  152 + docPartition = vals[1];
  153 + riakBucket = vals[2];
  154 + riakKey = vals[3];
  155 + hash = vals[4];
  156 + if (partition.equals(docPartition)) {
  157 + SolrDocument tmpDoc = new SolrDocument();
  158 + tmpDoc.addField("riak_bucket", riakBucket);
  159 + tmpDoc.addField("riak_key", riakKey);
  160 + tmpDoc.addField("base64_hash", hash);
  161 + docs.add(tmpDoc);
  162 + count++;
  163 + }
  164 + }
140 165 tmp = te.next();
141 166 }
142 167
@@ -157,6 +182,10 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
157 182 }
158 183 }
159 184
  185 + static boolean isLive(Bits liveDocs, TermsEnum te) throws IOException {
  186 + DocsEnum de = te.docs(liveDocs, null);
  187 + return de.nextDoc() != DocIdSetIterator.NO_MORE_DOCS;
  188 + }
160 189
161 190 static BytesRef decodeCont(String cont) {
162 191 byte[] bytes = Base64.decodeBase64(cont);
@@ -167,12 +196,6 @@ static boolean endOfItr(BytesRef returnValue) {
167 196 return returnValue == null;
168 197 }
169 198
170   - static byte[] sha(String s) throws NoSuchAlgorithmException {
171   - MessageDigest md = MessageDigest.getInstance("SHA");
172   - md.update(s.getBytes());
173   - return md.digest();
174   - }
175   -
176 199 static boolean isContinue(BytesRef cont) {
177 200 return DEFAULT_CONT != cont;
178 201 }
2  rebar.config
@@ -4,8 +4,6 @@
4 4
5 5 {deps,
6 6 [
7   - %% {esolr, ".*",
8   - %% {git, "git://github.com/lennart/esolr.git", {branch, "master"}}},
9 7 {ibrowse, ".*",
10 8 {git, "git://github.com/cmullaparthi/ibrowse.git", {tag, "v3.0.4"}}},
11 9 {lager, ".*",
118 riak_test/yokozuna_essential.erl
@@ -7,22 +7,43 @@
7 7 -define(INDEX_B, <<"fruit">>).
8 8 -define(NUM_KEYS, 10000).
9 9 -define(SUCCESS, 0).
  10 +-define(CFG,
  11 + [{riak_kv,
  12 + [
  13 + %% build/expire often
  14 + {anti_entropy_build_limit, {100, 1000}},
  15 + {anti_entropy_expire, 10000},
  16 + {anti_entropy_concurrency, 12}
  17 + ]},
  18 + {yokozuna,
  19 + [
  20 + {entropy_tick, 1000}
  21 + ]},
  22 + {lager,
  23 + [{handlers,
  24 + [{lager_file_backend,
  25 + [{"./log/error.log",error,10485760,"$D0",5},
  26 + {"./log/console.log",debug,104857600,"$D0",10}]}]}]}
  27 + ]).
10 28
11 29 confirm() ->
12 30 YZBenchDir = rt:get_os_env("YZ_BENCH_DIR"),
13 31 code:add_path(filename:join([YZBenchDir, "ebin"])),
14 32 random:seed(now()),
15   - Nodes = rt:deploy_nodes(4),
  33 + Nodes = rt:deploy_nodes(4, ?CFG),
16 34 Cluster = join_three(Nodes),
17 35 wait_for_joins(Cluster),
18 36 setup_indexing(Cluster, YZBenchDir),
19   - load_data(Cluster, YZBenchDir),
  37 + load_data(Cluster, "fruit", YZBenchDir),
20 38 Ref = async_query(Cluster, YZBenchDir),
  39 + %% Verify data exists before running join
  40 + timer:sleep(10000),
21 41 Cluster2 = join_rest(Cluster, Nodes),
22 42 check_status(wait_for(Ref)),
23 43 ok = test_tagging(Cluster),
24 44 KeysDeleted = delete_some_data(Cluster2, reap_sleep()),
25 45 verify_deletes(Cluster2, KeysDeleted, YZBenchDir),
  46 + ok = verify_aae(Cluster2, YZBenchDir),
26 47 ok = test_siblings(Cluster),
27 48 pass.
28 49
@@ -104,6 +125,60 @@ allow_mult(Cluster, Bucket) ->
104 125 %% end || N <- Cluster],
105 126 ok.
106 127
  128 +verify_aae(Cluster, YZBenchDir) ->
  129 + lager:info("Verify AAE"),
  130 + load_data(Cluster, "fruit_aae", YZBenchDir),
  131 + Keys = random_keys(),
  132 + {DelKeys, _ChangeKeys} = lists:split(length(Keys) div 2, Keys),
  133 + [ok = delete_ids(Cluster, "fruit_aae", K) || K <- DelKeys],
  134 + %% wait for soft commit
  135 + timer:sleep(1000),
  136 + %% ok = change_random_ids(Cluster, ChangeKeys),
  137 + HP = hd(host_entries(rt:connection_info(Cluster))),
  138 + ok = wait_for_aae(HP, "fruit_aae", ?NUM_KEYS).
  139 +
  140 +wait_for_aae(HP, Index, ExpectedNumFound) ->
  141 + wait_for_aae(HP, Index, ExpectedNumFound, 0).
  142 +
  143 +wait_for_aae(_, Index, _, 24) ->
  144 + lager:error("Hit limit waiting for AAE to repair indexes for ~p", [Index]),
  145 + aae_failed;
  146 +wait_for_aae(HP, Index, ExpectedNumFound, Tries) ->
  147 + case search(HP, "fruit_aae", "text", "apricot", ExpectedNumFound) of
  148 + true -> ok;
  149 + _ ->
  150 + timer:sleep(5000),
  151 + wait_for_aae(HP, Index, ExpectedNumFound, Tries + 1)
  152 + end.
  153 +
  154 +delete_ids(Cluster, Index, Key) ->
  155 + BKey = {list_to_binary(Index), list_to_binary(Key)},
  156 + Node = hd(Cluster),
  157 + Preflist = get_preflist(Node, BKey),
  158 + SolrIds = solr_ids(Node, Preflist, BKey),
  159 + ok = solr_delete(Cluster, SolrIds).
  160 +
  161 +get_preflist(Node, BKey) ->
  162 + Ring = rpc:call(Node, yz_misc, get_ring, [transformed]),
  163 + DocIdx = rpc:call(Node, riak_core_util, chash_std_keyfun, [BKey]),
  164 + Preflist = rpc:call(Node, riak_core_ring, preflist, [DocIdx, Ring]),
  165 + lists:sublist(Preflist, 3).
  166 +
  167 +solr_ids(Node, Preflist, {B,K}) ->
  168 + LPL = rpc:call(Node, yz_misc, convert_preflist, [Preflist, logical]),
  169 + [begin
  170 + Suffix = "_" ++ integer_to_list(P),
  171 + {binary_to_list(B), binary_to_list(K) ++ Suffix}
  172 + end
  173 + || {P,_} <- LPL].
  174 +
  175 +solr_delete(Cluster, SolrIds) ->
  176 + [begin
  177 + lager:info("Deleting solr id ~p/~p", [B, Id]),
  178 + rpc:multicall(Cluster, yz_solr, delete, [B, Id])
  179 + end|| {B, Id} <- SolrIds],
  180 + ok.
  181 +
107 182 test_tagging(Cluster) ->
108 183 lager:info("Test tagging"),
109 184 HP = hd(host_entries(rt:connection_info(Cluster))),
@@ -113,7 +188,8 @@ test_tagging(Cluster) ->
113 188 R1 = search(HP, "tagging", "user_s", "rzezeski"),
114 189 verify_count(1, R1),
115 190 R2 = search(HP, "tagging", "desc_t", "description"),
116   - verify_count(1, R2).
  191 + verify_count(1, R2),
  192 + ok.
117 193
118 194 write_with_tag({Host, Port}) ->
119 195 lager:info("Tag the object tagging/test"),
@@ -128,6 +204,10 @@ write_with_tag({Host, Port}) ->
128 204 {ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body, Opts),
129 205 ok.
130 206
  207 +search(HP, Index, Name, Term, Expect) ->
  208 + R = search(HP, Index, Name, Term),
  209 + verify_count(Expect, R).
  210 +
131 211 search({Host, Port}, Index, Name, Term) ->
132 212 URL = lists:flatten(io_lib:format("http://~s:~s/search/~s?q=~s:~s&wt=json",
133 213 [Host, integer_to_list(Port), Index, Name, Term])),
@@ -141,10 +221,12 @@ search({Host, Port}, Index, Name, Term) ->
141 221 {bad_response, Other}
142 222 end.
143 223
144   -verify_count(Expected, Resp) ->
  224 +get_count(Resp) ->
145 225 Struct = mochijson2:decode(Resp),
146   - NumFound = yz_driver:get_path(Struct, [<<"response">>, <<"numFound">>]),
147   - ?assertEqual(Expected, NumFound).
  226 + yz_driver:get_path(Struct, [<<"response">>, <<"numFound">>]).
  227 +
  228 +verify_count(Expected, Resp) ->
  229 + Expected == get_count(Resp).
148 230
149 231 async_query(Cluster, YZBenchDir) ->
150 232 lager:info("Run async query against cluster ~p", [Cluster]),
@@ -184,10 +266,8 @@ delete_key(Cluster, Key) ->
184 266 C:delete(?INDEX_B, list_to_binary(Key)).
185 267
186 268 delete_some_data(Cluster, ReapSleep) ->
187   - Num = random:uniform(100),
188   - lager:info("Deleting ~p keys", [Num]),
189   - Keys = [integer_to_list(random:uniform(?NUM_KEYS))
190   - || _ <- lists:seq(1, Num)],
  269 + Keys = random_keys(),
  270 + lager:info("Deleting ~p keys", [length(Keys)]),
191 271 [delete_key(Cluster, K) || K <- Keys],
192 272 lager:info("Sleeping ~ps to allow for reap", [ReapSleep]),
193 273 timer:sleep(timer:seconds(ReapSleep)),
@@ -210,8 +290,8 @@ join_rest([NodeA|_]=Cluster, Nodes) ->
210 290 [begin rt:join(Node, NodeA) end || Node <- ToJoin],
211 291 Nodes.
212 292
213   -load_data(Cluster, YZBenchDir) ->
214   - lager:info("Load data onto cluster ~p", [Cluster]),
  293 +load_data(Cluster, Index, YZBenchDir) ->
  294 + lager:info("Load data for index ~p onto cluster ~p", [Index, Cluster]),
215 295 Hosts = host_entries(rt:connection_info(Cluster)),
216 296 KeyGen = {function, yz_driver, fruit_key_val_gen, [?NUM_KEYS]},
217 297 Cfg = [{mode,max},
@@ -219,12 +299,12 @@ load_data(Cluster, YZBenchDir) ->
219 299 {concurrent, 3},
220 300 {code_paths, [YZBenchDir]},
221 301 {driver, yz_driver},
222   - {index_path, "/riak/fruit"},
  302 + {index_path, "/riak/" ++ Index},
223 303 {http_conns, Hosts},
224 304 {pb_conns, []},
225 305 {key_generator, KeyGen},
226 306 {operations, [{load_fruit, 1}]}],
227   - File = "bb-load-" ++ ?INDEX_S,
  307 + File = "bb-load-" ++ Index,
228 308 write_terms(File, Cfg),
229 309 run_bb(sync, File).
230 310
@@ -257,6 +337,8 @@ setup_indexing(Cluster, YZBenchDir) ->
257 337 ok = store_schema(Node, ?FRUIT_SCHEMA_NAME, RawSchema),
258 338 ok = create_index(Node, ?INDEX_S, ?FRUIT_SCHEMA_NAME),
259 339 ok = install_hook(Node, ?INDEX_B),
  340 + ok = create_index(Node, "fruit_aae", ?FRUIT_SCHEMA_NAME),
  341 + ok = install_hook(Node, <<"fruit_aae">>),
260 342 ok = create_index(Node, "tagging"),
261 343 ok = install_hook(Node, <<"tagging">>),
262 344 ok = create_index(Node, "siblings"),
@@ -293,6 +375,7 @@ wait_for(Ref) ->
293 375 rt:wait_for_cmd(Ref).
294 376
295 377 wait_for_joins(Cluster) ->
  378 + lager:info("Waiting for ownership handoff to finish"),
296 379 rt:wait_until_nodes_ready(Cluster),
297 380 rt:wait_until_no_pending_changes(Cluster).
298 381
@@ -300,3 +383,10 @@ write_terms(File, Terms) ->
300 383 {ok, IO} = file:open(File, [write]),
301 384 [io:fwrite(IO, "~p.~n", [T]) || T <- Terms],
302 385 file:close(IO).
  386 +
  387 +random_keys() ->
  388 + random_keys(random:uniform(100)).
  389 +
  390 +random_keys(Num) ->
  391 + lists:usort([integer_to_list(random:uniform(?NUM_KEYS))
  392 + || _ <- lists:seq(1, Num)]).
64 src/yokozuna.erl
@@ -27,11 +27,6 @@
27 27 %%% API
28 28 %%%===================================================================
29 29
30   -%% @doc Index the given object `O'.
31   --spec index(string(), riak_object:riak_object()) -> ok | {error, term()}.
32   -index(Index, O) ->
33   - yz_solr:index(Index, yz_doc:make_docs(O, <<"FPN">>, <<"Partition">>)).
34   -
35 30 %% @doc Return the set of unique logical partitions stored on this
36 31 %% node for the given `Index'.
37 32 -spec partition_list(string()) -> ordset(lp()).
@@ -51,62 +46,3 @@ search(Index, Query, Mapping) ->
51 46
52 47 solr_port(Node, Ports) ->
53 48 proplists:get_value(Node, Ports).
54   -
55   -%%%===================================================================
56   -%%% Private
57   -%%%===================================================================
58   -
59   -test_it(Index) ->
60   - B = <<"fruit">>,
61   - O1 = riak_object:new(B, <<"apples">>, <<"2">>),
62   - O2 = riak_object:new(B, <<"oranges">>, <<"1">>),
63   - O3 = riak_object:new(B, <<"strawberries">>, <<"6">>),
64   - O4 = riak_object:new(B, <<"lemons">>, <<"1">>),
65   - O5 = riak_object:new(B, <<"celery">>, <<"4">>),
66   - O6 = riak_object:new(B, <<"lime">>, <<"1">>),
67   - [index(Index, O) || O <- [O1, O2, O3, O4, O5, O6]],
68   - yz_solr:commit(Index).
69   -
70   -demo_write_objs(Index) ->
71   - ibrowse:start(),
72   - write_n_objs(Index, 1000),
73   - yz_solr:commit(Index).
74   -
75   -demo_build_tree(Index, Name) ->
76   - ibrowse:start(),
77   - TP = yz_entropy:new_tree_proc(Index, Name),
78   - Pid = element(3, TP),
79   - Ref = make_ref(),
80   - Pid ! {get_tree, self(), Ref},
81   - receive {tree, Ref, Tree} -> Tree end,
82   - %% returning TreeProc too in case want to play with it
83   - {Tree, TP}.
84   -
85   -demo_new_vclock(Index, N) ->
86   - %% the timestamp will change causing hash to change
87   - NS = list_to_binary(integer_to_list(N)),
88   - B = <<"test">>,
89   - K = <<"key_",NS/binary>>,
90   - V = <<"val_",NS/binary>>,
91   - O = riak_object:new(B, K, V),
92   - O2 = riak_object:increment_vclock(O, dummy_node),
93   - index(Index, O2),
94   - yz_solr:commit(Index).
95   -
96   -demo_delete(Index, N) ->
97   - NS = integer_to_list(N),
98   - K = "key_" ++ NS,
99   - ok = yz_solr:delete(Index, K),
100   - ok = yz_solr:commit(Index).
101   -
102   -write_n_objs(_, 0) ->
103   - ok;
104   -write_n_objs(Index, N) ->
105   - NS = list_to_binary(integer_to_list(N)),
106   - B = <<"test">>,
107   - K = <<"key_",NS/binary>>,
108   - V = <<"val_",NS/binary>>,
109   - O = riak_object:new(B, K, V),
110   - O2 = riak_object:increment_vclock(O, dummy_node),
111   - index(Index, O2),
112   - write_n_objs(Index, N-1).
12 src/yokozuna_sup.erl
@@ -45,4 +45,14 @@ init(_Args) ->
45 45 {yz_events, start_link, []},
46 46 permanent, 5000, worker, [yz_events]},
47 47
48   - {ok, {{one_for_one, 5, 10}, [SolrProc, Events]}}.
  48 + HashtreeSup = {yz_index_hashtree_sup,
  49 + {yz_index_hashtree_sup, start_link, []},
  50 + permanent, infinity, supervisor, [yz_index_hashtree_sup]},
  51 +
  52 + EntropyMgr = {yz_entropy_mgr,
  53 + {yz_entropy_mgr, start_link, []},
  54 + permanent, 5000, worker, [yz_entropy_mgr]},
  55 +
  56 + Children = [SolrProc, Events, HashtreeSup, EntropyMgr],
  57 +
  58 + {ok, {{one_for_one, 5, 10}, Children}}.
57 src/yz_doc.erl
@@ -33,9 +33,9 @@
33 33 add_to_doc({doc, Fields}, Field) ->
34 34 {doc, [Field|Fields]}.
35 35
36   --spec doc_id(riak_object:riak_object(), binary()) -> binary().
  36 +-spec doc_id(obj(), binary()) -> binary().
37 37 doc_id(O, Partition) ->
38   - <<(riak_object:key(O))/binary,"_",Partition/binary>>.
  38 + <<(yz_kv:get_obj_key(O))/binary,"_",Partition/binary>>.
39 39
40 40 doc_id(O, Partition, none) ->
41 41 doc_id(O, Partition);
@@ -47,29 +47,36 @@ doc_id(O, Partition, Sibling) ->
47 47 has_siblings(O) -> riak_object:value_count(O) > 1.
48 48
49 49 %% @doc Given an object generate the doc to be indexed by Solr.
50   --spec make_docs(riak_object:riak_object(), binary(), binary()) -> [doc()].
51   -make_docs(O, FPN, Partition) ->
52   - [make_doc(O, Content, FPN, Partition) || Content <- riak_object:get_contents(O)].
  50 +-spec make_docs(obj(), binary(), binary(), boolean()) -> [doc()].
  51 +make_docs(O, FPN, Partition, IndexContent) ->
  52 + [make_doc(O, Content, FPN, Partition, IndexContent)
  53 + || Content <- riak_object:get_contents(O)].
53 54
54   --spec make_doc(riak_object:riak_object(), {dict(), dict()}, binary(), binary()) -> doc().
55   -make_doc(O, {MD, V}, FPN, Partition) ->
  55 +-spec make_doc(obj(), {dict(), dict()}, binary(), binary(), boolean()) -> doc().
  56 +make_doc(O, {MD, V}, FPN, Partition, IndexContent) ->
56 57 Vtag = get_vtag(O, MD),
57 58 DocId = doc_id(O, Partition, Vtag),
58   - Fields = make_fields({DocId, riak_key(O), gen_vc(O), FPN, Partition, Vtag}),
59   - ExtractedFields = extract_fields({MD, V}),
  59 + EntropyData = gen_ed(O, Partition),
  60 + Fields = make_fields({DocId, yz_kv:get_obj_key(O), FPN,
  61 + Partition, Vtag, EntropyData}),
  62 + ExtractedFields =
  63 + case IndexContent of
  64 + true -> extract_fields({MD, V});
  65 + false -> []
  66 + end,
60 67 Tags = extract_tags(MD),
61 68 {doc, lists:append([Tags, ExtractedFields, Fields])}.
62 69
63   -make_fields({DocId, Key, VC, FPN, Partition, none}) ->
  70 +make_fields({DocId, Key, FPN, Partition, none, EntropyData}) ->
64 71 [{id, DocId},
65   - {?YZ_ED_FIELD, VC},
  72 + {?YZ_ED_FIELD, EntropyData},
66 73 {?YZ_FPN_FIELD, FPN},
67 74 {?YZ_NODE_FIELD, ?ATOM_TO_BIN(node())},
68 75 {?YZ_PN_FIELD, Partition},
69 76 {?YZ_RK_FIELD, Key}];
70 77
71   -make_fields({DocId, Key, VC, FPN, Partition, Vtag}) ->
72   - make_fields({DocId, Key, VC, FPN, Partition, none}) ++
  78 +make_fields({DocId, Key, FPN, Partition, Vtag, EntropyData}) ->
  79 + make_fields({DocId, Key, FPN, Partition, none, EntropyData}) ++
73 80 [{?YZ_VTAG_FIELD, Vtag}].
74 81
75 82 %% @doc If this is a sibling, return its binary vtag
@@ -79,7 +86,7 @@ get_vtag(O, MD) ->
79 86 _ -> none
80 87 end.
81 88
82   -% -spec extract_fields(obj()) -> fields() | {error, any()}.
  89 +-spec extract_fields({obj_metadata(), term()}) -> fields() | {error, any()}.
83 90 extract_fields({MD, V}) ->
84 91 case yz_kv:is_tombstone(MD) of
85 92 false ->
@@ -179,23 +186,19 @@ split_tag_names(TagNames) ->
179 186 doc_ts(MD) ->
180 187 dict:fetch(<<"X-Riak-Last-Modified">>, MD).
181 188
182   -doc_vclock(O) ->
183   - riak_object:vclock(O).
184   -
185 189 gen_ts() ->
186 190 {{Year, Month, Day},
187 191 {Hour, Min, Sec}} = calendar:now_to_universal_time(erlang:now()),
188 192 list_to_binary(io_lib:format("~4..0B~2..0B~2..0BT~2..0B~2..0B~2..0B",
189 193 [Year,Month,Day,Hour,Min,Sec])).
190 194
191   -gen_vc(O) ->
  195 +%% NOTE: All of this data needs to be in one field to efficiently
  196 +%% iterate. Otherwise the doc would have to be fetched for each
  197 +%% entry.
  198 +gen_ed(O, Partition) ->
192 199 TS = gen_ts(),
193   - RiakKey = riak_key(O),
194   - VClock = base64:encode(crypto:sha(term_to_binary(doc_vclock(O)))),
195   - <<TS/binary," ",RiakKey/binary," ",VClock/binary>>.
196   -
197   -riak_key(O) ->
198   - riak_object:key(O).
199   -
200   -value(O) ->
201   - riak_object:get_value(O).
  200 + RiakBucket = yz_kv:get_obj_bucket(O),
  201 + RiakKey = yz_kv:get_obj_key(O),
  202 + %% TODO: do this in KV vnode and pass to hook
  203 + Hash = base64:encode(yz_kv:hash_object(O)),
  204 + <<TS/binary," ",Partition/binary," ",RiakBucket/binary," ",RiakKey/binary," ",Hash/binary>>.
65 src/yz_entropy.erl
@@ -30,21 +30,6 @@
30 30 %% TODO: proper supervision and probably make tree proc a gen_server
31 31
32 32 %%%===================================================================
33   -%%% API
34   -%%%===================================================================
35   -
36   --spec new_tree_proc(string(), tree_name()) -> tree_ref() | already_running.
37   -new_tree_proc(Index, Name) ->
38   - case whereis(Name) of
39   - undefined ->
40   - {Pid, Ref} = spawn_monitor(?MODULE, tree_loop, [Index]),
41   - register(Name, Pid),
42   - #tree_ref{index=Index, name=Name, pid=Pid, ref=Ref};
43   - Pid ->
44   - {already_running, Pid}
45   - end.
46   -
47   -%%%===================================================================
48 33 %%% Private
49 34 %%%===================================================================
50 35
@@ -52,25 +37,32 @@ gen_before() ->
52 37 DateTime = calendar:now_to_universal_time(os:timestamp()),
53 38 to_datetime(minus_period(DateTime, [{mins, 5}])).
54 39
55   -build_tree(Index) ->
56   - Before = gen_before(),
57   - T1 = hashtree:new(),
58   - SV = yz_solr:get_vclocks(Index, Before, none, 100),
59   - iterate_vclocks(Index, Before, T1, SV).
60   -
61 40 ht_insert({Key, VCHash}, Tree) ->
62 41 hashtree:insert(Key, VCHash, Tree).
63 42
64   -iterate_vclocks(Index, Before, Tree, #solr_vclocks{more=true,
65   - continuation=Cont,
66   - pairs=Pairs}) ->
67   - Tree2 = lists:foldl(fun ht_insert/2, Tree, Pairs),
68   - SV = yz_solr:get_vclocks(Index, Before, Cont, 100),
69   - iterate_vclocks(Index, Before, Tree2, SV);
70   -iterate_vclocks(_, _, Tree, #solr_vclocks{more=false,
71   - pairs=Pairs}) ->
72   - Tree2 = lists:foldl(fun ht_insert/2, Tree, Pairs),
73   - hashtree:update_tree(Tree2).
  43 +%% @doc Iterate all the entropy data in `Index' calling `Fun' for
  44 +%% every 100 entries.
  45 +-spec iterate_entropy_data(index_name(), list(), function()) -> ok.
  46 +iterate_entropy_data(Index, Filter, Fun) ->
  47 + case yz_solr:ping(Index) of
  48 + true ->
  49 + DateTime = calendar:now_to_universal_time(os:timestamp()),
  50 + Before = to_datetime(minus_period(DateTime, [{mins, 5}])),
  51 + SV = yz_solr:get_vclocks(Index, Before, Filter, none, 100),
  52 + iterate_entropy_data(Index, Before, Filter, Fun, SV);
  53 + false ->
  54 + ok
  55 + end.
  56 +
  57 +iterate_entropy_data(Index, Before, Filter, Fun, #solr_vclocks{more=true,
  58 + continuation=Cont,
  59 + pairs=Pairs}) ->
  60 + lists:foreach(Fun, Pairs),
  61 + SV = yz_solr:get_vclocks(Index, Before, Filter, Cont, 100),
  62 + iterate_entropy_data(Index, Before, Filter, Fun, SV);
  63 +iterate_entropy_data(_, _, _, Fun, #solr_vclocks{more=false,
  64 + pairs=Pairs}) ->
  65 + lists:foreach(Fun, Pairs).
74 66
75 67 %% @doc Minus Period from DateTime.
76 68 %%
@@ -101,14 +93,3 @@ to_datetime({_Mega, _Secs, _Micro}=Now) ->
101 93 to_datetime({{Year, Month, Day}, {Hour, Min, Sec}}) ->
102 94 list_to_binary(io_lib:format("~4..0B~2..0B~2..0BT~2..0B~2..0B~2..0B",
103 95 [Year,Month,Day,Hour,Min,Sec])).
104   -
105   -tree_loop(Index) ->
106   - Tree = build_tree(Index),
107   - tree_cache_loop(Tree).
108   -
109   -tree_cache_loop(Tree) ->
110   - receive
111   - {get_tree, Pid, Ref} ->
112   - Pid ! {tree, Ref, Tree},
113   - tree_cache_loop(Tree)
114   - end.
570 src/yz_entropy_mgr.erl
... ... @@ -0,0 +1,570 @@
  1 +%% -------------------------------------------------------------------
  2 +%%
  3 +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
  4 +%%
  5 +%% This file is provided to you under the Apache License,
  6 +%% Version 2.0 (the "License"); you may not use this file
  7 +%% except in compliance with the License. You may obtain
  8 +%% a copy of the License at
  9 +%%
  10 +%% http://www.apache.org/licenses/LICENSE-2.0
  11 +%%
  12 +%% Unless required by applicable law or agreed to in writing,
  13 +%% software distributed under the License is distributed on an
  14 +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15 +%% KIND, either express or implied. See the License for the
  16 +%% specific language governing permissions and limitations
  17 +%% under the License.
  18 +%%
  19 +%% -------------------------------------------------------------------
  20 +
  21 +-module(yz_entropy_mgr).
  22 +-compile(export_all).
  23 +-behaviour(gen_server).
  24 +-include("yokozuna.hrl").
  25 +
  26 +%% gen_server callbacks
  27 +-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  28 + terminate/2, code_change/3]).
  29 +
  30 +-record(state, {mode :: exchange_mode(),
  31 + trees :: trees(),
  32 + tree_queue :: trees(),
  33 + locks :: [{pid(),reference()}],
  34 + build_tokens = 0 :: non_neg_integer(),
  35 + exchange_queue :: [exchange()],
  36 + exchanges :: [{p(),reference(), pid()}]}).
  37 +-type state() :: #state{}.
  38 +
  39 +-define(DEFAULT_CONCURRENCY, 2).
  40 +-define(DEFAULT_BUILD_LIMIT, {1, 3600000}). %% Once per hour
  41 +
  42 +%%%===================================================================
  43 +%%% API
  44 +%%%===================================================================
  45 +
  46 +start_link() ->
  47 + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  48 +
  49 +%% @doc Acquire an exchange concurrency lock if available, and associate
  50 +%% the lock with the calling process.
  51 +-spec get_lock(term()) -> ok | max_concurrency.
  52 +get_lock(Type) ->
  53 + get_lock(Type, self()).
  54 +
  55 +-spec get_lock(term(), pid()) -> ok | max_concurrency.
  56 +get_lock(Type, Pid) ->
  57 + gen_server:call(?MODULE, {get_lock, Type, Pid}, infinity).
  58 +
  59 +-spec get_tree(p()) -> {ok, tree()} | not_registered.
  60 +get_tree(Index) ->
  61 + %% NOTE: This is called by yz_kv:get_tree which is running on KV
  62 + %% vnode process. Think about putting tree register in ETS
  63 + %% table and making it public for read to avoid blocking
  64 + %% vnode when entryopy mgr is backed up.
  65 + gen_server:call(?MODULE, {get_tree, Index}, infinity).
  66 +
  67 +%% @doc Used by {@link yz_index_hashtree} to requeue a poke on build
  68 +%% failure.
  69 +-spec requeue_poke(p()) -> ok.
  70 +requeue_poke(Index) ->
  71 + gen_server:cast(?MODULE, {requeue_poke, Index}).
  72 +
  73 +%% @doc Used by {@link yz_exchange_fsm} to inform the entropy manager
  74 +%% about the status of an exchange (ie. completed without issue,
  75 +%% failed, etc)
  76 +-spec exchange_status(p(), {p(), n()}, term()) -> ok.
  77 +exchange_status(Index, IndexN, Status) ->
  78 + gen_server:cast(?MODULE, {exchange_status, self(), Index, IndexN, Status}).
  79 +
  80 +%% @doc Returns true of AAE is enabled, false otherwise.
  81 +-spec enabled() -> boolean().
  82 +enabled() ->
  83 + riak_kv_entropy_manager:enabled().
  84 +
  85 +%% @doc Set AAE to either `automatic' or `manual' mode. In automatic mode, the
  86 +%% entropy manager triggers all necessary hashtree exchanges. In manual
  87 +%% mode, exchanges must be triggered using {@link manual_exchange/1}.
  88 +%% Regardless of exchange mode, the entropy manager will always ensure
  89 +%% local hashtrees are built and rebuilt as necessary.
  90 +-spec set_mode(automatic | manual) -> ok.
  91 +set_mode(Mode=automatic) ->
  92 + ok = gen_server:call(?MODULE, {set_mode, Mode}, infinity);
  93 +set_mode(Mode=manual) ->
  94 + ok = gen_server:call(?MODULE, {set_mode, Mode}, infinity).
  95 +
  96 +%% NOTE: Yokozuna only runs when KV AAE runs, but this API is needed
  97 +%% so that the Yokozuna hashtrees may be stopped.
  98 +disable() ->
  99 + gen_server:call(?MODULE, disable, infinity).
  100 +
  101 +%% @doc Manually trigger hashtree exchanges.
  102 +%%
  103 +%% -- If a partition is provided, trigger exchanges between Yokozuna
  104 +%% and KV for all preflists stored by the partition.
  105 +%%
  106 +%% -- If both a partition and preflist are provided, trigger
  107 +%% exchange between Yokozuna and KV for that index/preflist.
  108 +-spec manual_exchange(p() | {p(), {p(), n()}}) -> ok.
  109 +manual_exchange(Exchange) ->
  110 + gen_server:call(?MODULE, {manual_exchange, Exchange}, infinity).
  111 +
  112 +%% @doc Stop the exchange currently executing for `Index', if there
  113 +%% is one.
  114 +-spec cancel_exchange(p()) -> ok | undefined.
  115 +cancel_exchange(Index) ->
  116 + gen_server:call(?MODULE, {cancel_exchange, Index}, infinity).
  117 +
  118 +%% @doc Stop all currently executing exchanges.
  119 +-spec cancel_exchanges() -> [p()].
  120 +cancel_exchanges() ->
  121 + gen_server:call(?MODULE, cancel_exchanges, infinity).
  122 +
  123 +%%%===================================================================
  124 +%%% gen_server callbacks
  125 +%%%===================================================================
  126 +
  127 +init([]) ->
  128 + Trees = get_trees_from_sup(),
  129 + schedule_tick(),
  130 + {_, Opts} = settings(),
  131 + Mode = case proplists:is_defined(manual, Opts) of
  132 + true -> manual;
  133 + false -> automatic
  134 + end,
  135 + S = #state{mode=Mode,
  136 + trees=Trees,
  137 + tree_queue=[],
  138 + locks=[],
  139 + exchanges=[],
  140 + exchange_queue=[]},
  141 + S2 = reset_build_tokens(S),
  142 + schedule_reset_build_tokens(),
  143 + {ok, S2}.
  144 +
  145 +handle_call({get_lock, Type, Pid}, _From, S) ->
  146 + {Reply, S2} = do_get_lock(Type, Pid, S),
  147 + {reply, Reply, S2};
  148 +
  149 +handle_call({get_tree, Index}, _From, S) ->
  150 + Resp = get_tree(Index, S),
  151 + {reply, Resp, S};
  152 +
  153 +handle_call({manual_exchange, Exchange}, _From, S) ->
  154 + S2 = enqueue_exchange(Exchange, S),
  155 + {reply, ok, S2};
  156 +
  157 +handle_call({cancel_exchange, Index}, _From, S) ->
  158 + case lists:keyfind(Index, 1, S#state.exchanges) of
  159 + false ->
  160 + {reply, undefined, S};
  161 + {Index, _Ref, Pid} ->
  162 + exit(Pid, kill),
  163 + {reply, ok, S}
  164 + end;
  165 +
  166 +handle_call(cancel_exchanges, _From, S=#state{exchanges=Exchanges}) ->
  167 + Indices = [begin
  168 + exit(Pid, kill),
  169 + Index
  170 + end || {Index, _Ref, Pid} <- Exchanges],
  171 + {reply, Indices, S};
  172 +
  173 +handle_call(disable, _From, S) ->
  174 + [yz_index_hashtree:stop(T) || {_,T} <- S#state.trees],
  175 + {reply, ok, S};
  176 +
  177 +handle_call({set_mode, Mode}, _From, S) ->
  178 + S2 = S#state{mode=Mode},
  179 + {reply, ok, S2};
  180 +
  181 +handle_call(Request, From, S) ->
  182 + lager:warning("Unexpected call: ~p from ~p", [Request, From]),
  183 + {reply, unexpected_call, S}.
  184 +
  185 +handle_cast({requeue_poke, Index}, S) ->
  186 + S2 = requeue_poke(Index, S),
  187 + {noreply, S2};
  188 +
  189 +handle_cast({exchange_status, Pid, Index, {StartIdx, N}, Status}, S) ->
  190 + S2 = do_exchange_status(Pid, Index, {StartIdx, N}, Status, S),
  191 + {noreply, S2};
  192 +
  193 +handle_cast(_Msg, S) ->
  194 + lager:warning("Unexpected cast: ~p", [_Msg]),
  195 + {noreply, S}.
  196 +
  197 +handle_info(tick, S) ->
  198 + S2 = maybe_tick(S),
  199 + {noreply, S2};
  200 +
  201 +handle_info(reset_build_tokens, S) ->
  202 + S2 = reset_build_tokens(S),
  203 + schedule_reset_build_tokens(),
  204 + {noreply, S2};
  205 +
  206 +handle_info({'DOWN', Ref, _, Obj, Status}, S) ->
  207 + %% NOTE: The down msg could be for exchange FSM or tree
  208 + S2 = maybe_release_lock(Ref, S),
  209 + S3 = maybe_clear_exchange(Ref, Status, S2),
  210 + S4 = maybe_clear_registered_tree(Obj, S3),
  211 + {noreply, S4};
  212 +
  213 +handle_info(_Msg, S) ->
  214 + lager:warning("Unexpected msg: ~p", [_Msg]),
  215 + {noreply, S}.
  216 +
  217 +terminate(_Reason, _S) ->
  218 + ok.
  219 +
  220 +code_change(_OldVsn, S, _Extra) ->
  221 + {ok, S}.
  222 +
  223 +%%%===================================================================
  224 +%%% Internal functions
  225 +%%%===================================================================
  226 +
  227 +schedule_reset_build_tokens() ->
  228 + {_, Reset} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
  229 + ?DEFAULT_BUILD_LIMIT),
  230 + erlang:send_after(Reset, self(), reset_build_tokens).
  231 +
  232 +reset_build_tokens(S) ->
  233 + {Tokens, _} = app_helper:get_env(riak_kv, anti_entropy_build_limit,
  234 + ?DEFAULT_BUILD_LIMIT),
  235 + S#state{build_tokens=Tokens}.
  236 +
  237 +-spec settings() -> {boolean(), proplists:proplist()}.
  238 +settings() ->
  239 + case app_helper:get_env(riak_kv, anti_entropy, {off, []}) of
  240 + {on, Opts} ->
  241 + {true, Opts};
  242 + {off, Opts} ->
  243 + {false, Opts};
  244 + X ->
  245 + lager:warning("Invalid setting for riak_kv/anti_entropy: ~p", [X]),
  246 + application:set_env(riak_kv, anti_entropy, {off, []}),
  247 + {false, []}
  248 + end.
  249 +
  250 +%% @private
  251 +-spec get_tree(p(), state()) -> {ok, tree()} | not_registered.
  252 +get_tree(Index, S) ->
  253 + case orddict:find(Index, S#state.trees) of
  254 + {ok, Tree} -> {ok, Tree};
  255 + error -> not_registered
  256 + end.
  257 +
  258 +%% @private
  259 +%%
  260 +%% @doc Generate a list of all the trees currently active. It enables
  261 +%% the entropy manager to rediscover the trees in the case of a
  262 +%% crash.
  263 +-spec get_trees_from_sup() -> trees().
  264 +get_trees_from_sup() ->
  265 + Trees = yz_index_hashtree_sup:trees(),
  266 + lists:foldl(fun get_index/2, [], Trees).
  267 +
  268 +%% @private
  269 +%%
  270 +%% @doc Get the index for the `Child' make a pair and add to `Trees'.
  271 +-spec get_index(tree(), trees()) -> trees().
  272 +get_index(Tree, Trees) ->
  273 + case yz_index_hashtree:get_index(Tree) of
  274 + {error, _} -> Trees;
  275 + Index -> [{Index,Tree}|Trees]
  276 + end.
  277 +
  278 +reload_hashtrees(Ring, S) ->
  279 + reload_hashtrees(enabled(), Ring, S).
  280 +
  281 +-spec reload_hashtrees(boolean(), ring(), state()) -> state().
  282 +reload_hashtrees(true, Ring, S=#state{mode=Mode, trees=Trees}) ->
  283 + Indices = riak_core_ring:my_indices(Ring),
  284 + Existing = orddict:from_list(Trees),
  285 +
  286 + MissingIdx = [Idx || Idx <- Indices, not orddict:is_key(Idx, Existing)],
  287 + L = lists:foldl(fun(Idx, NewTrees) ->
  288 + RPs = riak_kv_util:responsible_preflists(Idx),
  289 + {ok, Tree} = yz_index_hashtree:start(Idx, RPs),
  290 + [{Idx,Tree}|NewTrees]
  291 + end, [], MissingIdx),
  292 + Trees2 = orddict:from_list(Trees ++ L),
  293 +
  294 + Moved = [E || E={Idx,_} <- Trees2, not lists:member(Idx, Indices)],
  295 + Trees3 = remove_trees(Trees2, Moved),
  296 +
  297 + S2 = S#state{trees=Trees3},
  298 + S3 = lists:foldl(fun({Idx,Pid}, SAcc) ->
  299 + monitor(process, Pid),
  300 + case Mode of
  301 + manual -> SAcc;
  302 + automatic -> enqueue_exchange(Idx, SAcc)
  303 + end
  304 + end, S2, L),
  305 + S3;
  306 +reload_hashtrees(false, _, S) ->
  307 + S.
  308 +
  309 +%% @private
  310 +%%
  311 +%% @doc Remove trees from `Trees' and destroy the hashtrees.
  312 +-spec remove_trees(trees(), trees()) -> trees().
  313 +remove_trees(Trees, ToRemove) ->
  314 + F = fun({Idx, Tree}, TreesAcc) ->
  315 + yz_index_hashtree:destroy(Tree),
  316 + orddict:erase(Idx, TreesAcc)
  317 + end,
  318 + lists:foldl(F, Trees, ToRemove).
  319 +
  320 +-spec do_get_lock(term(), pid(), state()) ->
  321 + {ok | max_concurrency | build_limit_reached, state()}.
  322 +do_get_lock(Type, Pid, S=#state{locks=Locks}) ->
  323 + Concurrency = app_helper:get_env(riak_kv,
  324 + anti_entropy_concurrency,
  325 + ?DEFAULT_CONCURRENCY),
  326 + case length(Locks) >= Concurrency of
  327 + true ->
  328 + {max_concurrency, S};
  329 + false ->
  330 + case check_lock_type(Type, S) of
  331 + {ok, S2} ->
  332 + Ref = monitor(process, Pid),
  333 + S3 = S2#state{locks=[{Pid,Ref}|Locks]},
  334 + {ok, S3};
  335 + Error ->
  336 + {Error, S}
  337 + end
  338 + end.
  339 +
  340 +
  341 +-spec check_lock_type(term(), state()) -> {ok, state()} | build_limit_reached.
  342 +check_lock_type(build, S=#state{build_tokens=Tokens}) ->
  343 + if Tokens > 0 ->
  344 + {ok, S#state{build_tokens=Tokens-1}};
  345 + true ->
  346 + build_limit_reached
  347 + end;
  348 +check_lock_type(_Type, S) ->
  349 + {ok, S}.
  350 +
  351 +-spec maybe_release_lock(reference(), state()) -> state().
  352 +maybe_release_lock(Ref, S) ->
  353 + Locks = lists:keydelete(Ref, 2, S#state.locks),
  354 + S#state{locks=Locks}.
  355 +
  356 +-spec maybe_clear_exchange(reference(), term(), state()) -> state().
  357 +maybe_clear_exchange(Ref, Status, S) ->
  358 + case lists:keytake(Ref, 2, S#state.exchanges) of
  359 + false ->
  360 + S;
  361 + {value, {Idx,Ref,_Pid}, Exchanges} ->
  362 + lager:debug("Untracking exchange: ~p :: ~p", [Idx, Status]),
  363 + S#state{exchanges=Exchanges}
  364 + end.
  365 +
  366 +-spec maybe_clear_registered_tree(pid(), state()) -> state().
  367 +maybe_clear_registered_tree(Pid, S) when is_pid(Pid) ->
  368 + Trees = lists:keydelete(Pid, 2, S#state.trees),
  369 + S#state{trees=Trees};
  370 +maybe_clear_registered_tree(_, S) ->
  371 + S.
  372 +
  373 +-spec next_tree(state()) -> {pid(), state()} | {none, state()}.
  374 +next_tree(S=#state{tree_queue=Queue, trees=Trees}) ->
  375 + More = fun() -> Trees end,
  376 + case yz_misc:queue_pop(Queue, More) of
  377 + {{_,Pid}, Rest} ->
  378 + S2 = S#state{tree_queue=Rest},
  379 + {Pid, S2};
  380 + empty ->
  381 + {none, S}
  382 + end.
  383 +
  384 +-spec schedule_tick() -> reference().
  385 +schedule_tick() ->
  386 + erlang:send_after(?YZ_ENTROPY_TICK, ?MODULE, tick).
  387 +
  388 +maybe_tick(S) ->
  389 + case enabled() of
  390 + true ->
  391 + case riak_core_capability:get({riak_kv, anti_entropy}, disabled) of
  392 + disabled -> S2 = S;
  393 + enabled_v1 -> S2 = tick(S)
  394 + end;
  395 + false ->
  396 + %% Ensure we do not have any running index_hashtrees, which can
  397 + %% happen when disabling anti-entropy on a live system.
  398 + [yz_index_hashtree:stop(T) || {_,T} <- S#state.trees],
  399 + S2 = S
  400 + end,
  401 + schedule_tick(),
  402 + S2.
  403 +
  404 +-spec tick(state()) -> state().
  405 +tick(S) ->
  406 + Ring = yz_misc:get_ring(transformed),
  407 + S2 = reload_hashtrees(Ring, S),
  408 + S3 = lists:foldl(fun(_,SAcc) ->
  409 + maybe_poke_tree(SAcc)
  410 + end, S2, lists:seq(1,10)),
  411 + maybe_exchange(Ring, S3).
  412 +
  413 +-spec maybe_poke_tree(state()) -> state().
  414 +maybe_poke_tree(S) ->
  415 + case next_tree(S) of
  416 + {none, S2} ->
  417 + S2;
  418 + {Tree, S2} ->
  419 + yz_index_hashtree:poke(Tree),
  420 + S2
  421 + end.
  422 +
  423 +%%%===================================================================
  424 +%%% Exchanging
  425 +%%%===================================================================
  426 +
  427 +-spec do_exchange_status(pid(), p(), {p(), n()}, any(), state()) -> state().
  428 +do_exchange_status(_Pid, Index, {StartIdx, N}, Status, S) ->
  429 + case Status of
  430 + ok ->
  431 + lager:debug("Finished exhcange for partition ~p of preflist ~p",
  432 + [Index, {StartIdx, N}]),
  433 + S;
  434 + _ ->
  435 + lager:debug("Requeue exhcange for partition ~p of preflist ~p "
  436 + "for reason ~p",
  437 + [Index, {StartIdx, N}, Status]),
  438 + requeue_exchange(Index, {StartIdx, N}, S)
  439 + end.
  440 +
  441 +-spec start_exchange(p(), {p(),n()}, ring(), state()) -> {any(), state()}.
  442 +start_exchange(Index, Preflist, Ring, S) ->
  443 + case riak_core_ring:index_owner(Ring, Index) == node() of
  444 + false ->
  445 + {not_responsible, S};
  446 + true ->
  447 + %% TODO: check for not_registered
  448 + {ok, YZTree} = get_tree(Index, S),
  449 + %% TODO: use async version in case vnode is backed up
  450 + %%
  451 + %% TODO: hashtree_pid can return {error, wrong_node}
  452 + %% during ownership transfer, handle that case
  453 + {ok, KVTree} = riak_kv_vnode:hashtree_pid(Index),
  454 + case yz_exchange_fsm:start(Index, Preflist, YZTree,
  455 + KVTree, self()) of
  456 + {ok, FsmPid} ->
  457 + Ref = monitor(process, FsmPid),
  458 + E = S#state.exchanges,
  459 + %% TODO: add timestamp so we know how long ago
  460 + %% exchange was started
  461 + {ok, S#state{exchanges=[{Index,Ref,FsmPid}|E]}};
  462 + {error, Reason} ->
  463 + {Reason, S}
  464 + end
  465 + end.
  466 +
  467 +-spec all_pairwise_exchanges(p(), ring()) -> [exchange()].
  468 +all_pairwise_exchanges(Index, Ring) ->
  469 + RPs = riak_kv_util:responsible_preflists(Index, Ring),
  470 + [{Index, {StartIdx, N}} || {StartIdx, N} <- RPs].
  471 +
  472 +-spec all_exchanges(ring(), trees()) -> [exchange()].
  473 +all_exchanges(Ring, Trees) ->
  474 + Indices = orddict:fetch_keys(Trees),
  475 + lists:flatmap(fun(Index) ->
  476 + all_pairwise_exchanges(Index, Ring)
  477 + end, Indices).
  478 +
  479 +-spec enqueue_exchange(p() | {p(), {p(),n()}}, state()) -> state().
  480 +enqueue_exchange(E={_Index,_IndexN}, S) ->
  481 + case verify_exchange(E) of
  482 + true ->
  <