diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 70af684f77b9..45cafc0067e0 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -212,14 +212,14 @@ class TKqpDataExecuter : public TKqpExecuterBaseEndRowColumnIds.empty()) { - for (auto keyId : info.GetEndRowColumnIds()) { - ResponseEv->EndRowColumnIds.push_back(keyId); + if (!BatchOperationSettings.Empty() && info.HasBatchOperationMaxKey()) { + if (ResponseEv->BatchOperationMaxKeys.empty()) { + for (auto keyId : info.GetBatchOperationKeyIds()) { + ResponseEv->BatchOperationKeyIds.push_back(keyId); } } - ResponseEv->SerializedEndRows.emplace_back(info.GetSerializedEndRow()); + ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey()); } } else if (data.GetData().template Is()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index d97e44c2fbf7..14610c072ff8 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -39,8 +39,8 @@ struct TEvKqpExecuter { THashSet ParticipantNodes; // For BATCH operations only - TVector SerializedEndRows; - TVector EndRowColumnIds; + TVector BatchOperationMaxKeys; + TVector BatchOperationKeyIds; enum class EExecutionType { Data, diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp index d28447c6e7b6..25c00c501297 100644 --- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp @@ -549,9 +549,9 @@ class TKqpPartitionedExecuter : public TActorBootstrappedSerializedEndRows), std::move(ev->EndRowColumnIds)); - if (maxKey) { - partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(maxKey, /* IsInclusive */ false, /* IsPoint */ false); + TSerializedCellVec minKey = GetMinCellVecKey(std::move(ev->BatchOperationMaxKeys), std::move(ev->BatchOperationKeyIds)); + if (minKey) { + partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(minKey, /* IsInclusive */ false, /* IsPoint */ false); return RetryPartExecution(partInfo); } @@ -619,25 +619,43 @@ class TKqpPartitionedExecuter : public TActorBootstrapped&& rows, TVector&& rowColumnIds) const { - YQL_ENSURE(rowColumnIds.empty() || KeyIds.size() <= rowColumnIds.size()); + bool IsColumnsNeedReorder(const TVector& rowColumnIds) { + if (KeyColumnIdToPos.empty()) { + for (size_t i = 0; i < rowColumnIds.size(); ++i) { + KeyColumnIdToPos[rowColumnIds[i]] = i; + } + } - // Sometimes SchemeCache and KqpReadActor return keys in the different order, so we need to reorder the second ones - std::transform(rows.begin(), rows.end(), rows.begin(), [&](TSerializedCellVec& key) { - TVector newKey; - newKey.reserve(KeyIds.size()); + for (size_t i = 0; i < KeyIds.size(); ++i) { + auto it = KeyColumnIdToPos.find(KeyIds[i]); + YQL_ENSURE(it != KeyColumnIdToPos.end()); - for (auto keyId : KeyIds) { - auto it = std::find(rowColumnIds.begin(), rowColumnIds.end(), keyId); - if (it != rowColumnIds.end()) { - newKey.emplace_back(key.GetCells()[it - rowColumnIds.begin()]); - } else { - YQL_ENSURE(false, "KeyId " << keyId << " not found in readKeyIds"); - } + if (it->second != i) { + return true; } + } - return TSerializedCellVec(std::move(newKey)); - }); + return false; + } + + TSerializedCellVec GetMinCellVecKey(TVector&& rows, TVector&& rowColumnIds) { + if (!rowColumnIds.empty() && IsColumnsNeedReorder(rowColumnIds)) { + std::transform(rows.begin(), rows.end(), rows.begin(), [&](TSerializedCellVec& key) { + TVector newKey; + newKey.reserve(KeyIds.size()); + + for (auto keyId : KeyIds) { + auto it = std::find(rowColumnIds.begin(), rowColumnIds.end(), keyId); + if (it != rowColumnIds.end()) { + newKey.emplace_back(key.GetCells()[it - rowColumnIds.begin()]); + } else { + YQL_ENSURE(false, "KeyId " << keyId << " not found in readKeyIds"); + } + } + + return TSerializedCellVec(std::move(newKey)); + }); + } TSerializedCellVec result; @@ -706,6 +724,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped KeyIds; TVector KeyColumnTypes; + THashMap KeyColumnIdToPos; std::shared_ptr> TablePartitioning; THashMap StartedPartitions; diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2524ef4cf7ba..c36c87512ec6 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -805,12 +805,24 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq state->FillEvRead(*ev, KeyColumnTypes, Settings->GetReverse()); - for (const auto& column : Settings->GetColumns()) { + BatchOperationReadColumns.clear(); + + auto columnsSize = static_cast(Settings->GetColumns().size()); + size_t notSystemColumnsIndex = 0; + for (size_t i = 0; i < columnsSize; ++i) { + const auto& column = Settings->GetColumns()[i]; if (!IsSystemColumn(column.GetId())) { record.AddColumns(column.GetId()); + + if (Settings->GetIsBatch() && column.GetIsPrimary()) { + BatchOperationReadColumns.emplace_back(notSystemColumnsIndex, column.GetId()); + } + notSystemColumnsIndex++; } } + YQL_ENSURE(!Settings->GetIsBatch() || BatchOperationReadColumns.size() == KeyColumnTypes.size()); + if (CollectDuplicateStats) { for (const auto& column : DuplicateCheckExtraColumns) { record.AddColumns(column.Tag); @@ -919,7 +931,8 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq } void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) { - const auto& record = ev->Get()->Record; + auto& msg = *ev->Get(); + const auto& record = msg.Record; auto id = record.GetReadId(); if (!Reads[id] || Reads[id].Finished) { // dropped read @@ -1039,19 +1052,20 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq CA_LOG_D("Taken " << Locks.size() << " locks"); Reads[id].SerializedContinuationToken = record.GetContinuationToken(); - ui64 seqNo = ev->Get()->Record.GetSeqNo(); - Reads[id].RegisterMessage(*ev->Get()); + ui64 seqNo = record.GetSeqNo(); + Reads[id].RegisterMessage(msg); - if (Settings->GetIsBatch() && ev->Get()->GetRowsCount() > 0) { - SerializedEndRow = TSerializedCellVec{ev->Get()->GetCells(ev->Get()->GetRowsCount() - 1)}; + if (Settings->GetIsBatch() && msg.GetRowsCount() > 0) { + auto cells = msg.GetCells(msg.GetRowsCount() - 1); + BatchOperationMaxRow = TSerializedCellVec{cells}; } - ReceivedRowCount += ev->Get()->GetRowsCount(); + ReceivedRowCount += msg.GetRowsCount(); CA_LOG_D(TStringBuilder() << "new data for read #" << id - << " seqno = " << ev->Get()->Record.GetSeqNo() - << " finished = " << ev->Get()->Record.GetFinished()); - CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken())); + << " seqno = " << seqNo + << " finished = " << record.GetFinished()); + CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(&msg) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken())); Results.push({Reads[id].Shard->TabletId, THolder>(ev.Release()), id, seqNo}); NotifyCA(); @@ -1495,15 +1509,19 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq for (auto& lock : BrokenLocks) { resultInfo.AddLocks()->CopyFrom(lock); } + if (Settings->GetIsBatch() && !BatchOperationMaxRow.GetCells().empty()) { + std::vector keyRow; + auto cells = BatchOperationMaxRow.GetCells(); - if (Settings->GetIsBatch() && SerializedEndRow) { - for (const auto& column : Settings->GetColumns()) { - if (!IsSystemColumn(column.GetId())) { - resultInfo.AddEndRowColumnIds(column.GetId()); - } + for (const auto& meta : BatchOperationReadColumns) { + keyRow.push_back(cells[meta.ReadIndex]); + resultInfo.AddBatchOperationKeyIds(meta.ColumnId); } - resultInfo.SetSerializedEndRow(SerializedEndRow.ReleaseBuffer()); + if (!keyRow.empty()) { + YQL_ENSURE(keyRow.size() == KeyColumnTypes.size()); + resultInfo.SetBatchOperationMaxKey(TSerializedCellVec::Serialize(keyRow)); + } } result.PackFrom(resultInfo); @@ -1626,13 +1644,14 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq TVector DuplicateCheckExtraColumns; TVector DuplicateCheckColumnRemap; - struct TReadColumnInfo { - ui32 Id; - NScheme::TTypeInfo TypeInfo; - bool IsPrimary = false; + struct TBatchOperationColumnMeta { + size_t ReadIndex; + ui32 ColumnId; }; - TSerializedCellVec SerializedEndRow; // For BATCH operations only + // For BATCH operations only + TVector BatchOperationReadColumns; + TSerializedCellVec BatchOperationMaxRow; }; diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index a0dee02b4fb3..07a48b335e89 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -235,8 +235,8 @@ message TKqpTransaction { message TEvKqpInputActorResultInfo { repeated NKikimrDataEvents.TLock Locks = 1; - optional string SerializedEndRow = 2; - repeated uint32 EndRowColumnIds = 3; + optional string BatchOperationMaxKey = 2; + repeated uint32 BatchOperationKeyIds = 3; } message TKqpReadRangesSourceSettings {