Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
auto error = TStringBuilder()
<< "Data manipulation queries with column-oriented tables are supported only by API QueryService.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
YqlIssue({}, NYql::TIssuesIds::KIKIMR_PRECONDITION_FAILED, error));
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap);
const i64 priority = isOlap ? 0 : order;

if (isOlap && !(kqpCtx.IsGenericQuery() || (kqpCtx.IsDataQuery() && kqpCtx.Config->AllowOlapDataQuery))) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()),
TStringBuilder() << "Data manipulation queries with column-oriented tables are supported only by API QueryService."));
return false;
}
if (isOlap && !kqpCtx.Config->EnableOlapSink) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()),
TStringBuilder() << "Data manipulation queries with column-oriented tables are disabled."));
return false;
}

if (IsDqPureExpr(node.Input())) {
if (sinkEffect) {
stageInput = RebuildPureStageWithSink(
Expand Down Expand Up @@ -350,6 +361,17 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap);
const i64 priority = isOlap ? 0 : order;

if (isOlap && !(kqpCtx.IsGenericQuery() || (kqpCtx.IsDataQuery() && kqpCtx.Config->AllowOlapDataQuery))) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()),
TStringBuilder() << "Data manipulation queries with column-oriented tables are supported only by API QueryService."));
return false;
}
if (isOlap && !kqpCtx.Config->EnableOlapSink) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()),
TStringBuilder() << "Data manipulation queries with column-oriented tables are disabled."));
return false;
}

if (IsDqPureExpr(node.Input())) {
if (sinkEffect) {
stageInput = RebuildPureStageWithSink(
Expand Down
29 changes: 16 additions & 13 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,32 +1142,35 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor>, IActorExce
QueryState->TxCtx->HasOltpTable |= hasOltpWrite || hasOltpRead;
QueryState->TxCtx->HasTableWrite |= hasOlapWrite || hasOltpWrite;
QueryState->TxCtx->HasTableRead |= hasOlapRead || hasOltpRead;
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !QueryState->TxCtx->EnableHtapTx.value_or(false) && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions that use both row-oriented and column-oriented tables are disabled at current time.");
return false;
}
if (QueryState->TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
&& QueryState->TxCtx->HasOltpTable) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"SnapshotRW can only be used with olap tables.");
return false;
}

if (QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_SCAN
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_AST_SCAN
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT
&& QueryState->TxCtx->HasOlapTable) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
TStringBuilder()
<< "Read from column tables is not supported in Online Read-Only or Stale Read-Only transaction modes. "
<< "Read from column-oriented tables is not supported in Online Read-Only or Stale Read-Only transaction modes. "
<< "Use Serializable or Snapshot Read-Only mode instead.");
return false;
}

if (QueryState->TxCtx->EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
&& QueryState->TxCtx->HasOltpTable) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"SnapshotRW can only be used with column-oriented tables.");
return false;
}

if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !QueryState->TxCtx->EnableHtapTx.value_or(false) && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions that use both row-oriented and column-oriented tables are disabled at current time.");
return false;
}

QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
if (phyQuery.GetForceImmediateEffectsExecution()) {
Counters->ForcedImmediateEffectsExecution->Inc();
Expand Down
46 changes: 33 additions & 13 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2894,10 +2894,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT(TExtLocalHelper(kikimr).TryCreateTable("olapStore", "olapTable_9", 9));
}

void TestOlapUpsert(ui32 numShards) {
void TestOlapUpsert(ui32 numShards, bool allowOlapDataQuery) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
settings.AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
settings.AppConfig.MutableTableServiceConfig()->SetAllowOlapDataQuery(true);
settings.AppConfig.MutableTableServiceConfig()->SetAllowOlapDataQuery(allowOlapDataQuery);
TKikimrRunner kikimr(settings);

auto tableClient = kikimr.GetTableClient();
Expand Down Expand Up @@ -2929,7 +2929,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
(1, 15, 'ccccccc', 23, 1);
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); // TODO: snapshot isolation?

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
if (allowOlapDataQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_C(
result.GetIssues().ToString().contains(
"Data manipulation queries with column-oriented tables are supported only by API QueryService."),
result.GetIssues().ToString());
}

{
TString query = R"(
Expand All @@ -2941,19 +2949,31 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

auto it = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = FormatResultSetYson(it.GetResultSet(0));
Cout << result << Endl;
CompareYson(result, R"([[15;0];[15;1]])");
if (allowOlapDataQuery) {
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = FormatResultSetYson(it.GetResultSet(0));
Cout << result << Endl;
CompareYson(result, R"([[15;0];[15;1]])");
} else {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_C(
result.GetIssues().ToString().contains(
"Data manipulation queries with column-oriented tables are supported only by API QueryService."),
result.GetIssues().ToString());
}
}
}

Y_UNIT_TEST(OlapUpsertImmediate) {
TestOlapUpsert(1);
Y_UNIT_TEST(OlapUpsertOneShard) {
TestOlapUpsert(1, true);
}

Y_UNIT_TEST(OlapUpsertTwoShards) {
TestOlapUpsert(2, true);
}

Y_UNIT_TEST(OlapUpsert) {
TestOlapUpsert(2);
Y_UNIT_TEST(OlapUpsertDisabled) {
TestOlapUpsert(2, false);
}

Y_UNIT_TEST(OlapDeleteImmediate) {
Expand Down Expand Up @@ -4713,7 +4733,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(
result.GetIssues().ToString(),
"Read from column tables is not supported in Online Read-Only or Stale Read-Only transaction modes.",
"Read from column-oriented tables is not supported in Online Read-Only or Stale Read-Only transaction modes.",
result.GetIssues().ToString());
}

Expand All @@ -4724,7 +4744,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(
result.GetIssues().ToString(),
"Read from column tables is not supported in Online Read-Only or Stale Read-Only transaction modes.",
"Read from column-oriented tables is not supported in Online Read-Only or Stale Read-Only transaction modes.",
result.GetIssues().ToString());
}
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5059,7 +5059,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!prepareResult.IsSuccess());
UNIT_ASSERT_C(
prepareResult.GetIssues().ToString().contains("Data manipulation queries do not support column shard tables."),
prepareResult.GetIssues().ToString().contains(
"Data manipulation queries with column-oriented tables are disabled."),
prepareResult.GetIssues().ToString());
}

Expand Down
Loading