Skip to content

Commit

Permalink
KUDU-2671 range-specific hash schemas for 'kudu table add_range_parti…
Browse files Browse the repository at this point in the history
…tion'

This patch adds support for range-specific hash schemas into the
'kudu table add_range_partition' CLI tool.  This patch also contains
a test scenario to cover the newly introduced functionality.

An example of usage:

  kudu table add_range_partition $KUDU_MASTER my_table [0] [1] \
    --hash_schema='{"hash_schema": [{"columns": ["c"], "num_buckets": 5}]}'

Change-Id: I3832312b6ebfb397bb3083931f6d53039afc5e9b
Reviewed-on: http://gerrit.cloudera.org:8080/18814
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
Reviewed-by: Mahesh Reddy <mreddy@cloudera.com>
Tested-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Attila Bukor <abukor@apache.org>
  • Loading branch information
alexeyserbin committed Aug 4, 2022
1 parent 18d4067 commit d92f0f8
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 9 deletions.
159 changes: 157 additions & 2 deletions src/kudu/tools/kudu-admin-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -142,7 +141,7 @@ using std::deque;
using std::endl;
using std::ostringstream;
using std::string;
using std::thread;
using std::pair;
using std::tuple;
using std::unique_ptr;
using std::vector;
Expand Down Expand Up @@ -3214,6 +3213,162 @@ TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) {
});
}

TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) {
FLAGS_num_tablet_servers = 1;
FLAGS_num_replicas = 1;

NO_FATALS(BuildAndStart());

const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
constexpr const char* const kTestTableName = "custom_hash_schemas";
constexpr const char* const kC0 = "c0";
constexpr const char* const kC1 = "c1";
constexpr const char* const kC2 = "c2";

{
KuduSchemaBuilder builder;
builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull();
builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull();
builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING);
builder.SetPrimaryKey({ kC0, kC1 });
KuduSchema schema;
ASSERT_OK(builder.Build(&schema));

// Create a table with left-unbounded range partition having the
// table-wide hash schema.
unique_ptr<KuduPartialRow> l(schema.NewRow());
unique_ptr<KuduPartialRow> u(schema.NewRow());
ASSERT_OK(u->SetInt8(kC0, 0));

unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTestTableName)
.schema(&schema)
.set_range_partition_columns({ kC0 })
.add_hash_partitions({ kC1 }, 2)
.add_range_partition(l.release(), u.release())
.num_replicas(FLAGS_num_replicas)
.Create());
}

string stdout;
string stderr;

// Add a range partition with custom hash schema using the kudu CLI tool.
{
constexpr const char* const kHashSchemaJson = R"*({
"hash_schema": [
{ "columns": ["c1"], "num_buckets": 5, "seed": 8 }
]
})*";
const auto s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
"[0]",
"[1]",
Substitute("--hash_schema=$0", kHashSchemaJson),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}
{
const auto s = RunKuduTool({
"table",
"describe",
master_addr,
kTestTableName,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
}

// Insert a row into the newly added range partition.
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));

auto session = client_->NewSession();
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt8(kC0, 0));
ASSERT_OK(row->SetInt16(kC1, 0));
ASSERT_OK(row->SetString(kC2, "0"));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(1, CountTableRows(table.get()));
}

// Add unbounded range using the kudu CLI tool.
{
constexpr const char* const kHashSchemaJson = R"*({
"hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*";
const auto s = RunKuduTool({
"table",
"add_range_partition",
master_addr,
kTestTableName,
"[1]",
"[]",
Substitute("--hash_schema=$0", kHashSchemaJson),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}
{
const auto s = RunKuduTool({
"table",
"describe",
master_addr,
kTestTableName,
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
ASSERT_STR_CONTAINS(stdout,
"PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
ASSERT_STR_CONTAINS(stdout,
"PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3");
}

// Insert a row into the newly added range partition.
{
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
auto session = client_->NewSession();
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
ASSERT_OK(row->SetInt8(kC0, 10));
ASSERT_OK(row->SetInt16(kC1, 10));
ASSERT_OK(row->SetString(kC2, "10"));
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(session->Flush());
ASSERT_EQ(2, CountTableRows(table.get()));
}

// Drop all the ranges one-by-one.
const vector<pair<string, string>> kRangesStr = {
{"", "0"}, {"0", "1"}, {"1", ""},
};

for (const std::pair<string, string>& r : kRangesStr) {
SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second));
const auto s = RunKuduTool({
"table",
"drop_range_partition",
master_addr,
kTestTableName,
Substitute("[$0]", r.first),
Substitute("[$0]", r.second),
}, &stdout, &stderr);
ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
}

// There should be 0 rows left.
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client_->OpenTable(kTestTableName, &table));
ASSERT_EVENTUALLY([&]() {
ASSERT_EQ(0, CountTableRows(table.get()));
});
}

namespace {
constexpr const char* kPrincipal = "oryx";

Expand Down
5 changes: 5 additions & 0 deletions src/kudu/tools/tool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,11 @@ message PartitionPB {
repeated RangeWithHashSchemaPB custom_hash_schema_ranges = 4;
}

// A standalone message representing a hash schema.
message HashSchemaPB {
repeated HashPartitionPB hash_schema = 1;
}

// Table-wide hash schema.
repeated HashPartitionPB hash_partitions = 1;
// Range partitioning information.
Expand Down
55 changes: 48 additions & 7 deletions src/kudu/tools/tool_action_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ using kudu::client::KuduTableCreator;
using kudu::client::KuduTableStatistics;
using kudu::client::KuduValue;
using kudu::client::internal::ReplicaController;
using kudu::tools::PartitionPB;
using std::cerr;
using std::cout;
using std::endl;
using std::make_unique;
using std::map;
using std::pair;
using std::set;
Expand Down Expand Up @@ -150,6 +152,10 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
"The type of the upper bound, either inclusive or exclusive. "
"Defaults to exclusive. This flag is case-insensitive.");
DEFINE_string(hash_schema, "",
"String representation of range-specific hash schema as a JSON "
"object, e.g. "
"{\"hash_schema\": [{\"columns\": [\"c0\"], \"num_buckets\": 5}]}");
DEFINE_int32(scan_batch_size, -1,
"The size for scan results batches, in bytes. A negative value "
"means the server-side default is used, where the server-side "
Expand Down Expand Up @@ -1000,18 +1006,52 @@ Status ModifyRangePartition(const RunnerContext& context, PartitionAction action
upper_bound.get()));

KuduTableCreator::RangePartitionBound lower_bound_type;
KuduTableCreator::RangePartitionBound upper_bound_type;
RETURN_NOT_OK(convert_bounds_type(
"lower bound", FLAGS_lower_bound_type, &lower_bound_type));

RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type));
RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type));
KuduTableCreator::RangePartitionBound upper_bound_type;
RETURN_NOT_OK(convert_bounds_type(
"upper bound", FLAGS_upper_bound_type, &upper_bound_type));

const auto& hash_schema_str = FLAGS_hash_schema;
PartitionPB::HashSchemaPB hash_schema;
if (!hash_schema_str.empty()) {
JsonParseOptions opts;
opts.case_insensitive_enum_parsing = true;
if (const auto& s = JsonStringToMessage(
hash_schema_str, &hash_schema, opts); !s.ok()) {
return Status::InvalidArgument(
Substitute("unable to parse JSON: $0", hash_schema_str),
s.error_message().ToString());
}
}

unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
if (action == PartitionAction::ADD) {
return alterer->AddRangePartition(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type)->Alter();
if (hash_schema_str.empty()) {
// Add range partition with table-wide hash schema.
return alterer->AddRangePartition(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type)->Alter();
}

// Add range partition with custom hash schema.
auto p = make_unique<KuduRangePartition>(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type);
for (const auto& dimension_pb : hash_schema.hash_schema()) {
vector<string> columns;
for (const auto& column : dimension_pb.columns()) {
columns.emplace_back(column);
}
p->add_hash_partitions(
columns, dimension_pb.num_buckets(), dimension_pb.seed());
}
return alterer->AddRangePartition(p.release())->Alter();
}

DCHECK_EQ(PartitionAction::DROP, action);
return alterer->DropRangePartition(lower_bound.release(),
upper_bound.release(),
Expand Down Expand Up @@ -1790,6 +1830,7 @@ unique_ptr<Mode> BuildTableMode() {
"the upper range partition will be unbounded" })
.AddOptionalParameter("lower_bound_type")
.AddOptionalParameter("upper_bound_type")
.AddOptionalParameter("hash_schema")
.Build();

unique_ptr<Action> column_set_default =
Expand Down

0 comments on commit d92f0f8

Please sign in to comment.