diff --git a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp index 69f0013f24e9..35c89c2fb8da 100644 --- a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp +++ b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp @@ -13,6 +13,8 @@ #include #include +#include + #define LOG_E(name, stream) \ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", queued metrics: " << this->Metrics.size() << ". " << stream) #define LOG_W(name, stream) \ @@ -36,7 +38,8 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped { + struct TEvNextLabelsListingChunkReceived : public NActors::TEventLocal { + NSo::TSelectors Selectors; + NSo::TListMetricsLabelsResponse Response; + explicit TEvNextLabelsListingChunkReceived(NSo::TSelectors&& selectors, NSo::TListMetricsLabelsResponse&& response) + : Selectors(std::move(selectors)) + , Response(std::move(response)) {} + }; + + struct TEvNextMetricsListingChunkReceived : public NActors::TEventLocal { NSo::TListMetricsResponse Response; - explicit TEvNextListingChunkReceived(NSo::TListMetricsResponse&& response) + explicit TEvNextMetricsListingChunkReceived(NSo::TListMetricsResponse&& response) : Response(std::move(response)) {} }; @@ -66,19 +77,18 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped credentialsProvider) - : CurrentPage(0) - , ConsumersCount(consumersCount) + : ConsumersCount(consumersCount) , ReadParams(std::move(readParams)) - , PageSize(pageSize) - , PrefetchSize(prefetchSize) , BatchCountLimit(batchCountLimit) , TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange) , TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange) + , MaxListingPageSize(maxListingPageSize) + , MaxApiInflight(maxApiInflight) , CredentialsProvider(credentialsProvider) , SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider)) {} @@ -88,6 +98,11 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrappedSender, ev->Get()->Record.GetTransportMeta()); - TryPreFetch(); } else { LOG_I("TDqSolomonMetricsQueueActor", "HandleGetNextBatch doesn't have enough to send, trying to fetch"); ScheduleRequest(ev->Sender, ev->Get()->Record.GetTransportMeta()); @@ -168,22 +182,77 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrappedGet()->Response)) { - AnswerPendingRequests(true); - if (!HasPendingRequests) { - LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived no pending requests. Trying to prefetch"); - TryPreFetch(); - } else { - LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived there are pending requests. Fetching more metrics"); - TryFetch(); - } + void HandleNextLabelsListingChunkReceived(TEvPrivatePrivate::TEvNextLabelsListingChunkReceived::TPtr& ev) { + LOG_D("TDqSolomonMetricsQueueActor", "HandleNextLabelsListingChunkReceived"); + auto& batch = *ev->Get(); + CurrentInflight--; + DownloadedBytes += batch.Response.DownloadedBytes; + + if (batch.Response.Status != NSo::EStatus::STATUS_OK) { + MaybeIssues = batch.Response.Error; + TransitToErrorState(); + return; + } + + auto listLabelsResult = batch.Response.Result; + if (listLabelsResult.TotalCount <= MaxListingPageSize) { + PendingListingRequests.push_back(batch.Selectors); } else { + auto selectors = batch.Selectors; + auto& labels = listLabelsResult.Labels; + auto maxSizeLabelIt = std::max_element(labels.begin(), labels.end(), + [](const NSo::TLabelValues& a, const NSo::TLabelValues& b) { + return std::make_pair(!a.Truncated, a.Values.size()) < std::make_pair(!b.Truncated, b.Values.size()); + } + ); + + if (maxSizeLabelIt->Truncated) { + MaybeIssues = "couldn't list metrics, all label values are too big for listing"; + TransitToErrorState(); + return; + } + + auto& label = *maxSizeLabelIt; + + if (label.Values.empty()) { + return; + } + + ui64 metricsPerLabelValue = std::max(1, listLabelsResult.TotalCount / label.Values.size()); + ui64 batchSize = std::max(1, MaxListingPageSize * 0.75 / metricsPerLabelValue); + + for (ui64 i = 0; i * batchSize < label.Values.size(); i++) { + auto batchFromIt = label.Values.begin() + i * batchSize; + auto batchToIt = label.Values.begin() + std::min((i + 1) * batchSize, label.Values.size()); + + selectors[label.Name] = { "=", JoinRange("|", batchFromIt, batchToIt) }; + PendingLabelRequests.push_back(selectors); + } + if (label.Absent) { + selectors[label.Name] = { "=", "-" }; + PendingLabelRequests.push_back(selectors); + } + + } + + while (TryFetch()) {} + } + + void HandleNextMetricsListingChunkReceived(TEvPrivatePrivate::TEvNextMetricsListingChunkReceived::TPtr& ev) { + LOG_D("TDqSolomonMetricsQueueActor", "HandleNextMetricsListingChunkReceived"); + auto& batch = *ev->Get(); + CurrentInflight--; + DownloadedBytes += batch.Response.DownloadedBytes; + + if (batch.Response.Status != NSo::EStatus::STATUS_OK) { + MaybeIssues = batch.Response.Error; TransitToErrorState(); + return; } + + SaveRetrievedResults(batch.Response); + AnswerPendingRequests(true); + while (TryFetch()) {} } void HandleRoundRobinStageTimeout() { @@ -194,11 +263,6 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrappedGet()->Issues; - TransitToErrorState(); - } - void HandlePoison() { AnswerPendingRequests(); PassAway(); @@ -228,20 +292,8 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped= response.Result.PagesCount) { - LOG_I("TDqSolomonMetricsQueueActor", "SaveRetrievedResults no more metrics to list"); - HasMoreMetrics = false; - Become(&TDqSolomonMetricsQueueActor::NoMoreMetricsState); - } LOG_D("TDqSolomonMetricsQueueActor", "SaveRetrievedResults saving: " << response.Result.Metrics.size() << " metrics"); for (const auto& metric : response.Result.Metrics) { @@ -250,56 +302,63 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped= MaxApiInflight) { + LOG_D("TDqSolomonMetricsQueueActor", "TryFetch can't start fetching, have " << CurrentInflight << " inflight requests, current max: " << MaxApiInflight); + return false; } - if (HasMoreMetrics) { - LOG_D("TDqSolomonMetricsQueueActor", "TryFetch fetching metrics"); - Fetch(); - return true; + if (PendingLabelRequests.empty() && PendingListingRequests.empty()) { + LOG_D("TDqSolomonMetricsQueueActor", "TryFetch doesn't have anything to fetch yet, current inflight: " << CurrentInflight); + + if (!CurrentInflight) { + Become(&TDqSolomonMetricsQueueActor::NoMoreMetricsState); + AnswerPendingRequests(); + } + return false; } - LOG_D("TDqSolomonMetricsQueueActor", "TryFetch couldn't start fetching"); - AnswerPendingRequests(); - return false; + LOG_D("TDqSolomonMetricsQueueActor", "TryFetch fetching metrics"); + Fetch(); + return true; } void Fetch() { + YQL_ENSURE(!PendingLabelRequests.empty() || !PendingListingRequests.empty()); NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); - NSo::TSelectors selectors; - NSo::ProtoToSelectors(ReadParams.Source.GetSelectors(), selectors); - ListingFuture = - SolomonClient - ->ListMetrics(selectors, TrueRangeFrom, TrueRangeTo, PageSize, CurrentPage++) - .Subscribe([actorSystem, selfId = SelfId()]( - NThreading::TFuture future) -> void { - try { - actorSystem->Send( - selfId, - new TEvPrivatePrivate::TEvNextListingChunkReceived(future.ExtractValue())); - } catch (const std::exception& e) { - actorSystem->Send( - selfId, - new TEvPrivatePrivate::TEvTransitToErrorState(TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'")); - } - }); - } - - bool FetchingInProgress() const { - return ListingFuture.Defined(); + CurrentInflight++; + + if (!PendingLabelRequests.empty()) { + auto selectors = PendingLabelRequests.back(); + PendingLabelRequests.pop_back(); + + auto labelsListingFuture = SolomonClient->ListMetricsLabels(selectors, TrueRangeFrom, TrueRangeTo); + labelsListingFuture.Subscribe([actorSystem, selectors = std::move(selectors), selfId = SelfId()] + (NThreading::TFuture future) mutable { + actorSystem->Send( + selfId, + new TEvPrivatePrivate::TEvNextLabelsListingChunkReceived(std::move(selectors), future.ExtractValue())); + }); + + return; + } + + if (!PendingListingRequests.empty()) { + auto selectors = PendingListingRequests.back(); + PendingListingRequests.pop_back(); + + auto metricsListingFuture = SolomonClient->ListMetrics(selectors, TrueRangeFrom, TrueRangeTo); + metricsListingFuture.Subscribe([actorSystem, selfId = SelfId()] + (NThreading::TFuture future) { + actorSystem->Send( + selfId, + new TEvPrivatePrivate::TEvNextMetricsListingChunkReceived(future.ExtractValue())); + }); + + return; + } } void AnswerPendingRequests(bool earlyStop = false) { @@ -350,7 +409,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped StartedConsumers; THashSet UpdatedConsumers; THashSet FinishedConsumers; THashMap FinishingConsumerToLastSeqNo; - TMaybe> ListingFuture; bool HasPendingRequests; THashMap> PendingRequests; + std::vector PendingLabelRequests; + std::vector PendingListingRequests; std::vector Metrics; ui64 DownloadedBytes = 0; TMaybe MaybeIssues; const TDqSolomonReadParams ReadParams; - const ui64 PageSize; - const ui64 PrefetchSize; const ui64 BatchCountLimit; const TInstant TrueRangeFrom; const TInstant TrueRangeTo; + const ui64 MaxListingPageSize; + const ui64 MaxApiInflight; const std::shared_ptr CredentialsProvider; const NSo::ISolomonAccessorClient::TPtr SolomonClient; @@ -450,16 +509,6 @@ NActors::IActor* CreateSolomonMetricsQueueActor( { const auto& settings = readParams.Source.settings(); - ui64 pageSize = 0; - if (auto it = settings.find("metricsQueuePageSize"); it != settings.end()) { - pageSize = FromString(it->second); - } - - ui64 prefetchSize = 0; - if (auto it = settings.find("metricsQueuePrefetchSize"); it != settings.end()) { - prefetchSize = FromString(it->second); - } - ui64 batchCountLimit = 0; if (auto it = settings.find("metricsQueueBatchCountLimit"); it != settings.end()) { batchCountLimit = FromString(it->second); @@ -470,7 +519,17 @@ NActors::IActor* CreateSolomonMetricsQueueActor( truePointsFindRange = FromString(it->second); } - return new TDqSolomonMetricsQueueActor(consumersCount, std::move(readParams), pageSize, prefetchSize, batchCountLimit, TDuration::Seconds(truePointsFindRange), credentialsProvider); + ui64 maxListingPageSize = 20000; + if (auto it = settings.find("maxListingPageSize"); it != settings.end()) { + maxListingPageSize = FromString(it->second); + } + + ui64 maxInflight = 40; + if (auto it = settings.find("maxApiInflight"); it != settings.end()) { + maxInflight = FromString(it->second); + } + + return new TDqSolomonMetricsQueueActor(consumersCount, std::move(readParams), batchCountLimit, TDuration::Seconds(truePointsFindRange), maxListingPageSize, maxInflight, credentialsProvider); } } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/solomon/common/util.h b/ydb/library/yql/providers/solomon/common/util.h index 0cd28713e569..a3293435d5aa 100644 --- a/ydb/library/yql/providers/solomon/common/util.h +++ b/ydb/library/yql/providers/solomon/common/util.h @@ -30,6 +30,13 @@ struct TTimeseries { std::vector Values; }; +struct TLabelValues { + TString Name; + bool Absent; + bool Truncated; + std::vector Values; +}; + struct TMetricTimeRange { TSelectors Selectors; TString Program; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp index ec55f37bed87..eb3ef2d4719c 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp @@ -8,11 +8,10 @@ TSolomonConfiguration::TSolomonConfiguration() { REGISTER_SETTING(*this, _EnableReading); REGISTER_SETTING(*this, _EnableRuntimeListing); + REGISTER_SETTING(*this, _EnableSolomonClientPostApi); REGISTER_SETTING(*this, _TruePointsFindRange); - REGISTER_SETTING(*this, MetricsQueuePageSize); - REGISTER_SETTING(*this, MetricsQueuePrefetchSize); + REGISTER_SETTING(*this, _MaxListingPageSize); REGISTER_SETTING(*this, MetricsQueueBatchCountLimit); - REGISTER_SETTING(*this, SolomonClientDefaultReplica); REGISTER_SETTING(*this, ComputeActorBatchSize); REGISTER_SETTING(*this, MaxApiInflight); } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h index 1d8470af1efa..c112d3a946e0 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h @@ -14,11 +14,10 @@ struct TSolomonSettings { public: NCommon::TConfSetting _EnableReading; NCommon::TConfSetting _EnableRuntimeListing; + NCommon::TConfSetting _EnableSolomonClientPostApi; NCommon::TConfSetting _TruePointsFindRange; - NCommon::TConfSetting MetricsQueuePageSize; - NCommon::TConfSetting MetricsQueuePrefetchSize; + NCommon::TConfSetting _MaxListingPageSize; NCommon::TConfSetting MetricsQueueBatchCountLimit; - NCommon::TConfSetting SolomonClientDefaultReplica; NCommon::TConfSetting ComputeActorBatchSize; NCommon::TConfSetting MaxApiInflight; }; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index 34fb99a3a999..8fb111367d08 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -349,22 +349,14 @@ class TSolomonDqIntegration: public TDqIntegrationBase { source.AddRequiredLabelNames(labelAsString); } - auto defaultReplica = (source.GetClusterType() == NSo::NProto::CT_SOLOMON ? "sas" : "cloud-prod-a"); - auto& solomonConfig = State_->Configuration; auto& sourceSettings = *source.MutableSettings(); - auto metricsQueuePageSize = solomonConfig->MetricsQueuePageSize.Get().OrElse(2000); - sourceSettings.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)}); - - auto metricsQueuePrefetchSize = solomonConfig->MetricsQueuePrefetchSize.Get().OrElse(4000); - sourceSettings.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)}); - - auto metricsQueueBatchCountLimit = solomonConfig->MetricsQueueBatchCountLimit.Get().OrElse(10); + auto metricsQueueBatchCountLimit = solomonConfig->MetricsQueueBatchCountLimit.Get().OrElse(500); sourceSettings.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)}); - auto solomonClientDefaultReplica = solomonConfig->SolomonClientDefaultReplica.Get().OrElse(defaultReplica); - sourceSettings.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)}); + auto enableSolomonClientPostApi = solomonConfig->_EnableSolomonClientPostApi.Get().OrElse(false); + sourceSettings.insert({"enableSolomonClientPostApi", ToString(enableSolomonClientPostApi)}); auto computeActorBatchSize = solomonConfig->ComputeActorBatchSize.Get().OrElse(100); sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)}); @@ -372,6 +364,9 @@ class TSolomonDqIntegration: public TDqIntegrationBase { auto truePointsFindRange = solomonConfig->_TruePointsFindRange.Get().OrElse(301); sourceSettings.insert({"truePointsFindRange", ToString(truePointsFindRange)}); + auto maxListingPageSize = solomonConfig->_MaxListingPageSize.Get().OrElse(20000); + sourceSettings.insert({"maxListingPageSize", ToString(maxListingPageSize)}); + auto maxApiInflight = solomonConfig->MaxApiInflight.Get().OrElse(40); sourceSettings.insert({"maxApiInflight", ToString(maxApiInflight)}); diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp index 833e0a4b521a..647a95212c7e 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp @@ -16,7 +16,7 @@ namespace { struct TLoadSolomonMetaRequest { NSo::ISolomonAccessorClient::TPtr SolomonClient; NThreading::TFuture LabelNamesRequest; - NThreading::TFuture ListMetricsRequest; + NThreading::TFuture LabelValuesRequest; }; TMaybe ExtractSetting(const TExprNode& settings, const TString& settingName) { @@ -93,25 +93,24 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } - auto defaultReplica = (source.GetClusterType() == NSo::NProto::CT_SOLOMON ? "sas" : "cloud-prod-a"); - auto solomonClientDefaultReplica = State_->Configuration->SolomonClientDefaultReplica.Get().OrElse(defaultReplica); - source.MutableSettings()->insert({ "solomonClientDefaultReplica", ToString(solomonClientDefaultReplica) }); + auto enableSolomonClientPostApi = State_->Configuration->_EnableSolomonClientPostApi.Get().OrElse(false); + source.MutableSettings()->insert({ "enableSolomonClientPostApi", ToString(enableSolomonClientPostApi) }); auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(clusterName)); auto credentialsProvider = providerFactory->CreateProvider(); auto solomonClient = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider); auto labelNamesFuture = solomonClient->GetLabelNames(selectors, from, to); - auto listMetricsFuture = solomonClient->ListMetrics(selectors, from, to, 30, 0); + auto listMetricsLabelsFuture = solomonClient->ListMetricsLabels(selectors, from, to); LabelNamesRequests_[soReadObject.Raw()] = { .SolomonClient = solomonClient, .LabelNamesRequest = labelNamesFuture, - .ListMetricsRequest = listMetricsFuture + .LabelValuesRequest = listMetricsLabelsFuture }; futures.push_back(labelNamesFuture.IgnoreResult()); - futures.push_back(listMetricsFuture.IgnoreResult()); + futures.push_back(listMetricsLabelsFuture.IgnoreResult()); } } @@ -142,10 +141,10 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } - auto listMetricsValue = request.ListMetricsRequest.GetValue(); - if (listMetricsValue.Status != NSo::EStatus::STATUS_OK) { + auto listMetricLabelsValue = request.LabelValuesRequest.GetValue(); + if (listMetricLabelsValue.Status != NSo::EStatus::STATUS_OK) { ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), - TStringBuilder() << "Failed to get total metrics count, details: " << listMetricsValue.Error)); + TStringBuilder() << "Failed to get total metrics count, details: " << listMetricLabelsValue.Error)); return TStatus::Error; } @@ -161,7 +160,7 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase { .RequiredLabelNames() .Add(labelNames) .Build() - .TotalMetricsCount().Build(ToString(listMetricsValue.Result.TotalCount)) + .TotalMetricsCount().Build(ToString(listMetricLabelsValue.Result.TotalCount)) .Done().Ptr()); } diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp index 72983f07cbce..8170b41e9d71 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp @@ -159,6 +159,62 @@ TListMetricsResponse ProcessListMetricsResponse(NYql::IHTTPGateway::TResult&& re return TListMetricsResponse(std::move(result), response.Content.size() + response.Content.Headers.size()); } +TListMetricsLabelsResponse ProcessListMetricsLabelsResponse(NYql::IHTTPGateway::TResult&& response) { + TListMetricsLabelsResult result; + + if (response.CurlResponseCode != CURLE_OK) { + return TListMetricsLabelsResponse(TStringBuilder{} << "Monitoring api list metrics labels response: " << response.Issues.ToOneLineString() << + ", internal code: " << static_cast(response.CurlResponseCode)); + } + + if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) { + return TListMetricsLabelsResponse(TStringBuilder{} << "Monitoring api list metrics labels response: " << response.Content.data() << + ", internal code: " << response.Content.HttpResponseCode); + } + + NJson::TJsonValue json; + try { + NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true); + } catch (const std::exception& e) { + return TListMetricsLabelsResponse(TStringBuilder{} << "Monitoring api list metrics labels response is not a valid json: " << e.what()); + } + + if (!json.IsMap() || !json.Has("labels") || !json.Has("totalCount")) { + return TListMetricsLabelsResponse("Monitoring api list metrics labels response doesn't contain requested info"); + } + + if (!json["totalCount"].IsInteger() || !json["labels"].IsArray()) { + return TListMetricsLabelsResponse("Monitoring api list metrics labels response contains invalid data"); + } + + result.TotalCount = json["totalCount"].GetInteger(); + + for (const auto& label : json["labels"].GetArray()) { + try { + TString name = label["name"].GetStringSafe(); + bool absent = label["absent"].GetBooleanSafe(); + bool truncated = label["truncated"].GetBooleanSafe(); + const auto& jsonValues = label["values"].GetArraySafe(); + std::vector values; + + values.reserve(jsonValues.size()); + for (const auto& labelValue : jsonValues) { + if (!labelValue.IsString()) { + return TListMetricsLabelsResponse("Monitoring api list metrics labels response contains invalid label values"); + } + values.push_back(labelValue.GetString()); + } + + result.Labels.emplace_back(name, absent, truncated, std::move(values)); + } catch (const NJson::TJsonException& e) { + return TListMetricsLabelsResponse(TStringBuilder{} << "Monitoring api list metrics labels response contains invalid labels: " << e.what()); + } + + } + + return TListMetricsLabelsResponse(std::move(result), response.Content.size() + response.Content.Headers.size()); +} + TGetPointsCountResponse ProcessGetPointsCountResponse(NYql::IHTTPGateway::TResult&& response, ui64 downsampledPointsCount) { static std::set whitelistIssues = { "Not able to apply function count on vector with size 0" @@ -246,11 +302,13 @@ TGetDataResponse ProcessGetDataResponse(NYdbGrpc::TGrpcStatus&& status, ReadResp class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this { public: TSolomonAccessorClient( - const TString& defaultReplica, + bool enableSolomonClientPostApi, + ui64 maxListingPageSize, ui64 maxApiInflight, NYql::NSo::NProto::TDqSolomonSource&& settings, std::shared_ptr credentialsProvider) - : DefaultReplica(defaultReplica) + : EnableSolomonClientPostApi(enableSolomonClientPostApi) + , MaxListingPageSize(maxListingPageSize) , Settings(std::move(settings)) , CredentialsProvider(credentialsProvider) { @@ -269,7 +327,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable public: NThreading::TFuture GetLabelNames(const TSelectors& selectors, TInstant from, TInstant to) const override final { - auto requestUrl = BuildGetLabelsUrl(selectors, from, to); + auto [url, body] = BuildGetLabelsHttpParams(selectors, from, to); auto resultPromise = NThreading::NewPromise(); @@ -279,14 +337,15 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable DoHttpRequest( std::move(cb), - std::move(requestUrl) + std::move(url), + std::move(body) ); return resultPromise.GetFuture(); } - NThreading::TFuture ListMetrics(const TSelectors& selectors, TInstant from, TInstant to, int pageSize, int page) const override final { - auto requestUrl = BuildListMetricsUrl(selectors, from, to, pageSize, page); + NThreading::TFuture ListMetrics(const TSelectors& selectors, TInstant from, TInstant to) const override final { + auto [url, body] = BuildListMetricsHttpParams(selectors, from, to); auto resultPromise = NThreading::NewPromise(); @@ -296,7 +355,26 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable DoHttpRequest( std::move(cb), - std::move(requestUrl) + std::move(url), + std::move(body) + ); + + return resultPromise.GetFuture(); + } + + NThreading::TFuture ListMetricsLabels(const TSelectors& selectors, TInstant from, TInstant to) const override final { + auto [url, body] = BuildListMetricsLabelsHttpParams(selectors, from, to); + + auto resultPromise = NThreading::NewPromise(); + + auto cb = [resultPromise](NYql::IHTTPGateway::TResult&& result) mutable { + resultPromise.SetValue(ProcessListMetricsLabelsResponse(std::move(result))); + }; + + DoHttpRequest( + std::move(cb), + std::move(url), + std::move(body) ); return resultPromise.GetFuture(); @@ -317,8 +395,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable auto fullSelectors = AddRequiredLabels(selectors); TString program = TStringBuilder() << "count(" << BuildSelectorsProgram(fullSelectors) << ")"; - auto requestUrl = BuildGetPointsCountUrl(); - auto requestBody = BuildGetPointsCountBody(program, downsamplingTo, to); + auto [url, body] = BuildGetPointsCountHttpParams(program, downsamplingTo, to); auto cb = [resultPromise, downsampledPointsCount](NYql::IHTTPGateway::TResult&& response) mutable { resultPromise.SetValue(ProcessGetPointsCountResponse(std::move(response), downsampledPointsCount)); @@ -326,8 +403,8 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable DoHttpRequest( std::move(cb), - std::move(requestUrl), - std::move(requestBody) + std::move(url), + std::move(body) ); } else { @@ -470,7 +547,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable } } - TString BuildGetLabelsUrl(const TSelectors& selectors, TInstant from, TInstant to) const { + std::tuple BuildGetLabelsHttpParams(const TSelectors& selectors, TInstant from, TInstant to) const { TUrlBuilder builder(GetHttpSolomonEndpoint()); builder.AddPathComponent("api"); @@ -480,16 +557,26 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable builder.AddPathComponent("sensors"); builder.AddPathComponent("names"); - builder.AddUrlParam("projectId", GetProjectId()); - builder.AddUrlParam("selectors", BuildSelectorsProgram(selectors)); - builder.AddUrlParam("forceCluster", DefaultReplica); - builder.AddUrlParam("from", from.ToString()); - builder.AddUrlParam("to", to.ToString()); + NJsonWriter::TBuf w; - return builder.Build(); + if (EnableSolomonClientPostApi) { + w.BeginObject() + .UnsafeWriteKey("projectId").WriteString(GetProjectId()) + .UnsafeWriteKey("selectors").WriteString(BuildSelectorsProgram(selectors)) + .UnsafeWriteKey("from").WriteString(from.ToString()) + .UnsafeWriteKey("to").WriteString(to.ToString()) + .EndObject(); + } else { + builder.AddUrlParam("projectId", GetProjectId()); + builder.AddUrlParam("selectors", BuildSelectorsProgram(selectors)); + builder.AddUrlParam("from", from.ToString()); + builder.AddUrlParam("to", to.ToString()); + } + + return { builder.Build(), w.Str() }; } - TString BuildListMetricsUrl(const TSelectors& selectors, TInstant from, TInstant to, int pageSize, int page) const { + std::tuple BuildListMetricsHttpParams(const TSelectors& selectors, TInstant from, TInstant to) const { TUrlBuilder builder(GetHttpSolomonEndpoint()); builder.AddPathComponent("api"); @@ -498,18 +585,28 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable builder.AddPathComponent(Settings.GetProject()); builder.AddPathComponent("sensors"); - builder.AddUrlParam("projectId", GetProjectId()); - builder.AddUrlParam("selectors", BuildSelectorsProgram(selectors)); - builder.AddUrlParam("forceCluster", DefaultReplica); - builder.AddUrlParam("from", from.ToString()); - builder.AddUrlParam("to", to.ToString()); - builder.AddUrlParam("pageSize", std::to_string(pageSize)); - builder.AddUrlParam("page", std::to_string(page)); + NJsonWriter::TBuf w; - return builder.Build(); + if (EnableSolomonClientPostApi) { + w.BeginObject() + .UnsafeWriteKey("projectId").WriteString(GetProjectId()) + .UnsafeWriteKey("selectors").WriteString(BuildSelectorsProgram(selectors)) + .UnsafeWriteKey("from").WriteString(from.ToString()) + .UnsafeWriteKey("to").WriteString(to.ToString()) + .UnsafeWriteKey("pageSize").WriteLongLong(MaxListingPageSize) + .EndObject(); + } else { + builder.AddUrlParam("projectId", GetProjectId()); + builder.AddUrlParam("selectors", BuildSelectorsProgram(selectors)); + builder.AddUrlParam("from", from.ToString()); + builder.AddUrlParam("to", to.ToString()); + builder.AddUrlParam("pageSize", ToString(MaxListingPageSize)); + } + + return { builder.Build(), w.Str() }; } - TString BuildGetPointsCountUrl() const { + std::tuple BuildListMetricsLabelsHttpParams(const TSelectors& selectors, TInstant from, TInstant to) const { TUrlBuilder builder(GetHttpSolomonEndpoint()); builder.AddPathComponent("api"); @@ -517,21 +614,47 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable builder.AddPathComponent("projects"); builder.AddPathComponent(Settings.GetProject()); builder.AddPathComponent("sensors"); - builder.AddPathComponent("data"); + builder.AddPathComponent("labels"); - builder.AddUrlParam("projectId", GetProjectId()); + NJsonWriter::TBuf w; + + if (EnableSolomonClientPostApi) { + w.BeginObject() + .UnsafeWriteKey("projectId").WriteString(GetProjectId()) + .UnsafeWriteKey("selectors").WriteString(BuildSelectorsProgram(selectors)) + .UnsafeWriteKey("from").WriteString(from.ToString()) + .UnsafeWriteKey("to").WriteString(to.ToString()) + .UnsafeWriteKey("limit").WriteLongLong(100000) + .EndObject(); + } else { + builder.AddUrlParam("projectId", GetProjectId()); + builder.AddUrlParam("selectors", BuildSelectorsProgram(selectors)); + builder.AddUrlParam("from", from.ToString()); + builder.AddUrlParam("to", to.ToString()); + builder.AddUrlParam("limit", "100000"); + } - return builder.Build(); + return { builder.Build(), w.Str() }; } - TString BuildGetPointsCountBody(const TString& program, TInstant from, TInstant to) const { + std::tuple BuildGetPointsCountHttpParams(const TString& program, TInstant from, TInstant to) const { + TUrlBuilder builder(GetHttpSolomonEndpoint()); + + builder.AddPathComponent("api"); + builder.AddPathComponent("v2"); + builder.AddPathComponent("projects"); + builder.AddPathComponent(Settings.GetProject()); + builder.AddPathComponent("sensors"); + builder.AddPathComponent("data"); + + builder.AddUrlParam("projectId", GetProjectId()); + const auto& ds = Settings.GetDownsampling(); NJsonWriter::TBuf w; w.BeginObject() .UnsafeWriteKey("from").WriteString(from.ToString()) .UnsafeWriteKey("to").WriteString(to.ToString()) .UnsafeWriteKey("program").WriteString(program) - .UnsafeWriteKey("forceCluster").WriteString(DefaultReplica) .UnsafeWriteKey("downsampling") .BeginObject() .UnsafeWriteKey("disabled").WriteBool(ds.GetDisabled()); @@ -544,7 +667,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable } w.EndObject().EndObject(); - return w.Str(); + return { builder.Build(), w.Str() }; } ReadRequest BuildGetDataRequest(const TString& program, TInstant from, TInstant to) const { @@ -605,8 +728,9 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable } private: - const TString DefaultReplica; - const ui64 ListSizeLimit = 1ull << 20; + const bool EnableSolomonClientPostApi; + const ui64 MaxListingPageSize; + const ui64 ListSizeLimit = 100 * 1024 * 1024 * 8; const NYql::NSo::NProto::TDqSolomonSource Settings; const std::shared_ptr CredentialsProvider; @@ -625,9 +749,14 @@ ISolomonAccessorClient::Make( std::shared_ptr credentialsProvider) { const auto& settings = source.settings(); - TString defaultReplica; - if (auto it = settings.find("solomonClientDefaultReplica"); it != settings.end()) { - defaultReplica = it->second; + bool enableSolomonClientPostApi = false; + if (auto it = settings.find("enableSolomonClientPostApi"); it != settings.end()) { + enableSolomonClientPostApi = FromString(it->second); + } + + ui64 maxListingPageSize = 20000; + if (auto it = settings.find("maxListingPageSize"); it != settings.end()) { + maxListingPageSize = FromString(it->second); } ui64 maxApiInflight = 40; @@ -635,7 +764,7 @@ ISolomonAccessorClient::Make( maxApiInflight = FromString(it->second); } - return std::make_shared(defaultReplica, maxApiInflight, std::move(source), credentialsProvider); + return std::make_shared(enableSolomonClientPostApi, maxListingPageSize, maxApiInflight, std::move(source), credentialsProvider); } } // namespace NYql::NSo diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h index a57503b27e31..1d2b22f5079d 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h @@ -20,7 +20,8 @@ class ISolomonAccessorClient { public: virtual NThreading::TFuture GetLabelNames(const TSelectors& selectors, TInstant from, TInstant to) const = 0; - virtual NThreading::TFuture ListMetrics(const TSelectors& selectors, TInstant from, TInstant to, int pageSize, int page) const = 0; + virtual NThreading::TFuture ListMetrics(const TSelectors& selectors, TInstant from, TInstant to) const = 0; + virtual NThreading::TFuture ListMetricsLabels(const TSelectors& selectors, TInstant from, TInstant to) const = 0; virtual NThreading::TFuture GetPointsCount(const TSelectors& selectors, TInstant from, TInstant to) const = 0; virtual NThreading::TFuture GetData(const TSelectors& selectors, TInstant from, TInstant to) const = 0; virtual NThreading::TFuture GetData(const TString& program, TInstant from, TInstant to) const = 0; diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp index 32b3437e88a6..1d414371bbc1 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp @@ -25,6 +25,7 @@ TSolomonClientResponse::TSolomonClientResponse(T&& result, ui64 downloadedByt template class TSolomonClientResponse; template class TSolomonClientResponse; +template class TSolomonClientResponse; template class TSolomonClientResponse; template class TSolomonClientResponse; diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h index 3eb5fa067ad7..b3957c5306b4 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h @@ -41,6 +41,12 @@ struct TListMetricsResult { }; using TListMetricsResponse = TSolomonClientResponse; +struct TListMetricsLabelsResult { + std::vector Labels; + ui64 TotalCount; +}; +using TListMetricsLabelsResponse = TSolomonClientResponse; + struct TGetPointsCountResult { ui64 PointsCount; }; diff --git a/ydb/library/yql/tools/solomon_emulator/lib/shard.py b/ydb/library/yql/tools/solomon_emulator/lib/shard.py index 94111ffa5ceb..6383b91996cb 100644 --- a/ydb/library/yql/tools/solomon_emulator/lib/shard.py +++ b/ydb/library/yql/tools/solomon_emulator/lib/shard.py @@ -57,13 +57,17 @@ def match(self, selectors): if key == "project" or key == "cluster" or key == "service": continue - if value == "-": - if key in self.labels: - return False - elif value == "*": - return key in self.labels + if value.find("|") == -1: + if value == "-": + if key in self.labels: + return False + elif value == "*": + return key in self.labels + else: + if key not in self.labels or self.labels[key] != value: + return False else: - if key not in self.labels or self.labels[key] != value: + if key not in self.labels or self.labels[key] not in set(value.split("|")): return False return True @@ -134,6 +138,42 @@ def get_label_names(self, selectors): return list(result) + def get_labels(self, selectors): + all_label_values = dict() + + label_names = self.get_label_names(selectors) + for key, value in selectors.items(): + label_names.append(key) + + for name in label_names: + all_label_values[name] = { + "values": set(), + "absent": False + } + + matching_metrics = self.get_matching_metrics(selectors) + for metric in matching_metrics: + metric_labels = set() + for key, value in metric.labels.items(): + all_label_values[key]["values"].add(value) + metric_labels.add(key) + + for name, data in all_label_values.items(): + if name not in metric_labels: + data["absent"] = True + + result = [] + for name, data in all_label_values.items(): + if len(data["values"]) > 1: + result.append({ + "name": name, + "absent": data["absent"], + "truncated": False, + "values": list(data["values"]) + }) + + return (result, len(matching_metrics)) + def get_metrics(self, selectors): result = [] @@ -145,7 +185,10 @@ def get_metrics(self, selectors): labels["service"] = self._service result.append({"labels": labels, "type": metric.kind}) - return sorted(result, key=lambda x: str(x)) + if len(matching_metrics) > 1000: + return (None, "Too many lines for one listing request, should be under 2k, have: {}".format(len(matching_metrics))) + + return (sorted(result, key=lambda x: str(x)), None) def get_data(self, selectors, time_from, time_to, downsampling): result = dict() diff --git a/ydb/library/yql/tools/solomon_emulator/lib/webapp.py b/ydb/library/yql/tools/solomon_emulator/lib/webapp.py index 2ae1166fd692..269bd96406d4 100644 --- a/ydb/library/yql/tools/solomon_emulator/lib/webapp.py +++ b/ydb/library/yql/tools/solomon_emulator/lib/webapp.py @@ -2,7 +2,6 @@ import json import logging import re -from math import ceil from concurrent import futures from library.python.monlib.encoder import loads @@ -111,7 +110,8 @@ async def data_write(self, request): return web.json_response({"writtenMetricsCount": shard.add_metrics(metrics_json)}) async def sensor_names(self, request): - selectors, success = _parse_selectors(request.rel_url.query["selectors"]) + json = await request.json() + selectors, success = _parse_selectors(json["selectors"]) if not success: return web.HTTPBadRequest(text="Invalid selectors") @@ -128,10 +128,28 @@ async def sensor_names(self, request): return web.json_response({"names": result}) + async def sensor_labels(self, request): + json = await request.json() + selectors, success = _parse_selectors(json["selectors"]) + + if not success: + return web.HTTPBadRequest(text="Invalid selectors") + + if "project" not in selectors or "cluster" not in selectors or "service" not in selectors: + return web.HTTPBadRequest(text="project, cluster and service labels must be specified") + + project = selectors["project"] + cluster = selectors["cluster"] + service = selectors["service"] + + shard = self._get_shard(project, cluster, service) + labels, totalCount = shard.get_labels(selectors) + + return web.json_response({"labels": labels, "totalCount": totalCount}) + async def sensors(self, request): - selectors, success = _parse_selectors(request.rel_url.query["selectors"]) - page_size = int(request.rel_url.query['pageSize']) - page = int(request.rel_url.query['page']) + json = await request.json() + selectors, success = _parse_selectors(json["selectors"]) if not success: return web.HTTPBadRequest(text="Invalid selectors") @@ -144,12 +162,12 @@ async def sensors(self, request): service = selectors["service"] shard = self._get_shard(project, cluster, service) - metrics = shard.get_metrics(selectors) + metrics, error = shard.get_metrics(selectors) - from_ind = page_size * page - to_ind = min(len(metrics), page_size * (page + 1)) + if error is not None: + return web.HTTPBadRequest(text=error) - return web.json_response({"result": metrics[from_ind:to_ind], "page": {"pagesCount": ceil(len(metrics) / page_size), "totalCount": len(metrics)}}) + return web.json_response({"result": metrics, "page": {"pagesCount": 1, "totalCount": len(metrics)}}) async def metrics_get(self, request): cluster = request.rel_url.query.get('cluster', None) or request.rel_url.query['folderId'] @@ -296,8 +314,9 @@ def _build_read_response(labels, type, timestamps, values): def create_web_app(emulator): webapp = web.Application() webapp.add_routes([ - web.get("/api/v2/projects/{project}/sensors/names", emulator.sensor_names), - web.get("/api/v2/projects/{project}/sensors", emulator.sensors), + web.post("/api/v2/projects/{project}/sensors/names", emulator.sensor_names), + web.post("/api/v2/projects/{project}/sensors/labels", emulator.sensor_labels), + web.post("/api/v2/projects/{project}/sensors", emulator.sensors), web.get("/metrics/get", emulator.metrics_get), web.get("/ping", emulator.get_ping), web.post("/api/v2/push", emulator.api_v2_push), diff --git a/ydb/tests/solomon/reading/base.py b/ydb/tests/solomon/reading/base.py index b2e10dc2e945..474b4ca6a583 100644 --- a/ydb/tests/solomon/reading/base.py +++ b/ydb/tests/solomon/reading/base.py @@ -16,7 +16,7 @@ def setup_class(cls): cls.basic_reading_timestamps = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55] cls.basic_reading_values = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] - cls.listing_paging_metrics_size = 1000 + cls.listing_paging_metrics_size = 2500 cls.data_paging_timeseries_size = 25000 cls.data_paging_timestamps, cls.data_paging_values = cls._generate_data_paging_timeseries(cls.data_paging_timeseries_size) @@ -71,6 +71,18 @@ def setup_class(cls): { "name": "_EnableRuntimeListing", "value": "true" + }, + { + "name": "_EnableSolomonClientPostApi", + "value": "true" + }, + { + "name": "_MaxListingPageSize", + "value": 1000 + }, + { + "name": "MaxApiInflight", + "value": 2500 } ] }