Skip to content

Commit

Permalink
Merge 94d5e18 into f2efea6
Browse files Browse the repository at this point in the history
  • Loading branch information
avevad committed May 16, 2024
2 parents f2efea6 + 94d5e18 commit 368b453
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,6 @@ class TYtNativeGateway : public IYtGateway {
auto pos = execCtx->Options_.Pos();
try {
auto entry = execCtx->GetOrCreateEntry();
auto deterministicMode = execCtx->Session_->DeterministicMode_;

TString prefix = execCtx->Options_.Prefix();
TString suffix = execCtx->Options_.Suffix();
Expand All @@ -1765,7 +1764,7 @@ class TYtNativeGateway : public IYtGateway {
with_lock(entry->Lock_) {
if (auto p = entry->RangeCache.FindPtr(cacheKey)) {
YQL_CLOG(INFO, ProviderYt) << "Found range in cache for key ('" << prefix << "','" << suffix << "',<filter with size " << filterLambda.Size() << ">) - number of items " << p->size();
return MakeFuture(MakeTableRangeResult(*p, deterministicMode));
return MakeFuture(MakeTableRangeResult(*p));
}
}

Expand Down Expand Up @@ -1890,7 +1889,7 @@ class TYtNativeGateway : public IYtGateway {

auto logCtx = execCtx->LogCtx_;
return ExecCalc(filterLambda, extraUsage, tmpTablePath, execCtx, entry, TNodeResultFactory())
.Apply([logCtx, prefix, suffix, entry, deterministicMode, pos, errors = std::move(errors), cacheKey = std::move(cacheKey)](const TFuture<NYT::TNode>& f) mutable {
.Apply([logCtx, prefix, suffix, entry, pos, errors = std::move(errors), cacheKey = std::move(cacheKey)](const TFuture<NYT::TNode>& f) mutable {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(logCtx);
std::vector<TString> names;
try {
Expand All @@ -1903,20 +1902,20 @@ class TYtNativeGateway : public IYtGateway {
}
names.push_back(n.AsList().at(0).AsString());
}
return MakeTableRangeResult(std::move(names), std::move(cacheKey), prefix, suffix, entry, deterministicMode);
return MakeTableRangeResult(std::move(names), std::move(cacheKey), prefix, suffix, entry);
} catch (...) {
return ResultFromCurrentException<TTableRangeResult>(pos);
}
});
}
return MakeFuture(MakeTableRangeResult(std::move(names), std::move(cacheKey), prefix, suffix, entry, deterministicMode));
return MakeFuture(MakeTableRangeResult(std::move(names), std::move(cacheKey), prefix, suffix, entry));

} catch (...) {
return MakeFuture(ResultFromCurrentException<TTableRangeResult>(pos));
}
}

static TTableRangeResult MakeTableRangeResult(const std::vector<NYT::TRichYPath>& paths, bool deterministicMode) {
static TTableRangeResult MakeTableRangeResult(const std::vector<NYT::TRichYPath>& paths) {
TTableRangeResult rangeRes;
rangeRes.SetSuccess();

Expand All @@ -1929,14 +1928,14 @@ class TYtNativeGateway : public IYtGateway {
canonPath.Ranges = normalizedPath.GetRanges();
rangeRes.Tables.push_back(std::move(canonPath));
}
if (deterministicMode) {
SortBy(rangeRes.Tables, [] (const TCanonizedPath& path) { return path.Path; });
}

SortBy(rangeRes.Tables, [] (const TCanonizedPath& path) { return path.Path; });

return rangeRes;
}

static TTableRangeResult MakeTableRangeResult(std::vector<TString>&& names, std::tuple<TString, TString, TString>&& cacheKey,
TString prefix, TString suffix, const TTransactionCache::TEntry::TPtr& entry, bool deterministicMode)
TString prefix, TString suffix, const TTransactionCache::TEntry::TPtr& entry)
{
TTableRangeResult rangeRes;
rangeRes.SetSuccess();
Expand Down Expand Up @@ -1996,9 +1995,8 @@ class TYtNativeGateway : public IYtGateway {
entry->RangeCache.emplace(std::move(cacheKey), std::move(cached));
}

if (deterministicMode) {
SortBy(rangeRes.Tables, [] (const TCanonizedPath& path) { return path.Path; });
}
SortBy(rangeRes.Tables, [] (const TCanonizedPath& path) { return path.Path; });

return rangeRes;
}

Expand Down

0 comments on commit 368b453

Please sign in to comment.