Skip to content

Commit

Permalink
Resumable secondary index construction.
Browse files Browse the repository at this point in the history
Also reduces the impact of index construction on the performance of write queries.
Fixes #4959.
  • Loading branch information
Daniel Mewes committed Nov 3, 2015
1 parent 0f59e55 commit 0f47dc7
Show file tree
Hide file tree
Showing 41 changed files with 1,282 additions and 726 deletions.
11 changes: 5 additions & 6 deletions admin/static/coffee/tables/table.coffee
Expand Up @@ -141,7 +141,7 @@ class TableContainer extends Backbone.View
r.db(table_config("db"))
.table(table_config("name"), read_mode: "single")
.indexStatus()
.pluck('index', 'ready', 'blocks_processed', 'blocks_total')
.pluck('index', 'ready', 'progress')
.merge( (index) -> {
id: index("index")
db: table_config("db")
Expand Down Expand Up @@ -701,7 +701,7 @@ class SecondaryIndexesView extends Backbone.View

if build_in_progress and @model.get('db')?
query = r.db(@model.get('db')).table(@model.get('name')).indexStatus()
.pluck('index', 'ready', 'blocks_processed', 'blocks_total')
.pluck('index', 'ready', 'progress')
.merge( (index) => {
id: index("index")
db: @model.get("db")
Expand Down Expand Up @@ -886,7 +886,7 @@ class SecondaryIndexView extends Backbone.View
initialize: (data) =>
@container = data.container

@model.on 'change:blocks_processed', @update
@model.on 'change:progress', @update
@model.on 'change:ready', @update

update: (args) =>
Expand All @@ -910,8 +910,7 @@ class SecondaryIndexView extends Backbone.View
@

render_progress_bar: =>
blocks_processed = @model.get 'blocks_processed'
blocks_total = @model.get 'blocks_total'
progress = @model.get 'progress'

if @progress_bar?
if @model.get('ready') is true
Expand All @@ -920,7 +919,7 @@ class SecondaryIndexView extends Backbone.View
check: true
, => @render()
else
@progress_bar.render blocks_processed, blocks_total,
@progress_bar.render progress, 1,
got_response: true
check: true
, => @render()
Expand Down
7 changes: 1 addition & 6 deletions drivers/python/rethinkdb/_index_rebuild.py
Expand Up @@ -130,12 +130,7 @@ def get_index_progress(progress, conn, index):
if status['ready']:
return None
else:
processed = float(status.get('blocks_processed', 0))
total = float(status.get('blocks_total', 1))
if total != 0:
return processed / total
else:
return 0.0
return float(status.get('progress'))

def rename_index(progress, conn, index):
r.db(index['db']).table(index['table']).index_rename(index['temp_name'], index['name'], overwrite=True).run(conn)
Expand Down
4 changes: 4 additions & 0 deletions scripts/generate_serialize_macros.py
Expand Up @@ -75,6 +75,10 @@ def generate_make_serializable_macro(nfields):
print "#define RDB_IMPL_SERIALIZABLE_%d_SINCE_v2_1(type_t%s) \\" % (nfields, fields)
print " RDB_IMPL_SERIALIZABLE_%d(type_t%s); \\" % (nfields, fields)
print " INSTANTIATE_SERIALIZABLE_SINCE_v2_1(type_t)"
print
print "#define RDB_IMPL_SERIALIZABLE_%d_SINCE_v2_2(type_t%s) \\" % (nfields, fields)
print " RDB_IMPL_SERIALIZABLE_%d(type_t%s); \\" % (nfields, fields)
print " INSTANTIATE_SERIALIZABLE_SINCE_v2_2(type_t)"

print "#define RDB_MAKE_ME_SERIALIZABLE_%d(type_t%s) \\" % \
(nfields, fields)
Expand Down
51 changes: 49 additions & 2 deletions src/btree/secondary_operations.cc
Expand Up @@ -8,9 +8,56 @@
#include "containers/archive/vector_stream.hpp"
#include "containers/archive/versioned.hpp"

RDB_IMPL_SERIALIZABLE_5_SINCE_v1_13(
RDB_IMPL_SERIALIZABLE_5_SINCE_v2_2(
secondary_index_t, superblock, opaque_definition,
post_construction_complete, being_deleted, id);
needs_post_construction_range, being_deleted, id);

// Pre 2.2 we didn't have the `needs_post_construction_range` field, but instead had
// a boolean `post_construction_complete`.
// We need to specify a custom deserialization function for that:
template <cluster_version_t W>
archive_result_t pre_2_2_deserialize(
read_stream_t *s, secondary_index_t *sindex) {
archive_result_t res = archive_result_t::SUCCESS;
res = deserialize<W>(s, deserialize_deref(sindex->superblock));
if (bad(res)) { return res; }
res = deserialize<W>(s, deserialize_deref(sindex->opaque_definition));
if (bad(res)) { return res; }

bool post_construction_complete = false;
res = deserialize<W>(s, &post_construction_complete);
if (bad(res)) { return res; }
sindex->needs_post_construction_range =
post_construction_complete
? key_range_t::empty()
: key_range_t::universe();

res = deserialize<W>(s, deserialize_deref(sindex->being_deleted));
if (bad(res)) { return res; }
res = deserialize<W>(s, deserialize_deref(sindex->id));
if (bad(res)) { return res; }
return res;
}
template <> archive_result_t deserialize<cluster_version_t::v1_14>(
read_stream_t *s, secondary_index_t *sindex) {
return pre_2_2_deserialize<cluster_version_t::v1_14>(s, sindex);
}
template <> archive_result_t deserialize<cluster_version_t::v1_15>(
read_stream_t *s, secondary_index_t *sindex) {
return pre_2_2_deserialize<cluster_version_t::v1_15>(s, sindex);
}
template <> archive_result_t deserialize<cluster_version_t::v1_16>(
read_stream_t *s, secondary_index_t *sindex) {
return pre_2_2_deserialize<cluster_version_t::v1_16>(s, sindex);
}
template <> archive_result_t deserialize<cluster_version_t::v2_0>(
read_stream_t *s, secondary_index_t *sindex) {
return pre_2_2_deserialize<cluster_version_t::v2_0>(s, sindex);
}
template <> archive_result_t deserialize<cluster_version_t::v2_1>(
read_stream_t *s, secondary_index_t *sindex) {
return pre_2_2_deserialize<cluster_version_t::v2_1>(s, sindex);
}

RDB_IMPL_SERIALIZABLE_2_SINCE_v1_13(sindex_name_t, name, being_deleted);

Expand Down
28 changes: 21 additions & 7 deletions src/btree/secondary_operations.hpp
@@ -1,11 +1,12 @@
// Copyright 2010-2014 RethinkDB, all rights reserved.
// Copyright 2010-2015 RethinkDB, all rights reserved.
#ifndef BTREE_SECONDARY_OPERATIONS_HPP_
#define BTREE_SECONDARY_OPERATIONS_HPP_

#include <map>
#include <string>
#include <vector>

#include "btree/keys.hpp"
#include "buffer_cache/types.hpp"
#include "containers/archive/archive.hpp"
#include "containers/uuid.hpp"
Expand All @@ -24,20 +25,33 @@ class buf_lock_t;
struct secondary_index_t {
secondary_index_t()
: superblock(NULL_BLOCK_ID),
post_construction_complete(false),
needs_post_construction_range(key_range_t::universe()),
being_deleted(false),
/* TODO(2014-08): This generate_uuid() is weird. */
id(generate_uuid()) { }

/* A virtual superblock. */
block_id_t superblock;

/* Whether the index is has completed post construction, and/or is being deleted.
* Note that an index can be in any combination of those states. */
bool post_construction_complete;
/* Whether the index still needs to be post constructed, and/or is being deleted.
Note that an index can be in any combination of those states. */
key_range_t needs_post_construction_range;
bool being_deleted;

/* Note that this is even still relevant if the index is being deleted. In that case
it tells us whether the index had completed post constructing before it got deleted
or not. That is relevant because once an index got post-constructed, there can be
snapshotted read queries that are still accessing it, and we must detach any
values that we are deleting from the index.
If on the other hand the index never finished post-construction, we must not detach
values because they might be pointing to blocks that no longer exist (in general a
not fully constructed index can be in an inconsistent state). */
bool post_construction_complete() const {
return needs_post_construction_range.is_empty();
}

/* Determines whether it's ok to query the index. */
bool is_ready() const {
return post_construction_complete && !being_deleted;
return post_construction_complete() && !being_deleted;
}

/* An opaque blob that describes the index. See serialize_sindex_info and
Expand Down
12 changes: 12 additions & 0 deletions src/buffer_cache/alt.cc
Expand Up @@ -741,6 +741,12 @@ page_t *buf_lock_t::get_held_page_for_read() {
guarantee(cpa != NULL);
// We only wait here so that we can guarantee(!empty()) after it's pulsed.
cpa->read_acq_signal()->wait();
#ifndef NDEBUG
// Occasionally block, as if the block had to be fetched from disk
if (randint(10) == 0) {
coro_t::yield();
}
#endif

ASSERT_FINITE_CORO_WAITING;
guarantee(!empty());
Expand All @@ -752,6 +758,12 @@ page_t *buf_lock_t::get_held_page_for_write() {
rassert(snapshot_node_ == NULL);
// We only wait here so that we can guarantee(!empty()) after it's pulsed.
current_page_acq_->write_acq_signal()->wait();
#ifndef NDEBUG
// Occasionally block, as if the block had to be fetched from disk
if (randint(10) == 0) {
coro_t::yield();
}
#endif

ASSERT_FINITE_CORO_WAITING;
guarantee(!empty());
Expand Down
4 changes: 2 additions & 2 deletions src/clustering/administration/jobs/manager.cc
Expand Up @@ -138,8 +138,8 @@ void jobs_manager_t::on_get_job_reports(
table_id,
status.first,
status.second.second.ready,
status.second.second.blocks_processed,
status.second.second.blocks_total);
status.second.second.progress_numerator,
status.second.second.progress_denominator);
}

std::map<region_t, backfill_progress_tracker_t::progress_tracker_t> backfills =
Expand Down
5 changes: 2 additions & 3 deletions src/clustering/immediate_consistency/backfill_metadata.cc
Expand Up @@ -12,10 +12,9 @@ RDB_IMPL_SERIALIZABLE_4_FOR_CLUSTER(backfill_config_t,
item_queue_mem_size, item_chunk_mem_size, pre_item_queue_mem_size,
pre_item_chunk_mem_size);

RDB_IMPL_SERIALIZABLE_9_FOR_CLUSTER(backfiller_bcard_t::intro_2_t,
RDB_IMPL_SERIALIZABLE_8_FOR_CLUSTER(backfiller_bcard_t::intro_2_t,
common_version, final_version_history, pre_items_mailbox, begin_session_mailbox,
end_session_mailbox, ack_items_mailbox, num_changes_estimate, distribution_counts,
distribution_counts_sum);
end_session_mailbox, ack_items_mailbox, num_changes_estimate, progress_estimator);

RDB_IMPL_SERIALIZABLE_7_FOR_CLUSTER(backfiller_bcard_t::intro_1_t,
config, initial_version, initial_version_history, intro_mailbox, items_mailbox,
Expand Down
6 changes: 3 additions & 3 deletions src/clustering/immediate_consistency/backfill_metadata.hpp
Expand Up @@ -6,6 +6,7 @@
#include "clustering/generic/registration_metadata.hpp"
#include "clustering/immediate_consistency/backfill_item_seq.hpp"
#include "clustering/immediate_consistency/history.hpp"
#include "rdb_protocol/distribution_progress.hpp"
#include "rdb_protocol/protocol.hpp"
#include "rpc/mailbox/typed.hpp"

Expand Down Expand Up @@ -160,9 +161,8 @@ class backfiller_bcard_t {
/* This is used to determine the backfill priority. */
uint64_t num_changes_estimate;

/* These are used to estimate backfill progress. */
std::map<store_key_t, int64_t> distribution_counts;
int64_t distribution_counts_sum;
/* This is used to estimate backfill progress. */
distribution_progress_estimator_t progress_estimator;
};

typedef mailbox_t<void(
Expand Down
16 changes: 4 additions & 12 deletions src/clustering/immediate_consistency/backfillee.cc
Expand Up @@ -186,18 +186,10 @@ class backfillee_t::session_t {
}

if (!new_threshold.unbounded) {
const auto &distribution_counts =
parent->parent->intro.distribution_counts;
auto lower_bound =
distribution_counts.lower_bound(new_threshold.key());
if (lower_bound != distribution_counts.end()) {
double distribution_counts_sum =
parent->parent->intro.distribution_counts_sum;
parent->parent->progress_tracker->progress =
lower_bound->second / distribution_counts_sum;
} else {
parent->parent->progress_tracker->progress = 1.0;
}
const distribution_progress_estimator_t &estimator =
parent->parent->intro.progress_estimator;
parent->parent->progress_tracker->progress =
estimator.estimate_progress(new_threshold.key());
} else {
parent->parent->progress_tracker->progress = 1.0;
}
Expand Down
35 changes: 3 additions & 32 deletions src/clustering/immediate_consistency/backfiller.cc
Expand Up @@ -2,6 +2,7 @@
#include "clustering/immediate_consistency/backfiller.hpp"

#include "clustering/immediate_consistency/history.hpp"
#include "rdb_protocol/distribution_progress.hpp"
#include "rdb_protocol/protocol.hpp"
#include "store_view.hpp"

Expand Down Expand Up @@ -87,36 +88,7 @@ backfiller_t::client_t::client_t(

/* Fetch the key distribution from the store, this is used by the backfillee to
calculate the progress of backfill jobs. */
std::map<store_key_t, int64_t> distribution_counts;
int64_t distribution_counts_sum = 0;
{
static const int max_depth = 2;
static const size_t result_limit = 128;

#ifndef NDEBUG
metainfo_checker_t metainfo_checker(
parent->store->get_region(),
[](const region_t &, const binary_blob_t &) { });
#endif

distribution_read_t distribution_read(max_depth, result_limit);
read_t read(
distribution_read, profile_bool_t::DONT_PROFILE, read_mode_t::OUTDATED);
read_response_t read_response;
read_token_t read_token;
parent->store->read(
DEBUG_ONLY(metainfo_checker, )
read, &read_response, &read_token, interruptor);
distribution_counts = std::move(
boost::get<distribution_read_response_t>(read_response.response).key_counts);

/* For the progress calculation we need partial sums for each key thus we
calculate those from the results that the distribution query returns. */
for (auto &&distribution_count : distribution_counts) {
distribution_count.second =
(distribution_counts_sum += distribution_count.second);
}
}
distribution_progress_estimator_t progress_estimator(parent->store, interruptor);

/* Estimate the total number of changes that will need to be backfilled, by comparing
`our_version`, `intro.initial_version`, and `common_version`. We estimate the number
Expand Down Expand Up @@ -147,8 +119,7 @@ backfiller_t::client_t::client_t(
our_intro.end_session_mailbox = end_session_mailbox.get_address();
our_intro.ack_items_mailbox = ack_items_mailbox.get_address();
our_intro.num_changes_estimate = num_changes_estimate;
our_intro.distribution_counts = std::move(distribution_counts);
our_intro.distribution_counts_sum = distribution_counts_sum;
our_intro.progress_estimator = std::move(progress_estimator);
send(parent->mailbox_manager, intro.intro_mailbox, our_intro);
}

Expand Down
Expand Up @@ -283,9 +283,11 @@ remote_replicator_client_t::remote_replicator_client_t(
parent->next_write_waiter_->pulse_if_not_already_pulsed();
}
}
/* If the backfill throttler is telling us to pause, then interrupt
/* If the backfill throttler is telling us to pause, or it is no longer
ok to backfill because of secondary index construction, then interrupt
`backfillee.go()` */
return !preempt_signal->is_pulsed();
return parent->store_->check_ok_to_receive_backfill()
&& !preempt_signal->is_pulsed();
}
remote_replicator_client_t *parent;
signal_t *preempt_signal;
Expand All @@ -297,7 +299,6 @@ remote_replicator_client_t::remote_replicator_client_t(
interruptor);

if (tracker_->get_backfill_threshold() != region_.inner.right) {
guarantee(backfill_throttler_lock.get_preempt_signal()->is_pulsed());
/* Switch mode to `PAUSED` so that writes can proceed while we wait to
reacquire the throttler lock */
mutex_assertion_t::acq_t mutex_assertion_acq(&mutex_assertion_);
Expand Down
3 changes: 3 additions & 0 deletions src/clustering/table_manager/sindex_manager.cc
Expand Up @@ -127,6 +127,9 @@ void sindex_manager_t::update_blocking(signal_t *interruptor) {
}
store->sindex_rename_multi(to_rename, &ct_interruptor);
for (const auto &pair : to_create) {
/* Note that this crashes if the index already exists. Luckily we are running
in a `pump_coro_t`, and we should be the only thing that's creating indexes
on the store so this shouldn't be an issue. */
store->sindex_create(pair.first, pair.second, &ct_interruptor);
}
}
Expand Down

0 comments on commit 0f47dc7

Please sign in to comment.