Skip to content

Commit

Permalink
[BACKPORT 2.6][#10504] docdb - Move value TTL metadata collection to …
Browse files Browse the repository at this point in the history
…DocWriteBatch

Summary:
Request TTL metadata is currently missing from tablet replicas, leading to replica SST files
missing value TTL frontier metadata necessary for expiration. This is happening because the metadata
was being stored in operations (which are rebuilt on replicas) rather than being stored in the doc
batch write requests themselves.

This commit moves the value TTL metadata collection into DocWriteBatch and adds a field for ttl in the
KeyValueWriteBatchPB. It also modifies the logic on the tablet side to calculate the expiration timestamp
based on the KeyValueWriteBatchPB metadata.

Also adds the CompactionTestWithFileExpirationRF3 test suite.

Adjusted compaction-test.cc for 2.6 constructors

Original commit: rYBDBbd2626d2b50f / D13855

Test Plan:
Jenkins: hot
Jenkins: rebase: 2.6
ybd --cxx-test integration-tests_compaction-test --gtest_filter CompactionTestWithFileExpiration.ReplicatedMetadataCanExpireFile
ybd --cxx-test integration-tests_compaction-test --gtest_filter CompactionTestWithFileExpiration.ReplicatedNoMetadataUsesTableTTL
ybd --cxx-test docdb_docdb-test --gtest_filter DocDBTests/DocDBTestWrapper.TestUpdateDocWriteBatchTTL/0

Reviewers: timur, rsami, sergei

Reviewed By: sergei

Subscribers: bogdan, anallan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D13927
  • Loading branch information
jmeehan16 committed Nov 14, 2021
1 parent d4cef7b commit 8200c41
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 26 deletions.
18 changes: 18 additions & 0 deletions src/yb/docdb/doc_write_batch.cc
Expand Up @@ -416,6 +416,7 @@ Status DocWriteBatch::ExtendSubDocument(
RETURN_NOT_OK(SetPrimitive(doc_path, Value(value, ttl, user_timestamp),
read_ht, deadline, query_id));
}
UpdateMaxValueTtl(ttl);
return Status::OK();
}

Expand Down Expand Up @@ -588,6 +589,17 @@ Status DocWriteBatch::ReplaceRedisInList(
}
}

void DocWriteBatch::UpdateMaxValueTtl(const MonoDelta& ttl) {
// Don't update the max value TTL if the value is uninitialized or if it is set to
// kMaxTtl (i.e. use table TTL).
if (!ttl.Initialized() || ttl.Equals(Value::kMaxTtl)) {
return;
}
if (!ttl_.Initialized() || ttl > ttl_) {
ttl_ = ttl;
}
}

Status DocWriteBatch::ReplaceCqlInList(
const DocPath& doc_path,
const int target_cql_index,
Expand Down Expand Up @@ -697,6 +709,9 @@ void DocWriteBatch::MoveToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) {
kv_pair->mutable_key()->swap(entry.first);
kv_pair->mutable_value()->swap(entry.second);
}
if (has_ttl()) {
kv_pb->set_ttl(ttl_ns());
}
}

void DocWriteBatch::TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const {
Expand All @@ -706,6 +721,9 @@ void DocWriteBatch::TEST_CopyToWriteBatchPB(KeyValueWriteBatchPB *kv_pb) const {
kv_pair->mutable_key()->assign(entry.first);
kv_pair->mutable_value()->assign(entry.second);
}
if (has_ttl()) {
kv_pb->set_ttl(ttl_ns());
}
}

// ------------------------------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions src/yb/docdb/doc_write_batch.h
Expand Up @@ -211,6 +211,16 @@ class DocWriteBatch {
return cache_.Get(encoded_key_prefix);
}

void UpdateMaxValueTtl(const MonoDelta& ttl);

int64_t ttl_ns() const {
return ttl_.ToNanoseconds();
}

bool has_ttl() const {
return ttl_.Initialized();
}

private:
// This member function performs the necessary operations to set a primitive value for a given
// docpath assuming the appropriate operations have been taken care of for subkeys with index <
Expand Down Expand Up @@ -247,6 +257,8 @@ class DocWriteBatch {
KeyBytes key_prefix_;
bool subdoc_exists_ = true;
DocWriteBatchCache::Entry current_entry_;

MonoDelta ttl_;
};

// Converts a RocksDB WriteBatch to a string.
Expand Down
46 changes: 46 additions & 0 deletions src/yb/docdb/docdb-test.cc
Expand Up @@ -19,6 +19,7 @@
#include "yb/common/doc_hybrid_time.h"
#include "yb/common/ql_value.h"
#include "yb/docdb/doc_key.h"
#include "yb/docdb/docdb.pb.h"
#include "yb/docdb/primitive_value.h"
#include "yb/rocksdb/cache.h"
#include "yb/rocksdb/db.h"
Expand Down Expand Up @@ -50,6 +51,7 @@
#include "yb/util/path_util.h"
#include "yb/util/random_util.h"
#include "yb/util/size_literals.h"
#include "yb/util/status.h"
#include "yb/util/string_trim.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_util.h"
Expand Down Expand Up @@ -3313,6 +3315,50 @@ SubDocKey(DocKey([], ["k1"]), ["s3", "s5"; HT{ physical: 10000 w: 1 }]) -> "v1";
)#");
}

CHECKED_STATUS InsertToWriteBatchWithTTL(DocWriteBatch* dwb, const MonoDelta ttl) {
const DocKey doc_key(PrimitiveValues("k1"));
KeyBytes encoded_doc_key(doc_key.Encode());
SubDocument subdoc;
subdoc.SetChildPrimitive(PrimitiveValue("sk1"), PrimitiveValue("v1"));

return dwb->InsertSubDocument(
DocPath(encoded_doc_key, PrimitiveValue("s1"), PrimitiveValue("s2")),
subdoc, ReadHybridTime::Max(), CoarseTimePoint::max(),
rocksdb::kDefaultQueryId, ttl);
}

TEST_P(DocDBTestWrapper, TestUpdateDocWriteBatchTTL) {
auto dwb = MakeDocWriteBatch();
KeyValueWriteBatchPB kv_pb;
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_FALSE(kv_pb.has_ttl());

// Write a subdoc with kMaxTtl, which should not show up in the the kv ttl.
ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, Value::kMaxTtl));
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_FALSE(kv_pb.has_ttl());

// Write a subdoc with 10s TTL, which should show up in the the kv ttl.
ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 10s));
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_EQ(kv_pb.ttl(), 10 * MonoTime::kNanosecondsPerSecond);

// Write a subdoc with 5s TTL, which should make the kv ttl unchanged.
ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 5s));
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_EQ(kv_pb.ttl(), 10 * MonoTime::kNanosecondsPerSecond);

// Write a subdoc with 15s TTL, which should show up in the the kv ttl.
ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 15s));
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_EQ(kv_pb.ttl(), 15 * MonoTime::kNanosecondsPerSecond);

// Write a subdoc with kMaxTTL, which should make the kv ttl unchanged.
ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, Value::kMaxTtl));
dwb.TEST_CopyToWriteBatchPB(&kv_pb);
ASSERT_EQ(kv_pb.ttl(), 15 * MonoTime::kNanosecondsPerSecond);
}

TEST_P(DocDBTestWrapper, TestCompactionWithUserTimestamp) {
const DocKey doc_key(PrimitiveValues("k1"));
HybridTime t3000 = 3000_usec_ht;
Expand Down
2 changes: 2 additions & 0 deletions src/yb/docdb/docdb.proto
Expand Up @@ -40,6 +40,8 @@ message KeyValueWriteBatchPB {
repeated KeyValuePairPB read_pairs = 5;
optional RowMarkType row_mark_type = 6;
repeated ApplyExternalTransactionPB apply_external_transactions = 7;

optional int64 ttl = 9;
}

message ConsensusFrontierPB {
Expand Down
96 changes: 94 additions & 2 deletions src/yb/integration-tests/compaction-test.cc
Expand Up @@ -16,7 +16,9 @@
#include "yb/client/transaction_pool.h"

#include "yb/common/table_properties_constants.h"
#include "yb/consensus/consensus.h"
#include "yb/docdb/compaction_file_filter.h"
#include "yb/docdb/consensus_frontier.h"
#include "yb/gutil/integral_types.h"
#include "yb/integration-tests/test_workload.h"
#include "yb/integration-tests/mini_cluster.h"
Expand All @@ -32,6 +34,7 @@

#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/monotime.h"
#include "yb/util/size_literals.h"
#include "yb/util/test_util.h"

Expand Down Expand Up @@ -130,14 +133,16 @@ class CompactionTest : public YBTest {

// Start cluster.
MiniClusterOptions opts;
opts.num_tablet_servers = 1;
opts.num_tablet_servers = NumTabletServers();
cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(cluster_->Start());
// These flags should be set after minicluster start, so it wouldn't override them.
FLAGS_db_write_buffer_size = kMemStoreSize;
FLAGS_rocksdb_level0_file_num_compaction_trigger = 3;
// Patch tablet options inside tablet manager, will be applied to newly created tablets.
cluster_->GetTabletManager(0)->TEST_tablet_options()->listeners.push_back(rocksdb_listener_);
for (int i = 0 ; i < NumTabletServers(); i++) {
cluster_->GetTabletManager(i)->TEST_tablet_options()->listeners.push_back(rocksdb_listener_);
}

client_ = ASSERT_RESULT(cluster_->CreateClient());
transaction_manager_ = std::make_unique<client::TransactionManager>(
Expand Down Expand Up @@ -180,6 +185,10 @@ class CompactionTest : public YBTest {
return -1;
}

virtual int NumTabletServers() {
return 1;
}

size_t BytesWritten() {
return workload_->rows_inserted() * kPayloadBytes;
}
Expand Down Expand Up @@ -788,5 +797,88 @@ TEST_F(CompactionTestWithFileExpiration, TableTTLChangesWillChangeWhetherFilesEx
AssertAllFilesExpired();
}

class FileExpirationWithRF3 : public CompactionTestWithFileExpiration {
public:
void SetUp() override {
CompactionTestWithFileExpiration::SetUp();
}
protected:
bool AllFilesHaveTTLMetadata();
void WaitUntilAllCommittedOpsApplied(const MonoDelta timeout);
void ExpirationWhenReplicated(bool withValueTTL);
int NumTabletServers() override {
return 3;
}
int ttl_to_use() override {
return kTTLSec;
}
const int kTTLSec = 1;
};

bool FileExpirationWithRF3::AllFilesHaveTTLMetadata() {
auto dbs = GetAllRocksDbs(cluster_.get(), false);
for (auto* db : dbs) {
auto metas = db->GetLiveFilesMetaData();
for (auto file : metas) {
const docdb::ConsensusFrontier largest =
down_cast<docdb::ConsensusFrontier&>(*file.largest.user_frontier);
auto max_ttl_expiry = largest.max_value_level_ttl_expiration_time();
// If value is not valid, then it wasn't initialized.
// If value is kInitial, then the table-level TTL will be used (no value metadata).
if (!max_ttl_expiry.is_valid() || max_ttl_expiry == HybridTime::kInitial) {
return false;
}
}
}
return true;
}

void FileExpirationWithRF3::WaitUntilAllCommittedOpsApplied(const MonoDelta timeout) {
const auto completion_deadline = MonoTime::Now() + timeout;
for (auto& peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) {
auto consensus = peer->shared_consensus();
if (consensus) {
ASSERT_OK(Wait([consensus]() -> Result<bool> {
return consensus->GetLastAppliedOpId() >= consensus->GetLastCommittedOpId();
}, completion_deadline, "Waiting for all committed ops to be applied"));
}
}
}

void FileExpirationWithRF3::ExpirationWhenReplicated(bool withValueTTL) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = -1;
SetupWorkload(IsolationLevel::NON_TRANSACTIONAL);
if (withValueTTL) {
// Change the table TTL to a large value that won't expire.
ASSERT_OK(ChangeTableTTL(workload_->table_name(), 1000000));
} else {
// Set workload to not have value TTL.
workload_->set_ttl(-1);
}
ANNOTATE_UNPROTECTED_WRITE(FLAGS_file_expiration_value_ttl_overrides_table_ttl) = withValueTTL;

WriteAtLeastFilesPerDb(5);
WaitUntilAllCommittedOpsApplied(15s);
ASSERT_EQ(AllFilesHaveTTLMetadata(), withValueTTL);

LOG(INFO) << "Sleeping to expire files according to value TTL";
auto timeToSleep = 2 * (withValueTTL ? kTTLSec : kTableTTLSec);
SleepFor(MonoDelta::FromSeconds(timeToSleep));

ASSERT_OK(ExecuteManualCompaction());
// Assert that all data has been deleted, and that we're filtering SST files.
AssertAllFilesExpired();
}

TEST_F_EX(
CompactionTestWithFileExpiration, ReplicatedMetadataCanExpireFile, FileExpirationWithRF3) {
ExpirationWhenReplicated(true);
}

TEST_F_EX(
CompactionTestWithFileExpiration, ReplicatedNoMetadataUsesTableTTL, FileExpirationWithRF3) {
ExpirationWhenReplicated(false);
}

} // namespace tserver
} // namespace yb
7 changes: 0 additions & 7 deletions src/yb/tablet/operations/operation.cc
Expand Up @@ -118,13 +118,6 @@ std::string OperationState::LogPrefix() const {
return Format("$0: ", this);
}

void OperationState::UpdateIfMaxTtl(const MonoDelta& ttl) {
std::lock_guard<simple_spinlock> l(mutex_);
if (!ttl_.Initialized() || ttl > ttl_) {
ttl_ = ttl;
}
}

HybridTime OperationState::WriteHybridTime() const {
return hybrid_time();
}
Expand Down
14 changes: 0 additions & 14 deletions src/yb/tablet/operations/operation.h
Expand Up @@ -227,18 +227,6 @@ class OperationState {
return op_id_;
}

void UpdateIfMaxTtl(const MonoDelta& ttl);

const MonoDelta ttl() const {
std::lock_guard<simple_spinlock> l(mutex_);
return ttl_;
}

bool has_ttl() const {
std::lock_guard<simple_spinlock> l(mutex_);
return ttl_.Initialized();
}

bool has_completion_callback() const {
return completion_clbk_ != nullptr;
}
Expand Down Expand Up @@ -273,8 +261,6 @@ class OperationState {
// This OpId stores the canonical "anchor" OpId for this transaction.
OpIdPB op_id_;

MonoDelta ttl_ GUARDED_BY(mutex_);

scoped_refptr<consensus::ConsensusRound> consensus_round_;

// Lock that protects access to operation state.
Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/tablet.cc
Expand Up @@ -1054,7 +1054,9 @@ Status Tablet::ApplyOperationState(
operation_state.hybrid_time(),
&frontiers);
if (frontiers_ptr) {
auto ttl = operation_state.has_ttl() ? operation_state.ttl() : docdb::Value::kMaxTtl;
auto ttl = write_batch.has_ttl()
? MonoDelta::FromNanoseconds(write_batch.ttl())
: docdb::Value::kMaxTtl;
frontiers_ptr->Largest().set_max_value_level_ttl_expiration_time(
docdb::FileExpirationFromValueTTL(operation_state.hybrid_time(), ttl));
}
Expand Down Expand Up @@ -1453,8 +1455,6 @@ void Tablet::CompleteQLWriteBatch(std::unique_ptr<WriteOperation> operation, con

for (size_t i = 0; i < doc_ops.size(); i++) {
QLWriteOperation* ql_write_op = down_cast<QLWriteOperation*>(doc_ops[i].get());
const MonoDelta ttl = ql_write_op->request_ttl();
operation->state()->UpdateIfMaxTtl(ttl);
if (metadata_->is_unique_index() &&
ql_write_op->request().type() == QLWriteRequestPB::QL_STMT_INSERT &&
ql_write_op->response()->has_applied() && !ql_write_op->response()->applied()) {
Expand Down

0 comments on commit 8200c41

Please sign in to comment.