diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index f8d4b2f5e221..985824e39f09 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -645,6 +645,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink(); kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx(); kqpConfig.EnableStreamWrite = serviceConfig.GetEnableStreamWrite(); + kqpConfig.EnableBatchUpdates = serviceConfig.GetEnableBatchUpdates(); kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode(); kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit(); kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel(); diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 1266744519ca..28c3eb7da0ba 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -964,12 +964,18 @@ TExprNode::TPtr HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx, TExprNode::TPtr HandleUpdateTable(const TKiUpdateTable& update, TExprContext& ctx, TKqpOptimizeContext& kqpCtx, const TKikimrTablesData& tablesData, bool withSystemColumns) { - Y_UNUSED(kqpCtx); const auto& tableData = GetTableData(tablesData, update.DataSink().Cluster(), update.Table().Value()); if (!CheckWriteToIndex(update, tableData, ctx) || !CheckDisabledWriteToUniqIndex(update, tableData, ctx)) { return nullptr; } + const bool allowBatchUpdates = kqpCtx.Config->EnableBatchUpdates && kqpCtx.Config->EnableOltpSink; + if (!allowBatchUpdates && update.IsBatch() == "true") { + const TString err = "BATCH operations are not supported at the current time."; + ctx.AddError(YqlIssue(ctx.GetPosition(update.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, err)); + return nullptr; + } + if (HasIndexesToWrite(tableData)) { return BuildUpdateTableWithIndex(update, tableData, withSystemColumns, ctx).Ptr(); } else { @@ -980,12 +986,18 @@ TExprNode::TPtr HandleUpdateTable(const TKiUpdateTable& update, TExprContext& ct TExprNode::TPtr HandleDeleteTable(const TKiDeleteTable& del, TExprContext& ctx, TKqpOptimizeContext& kqpCtx, const TKikimrTablesData& tablesData, bool withSystemColumns) { - Y_UNUSED(kqpCtx); auto& tableData = GetTableData(tablesData, del.DataSink().Cluster(), del.Table().Value()); if (!CheckWriteToIndex(del, tableData, ctx) || !CheckDisabledWriteToUniqIndex(del, tableData, ctx)) { return nullptr; } + const bool allowBatchUpdates = kqpCtx.Config->EnableBatchUpdates && kqpCtx.Config->EnableOltpSink; + if (!allowBatchUpdates && del.IsBatch() == "true") { + const TString err = "BATCH operations are not supported at the current time."; + ctx.AddError(YqlIssue(ctx.GetPosition(del.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, err)); + return nullptr; + } + if (HasIndexesToWrite(tableData)) { return BuildDeleteTableWithIndex(del, tableData, withSystemColumns, ctx).Ptr(); } else { diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 943257194619..0f10deb1c79a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -194,6 +194,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableOltpSink = false; bool EnableHtapTx = false; bool EnableStreamWrite = false; + bool EnableBatchUpdates = false; NKikimrConfig::TTableServiceConfig_EBlockChannelsMode BlockChannelsMode; bool EnableSpilling = true; ui32 DefaultCostBasedOptimizationLevel = 4; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 43c330a05138..ad2414c81a58 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1394,7 +1394,7 @@ class TKqpSessionActor : public TActorBootstrapped { return; } - if (QueryState->TxCtx->EnableOltpSink.value_or(false) && isBatchQuery && (!tx || !tx->IsLiteralTx())) { + if (isBatchQuery && (!tx || !tx->IsLiteralTx())) { ExecutePartitioned(tx); } else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) { ExecuteDeferredEffectsImmediately(tx); @@ -1408,7 +1408,12 @@ class TKqpSessionActor : public TActorBootstrapped { void ExecutePartitioned(const TKqpPhyTxHolder::TConstPtr& tx) { if (!Settings.TableService.GetEnableBatchUpdates()) { return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, - "BATCH operations are disabled by EnableBatchUpdates flag."); + "BATCH operations are not supported at the current time."); + } + + if (!QueryState->TxCtx->EnableOltpSink.value_or(false)) { + return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, + "BATCH operations are not supported at the current time."); } if (QueryState->TxCtx->HasOlapTable) { @@ -1416,9 +1421,8 @@ class TKqpSessionActor : public TActorBootstrapped { "BATCH operations are not supported for column tables at the current time."); } - if (QueryState->HasTxControl()) { - NYql::TIssues issues; - return ReplyQueryError(::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST, + if (!QueryState->HasImplicitTx()) { + return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "BATCH operation can be executed only in the implicit transaction mode."); } diff --git a/ydb/core/kqp/ut/batch_operations/kqp_batch_delete_ut.cpp b/ydb/core/kqp/ut/batch_operations/kqp_batch_delete_ut.cpp index d4bf0c759484..9bd067c4d9b9 100644 --- a/ydb/core/kqp/ut/batch_operations/kqp_batch_delete_ut.cpp +++ b/ydb/core/kqp/ut/batch_operations/kqp_batch_delete_ut.cpp @@ -13,11 +13,11 @@ using namespace NYdb::NQuery; namespace { -NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10) { +NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10, bool enableOltpSink = true, bool enableBatchUpdates = true) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableOlapSink(true); - app.MutableTableServiceConfig()->SetEnableOltpSink(true); - app.MutableTableServiceConfig()->SetEnableBatchUpdates(true); + app.MutableTableServiceConfig()->SetEnableOltpSink(enableOltpSink); + app.MutableTableServiceConfig()->SetEnableBatchUpdates(enableBatchUpdates); app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetMaxBatchSize(maxBatchSize); app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetPartitionExecutionLimit(partitionLimit); return app; @@ -578,10 +578,31 @@ Y_UNIT_TEST_SUITE(KqpBatchDelete) { )"); auto result = session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operation can be executed only in the implicit transaction mode.", result.GetIssues().ToString()); } } + + Y_UNIT_TEST_QUAD(DisableFlags, UseSink, UseBatchUpdates) { + TKikimrRunner kikimr(GetAppConfig(10000, 10, UseSink, UseBatchUpdates)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + { + auto query = Q_(R"( + BATCH DELETE FROM KeyValue + WHERE Key >= 3; + )"); + + auto result = session.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync(); + if (UseSink && UseBatchUpdates) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operations are not supported at the current time.", result.GetIssues().ToString()); + } + } + } } } // namespace NKqp diff --git a/ydb/core/kqp/ut/batch_operations/kqp_batch_update_ut.cpp b/ydb/core/kqp/ut/batch_operations/kqp_batch_update_ut.cpp index 4e5a07e3f3c1..d302bed4e382 100644 --- a/ydb/core/kqp/ut/batch_operations/kqp_batch_update_ut.cpp +++ b/ydb/core/kqp/ut/batch_operations/kqp_batch_update_ut.cpp @@ -13,11 +13,11 @@ using namespace NYdb::NQuery; namespace { -NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10) { +NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10, bool enableOltpSink = true, bool enableBatchUpdates = true) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableOlapSink(true); - app.MutableTableServiceConfig()->SetEnableOltpSink(true); - app.MutableTableServiceConfig()->SetEnableBatchUpdates(true); + app.MutableTableServiceConfig()->SetEnableOltpSink(enableOltpSink); + app.MutableTableServiceConfig()->SetEnableBatchUpdates(enableBatchUpdates); app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetMaxBatchSize(maxBatchSize); app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetPartitionExecutionLimit(partitionLimit); return app; @@ -717,10 +717,32 @@ Y_UNIT_TEST_SUITE(KqpBatchUpdate) { )"); auto result = session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operation can be executed only in the implicit transaction mode.", result.GetIssues().ToString()); } } + + Y_UNIT_TEST_QUAD(DisableFlags, UseSink, UseBatchUpdates) { + TKikimrRunner kikimr(GetAppConfig(10000, 10, UseSink, UseBatchUpdates)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + { + auto query = Q_(R"( + BATCH UPDATE KeyValue + SET Value = "None" + WHERE Key IN [1, 3, 5]; + )"); + + auto result = session.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync(); + if (UseSink && UseBatchUpdates) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED); + UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operations are not supported at the current time.", result.GetIssues().ToString()); + } + } + } } } // namespace NKqp