Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
}

class TConflictWrite : public TTableDataModificationTester {
std::string WriteOperation = "insert";

public:
TConflictWrite(const std::string& writeOperation) : WriteOperation(writeOperation) {}

protected:
void DoExecute() override {
auto client = Kikimr->GetQueryClient();
Expand All @@ -76,7 +81,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
SELECT * FROM `/Root/KV`;
)"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto tx1 = result.GetTransaction();
UNIT_ASSERT(tx1);

Expand All @@ -86,12 +90,39 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
)"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = session1.ExecuteQuery(Q_(R"(
UPSERT INTO `/Root/Test` (Group, Name, Comment)
VALUES (1U, "Paul", "Changed");
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
// Keys changed since taking snapshot.
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
if (WriteOperation == "insert") {
result = session1.ExecuteQuery(Q_(R"(
INSERT INTO `/Root/Test` (Group, Name, Comment)
VALUES (1U, "Paul", "Changed");
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
} else if (WriteOperation == "upsert_partial") {
result = session1.ExecuteQuery(Q_(R"(
UPSERT INTO `/Root/Test` (Group, Name, Comment)
VALUES (1U, "Paul", "Changed");
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
} else if (WriteOperation == "upsert_full") {
result = session1.ExecuteQuery(Q_(R"(
UPSERT INTO `/Root/Test` (Group, Name, Comment, Amount)
VALUES (1U, "Paul", "Changed", 301ul);
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
} else if (WriteOperation == "replace") {
result = session1.ExecuteQuery(Q_(R"(
REPLACE INTO `/Root/Test` (Group, Name, Comment)
VALUES (1U, "Paul", "Changed");
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
} else if (WriteOperation == "delete") {
result = session1.ExecuteQuery(Q_(R"(
DELETE FROM `/Root/Test` WHERE Name == "Paul";
)"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
} else {
UNIT_ASSERT(false);
}

if (WriteOperation == "insert") {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
}

result = session2.ExecuteQuery(Q_(R"(
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
Expand All @@ -103,22 +134,45 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {

Y_UNIT_TEST(TConflictWriteOltp) {
return;
TConflictWrite tester;
TConflictWrite tester("upsert_partial");
tester.SetIsOlap(false);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOltpNoSink) {
return;
TConflictWrite tester;
TConflictWrite tester("upsert_partial");
tester.SetIsOlap(false);
tester.SetDisableSinks(true);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOlap) {
return;
TConflictWrite tester;
Y_UNIT_TEST(TConflictWriteOlapInsert) {
TConflictWrite tester("insert");
tester.SetIsOlap(true);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOlapUpsertPartial) {
TConflictWrite tester("upsert_partial");
tester.SetIsOlap(true);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOlapUpsertFull) {
TConflictWrite tester("upsert_full");
tester.SetIsOlap(true);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOlapReplace) {
TConflictWrite tester("replace");
tester.SetIsOlap(true);
tester.Execute();
}

Y_UNIT_TEST(TConflictWriteOlapDelete) {
TConflictWrite tester("delete");
tester.SetIsOlap(true);
tester.Execute();
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ namespace TEvColumnShard {
YDB_READONLY_DEF(NColumnShard::TUnifiedPathId, PathId);
YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
YDB_READONLY_DEF(std::optional<ui64>, LockId);
YDB_READONLY_DEF(bool, ReadOnlyConflicts);
YDB_ACCESSOR(bool, Reverse, false);
YDB_ACCESSOR(ui32, ItemsLimit, 0);
YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
Expand All @@ -118,10 +119,11 @@ namespace TEvColumnShard {
ColumnIds.emplace_back(id);
}

TEvInternalScan(const NColumnShard::TUnifiedPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional<ui64> lockId)
TEvInternalScan(const NColumnShard::TUnifiedPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional<ui64> lockId, const bool readOnlyConflicts)
: PathId(pathId)
, Snapshot(snapshot)
, LockId(lockId)
, ReadOnlyConflicts(readOnlyConflicts)
{
AFL_VERIFY(Snapshot.Valid());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
} else {
lockId = record.GetLockTxId();
}
// Serializable by default
NKikimrDataEvents::ELockMode lockMode = record.HasLockMode() ? record.GetLockMode() : NKikimrDataEvents::OPTIMISTIC;

const bool isBulk = operation.HasIsBulk() && operation.GetIsBulk();

Expand Down Expand Up @@ -600,7 +602,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

LWPROBE(EvWrite, TabletID(), source.ToString(), cookie, record.GetTxId(), writeTimeout.value_or(TDuration::Max()), arrowData->GetSize(), "", true, operation.GetIsBulk(), "", "");

WriteTasksQueue->Enqueue(TWriteTask(arrowData, schema, source, ev->Recipient, granuleShardingVersionId, pathId, cookie, mvccSnapshot, lockId, record.GetLockNodeId(), *mType, behaviour, writeTimeout, record.GetTxId(),
WriteTasksQueue->Enqueue(TWriteTask(arrowData, schema, source, ev->Recipient, granuleShardingVersionId, pathId, cookie, mvccSnapshot, lockId, record.GetLockNodeId(), lockMode, *mType, behaviour, writeTimeout, record.GetTxId(),
isBulk, record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional<ui64>()));
WriteTasksQueue->Drain(false, ctx);
}
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/columnshard/engines/reader/common/description.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ class TReadDescription {
ScanCursor = cursor;
}

void SetLock(std::optional<ui64> lockId, std::optional<NKikimrDataEvents::ELockMode> lockMode, const NColumnShard::TLockFeatures* lock) {
void SetLock(std::optional<ui64> lockId, std::optional<NKikimrDataEvents::ELockMode> lockMode, const NColumnShard::TLockFeatures* lock, const bool readOnlyConflicts) {
LockId = lockId;
LockMode = lockMode;

auto snapshotIsolation = lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
auto snapshotIsolation = lockId.has_value() && lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;

// always true for now, will be false for reads that check only conflicts (comming soon with Snapshot Isolation)
readNonconflictingPortions = true;
readNonconflictingPortions = !readOnlyConflicts;

// do not check conflicts for Snapshot isolated txs or txs with no lock
readConflictingPortions = LockId.has_value() && !snapshotIsolation;
readConflictingPortions = (LockId.has_value() && !snapshotIsolation) || readOnlyConflicts;

// if we need conflicting portions, we just take all uncommitted portions (from other txs and own)
// but if we do not need conflicting portions, we need to remember own portions ids,
// so that we can pick only own uncommitted portions for the read
if (readNonconflictingPortions && !readConflictingPortions && lock != nullptr && lock->GetWriteOperations().size() > 0) {
ownPortions = THashSet<TInsertWriteId>();
for (auto& writeOperation : lock->GetWriteOperations()) {
Expand All @@ -87,6 +89,10 @@ class TReadDescription {
if (ownPortions.has_value() && !ownPortions->empty()) {
AFL_VERIFY(readNonconflictingPortions);
}
// we do not have cases (at the moment) when we need to read only conflicts for a scan with no transaction
if (!LockId.has_value()) {
AFL_VERIFY(!readOnlyConflicts);
}
}

TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
read.SetLock(
request.GetLockId(),
NKikimrDataEvents::OPTIMISTIC,
request.GetLockId().has_value() ? Self->GetOperationsManager().GetLockOptional(request.GetLockId().value()) : nullptr
request.GetLockId().has_value() ? Self->GetOperationsManager().GetLockOptional(request.GetLockId().value()) : nullptr,
request.GetReadOnlyConflicts()
);
read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES;
std::unique_ptr<IScannerConstructor> scannerConstructor(new NSimple::TIndexScannerConstructor(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ void TTxScan::Complete(const TActorContext& ctx) {
read.SetLock(
request.HasLockTxId() ? std::make_optional(request.GetLockTxId()) : std::nullopt,
request.HasLockMode() ? std::make_optional(request.GetLockMode()) : std::nullopt,
Self->GetOperationsManager().GetLockOptional(request.GetLockTxId())
request.HasLockTxId() ? Self->GetOperationsManager().GetLockOptional(request.GetLockTxId()) : nullptr,
false
);

{
Expand Down
44 changes: 27 additions & 17 deletions ydb/core/tx/columnshard/operations/batch_builder/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,35 @@ void TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
auto batch = preparedConclusion.DetachResult();

std::shared_ptr<IMerger> merger;
bool readOnlyConflicts = false;
auto handleReplace = [&]() -> bool {
if (Context.GetNoTxWrite()) {
NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(),
new NWritingPortions::TEvAddInsertedDataToBuffer(
std::make_shared<NEvWrite::TWriteData>(WriteData), batch, std::make_shared<TWritingContext>(Context)));
return false;
} else if (Context.IsSnapshotIsolated()) {
merger = std::make_shared<TInsertMerger>(batch, Context.GetActualSchema());
readOnlyConflicts = true;
return true;
} else {
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildSlicesTask>(std::move(WriteData), batch.GetContainer(), Context);
NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task);
return false;
}
};
switch (WriteData.GetWriteMeta().GetModificationType()) {
case NEvWrite::EModificationType::Upsert: {
const std::vector<std::shared_ptr<arrow::Field>> defaultFields =
Context.GetActualSchema()->GetAbsentFields(batch.GetContainer()->schema());
if (defaultFields.empty()) {
if (!Context.GetNoTxWrite()) {
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildSlicesTask>(std::move(WriteData), batch.GetContainer(), Context);
NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task);
auto proceed = handleReplace();
if (proceed) {
break;
} else {
NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(),
new NWritingPortions::TEvAddInsertedDataToBuffer(
std::make_shared<NEvWrite::TWriteData>(WriteData), batch, std::make_shared<TWritingContext>(Context)));
return;
}
return;
} else {
auto insertionConclusion = Context.GetActualSchema()->CheckColumnsDefault(defaultFields);
auto conclusion = Context.GetActualSchema()->BuildDefaultBatch(Context.GetActualSchema()->GetIndexInfo().ArrowSchema(), 1, true);
Expand All @@ -90,20 +104,16 @@ void TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
}
case NEvWrite::EModificationType::Replace:
case NEvWrite::EModificationType::Delete: {
if (!Context.GetNoTxWrite()) {
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildSlicesTask>(std::move(WriteData), batch.GetContainer(), Context);
NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task);
auto proceed = handleReplace();
if (proceed) {
break;
} else {
NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(),
new NWritingPortions::TEvAddInsertedDataToBuffer(
std::make_shared<NEvWrite::TWriteData>(WriteData), batch, std::make_shared<TWritingContext>(Context)));
return;
}
return;
}
}
std::shared_ptr<NDataReader::IRestoreTask> task =
std::make_shared<NOlap::TModificationRestoreTask>(std::move(WriteData), merger, batch, Context);
std::make_shared<NOlap::TModificationRestoreTask>(std::move(WriteData), merger, batch, Context, readOnlyConflicts);
NActors::TActivationContext::AsActorContext().Register(new NDataReader::TActor(task));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NKikimr::NOlap {

std::unique_ptr<TEvColumnShard::TEvInternalScan> TModificationRestoreTask::DoBuildRequestInitiator() const {
const auto& writeMetaData = WriteData.GetWriteMeta();
auto request = std::make_unique<TEvColumnShard::TEvInternalScan>(writeMetaData.GetPathId(), Context.GetApplyToSnapshot(), Context.GetLockId());
auto request = std::make_unique<TEvColumnShard::TEvInternalScan>(writeMetaData.GetPathId(), Context.GetApplyToSnapshot(), Context.GetLockId(), ReadOnlyConflicts);
request->TaskIdentifier = GetTaskId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_start")(
"count", IncomingData.HasContainer() ? IncomingData->num_rows() : 0)("task_id", WriteData.GetWriteMeta().GetId());
Expand Down Expand Up @@ -61,13 +61,14 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() {
return TConclusionStatus::Success();
}

TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger, const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context)
TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger, const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context, const bool readOnlyConflicts)
: TBase(context.GetTabletId(), context.GetTabletActorId(),
writeData.GetWriteMeta().GetId() + "::" + ::ToString(writeData.GetWriteMeta().GetWriteId()))
, WriteData(std::move(writeData))
, Merger(merger)
, IncomingData(incomingData)
, Context(context) {
, Context(context)
, ReadOnlyConflicts(readOnlyConflicts) {
AFL_VERIFY(context.GetApplyToSnapshot().Valid());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn
std::shared_ptr<IMerger> Merger;
NArrow::TContainerWithIndexes<arrow::RecordBatch> IncomingData;
const TWritingContext Context;
bool ReadOnlyConflicts;
virtual std::unique_ptr<TEvColumnShard::TEvInternalScan> DoBuildRequestInitiator() const override;

virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr<arrow::Table>& data) override;
Expand All @@ -35,7 +36,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn
virtual TDuration GetTimeout() const override;

TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger,
const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context);
const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context, const bool readOnlyConflicts);
};

} // namespace NKikimr::NOlap
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/operations/common/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TWritingContext {
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TWriteCounters>, WritingCounters);
YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero());
YDB_READONLY_DEF(std::optional<ui64>, LockId);
YDB_READONLY_DEF(std::optional<NKikimrDataEvents::ELockMode>, LockMode);
const std::shared_ptr<TActivityChecker> ActivityChecker;
YDB_READONLY(bool, NoTxWrite, false);
YDB_READONLY(bool, IsBulk, false);
Expand All @@ -63,14 +64,18 @@ class TWritingContext {
bool IsActive() const {
return ActivityChecker->IsActive();
}


bool IsSnapshotIsolated() const {
return LockId.has_value() && LockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
}

TString GetErrorMessage() const {
return ActivityChecker->GetErrorMessage();
}

TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr<ISnapshotSchema>& actualSchema,
const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<NColumnShard::TSplitterCounters>& splitterCounters,
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot, const std::optional<ui64>& lockId,
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot, const std::optional<ui64>& lockId, const std::optional<NKikimrDataEvents::ELockMode>& lockMode,
const std::shared_ptr<TActivityChecker>& activityChecker, const bool noTxWrite, const NActors::TActorId& bufferizationPortionsActorId,
const bool isBulk)
: TabletId(tabletId)
Expand All @@ -82,6 +87,7 @@ class TWritingContext {
, WritingCounters(writingCounters)
, ApplyToSnapshot(applyToSnapshot)
, LockId(lockId)
, LockMode(lockMode)
, ActivityChecker(activityChecker)
, NoTxWrite(noTxWrite)
, IsBulk(isBulk)
Expand Down
Loading
Loading