Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
TKikimrRunner kikimr(TKikimrSettings()
.SetUseRealThreads(false)
.SetEnableAddColumsWithDefaults(true)
.SetDisableMissingDefaultColumnsInBulkUpsert(true)
.SetWithSampleTables(false));

auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );

auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );

auto& runtime = *kikimr.GetTestServer().GetRuntime();

{
Expand Down Expand Up @@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {

auto alterQuery = R"(
ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError`
ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
ADD COLUMN Value3 Int32 DEFAULT 7;
)";

auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); });
Expand Down Expand Up @@ -924,8 +927,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {

auto result = runtime.WaitFuture(alterFuture);
fCompareTable(R"([
[1u;"Changed";"Updated";7];
[2u;"New";"text";7]
[1u;"Changed";"Updated";[7]];
[2u;"New";"text";[7]]
])");
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,5 @@ message TFeatureFlags {
optional bool EnableTopicCompactificationByKey = 196 [default = true];
optional bool EnableCompactionOverloadDetection = 197 [default = true];
reserved 198; // DisableColumnShardBulkUpsertRequireAllColumns
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnablePermissionsExport)
FEATURE_FLAG_SETTER(EnableLocalDBBtreeIndex)
FEATURE_FLAG_SETTER(EnableSharedMetadataAccessorCache)
FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)

#undef FEATURE_FLAG_SETTER
};
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
THashMap<TString, ui32> columnByName;
THashSet<TString> keyColumnsLeft;
THashSet<TString> notNullColumnsLeft = entry.NotNullColumns;
THashSet<TString> defaultColumnsLeft;
SrcColumns.reserve(entry.Columns.size());
THashSet<TString> HasInternalConversion;

Expand All @@ -426,6 +427,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
keyColumnIds[keyOrder] = id;
keyColumnsLeft.insert(name);
}

if (colInfo.IsDefaultFromLiteral()) {
defaultColumnsLeft.insert(name);
}
}

if (entry.ColumnTableInfo) {
Expand Down Expand Up @@ -524,6 +529,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
NotNullColumns.emplace(ci.Name);
}

if (defaultColumnsLeft.contains(ci.Name)) {
defaultColumnsLeft.erase(ci.Name);
}

if (ci.KeyOrder != -1) {
KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};
keyColumnsLeft.erase(ci.Name);
Expand Down Expand Up @@ -608,6 +617,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));
}

if (!defaultColumnsLeft.empty() && UpsertIfExists) {
// some default columns are not specified in the request, but upsert will only update existing rows
// and only the columns specified in the request will be updated; unspecified default columns will not be changed.
defaultColumnsLeft.clear();
}

if (!defaultColumnsLeft.empty()) {
if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) {
return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str()));
}

UploadCounters.OnMissingDefaultColumns();
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str());
}

TConclusionStatus res = TConclusionStatus::Success();
if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {
res = CheckRequiredColumns(entry, *reqColumns);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/upload_rows_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()
WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");
FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");
RequestsBytes = TBase::GetDeriviative("Requests/Bytes");
MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");
}
}
6 changes: 6 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;
NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes;

NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount;

THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;

NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
Expand Down Expand Up @@ -142,6 +144,10 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
PackageSizeCountByRecords->Collect(rowsCount);
RequestsBytes->Add(requestBytes);
}

void OnMissingDefaultColumns() {
MissingDefaultColumnsCount->Inc();
}
};

} // namespace NKikimr
Loading