diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 4fe1f6170b02..9613af3142a5 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -84,6 +84,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableStreamingQueries) FEATURE_FLAG_SETTER(EnableSecureScriptExecutions) FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert) + FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForReplication) #undef FEATURE_FLAG_SETTER }; diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index d02fc7c0a815..8e006b5eeb64 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -274,18 +274,19 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct const auto* target = replication->FindTarget(targetId); Y_ABORT_UNLESS(target); - const auto& config = replication->GetConfig().GetConsistencySettings(); - const auto resolvedTimestamps = config.HasGlobal() - ? std::make_optional(TDuration::MilliSeconds(config.GetGlobal().GetCommitIntervalMilliSeconds())) + const auto& config = replication->GetConfig(); + const auto& consistency = config.GetConsistencySettings(); + const auto resolvedTimestamps = consistency.HasGlobal() + ? std::make_optional(TDuration::MilliSeconds(consistency.GetGlobal().GetCommitIntervalMilliSeconds())) : std::nullopt; - const bool needCreate = !replication->GetConfig().HasTransferSpecific() || - !replication->GetConfig().GetTransferSpecific().GetTarget().HasConsumerName(); + const bool needCreate = !config.HasTransferSpecific() || !config.GetTransferSpecific().GetTarget().HasConsumerName(); + const bool supportsTopicAutopartitioning = !consistency.HasGlobal() && AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication(); return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(), replication->GetId(), target->GetId(), target->GetConfig(), target->GetStreamName(), target->GetStreamConsumerName(), TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps, - AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication(), needCreate); + supportsTopicAutopartitioning, needCreate); } IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, diff --git a/ydb/core/tx/replication/controller/stream_creator_ut.cpp b/ydb/core/tx/replication/controller/stream_creator_ut.cpp index 87b521b8ce5c..c2c8effdb4cf 100644 --- a/ydb/core/tx/replication/controller/stream_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/stream_creator_ut.cpp @@ -71,6 +71,51 @@ Y_UNIT_TEST_SUITE(StreamCreator) { Y_UNIT_TEST(WithResolvedTimestamps) { Basic(TDuration::Seconds(10)); } + + void TopicAutoPartitioning(bool enabled) { + TEnv env(TFeatureFlags().SetEnableTopicAutopartitioningForCDC(true)); + env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE); + + env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{ + .Name = "Table", + .KeyColumns = {"key"}, + .Columns = { + {.Name = "key", .Type = "Uint32"}, + {.Name = "value", .Type = "Utf8"}, + }, + .ReplicationConfig = Nothing(), + })); + + env.GetRuntime().Register(CreateStreamCreator( + env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */, + std::make_shared("/Root/Table", "/Root/Replica"), + "Stream", "replicationConsumer", TDuration::Hours(1), std::nullopt, enabled + )); + { + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + env.GetRuntime().Send(ev->Sender, env.GetSender(), new TEvPrivate::TEvAllowCreateStream()); + } + { + auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); + UNIT_ASSERT(ev->Get()->IsSuccess()); + } + + auto desc = env.GetDescription("/Root/Table/Stream/streamImpl"); + + const auto& pqconfig = desc.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig(); + const auto& strategy = pqconfig.GetPartitionStrategy(); + + if (enabled) { + UNIT_ASSERT_EQUAL(strategy.GetPartitionStrategyType(), NKikimrPQ::TPQTabletConfig::CAN_SPLIT); + } else { + UNIT_ASSERT_EQUAL(strategy.GetPartitionStrategyType(), NKikimrPQ::TPQTabletConfig::DISABLED); + } + } + + Y_UNIT_TEST(TopicAutoPartitioning) { + TopicAutoPartitioning(true); + TopicAutoPartitioning(false); + } } }