Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,20 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo

TDqSolomonMetricsQueueActor(
ui64 consumersCount,
TDqSolomonReadParams&& readParams,
ui64 pageSize,
ui64 prefetchSize,
ui64 batchCountLimit,
TDqSolomonReadParams&& readParams,
TDuration truePointsFindRange,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
: CurrentPage(0)
, ConsumersCount(consumersCount)
, ReadParams(std::move(readParams))
, PageSize(pageSize)
, PrefetchSize(prefetchSize)
, BatchCountLimit(batchCountLimit)
, ReadParams(std::move(readParams))
, TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange)
, TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange)
, CredentialsProvider(credentialsProvider)
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
{}
Expand Down Expand Up @@ -277,7 +280,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
std::map<TString, TString> selectors(ReadParams.Source.GetSelectors().begin(), ReadParams.Source.GetSelectors().end());
ListingFuture =
SolomonClient
->ListMetrics(selectors, PageSize, CurrentPage++)
->ListMetrics(selectors, TrueRangeFrom, TrueRangeTo, PageSize, CurrentPage++)
.Subscribe([actorSystem, selfId = SelfId()](
NThreading::TFuture<NSo::TListMetricsResponse> future) -> void {
try {
Expand Down Expand Up @@ -419,10 +422,12 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
std::vector<NSo::MetricQueue::TMetric> Metrics;
TMaybe<TString> MaybeIssues;

ui64 PageSize;
ui64 PrefetchSize;
ui64 BatchCountLimit;
const TDqSolomonReadParams ReadParams;
const ui64 PageSize;
const ui64 PrefetchSize;
const ui64 BatchCountLimit;
const TInstant TrueRangeFrom;
const TInstant TrueRangeTo;
const std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
const NSo::ISolomonAccessorClient::TPtr SolomonClient;

Expand Down Expand Up @@ -455,7 +460,12 @@ NActors::IActor* CreateSolomonMetricsQueueActor(
batchCountLimit = FromString<ui64>(it->second);
}

return new TDqSolomonMetricsQueueActor(consumersCount, pageSize, prefetchSize, batchCountLimit, std::move(readParams), credentialsProvider);
ui64 truePointsFindRange = 301;
if (auto it = settings.find("truePointsFindRange"); it != settings.end()) {
truePointsFindRange = FromString<ui64>(it->second);
}

return new TDqSolomonMetricsQueueActor(consumersCount, std::move(readParams), pageSize, prefetchSize, batchCountLimit, TDuration::Seconds(truePointsFindRange), credentialsProvider);
}

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
TDqSolomonReadParams&& readParams,
ui64 computeActorBatchSize,
TDuration truePointsFindRange,
ui64 metricsQueueConsumersCountDelta,
ui64 maxInflight,
NActors::TActorId metricsQueueActor,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
Expand All @@ -93,7 +95,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
, LogPrefix(TStringBuilder() << "TxId: " << TxId << ", TDqSolomonReadActor: ")
, ReadParams(std::move(readParams))
, ComputeActorBatchSize(computeActorBatchSize)
, TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange)
, TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange)
, MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta)
, MaxInflight(maxInflight)
, MetricsQueueActor(metricsQueueActor)
, CredentialsProvider(credentialsProvider)
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
Expand All @@ -110,9 +115,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}
return ERetryErrorClass::NoRetry;
},
TDuration::MilliSeconds(25),
TDuration::MilliSeconds(50),
TDuration::MilliSeconds(200),
TDuration::MilliSeconds(500),
TDuration::MilliSeconds(1000),
10
);

Expand All @@ -125,8 +130,14 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

void FillSystemColumnPositionIndex() {
YQL_ENSURE(ReadParams.Source.GetLabelNameAliases().size() == ReadParams.Source.GetLabelNames().size());

for (int i = 0; i < ReadParams.Source.GetLabelNameAliases().size(); ++i) {
AliasIndex[ReadParams.Source.GetLabelNameAliases()[i]] = ReadParams.Source.GetLabelNames()[i];
}

std::vector<TString> names(ReadParams.Source.GetSystemColumns().begin(), ReadParams.Source.GetSystemColumns().end());
names.insert(names.end(), ReadParams.Source.GetLabelNames().begin(), ReadParams.Source.GetLabelNames().end());
names.insert(names.end(), ReadParams.Source.GetLabelNameAliases().begin(), ReadParams.Source.GetLabelNameAliases().end());
std::sort(names.begin(), names.end());
size_t index = 0;
for (auto& n : names) {
Expand Down Expand Up @@ -241,7 +252,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
ParsePointsCount(metric, pointsCount);
CompletedMetricsCount++;

TryRequestData();
while (TryRequestData()) {}
}

void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
Expand Down Expand Up @@ -317,6 +328,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics, finished = " << LastMetricProcessed());

TInstant from = TInstant::Seconds(ReadParams.Source.GetFrom());
TInstant to = TInstant::Seconds(ReadParams.Source.GetTo());

for (const auto& data : MetricsData) {
auto& labels = data.Metric.Labels;

Expand All @@ -331,6 +345,11 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
auto& type = data.Metric.Type;

for (size_t i = 0; i < timestamps.size(); ++i){
TInstant timestamp = TInstant::MilliSeconds(timestamps[i]);
if (timestamp < from || timestamp > to) {
continue;
}

NUdf::TUnboxedValue* items = nullptr;
auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items);

Expand All @@ -351,9 +370,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
items[it->second] = dictValue;
}

for (const auto& c : ReadParams.Source.GetLabelNames()) {
for (const auto& c : ReadParams.Source.GetLabelNameAliases()) {
auto& v = items[Index[c]];
auto it = labels.find(c);
auto it = labels.find(AliasIndex[c]);
if (it != labels.end()) {
v = NKikimr::NMiniKQL::MakeString(it->second);
} else {
Expand Down Expand Up @@ -386,7 +405,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
SOURCE_LOG_I("PassAway, processed " << CompletedMetricsCount << " metrics.");
SOURCE_LOG_I("PassAway, processed " << CompletedMetricsCount << " metrics, " << CompletedTimeRanges << " time ranges.");
if (UseMetricsQueue) {
MetricsQueueEvents.Unsubscribe();
}
Expand Down Expand Up @@ -421,6 +440,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct

bool TryRequestPointsCount() {
TryRequestMetrics();

if (ListedMetrics.empty()) {
return false;
}
Expand All @@ -433,7 +453,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
NSo::TMetric requestMetric = ListedMetrics.back();
ListedMetrics.pop_back();

auto getPointsCountFuture = SolomonClient->GetPointsCount(requestMetric.Labels);
auto getPointsCountFuture = SolomonClient->GetPointsCount(requestMetric.Labels, TrueRangeFrom, TrueRangeTo);

NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
getPointsCountFuture.Subscribe([actorSystem, metric = std::move(requestMetric), selfId = SelfId()](
Expand All @@ -446,12 +466,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
});
}

void TryRequestData() {
bool TryRequestData() {
TryRequestPointsCount();
while (!MetricsWithTimeRange.empty()) {
RequestData();
TryRequestPointsCount();

if (MetricsWithTimeRange.empty()) {
return false;
}

if (CurrentInflight >= MaxInflight) {
return false;
}

RequestData();
return true;
}

void RequestData() {
Expand All @@ -460,6 +487,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct

auto request = MetricsWithTimeRange.back();
MetricsWithTimeRange.pop_back();
CurrentInflight++;

if (UseMetricsQueue) {
dataRequestFuture = SolomonClient->GetData(request.Selectors, request.From, request.To);
Expand All @@ -480,18 +508,18 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

void ParsePointsCount(const NSo::TMetric& metric, ui64 pointsCount) {
TInstant from = TInstant::Seconds(ReadParams.Source.GetFrom());
TInstant to = TInstant::Seconds(ReadParams.Source.GetTo());

auto ranges = SplitTimeIntervalIntoRanges(from, to, pointsCount);
auto ranges = SplitTimeIntervalIntoRanges(pointsCount);

for (const auto& [fromRange, toRange] : ranges) {
MetricsWithTimeRange.emplace_back(metric.Labels, "", fromRange, toRange);
}
ListedTimeRanges += ranges.size();
}

std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges(TInstant from, TInstant to, ui64 pointsCount) const {
std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges(ui64 pointsCount) const {
TInstant from = TrueRangeFrom;
TInstant to = TrueRangeTo;

std::vector<std::pair<TInstant, TInstant>> result;
if (pointsCount == 0) {
return result;
Expand Down Expand Up @@ -524,6 +552,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

PendingDataRequests_.erase(request);
CurrentInflight--;

if (batch.Response.Status != NSo::EStatus::STATUS_OK) {
TIssues issues { TIssue(batch.Response.Error) };
Expand Down Expand Up @@ -552,7 +581,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
const TString LogPrefix;
const TDqSolomonReadParams ReadParams;
const ui64 ComputeActorBatchSize;
const TInstant TrueRangeFrom;
const TInstant TrueRangeTo;
const ui64 MetricsQueueConsumersCountDelta;
const ui64 MaxInflight;
IRetryPolicy<NSo::TGetDataResponse>::TPtr RetryPolicy;

bool UseMetricsQueue;
Expand All @@ -570,6 +602,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
ui64 CompletedMetricsCount = 0;
ui64 ListedTimeRanges = 0;
ui64 CompletedTimeRanges = 0;
ui64 CurrentInflight = 0;
const ui64 MaxPointsPerOneRequest = 10000;

TString SourceId;
Expand All @@ -578,6 +611,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
TType* DictType = nullptr;
std::vector<size_t> SystemColumnPositionIndex;
THashMap<TString, size_t> Index;
THashMap<TString, TString> AliasIndex;
};


Expand Down Expand Up @@ -618,6 +652,16 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
computeActorBatchSize = FromString<ui64>(it->second);
}

ui64 truePointsFindRange = 301;
if (auto it = settings.find("truePointsFindRange"); it != settings.end()) {
truePointsFindRange = FromString<ui64>(it->second);
}

ui64 maxInflight = 40;
if (auto it = settings.find("maxApiInflight"); it != settings.end()) {
maxInflight = FromString<ui64>(it->second);
}

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
auto credentialsProvider = credentialsProviderFactory->CreateProvider();

Expand All @@ -630,7 +674,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
programBuilder,
std::move(params),
computeActorBatchSize,
TDuration::Seconds(truePointsFindRange),
metricsQueueConsumersCountDelta,
maxInflight,
metricsQueueActor,
counters,
credentialsProvider);
Expand Down
7 changes: 7 additions & 0 deletions ydb/library/yql/providers/solomon/common/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
UNITTEST_FOR(ydb/library/yql/providers/solomon/common)

SRCS(
util_ut.cpp
)

END()
Loading
Loading