diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc index 4d9706b4c9..1480ce6f38 100644 --- a/src/kudu/tools/kudu-admin-test.cc +++ b/src/kudu/tools/kudu-admin-test.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -142,7 +141,7 @@ using std::deque; using std::endl; using std::ostringstream; using std::string; -using std::thread; +using std::pair; using std::tuple; using std::unique_ptr; using std::vector; @@ -3214,6 +3213,162 @@ TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) { }); } +TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) { + FLAGS_num_tablet_servers = 1; + FLAGS_num_replicas = 1; + + NO_FATALS(BuildAndStart()); + + const string& master_addr = cluster_->master()->bound_rpc_addr().ToString(); + constexpr const char* const kTestTableName = "custom_hash_schemas"; + constexpr const char* const kC0 = "c0"; + constexpr const char* const kC1 = "c1"; + constexpr const char* const kC2 = "c2"; + + { + KuduSchemaBuilder builder; + builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull(); + builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull(); + builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING); + builder.SetPrimaryKey({ kC0, kC1 }); + KuduSchema schema; + ASSERT_OK(builder.Build(&schema)); + + // Create a table with left-unbounded range partition having the + // table-wide hash schema. + unique_ptr l(schema.NewRow()); + unique_ptr u(schema.NewRow()); + ASSERT_OK(u->SetInt8(kC0, 0)); + + unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(table_creator->table_name(kTestTableName) + .schema(&schema) + .set_range_partition_columns({ kC0 }) + .add_hash_partitions({ kC1 }, 2) + .add_range_partition(l.release(), u.release()) + .num_replicas(FLAGS_num_replicas) + .Create()); + } + + string stdout; + string stderr; + + // Add a range partition with custom hash schema using the kudu CLI tool. + { + constexpr const char* const kHashSchemaJson = R"*({ + "hash_schema": [ + { "columns": ["c1"], "num_buckets": 5, "seed": 8 } + ] + })*"; + const auto s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTestTableName, + "[0]", + "[1]", + Substitute("--hash_schema=$0", kHashSchemaJson), + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + } + { + const auto s = RunKuduTool({ + "table", + "describe", + master_addr, + kTestTableName, + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + ASSERT_STR_CONTAINS(stdout, + "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5"); + } + + // Insert a row into the newly added range partition. + { + client::sp::shared_ptr table; + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + + auto session = client_->NewSession(); + unique_ptr insert(table->NewInsert()); + auto* row = insert->mutable_row(); + ASSERT_OK(row->SetInt8(kC0, 0)); + ASSERT_OK(row->SetInt16(kC1, 0)); + ASSERT_OK(row->SetString(kC2, "0")); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + ASSERT_EQ(1, CountTableRows(table.get())); + } + + // Add unbounded range using the kudu CLI tool. + { + constexpr const char* const kHashSchemaJson = R"*({ + "hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*"; + const auto s = RunKuduTool({ + "table", + "add_range_partition", + master_addr, + kTestTableName, + "[1]", + "[]", + Substitute("--hash_schema=$0", kHashSchemaJson), + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + } + { + const auto s = RunKuduTool({ + "table", + "describe", + master_addr, + kTestTableName, + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + ASSERT_STR_CONTAINS(stdout, + "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5"); + ASSERT_STR_CONTAINS(stdout, + "PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3"); + } + + // Insert a row into the newly added range partition. + { + client::sp::shared_ptr table; + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + auto session = client_->NewSession(); + unique_ptr insert(table->NewInsert()); + auto* row = insert->mutable_row(); + ASSERT_OK(row->SetInt8(kC0, 10)); + ASSERT_OK(row->SetInt16(kC1, 10)); + ASSERT_OK(row->SetString(kC2, "10")); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + ASSERT_EQ(2, CountTableRows(table.get())); + } + + // Drop all the ranges one-by-one. + const vector> kRangesStr = { + {"", "0"}, {"0", "1"}, {"1", ""}, + }; + + for (const std::pair& r : kRangesStr) { + SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second)); + const auto s = RunKuduTool({ + "table", + "drop_range_partition", + master_addr, + kTestTableName, + Substitute("[$0]", r.first), + Substitute("[$0]", r.second), + }, &stdout, &stderr); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr); + } + + // There should be 0 rows left. + client::sp::shared_ptr table; + ASSERT_OK(client_->OpenTable(kTestTableName, &table)); + ASSERT_EVENTUALLY([&]() { + ASSERT_EQ(0, CountTableRows(table.get())); + }); +} + namespace { constexpr const char* kPrincipal = "oryx"; diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto index f98d8cd84f..f80e462baf 100644 --- a/src/kudu/tools/tool.proto +++ b/src/kudu/tools/tool.proto @@ -454,6 +454,11 @@ message PartitionPB { repeated RangeWithHashSchemaPB custom_hash_schema_ranges = 4; } + // A standalone message representing a hash schema. + message HashSchemaPB { + repeated HashPartitionPB hash_schema = 1; + } + // Table-wide hash schema. repeated HashPartitionPB hash_partitions = 1; // Range partitioning information. diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc index b764781def..d43c2b299c 100644 --- a/src/kudu/tools/tool_action_table.cc +++ b/src/kudu/tools/tool_action_table.cc @@ -84,9 +84,11 @@ using kudu::client::KuduTableCreator; using kudu::client::KuduTableStatistics; using kudu::client::KuduValue; using kudu::client::internal::ReplicaController; +using kudu::tools::PartitionPB; using std::cerr; using std::cout; using std::endl; +using std::make_unique; using std::map; using std::pair; using std::set; @@ -150,6 +152,10 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND", DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND", "The type of the upper bound, either inclusive or exclusive. " "Defaults to exclusive. This flag is case-insensitive."); +DEFINE_string(hash_schema, "", + "String representation of range-specific hash schema as a JSON " + "object, e.g. " + "{\"hash_schema\": [{\"columns\": [\"c0\"], \"num_buckets\": 5}]}"); DEFINE_int32(scan_batch_size, -1, "The size for scan results batches, in bytes. A negative value " "means the server-side default is used, where the server-side " @@ -1000,18 +1006,52 @@ Status ModifyRangePartition(const RunnerContext& context, PartitionAction action upper_bound.get())); KuduTableCreator::RangePartitionBound lower_bound_type; - KuduTableCreator::RangePartitionBound upper_bound_type; + RETURN_NOT_OK(convert_bounds_type( + "lower bound", FLAGS_lower_bound_type, &lower_bound_type)); - RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type)); - RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type)); + KuduTableCreator::RangePartitionBound upper_bound_type; + RETURN_NOT_OK(convert_bounds_type( + "upper bound", FLAGS_upper_bound_type, &upper_bound_type)); + + const auto& hash_schema_str = FLAGS_hash_schema; + PartitionPB::HashSchemaPB hash_schema; + if (!hash_schema_str.empty()) { + JsonParseOptions opts; + opts.case_insensitive_enum_parsing = true; + if (const auto& s = JsonStringToMessage( + hash_schema_str, &hash_schema, opts); !s.ok()) { + return Status::InvalidArgument( + Substitute("unable to parse JSON: $0", hash_schema_str), + s.error_message().ToString()); + } + } unique_ptr alterer(client->NewTableAlterer(table_name)); if (action == PartitionAction::ADD) { - return alterer->AddRangePartition(lower_bound.release(), - upper_bound.release(), - lower_bound_type, - upper_bound_type)->Alter(); + if (hash_schema_str.empty()) { + // Add range partition with table-wide hash schema. + return alterer->AddRangePartition(lower_bound.release(), + upper_bound.release(), + lower_bound_type, + upper_bound_type)->Alter(); + } + + // Add range partition with custom hash schema. + auto p = make_unique(lower_bound.release(), + upper_bound.release(), + lower_bound_type, + upper_bound_type); + for (const auto& dimension_pb : hash_schema.hash_schema()) { + vector columns; + for (const auto& column : dimension_pb.columns()) { + columns.emplace_back(column); + } + p->add_hash_partitions( + columns, dimension_pb.num_buckets(), dimension_pb.seed()); + } + return alterer->AddRangePartition(p.release())->Alter(); } + DCHECK_EQ(PartitionAction::DROP, action); return alterer->DropRangePartition(lower_bound.release(), upper_bound.release(), @@ -1790,6 +1830,7 @@ unique_ptr BuildTableMode() { "the upper range partition will be unbounded" }) .AddOptionalParameter("lower_bound_type") .AddOptionalParameter("upper_bound_type") + .AddOptionalParameter("hash_schema") .Build(); unique_ptr column_set_default =