Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

if (!BatchOperationSettings.Empty() && info.HasSerializedEndRow()) {
if (ResponseEv->EndRowColumnIds.empty()) {
for (auto keyId : info.GetEndRowColumnIds()) {
ResponseEv->EndRowColumnIds.push_back(keyId);
if (!BatchOperationSettings.Empty() && info.HasBatchOperationMaxKey()) {
if (ResponseEv->BatchOperationMaxKeys.empty()) {
for (auto keyId : info.GetBatchOperationKeyIds()) {
ResponseEv->BatchOperationKeyIds.push_back(keyId);
}
}

ResponseEv->SerializedEndRows.emplace_back(info.GetSerializedEndRow());
ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey());
}
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ struct TEvKqpExecuter {
THashSet<ui32> ParticipantNodes;

// For BATCH operations only
TVector<TSerializedCellVec> SerializedEndRows;
TVector<ui32> EndRowColumnIds;
TVector<TSerializedCellVec> BatchOperationMaxKeys;
TVector<ui32> BatchOperationKeyIds;

enum class EExecutionType {
Data,
Expand Down
55 changes: 37 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,9 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
}

void OnSuccessResponse(TBatchPartitionInfo::TPtr& partInfo, TEvKqpExecuter::TEvTxResponse* ev) {
TSerializedCellVec maxKey = GetMinCellVecKey(std::move(ev->SerializedEndRows), std::move(ev->EndRowColumnIds));
if (maxKey) {
partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(maxKey, /* IsInclusive */ false, /* IsPoint */ false);
TSerializedCellVec minKey = GetMinCellVecKey(std::move(ev->BatchOperationMaxKeys), std::move(ev->BatchOperationKeyIds));
if (minKey) {
partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(minKey, /* IsInclusive */ false, /* IsPoint */ false);
return RetryPartExecution(partInfo);
}

Expand Down Expand Up @@ -619,25 +619,43 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
return StartedPartitions.empty();
}

TSerializedCellVec GetMinCellVecKey(TVector<TSerializedCellVec>&& rows, TVector<ui32>&& rowColumnIds) const {
YQL_ENSURE(rowColumnIds.empty() || KeyIds.size() <= rowColumnIds.size());
bool IsColumnsNeedReorder(const TVector<ui32>& rowColumnIds) {
if (KeyColumnIdToPos.empty()) {
for (size_t i = 0; i < rowColumnIds.size(); ++i) {
KeyColumnIdToPos[rowColumnIds[i]] = i;
}
}

// Sometimes SchemeCache and KqpReadActor return keys in the different order, so we need to reorder the second ones
std::transform(rows.begin(), rows.end(), rows.begin(), [&](TSerializedCellVec& key) {
TVector<TCell> newKey;
newKey.reserve(KeyIds.size());
for (size_t i = 0; i < KeyIds.size(); ++i) {
auto it = KeyColumnIdToPos.find(KeyIds[i]);
YQL_ENSURE(it != KeyColumnIdToPos.end());

for (auto keyId : KeyIds) {
auto it = std::find(rowColumnIds.begin(), rowColumnIds.end(), keyId);
if (it != rowColumnIds.end()) {
newKey.emplace_back(key.GetCells()[it - rowColumnIds.begin()]);
} else {
YQL_ENSURE(false, "KeyId " << keyId << " not found in readKeyIds");
}
if (it->second != i) {
return true;
}
}

return TSerializedCellVec(std::move(newKey));
});
return false;
}

TSerializedCellVec GetMinCellVecKey(TVector<TSerializedCellVec>&& rows, TVector<ui32>&& rowColumnIds) {
if (!rowColumnIds.empty() && IsColumnsNeedReorder(rowColumnIds)) {
std::transform(rows.begin(), rows.end(), rows.begin(), [&](TSerializedCellVec& key) {
TVector<TCell> newKey;
newKey.reserve(KeyIds.size());

for (auto keyId : KeyIds) {
auto it = std::find(rowColumnIds.begin(), rowColumnIds.end(), keyId);
if (it != rowColumnIds.end()) {
newKey.emplace_back(key.GetCells()[it - rowColumnIds.begin()]);
} else {
YQL_ENSURE(false, "KeyId " << keyId << " not found in readKeyIds");
}
}

return TSerializedCellVec(std::move(newKey));
});
}

TSerializedCellVec result;

Expand Down Expand Up @@ -706,6 +724,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute

TVector<ui32> KeyIds;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
THashMap<ui32, size_t> KeyColumnIdToPos;

std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> TablePartitioning;
THashMap<TPartitionIndex, TBatchPartitionInfo::TPtr> StartedPartitions;
Expand Down
61 changes: 40 additions & 21 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,12 +805,24 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

state->FillEvRead(*ev, KeyColumnTypes, Settings->GetReverse());

for (const auto& column : Settings->GetColumns()) {
BatchOperationReadColumns.clear();

auto columnsSize = static_cast<size_t>(Settings->GetColumns().size());
size_t notSystemColumnsIndex = 0;
for (size_t i = 0; i < columnsSize; ++i) {
const auto& column = Settings->GetColumns()[i];
if (!IsSystemColumn(column.GetId())) {
record.AddColumns(column.GetId());

if (Settings->GetIsBatch() && column.GetIsPrimary()) {
BatchOperationReadColumns.emplace_back(notSystemColumnsIndex, column.GetId());
}
notSystemColumnsIndex++;
}
}

YQL_ENSURE(!Settings->GetIsBatch() || BatchOperationReadColumns.size() == KeyColumnTypes.size());

if (CollectDuplicateStats) {
for (const auto& column : DuplicateCheckExtraColumns) {
record.AddColumns(column.Tag);
Expand Down Expand Up @@ -919,7 +931,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
}

void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) {
const auto& record = ev->Get()->Record;
auto& msg = *ev->Get();
const auto& record = msg.Record;
auto id = record.GetReadId();
if (!Reads[id] || Reads[id].Finished) {
// dropped read
Expand Down Expand Up @@ -1039,19 +1052,20 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
CA_LOG_D("Taken " << Locks.size() << " locks");
Reads[id].SerializedContinuationToken = record.GetContinuationToken();

ui64 seqNo = ev->Get()->Record.GetSeqNo();
Reads[id].RegisterMessage(*ev->Get());
ui64 seqNo = record.GetSeqNo();
Reads[id].RegisterMessage(msg);

if (Settings->GetIsBatch() && ev->Get()->GetRowsCount() > 0) {
SerializedEndRow = TSerializedCellVec{ev->Get()->GetCells(ev->Get()->GetRowsCount() - 1)};
if (Settings->GetIsBatch() && msg.GetRowsCount() > 0) {
auto cells = msg.GetCells(msg.GetRowsCount() - 1);
BatchOperationMaxRow = TSerializedCellVec{cells};
}

ReceivedRowCount += ev->Get()->GetRowsCount();
ReceivedRowCount += msg.GetRowsCount();

CA_LOG_D(TStringBuilder() << "new data for read #" << id
<< " seqno = " << ev->Get()->Record.GetSeqNo()
<< " finished = " << ev->Get()->Record.GetFinished());
CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));
<< " seqno = " << seqNo
<< " finished = " << record.GetFinished());
CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(&msg) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));

Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id, seqNo});
NotifyCA();
Expand Down Expand Up @@ -1495,15 +1509,19 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
for (auto& lock : BrokenLocks) {
resultInfo.AddLocks()->CopyFrom(lock);
}
if (Settings->GetIsBatch() && !BatchOperationMaxRow.GetCells().empty()) {
std::vector<TCell> keyRow;
auto cells = BatchOperationMaxRow.GetCells();

if (Settings->GetIsBatch() && SerializedEndRow) {
for (const auto& column : Settings->GetColumns()) {
if (!IsSystemColumn(column.GetId())) {
resultInfo.AddEndRowColumnIds(column.GetId());
}
for (const auto& meta : BatchOperationReadColumns) {
keyRow.push_back(cells[meta.ReadIndex]);
resultInfo.AddBatchOperationKeyIds(meta.ColumnId);
}

resultInfo.SetSerializedEndRow(SerializedEndRow.ReleaseBuffer());
if (!keyRow.empty()) {
YQL_ENSURE(keyRow.size() == KeyColumnTypes.size());
resultInfo.SetBatchOperationMaxKey(TSerializedCellVec::Serialize(keyRow));
}
}

result.PackFrom(resultInfo);
Expand Down Expand Up @@ -1626,13 +1644,14 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
TVector<TResultColumn> DuplicateCheckExtraColumns;
TVector<ui32> DuplicateCheckColumnRemap;

struct TReadColumnInfo {
ui32 Id;
NScheme::TTypeInfo TypeInfo;
bool IsPrimary = false;
struct TBatchOperationColumnMeta {
size_t ReadIndex;
ui32 ColumnId;
};

TSerializedCellVec SerializedEndRow; // For BATCH operations only
// For BATCH operations only
TVector<TBatchOperationColumnMeta> BatchOperationReadColumns;
TSerializedCellVec BatchOperationMaxRow;
};


Expand Down
4 changes: 2 additions & 2 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ message TKqpTransaction {

message TEvKqpInputActorResultInfo {
repeated NKikimrDataEvents.TLock Locks = 1;
optional string SerializedEndRow = 2;
repeated uint32 EndRowColumnIds = 3;
optional string BatchOperationMaxKey = 2;
repeated uint32 BatchOperationKeyIds = 3;
}

message TKqpReadRangesSourceSettings {
Expand Down
Loading