Skip to content

Commit

Permalink
[#6476] Set local_limit and read restart time in a better way to redu…
Browse files Browse the repository at this point in the history
…ce the number of read restarts

Summary:
Background
==========

"Read restart" is a mechanism that we use to make sure that a snapshot isolation read request returns all the records that the application "knows" have been written to the database, e.g. records that were written or read earlier by the same client or a different client that notified the client that is sending the current read request. It works as follows. For each read request, we select an MVCC timestamp to read at (also known as read hybrid time, read_ht, or read_time), and we try to return all records with commit timestamp <= read_time. However, sometimes we see records with commit timestamp slightly higher than read_time, and if we cannot rule out a possibility that a record like that might have been written prior to the beginning of our snapshot isolation transaction, we have no choice but to restart the entire transaction with a read time that is >= this commit timestamp. Read restarts are detrimental to performance and we want to minimize them.

Each read request from the YQL engine to the DocDB layer of the tablet server is parameterized with multiple components in addition to read_time, including the following:
- global_limit is the upper bound on physical time (and, therefore, on hybrid time) in the cluster prior to the beginning of the transaction, computed as current time on the YQL node that received the read request + max clock skew.
- local_limit is a value maintained for each tablet, separately for each transaction. It starts off as global_limit for the first request to a tablet as part of the transaction, but then. for second and later requests to that tablet, is set to the safe time on that tablet returned to the YQL engine by the response to the first request. After it was set to a tablet's safe time, it is not supposed to change until the end of the transaction.

Both local_limit and global_limit help us "prove" that a particular record we see in RocksDB could not have been committed prior to the beginning of our transaction.

For the intents read path, the logic for deciding when to trigger read restarts is implemented in IntentAwareIterator::ProcessIntent.

For regular RocksDB records, the corresponding logic is the IntentAwareIterator::SkipFutureRecords function, which skips all records for which it can "prove" that they could not have been committed prior to the beginning of our transaction and, independently, are not visible as of read_time. This function operates on encoded hybrid times and comparisons are inverted, which is unintuitive. Also it looks at intent hybrid times stored in values in regular RocksDB, which are lower than the commit hybrid times that are part of the key.

Also note that in any case, records committed after the beginning of the transaction, and therefore not causing a read restart, could still have commit time <= read_time due to clock skew and therefore have to be included in the read result.

Changes in this diff
====================

Prior to this diff, the value of local_limit that YQL engine sends to a tablet server would actually be set to max(read_time, local_limit_for_tablet) where local_limit_for_tablet is the current value local limit for this tablet maintained by the YQL engine for the transaction. This was needed due to how the logic in IntentAwareIterator::SkipFutureRecords was set up, in order to capture all records with commit time <= read_time, which are required for consistency. However, due to the artificially increased effective value of local_limit, this approach was causing us to miss some opportunities to avoid read restarts. E.g. when we are trying to read a key that is constantly being overwritten, the read operation would keep getting read restarts, but local_limit would always be set to the new read_time, and it is likely that new intents would have been written with intent hybrid time lower than that read_time, not allowing us to ignore them. This would result in many unnecessary read restarts.

With this diff, we are no longer updating local_limit to be greater than or equal to read_time in `ConsistentReadPoint::GetReadTime`, so we need to take extra care to ensure we include all records with commit time <= read_time elsewhere in the read path.

The new version of the logic can be summarized as follows:
- If intent_ht > local_limit, meaning the intent was certainly written after the read operation started: include the record if commit_ht <= read_time. No read restart is possible.
- If intent_ht <= local_limit, meaning the intent could have been written before the read operation started: include the record if commit_ht <= global_limit. Read restart is still possible if commit_ht > read_time.

Another important change in this diff is how we now set read restart time:

  min(max(restart_time, safe_ht_to_read), global_limit)

Here, restart_time is the time that the tablet returned to a YQL engine's request, and safe_ht_to_read is the safe time on that tablet. By restarting at safe_ht_to_read we would avoid doing more than one read restart per transaction per tablet. However, we still need to cap the read restart time at global_limit for slow transactions, because global_limit is the latest MVCC timestamp we every have to read at to avoid stale results.

Backward compatibility
======================

The old local_limit_ht protobuf field is renamed to DEPRECATED_max_of_read_time_and_local_limit_ht, and is set to what its name implies, keeping the old logic. The new local_limit_ht field is set to the real value of local limit, which could be smaller than read_time. The deprecated field can be removed after all YugabyteDB clusters are upgraded to the new version.

Related work
============

Please also see https://phabricator.dev.yugabyte.com/D8510 ( 26260e0 ), which introduced cleanup of intent hybrid times from records in regular RocksDB that were created by applying transactions committed before the history cutoff time.

Test Plan: ybd release --gtest_filter PgMiniTest.ReadRestartSnapshot -n 8

Reviewers: mbautin

Reviewed By: mbautin

Subscribers: kannan, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D10047
  • Loading branch information
spolitov committed Dec 23, 2020
1 parent 4c53f23 commit c784595
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ TEST_F(QLTransactionTest, WriteAfterReadRestart) {
auto result = WriteRow(
session, KeyForTransactionAndIndex(0, r),
ValueForTransactionAndIndex(0, r, WriteOpType::UPDATE), WriteOpType::UPDATE);
ASSERT_TRUE(!result.ok() && result.status().IsTryAgain()) << result;
ASSERT_OK(result);
}

txn2->Abort();
Expand Down
4 changes: 3 additions & 1 deletion src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,13 @@ message TransactionMetadataPB {
message ReadHybridTimePB {
optional fixed64 read_ht = 1;

optional fixed64 local_limit_ht = 2;
optional fixed64 DEPRECATED_max_of_read_time_and_local_limit_ht = 2;

optional fixed64 global_limit_ht = 3;

optional fixed64 in_txn_limit_ht = 4;

optional fixed64 local_limit_ht = 5;
}

// For clarification of field meaning see comments of appropriate fields in YBTransaction::Impl
Expand Down
2 changes: 1 addition & 1 deletion src/yb/common/consistent_read_point.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ ReadHybridTime ConsistentReadPoint::GetReadTime(const TabletId& tablet) const {
// Use the local limit for the tablet but no earlier than the read time we want.
const auto it = local_limits_.find(tablet);
if (it != local_limits_.end()) {
read_time.local_limit = std::max(it->second, read_time.read);
read_time.local_limit = it->second;
}
}
return read_time;
Expand Down
15 changes: 10 additions & 5 deletions src/yb/common/read_hybrid_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,16 @@ struct ReadHybridTime {
template <class PB>
static ReadHybridTime FromPB(const PB& read_time) {
return {
HybridTime(read_time.read_ht()),
HybridTime(read_time.local_limit_ht()),
HybridTime(read_time.global_limit_ht()),
.read = HybridTime(read_time.read_ht()),
.local_limit = HybridTime(read_time.has_local_limit_ht()
? read_time.local_limit_ht()
: read_time.deprecated_max_of_read_time_and_local_limit_ht()),
.global_limit = HybridTime(read_time.global_limit_ht()),
// Use max hybrid time for backward compatibility.
read_time.in_txn_limit_ht() ? HybridTime(read_time.in_txn_limit_ht()) : HybridTime::kMax,
0
.in_txn_limit = read_time.in_txn_limit_ht()
? HybridTime(read_time.in_txn_limit_ht())
: HybridTime::kMax,
.serial_no = 0,
};
}

Expand All @@ -96,6 +100,7 @@ struct ReadHybridTime {
out->set_global_limit_ht(global_limit.ToUint64());
out->set_in_txn_limit_ht(
in_txn_limit.is_valid() ? in_txn_limit.ToUint64() : HybridTime::kMax.ToUint64());
out->set_deprecated_max_of_read_time_and_local_limit_ht(std::max(local_limit, read).ToUint64());
}

template <class PB>
Expand Down
96 changes: 82 additions & 14 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,45 @@ namespace {
struct DecodeStrongWriteIntentResult {
Slice intent_prefix;
Slice intent_value;
DocHybridTime intent_time;
DocHybridTime value_time;
IntentTypeSet intent_types;

// Whether this intent from the same transaction as specified in context.
bool same_transaction = false;

std::string ToString() const {
return Format("{ intent_prefix: $0 intent_value: $1 value_time: $2 same_transaction: $3 "
"intent_types: $4 }",
intent_prefix.ToDebugHexString(), intent_value.ToDebugHexString(), value_time,
same_transaction, intent_types);
return Format("{ intent_prefix: $0 intent_value: $1 intent_time: $2 value_time: $3 "
"same_transaction: $4 intent_types: $5 }",
intent_prefix.ToDebugHexString(), intent_value.ToDebugHexString(), intent_time,
value_time, same_transaction, intent_types);
}

// Returns the upper limit for the "value time" of an intent in order for the intent to be visible
// in the read results. The "value time" is defined as follows:
// - For uncommitted transactions, the "value time" is the time when the intent was written.
// Note that same_transaction or in_txn_limit could only be set for uncommited transactions.
// - For committed transactions, the "value time" is the commit time.
//
// The logic here is as follows:
// - When a transaction is reading its own intents, the in_txn_limit allows a statement to
// avoid seeing its own partial results. This is necessary for statements such as INSERT ...
// SELECT to avoid reading rows that the same statement generated and going into an infinite
// loop.
// - If an intent's hybrid time is greater than the tablet's local limit, then this intent
// cannot lead to a read restart and we only need to see it if its commit time is less than or
// equal to read_time.
// - If an intent's hybrid time is <= than the tablet's local limit, then we cannot claim that
// the intent was written after the read transaction began based on the local limit, and we
// must compare the intent's commit time with global_limit and potentially perform a read
// restart, because the transaction that wrote the intent might have been committed before our
// read transaction begin.
HybridTime MaxAllowedValueTime(const ReadHybridTime& read_time) const {
if (same_transaction) {
return read_time.in_txn_limit;
}
return intent_time.hybrid_time() > read_time.local_limit
? read_time.read : read_time.global_limit;
}
};

Expand Down Expand Up @@ -237,6 +265,7 @@ Result<DecodeStrongWriteIntentResult> DecodeStrongWriteIntent(
result.intent_value.consume_byte();
IntraTxnWriteId in_txn_write_id = BigEndian::Load32(result.intent_value.data());
result.intent_value.remove_prefix(sizeof(IntraTxnWriteId));
result.intent_time = decoded_intent_key.doc_ht;
if (result.intent_value.starts_with(ValueTypeAsChar::kRowLock)) {
result.value_time = DocHybridTime::kMin;
} else if (result.same_transaction) {
Expand Down Expand Up @@ -276,6 +305,10 @@ bool DebugHasHybridTime(const Slice& subdoc_key_encoded) {
return subdoc_key.has_hybrid_time();
}

std::string EncodeHybridTime(HybridTime value) {
return DocHybridTime(value, kMaxWriteId).EncodedInDocDbFormat();
}

} // namespace

IntentAwareIterator::IntentAwareIterator(
Expand All @@ -285,10 +318,12 @@ IntentAwareIterator::IntentAwareIterator(
const ReadHybridTime& read_time,
const TransactionOperationContextOpt& txn_op_context)
: read_time_(read_time),
encoded_read_time_local_limit_(
DocHybridTime(read_time_.local_limit, kMaxWriteId).EncodedInDocDbFormat()),
encoded_read_time_global_limit_(
DocHybridTime(read_time_.global_limit, kMaxWriteId).EncodedInDocDbFormat()),
encoded_read_time_read_(EncodeHybridTime(read_time_.read)),
encoded_read_time_local_limit_(EncodeHybridTime(read_time_.local_limit)),
encoded_read_time_global_limit_(EncodeHybridTime(read_time_.global_limit)),
encoded_read_time_regular_limit_(
read_time_.local_limit > read_time_.read ? Slice(encoded_read_time_local_limit_)
: Slice(encoded_read_time_read_)),
txn_op_context_(txn_op_context),
transaction_status_cache_(
txn_op_context ? &txn_op_context->txn_status_manager : nullptr, read_time, deadline) {
Expand Down Expand Up @@ -707,9 +742,7 @@ void IntentAwareIterator::ProcessIntent() {
}

// Ignore intent past read limit.
auto max_allowed_time = decode_result->same_transaction
? read_time_.in_txn_limit : read_time_.global_limit;
if (decode_result->value_time.hybrid_time() > max_allowed_time) {
if (decode_result->value_time.hybrid_time() > decode_result->MaxAllowedValueTime(read_time_)) {
return;
}

Expand Down Expand Up @@ -1087,12 +1120,47 @@ void IntentAwareIterator::SkipFutureRecords(const Direction direction) {
// Value came from a transaction, we could try to filter it by original intent time.
Slice encoded_intent_doc_ht = value;
encoded_intent_doc_ht.consume_byte();
if (encoded_intent_doc_ht.compare(Slice(encoded_read_time_local_limit_)) > 0 &&
encoded_doc_ht.compare(Slice(encoded_read_time_global_limit_)) > 0) {
// The logic here replicates part of the logic in
// DecodeStrongWriteIntentResult:: MaxAllowedValueTime for intents that have been committed
// and applied to regular RocksDB only. Note that here we are comparing encoded hybrid times,
// so comparisons are reversed vs. the un-encoded case. If a value is found "invalid", it
// can't cause a read restart. If it is found "valid", it will cause a read restart if it is
// greater than read_time.read. That last comparison is done outside this function.
Slice max_allowed = encoded_intent_doc_ht.compare(encoded_read_time_local_limit_) > 0
? Slice(encoded_read_time_global_limit_)
: Slice(encoded_read_time_read_);
if (encoded_doc_ht.compare(max_allowed) > 0) {
iter_valid_ = true;
return;
}
} else if (encoded_doc_ht.compare(Slice(encoded_read_time_local_limit_)) > 0) {
} else if (encoded_doc_ht.compare(encoded_read_time_regular_limit_) > 0) {
// If a value does not contain the hybrid time of the intent that wrote the original
// transaction, then it either (a) originated from a single-shard transaction or (b) the
// intent hybrid time has already been garbage-collected during a compaction because the
// corresponding transaction's commit time (stored in the key) became lower than the history
// cutoff. See the following commit for the details of this intent hybrid time GC.
//
// https://github.com/yugabyte/yugabyte-db/commit/26260e0143e521e219d93f4aba6310fcc030a628
//
// encoded_read_time_regular_limit_ is simply the encoded value of max(read_ht, local_limit).
// The above condition
//
// encoded_doc_ht.compare(encoded_read_time_regular_limit_) >= 0
//
// corresponds to the following in terms of decoded hybrid times (order is reversed):
//
// commit_ht <= max(read_ht, local_limit)
//
// and the inverse of that can be written as
//
// commit_ht > read_ht && commit_ht > local_limit
//
// The reason this is correct here is that in case (a) the event of writing a single-shard
// record to the tablet would certainly be after our read transaction's start time in case
// commit_ht > local_limit, so it can never cause a read restart. In case (b) we know that
// commit_ht < history_cutoff and read_ht >= history_cutoff (by definition of history cutoff)
// so commit_ht < read_ht, and in this case read restart is impossible regardless of the
// value of local_limit.
iter_valid_ = true;
return;
}
Expand Down
11 changes: 9 additions & 2 deletions src/yb/docdb/intent_aware_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,15 @@ class IntentAwareIterator {
bool NextRegular(Direction direction);

const ReadHybridTime read_time_;
const string encoded_read_time_local_limit_;
const string encoded_read_time_global_limit_;
const std::string encoded_read_time_read_;
const std::string encoded_read_time_local_limit_;
const std::string encoded_read_time_global_limit_;

// The encoded hybrid time to use to filter records in regular RocksDB. This is the maximum of
// read_time and local_limit (in terms of hybrid time comparison), and this slice points to
// one of the strings above.
Slice encoded_read_time_regular_limit_;

const TransactionOperationContextOpt txn_op_context_;
docdb::BoundedRocksDbIterator intent_iter_;
docdb::BoundedRocksDbIterator iter_;
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/operations/write_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ void WriteOperation::DoStartSynchronization(const Status& status) {
state()->CompleteWithStatus(local_limit.status());
return;
}
restart_time->set_deprecated_max_of_read_time_and_local_limit_ht(local_limit->ToUint64());
restart_time->set_local_limit_ht(local_limit->ToUint64());
// Global limit is ignored by caller, so we don't set it.
state()->CompleteWithStatus(Status::OK());
Expand Down
4 changes: 3 additions & 1 deletion src/yb/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ struct ReadContext {
DCHECK_GT(restart_time, read_time.read);
VLOG(1) << "Restart read required at: " << restart_time << ", original: " << read_time;
auto result = read_time;
result.read = restart_time;
result.read = std::min(std::max(restart_time, safe_ht_to_read), read_time.global_limit);
result.local_limit = safe_ht_to_read;
return result;
}
Expand Down Expand Up @@ -1995,6 +1995,8 @@ void TabletServiceImpl::CompleteRead(ReadContext* read_context) {
read_context->resp->Clear();
auto restart_read_time = read_context->resp->mutable_restart_read_time();
restart_read_time->set_read_ht(read_context->read_time.read.ToUint64());
restart_read_time->set_deprecated_max_of_read_time_and_local_limit_ht(
read_context->read_time.local_limit.ToUint64());
restart_read_time->set_local_limit_ht(read_context->read_time.local_limit.ToUint64());
// Global limit is ignored by caller, so we don't set it.
down_cast<Tablet*>(read_context->tablet.get())->metrics()->restart_read_requests->Increment();
Expand Down
17 changes: 13 additions & 4 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ void PgMiniTest::TestReadRestart(const bool deferrable) {
std::atomic<int> num_read_successes(0);
TestThreadHolder thread_holder;

SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);

// Set up table
auto setup_conn = ASSERT_RESULT(Connect());
ASSERT_OK(setup_conn.Execute("CREATE TABLE t (key INT PRIMARY KEY, value INT)"));
Expand Down Expand Up @@ -196,6 +194,7 @@ void PgMiniTest::TestReadRestart(const bool deferrable) {
ASSERT_STR_CONTAINS(result.status().ToString(), "Restart read");
++num_read_restarts;
ASSERT_OK(read_conn.Execute("ABORT"));
break;
} else {
ASSERT_OK(read_conn.Execute("COMMIT"));
++num_read_successes;
Expand Down Expand Up @@ -231,11 +230,21 @@ void PgMiniTest::TestReadRestart(const bool deferrable) {
ASSERT_GT(num_read_successes.load(std::memory_order_acquire), kRequiredNumReads);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSerializableDeferrable)) {
class PgMiniLargeClockSkewTest : public PgMiniTest {
public:
void SetUp() override {
SetAtomicFlag(250000ULL, &FLAGS_max_clock_skew_usec);
PgMiniTestBase::SetUp();
}
};

TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSerializableDeferrable),
PgMiniLargeClockSkewTest) {
TestReadRestart(true /* deferrable */);
}

TEST_F(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSnapshot)) {
TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_SANITIZERS(ReadRestartSnapshot),
PgMiniLargeClockSkewTest) {
TestReadRestart(false /* deferrable */);
}

Expand Down

0 comments on commit c784595

Please sign in to comment.