diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index 495056314e8f..fd87f36fa171 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -288,16 +288,33 @@ THolder CreateChangefeedPropose( cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds()); } + auto tableDesc = GetTableDescription(ss, dstPath->PathId); + Y_ABORT_UNLESS(!tableDesc.GetKeyColumnIds().empty()); + const auto& keyId = tableDesc.GetKeyColumnIds()[0]; + bool isPartitioningAvailable = false; + + // Explicit specification of the number of partitions when creating CDC + // is possible only if the first component of the primary key + // of the source table is Uint32 or Uint64 + for (const auto& column : tableDesc.GetColumns()) { + if (column.GetId() == keyId) { + isPartitioningAvailable = column.GetType() == "Uint32" || column.GetType() == "Uint64"; + break; + } + } + if (topic.has_partitioning_settings()) { - i64 minActivePartitions = - topic.partitioning_settings().min_active_partitions(); - if (minActivePartitions < 0) { - error = "minActivePartitions must be >= 0"; - return nullptr; - } else if (minActivePartitions == 0) { - minActivePartitions = 1; + if (isPartitioningAvailable) { + i64 minActivePartitions = + topic.partitioning_settings().min_active_partitions(); + if (minActivePartitions < 0) { + error = "minActivePartitions must be >= 0"; + return nullptr; + } else if (minActivePartitions == 0) { + minActivePartitions = 1; + } + cdcStream.SetTopicPartitions(minActivePartitions); } - cdcStream.SetTopicPartitions(minActivePartitions); if (topic.partitioning_settings().has_auto_partitioning_settings()) { auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings(); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 6927f9054ee0..d4a9e006f9d8 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -1145,6 +1145,18 @@ void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record) UNIT_ASSERT(!record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().HasMaxPartitionsCount()); } +TCheckFunc MinTopicPartitionsCountEqual(ui32 count) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMinPartitionCount(), count); + }; +} + +TCheckFunc MaxTopicPartitionsCountEqual(ui32 count) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMaxPartitionCount(), count); + }; +} + TCheckFunc PartitioningByLoadStatus(bool status) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().GetSplitByLoadSettings().GetEnabled(), status); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 64a5bd1f35fe..0cd4fa0bdc56 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -138,6 +138,8 @@ namespace NLs { TCheckFunc MaxPartitionsCountEqual(ui32 count); void HasMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record); void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record); + TCheckFunc MinTopicPartitionsCountEqual(ui32 count); + TCheckFunc MaxTopicPartitionsCountEqual(ui32 count); TCheckFunc PartitioningByLoadStatus(bool status); TCheckFunc ColumnFamiliesCount(ui32 size); TCheckFunc ColumnFamiliesHas(ui32 familyId); diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index 6955aed70987..6ff4f4bc953f 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -5238,7 +5238,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { std::function Checker; }; - TGeneratedChangefeed GenChangefeed(ui64 num = 1) { + TGeneratedChangefeed GenChangefeed(ui64 num = 1, bool isPartitioningAvailable = true) { const TString changefeedName = TStringBuilder() << "updates_feed" << num; const auto changefeedPath = TStringBuilder() << "/" << changefeedName; @@ -5251,10 +5251,10 @@ Y_UNIT_TEST_SUITE(TImportTests) { const auto topicDesc = R"( partitioning_settings { - min_active_partitions: 1 - max_active_partitions: 1 + min_active_partitions: 2 + max_active_partitions: 3 auto_partitioning_settings { - strategy: AUTO_PARTITIONING_STRATEGY_DISABLED + strategy: AUTO_PARTITIONING_STRATEGY_SCALE_UP partition_write_speed { stabilization_window { seconds: 300 @@ -5303,40 +5303,50 @@ Y_UNIT_TEST_SUITE(TImportTests) { attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc); return { {changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})}, - [changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) { + [changefeedPath = TString(changefeedPath), isPartitioningAvailable](TTestBasicRuntime& runtime) { TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), { NLs::PathExist, }); TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), { - NLs::ConsumerExist("my_consumer") + NLs::ConsumerExist("my_consumer"), + NLs::MinTopicPartitionsCountEqual(isPartitioningAvailable ? 2 : 1), + NLs::MaxTopicPartitionsCountEqual(3), }); } }; } - TVector> GenChangefeeds(THashMap& bucketContent, ui64 count = 1) { + TVector> GenChangefeeds( + THashMap& bucketContent, + ui64 count = 1, + bool isPartitioningAvailable = true) + { TVector> checkers; checkers.reserve(count); for (ui64 i = 1; i <= count; ++i) { - auto genChangefeed = GenChangefeed(i); + auto genChangefeed = GenChangefeed(i, isPartitioningAvailable); bucketContent.emplace(genChangefeed.Changefeed); checkers.push_back(genChangefeed.Checker); } return checkers; } - std::function AddedSchemeCommon(THashMap& bucketContent, const TString& permissions) { - const auto data = GenerateTestData(R"( + std::function AddedSchemeCommon( + THashMap& bucketContent, + const TString& permissions, + const TString& pkType) + { + const auto data = GenerateTestData(Sprintf(R"( columns { name: "key" - type { optional_type { item { type_id: UTF8 } } } + type { optional_type { item { type_id: %s } } } } columns { name: "value" type { optional_type { item { type_id: UTF8 } } } } primary_key: "key" - )", {{"a", 1}}, permissions); + )", pkType.c_str()), {{pkType == "UTF8" ? "a" : "", 1}}, permissions); bucketContent.emplace("", data); return [](TTestBasicRuntime& runtime) { @@ -5346,11 +5356,17 @@ Y_UNIT_TEST_SUITE(TImportTests) { }; } - std::function AddedScheme(THashMap& bucketContent) { - return AddedSchemeCommon(bucketContent, ""); + std::function AddedScheme( + THashMap& bucketContent, + const TString& pkType) + { + return AddedSchemeCommon(bucketContent, "", pkType); } - std::function AddedSchemeWithPermissions(THashMap& bucketContent) { + std::function AddedSchemeWithPermissions( + THashMap& bucketContent, + const TString& pkType) + { const auto permissions = R"( actions { change_owner: "eve" @@ -5374,34 +5390,22 @@ Y_UNIT_TEST_SUITE(TImportTests) { } } )"; - return AddedSchemeCommon(bucketContent, permissions); + return AddedSchemeCommon(bucketContent, permissions, pkType); } - using SchemeFunction = std::function(THashMap&)>; + using SchemeFunction = std::function(THashMap&, const TString&)>; - void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme) { + void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme, const TString& pkType = "UTF8") { TTestBasicRuntime runtime; TTestEnv env(runtime); ui64 txId = 100; runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); - const auto data = GenerateTestData(R"( - columns { - name: "key" - type { optional_type { item { type_id: UTF8 } } } - } - columns { - name: "value" - type { optional_type { item { type_id: UTF8 } } } - } - primary_key: "key" - )"); - THashMap bucketContent(countChangefeed + 1); - auto checkerTable = addedScheme(bucketContent); - auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed); + auto checkerTable = addedScheme(bucketContent, pkType); + auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed, pkType == "UINT32" || pkType == "UINT64"); TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -5431,6 +5435,17 @@ Y_UNIT_TEST_SUITE(TImportTests) { TestImportChangefeeds(1, AddedScheme); } + // Explicit specification of the number of partitions when creating CDC + // is possible only if the first component of the primary key + // of the source table is Uint32 or Uint64 + Y_UNIT_TEST(ChangefeedWithPartitioning) { + TestImportChangefeeds(1, AddedScheme, "UINT32"); + } + + Y_UNIT_TEST(ChangefeedsWithPartitioning) { + TestImportChangefeeds(3, AddedScheme, "UINT64"); + } + Y_UNIT_TEST(Changefeeds) { TestImportChangefeeds(3, AddedScheme); }