Skip to content

Commit

Permalink
[#8543] PITR: Add test for need to increase table version on restore
Browse files Browse the repository at this point in the history
Summary:
This diff adds that fails if we don't patch schema version on restore.
Consider the following scenario, w/o patching:
1) Create table. Schema version - 0.
2) Add text column to table. Schema version - 1.
3) Insert values into table. Each CQL proxy suppose schema version 1 for this table.
4) Restore to time between (1) and (2). Schema version - 0.
5) Add int column to table. Schema version - 1.
6) Try insert values with wrong type into table.

So table has schema version 1, but new column is INT.
CQL proxy suppose schema version is also 1, but the last column is TEXT.

If we patch schema version on restore and set it to last version + 1, then each CQL proxy will have to reload schema for this table.

Test Plan: ybd --gtest_filter YbAdminSnapshotScheduleTest.AlterTable

Reviewers: bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D11693
  • Loading branch information
spolitov committed May 24, 2021
1 parent 588542a commit f2a2d0b
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 23 deletions.
36 changes: 18 additions & 18 deletions src/yb/integration-tests/cassandra_cpp_driver-test.cc
Expand Up @@ -111,7 +111,7 @@ class CppCassandraDriverTest : public ExternalMiniClusterITestBase {
hosts.push_back(cluster_->tablet_server(i)->bind_host());
}
driver_.reset(new CppCassandraDriver(
hosts, cluster_->tablet_server(0)->cql_rpc_port(), UsePartitionAwareRouting()));
hosts, cluster_->tablet_server(0)->cql_rpc_port(), use_partition_aware_routing()));

// Create and use default keyspace.
auto deadline = CoarseMonoClock::now() + 15s;
Expand Down Expand Up @@ -153,8 +153,8 @@ class CppCassandraDriverTest : public ExternalMiniClusterITestBase {
return 1;
}

virtual bool UsePartitionAwareRouting() {
return true;
virtual UsePartitionAwareRouting use_partition_aware_routing() {
return UsePartitionAwareRouting::kTrue;
}

protected:
Expand Down Expand Up @@ -203,10 +203,10 @@ class CppCassandraDriverTestIndex : public CppCassandraDriverTest {
};
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}

protected:
Expand Down Expand Up @@ -2077,7 +2077,7 @@ TEST_F_EX(CppCassandraDriverTest, TestDeleteAndCreateIndex, CppCassandraDriverTe
}
for (int i = 0; i <= kNumLoops; i++) {
drivers.emplace_back(new CppCassandraDriver(
hosts, cluster_->tablet_server(0)->cql_rpc_port(), false /*UsePartitionAwareRouting()*/));
hosts, cluster_->tablet_server(0)->cql_rpc_port(), UsePartitionAwareRouting::kFalse));
}

for (int i = 0; i <= kNumLoops; i++) {
Expand Down Expand Up @@ -2563,10 +2563,10 @@ class CppCassandraDriverBackpressureTest : public CppCassandraDriverTest {
return {"--tablet_server_svc_queue_length=10"s, "--max_time_in_queue_ms=-1"s};
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// TODO: Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}
};

Expand Down Expand Up @@ -2609,10 +2609,10 @@ class CppCassandraDriverTransactionalWriteTest : public CppCassandraDriverTest {
return {"--TEST_transaction_inject_flushed_delay_ms=10"s};
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// TODO: Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}
};

Expand Down Expand Up @@ -2645,10 +2645,10 @@ class CppCassandraDriverTestThreeMasters : public CppCassandraDriverTestNoPartit
return 3;
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// TODO: Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}
};

Expand Down Expand Up @@ -2753,10 +2753,10 @@ class CppCassandraDriverTestPartitionsVtableCache : public CppCassandraDriverTes
return flags;
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// TODO: Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}

int table_idx_ = 0;
Expand Down Expand Up @@ -2817,10 +2817,10 @@ class CppCassandraDriverRejectionTest : public CppCassandraDriverTest {
"--linear_backoff_ms=10"};
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}
};

Expand Down Expand Up @@ -2921,10 +2921,10 @@ class CppCassandraDriverSmallSoftLimitTest : public CppCassandraDriverTest {
};
}

bool UsePartitionAwareRouting() override {
UsePartitionAwareRouting use_partition_aware_routing() override {
// Disable partition aware routing in this test because of TSAN issue (#1837).
// Should be reenabled when issue is fixed.
return false;
return UsePartitionAwareRouting::kFalse;
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/cql_test_base.cc
Expand Up @@ -49,7 +49,7 @@ void CqlTestBase::SetUp() {
ASSERT_OK(cql_server_->Start());

driver_ = std::make_unique<CppCassandraDriver>(
std::vector<std::string>{ cql_host }, cql_port, false);
std::vector<std::string>{ cql_host }, cql_port, UsePartitionAwareRouting::kFalse);
}

void CqlTestBase::DoTearDown() {
Expand Down
39 changes: 38 additions & 1 deletion src/yb/integration-tests/cql_test_util.cc
Expand Up @@ -163,6 +163,18 @@ CassandraRowIterator CassandraRow::CreateIterator() const {
return CassandraRowIterator(cass_iterator_from_row(cass_row_));
}

std::string CassandraRow::RenderToString(const std::string& separator) {
std::string result;
auto iter = CreateIterator();
while (iter.Next()) {
if (!result.empty()) {
result += separator;
}
result += iter.Value().ToString();
}
return result;
}

void CassandraRow::TakeIterator(CassIteratorPtr iterator) {
cass_iterator_ = std::move(iterator);
}
Expand All @@ -183,6 +195,20 @@ CassandraIterator CassandraResult::CreateIterator() const {
return CassandraIterator(cass_iterator_from_result(cass_result_.get()));
}

std::string CassandraResult::RenderToString(
const std::string& line_separator, const std::string& value_separator) const {
std::string result;
auto iter = CreateIterator();
while (iter.Next()) {
auto row = iter.Row();
if (!result.empty()) {
result += ";";
}
result += row.RenderToString();
}
return result;
}

bool CassandraFuture::Ready() const {
return cass_future_ready(future_.get());
}
Expand Down Expand Up @@ -315,6 +341,10 @@ Result<CassandraResult> CassandraSession::ExecuteWithResult(const CassandraState
return future.Result();
}

Result<std::string> CassandraSession::ExecuteAndRenderToString(const std::string& statement) {
return VERIFY_RESULT(ExecuteWithResult(statement)).RenderToString();
}

CassandraFuture CassandraSession::ExecuteGetFuture(const CassandraStatement& statement) {
return CassandraFuture(
cass_session_execute(cass_session_.get(), statement.cass_statement_.get()));
Expand Down Expand Up @@ -373,11 +403,18 @@ CassandraStatement CassandraPrepared::Bind() {
const MonoDelta kCassandraTimeOut = RegularBuildVsSanitizers(12s, 60s);

CppCassandraDriver::CppCassandraDriver(
const std::vector<std::string>& hosts, uint16_t port, bool use_partition_aware_routing) {
const std::vector<std::string>& hosts, uint16_t port,
UsePartitionAwareRouting use_partition_aware_routing) {

// Enable detailed tracing inside driver.
if (VLOG_IS_ON(4)) {
cass_log_set_level(CASS_LOG_TRACE);
} else if (VLOG_IS_ON(3)) {
cass_log_set_level(CASS_LOG_DEBUG);
} else if (VLOG_IS_ON(2)) {
cass_log_set_level(CASS_LOG_INFO);
} else if (VLOG_IS_ON(1)) {
cass_log_set_level(CASS_LOG_WARN);
}

auto hosts_str = JoinStrings(hosts, ",");
Expand Down
12 changes: 11 additions & 1 deletion src/yb/integration-tests/cql_test_util.h
Expand Up @@ -97,6 +97,8 @@ class CassandraRow {

CassandraRowIterator CreateIterator() const;

std::string RenderToString(const std::string& separator = ",");

void TakeIterator(CassIteratorPtr iterator);

private:
Expand Down Expand Up @@ -127,6 +129,9 @@ class CassandraResult {

CassandraIterator CreateIterator() const;

std::string RenderToString(const std::string& line_separator = ";",
const std::string& value_separator = ",") const;

private:
CassResultPtr cass_result_;
};
Expand Down Expand Up @@ -236,6 +241,8 @@ class CassandraSession {

Result<CassandraResult> ExecuteWithResult(const std::string& query);

Result<std::string> ExecuteAndRenderToString(const std::string& statement);

template <class Action>
CHECKED_STATUS ExecuteAndProcessOneRow(
const CassandraStatement& statement, const Action& action) {
Expand Down Expand Up @@ -279,10 +286,13 @@ class CassandraSession {
CassSessionPtr cass_session_;
};

YB_STRONGLY_TYPED_BOOL(UsePartitionAwareRouting);

class CppCassandraDriver {
public:
CppCassandraDriver(
const std::vector<std::string>& hosts, uint16_t port, bool use_partition_aware_routing);
const std::vector<std::string>& hosts, uint16_t port,
UsePartitionAwareRouting use_partition_aware_routing);

~CppCassandraDriver();

Expand Down
59 changes: 58 additions & 1 deletion src/yb/tools/yb-admin-snapshot-schedule-test.cc
Expand Up @@ -24,6 +24,7 @@

#include "yb/util/date_time.h"
#include "yb/util/random_util.h"
#include "yb/util/range.h"

#include "yb/yql/pgwrapper/libpq_utils.h"

Expand Down Expand Up @@ -188,8 +189,9 @@ class YbAdminSnapshotScheduleTest : public AdminTestBase {
for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
hosts.push_back(cluster_->tablet_server(i)->bind_host());
}
LOG(INFO) << "CQL hosts: " << AsString(hosts);
cql_driver_ = std::make_unique<CppCassandraDriver>(
hosts, cluster_->tablet_server(0)->cql_rpc_port(), true);
hosts, cluster_->tablet_server(0)->cql_rpc_port(), UsePartitionAwareRouting::kTrue);
}
auto result = VERIFY_RESULT(cql_driver_->CreateSession());
if (!db_name.empty()) {
Expand All @@ -212,6 +214,11 @@ class YbAdminSnapshotScheduleTest : public AdminTestBase {

void TestUndeleteTable(bool restart_masters);

void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override {
options->bind_to_unique_loopback_addresses = true;
options->use_same_ts_ports = true;
}

std::unique_ptr<CppCassandraDriver> cql_driver_;
};

Expand Down Expand Up @@ -430,5 +437,55 @@ TEST_F(YbAdminSnapshotScheduleTest, UndeleteIndex) {
ASSERT_EQ(res, 1);
}

// This test is for schema version patching after restore.
// Consider the following scenario, w/o patching:
//
// 1) Create table.
// 2) Add text column to table. Schema version - 1.
// 3) Insert values into table. Each CQL proxy suppose schema version 1 for this table.
// 4) Restore to time between (1) and (2). Schema version - 0.
// 5) Add int column to table. Schema version - 1.
// 6) Try insert values with wrong type into table.
//
// So table has schema version 1, but new column is INT.
// CQL proxy suppose schema version is also 1, but the last column is TEXT.
TEST_F(YbAdminSnapshotScheduleTest, AlterTable) {
const auto kKeys = Range(10);

auto schedule_id = ASSERT_RESULT(PrepareCql());

auto conn = ASSERT_RESULT(CqlConnect(client::kTableName.namespace_name()));

ASSERT_OK(conn.ExecuteQuery(
"CREATE TABLE test_table (key INT PRIMARY KEY, value TEXT)"));

for (auto key : kKeys) {
ASSERT_OK(conn.ExecuteQuery(Format(
"INSERT INTO test_table (key, value) VALUES ($0, 'A')", key)));
}

Timestamp time(ASSERT_RESULT(WallClock()->Now()).time_point);

ASSERT_OK(conn.ExecuteQuery(
"ALTER TABLE test_table ADD value2 TEXT"));

for (auto key : kKeys) {
ASSERT_OK(conn.ExecuteQuery(Format(
"INSERT INTO test_table (key, value, value2) VALUES ($0, 'B', 'X')", key)));
}

ASSERT_OK(RestoreSnapshotSchedule(schedule_id, time));

ASSERT_OK(conn.ExecuteQuery(
"ALTER TABLE test_table ADD value2 INT"));

for (auto key : kKeys) {
// It would succeed on some TServers if we would not refresh metadata after restore.
// But it should not succeed because of last column type.
ASSERT_NOK(conn.ExecuteQuery(Format(
"INSERT INTO test_table (key, value, value2) VALUES ($0, 'D', 'Y')", key)));
}
}

} // namespace tools
} // namespace yb

0 comments on commit f2a2d0b

Please sign in to comment.