Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 158 additions & 99 deletions ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions ydb/library/yql/providers/solomon/common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ struct TTimeseries {
std::vector<double> Values;
};

struct TLabelValues {
TString Name;
bool Absent;
bool Truncated;
std::vector<TString> Values;
};

struct TMetricTimeRange {
TSelectors Selectors;
TString Program;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ TSolomonConfiguration::TSolomonConfiguration()
{
REGISTER_SETTING(*this, _EnableReading);
REGISTER_SETTING(*this, _EnableRuntimeListing);
REGISTER_SETTING(*this, _EnableSolomonClientPostApi);
REGISTER_SETTING(*this, _TruePointsFindRange);
REGISTER_SETTING(*this, MetricsQueuePageSize);
REGISTER_SETTING(*this, MetricsQueuePrefetchSize);
REGISTER_SETTING(*this, _MaxListingPageSize);
REGISTER_SETTING(*this, MetricsQueueBatchCountLimit);
REGISTER_SETTING(*this, SolomonClientDefaultReplica);
REGISTER_SETTING(*this, ComputeActorBatchSize);
REGISTER_SETTING(*this, MaxApiInflight);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ struct TSolomonSettings {
public:
NCommon::TConfSetting<bool, Static> _EnableReading;
NCommon::TConfSetting<bool, Static> _EnableRuntimeListing;
NCommon::TConfSetting<bool, Static> _EnableSolomonClientPostApi;
NCommon::TConfSetting<ui64, Static> _TruePointsFindRange;
NCommon::TConfSetting<ui64, Static> MetricsQueuePageSize;
NCommon::TConfSetting<ui64, Static> MetricsQueuePrefetchSize;
NCommon::TConfSetting<ui64, Static> _MaxListingPageSize;
NCommon::TConfSetting<ui64, Static> MetricsQueueBatchCountLimit;
NCommon::TConfSetting<TString, Static> SolomonClientDefaultReplica;
NCommon::TConfSetting<ui64, Static> ComputeActorBatchSize;
NCommon::TConfSetting<ui64, Static> MaxApiInflight;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,29 +349,24 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
source.AddRequiredLabelNames(labelAsString);
}

auto defaultReplica = (source.GetClusterType() == NSo::NProto::CT_SOLOMON ? "sas" : "cloud-prod-a");

auto& solomonConfig = State_->Configuration;
auto& sourceSettings = *source.MutableSettings();

auto metricsQueuePageSize = solomonConfig->MetricsQueuePageSize.Get().OrElse(2000);
sourceSettings.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)});

auto metricsQueuePrefetchSize = solomonConfig->MetricsQueuePrefetchSize.Get().OrElse(4000);
sourceSettings.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)});

auto metricsQueueBatchCountLimit = solomonConfig->MetricsQueueBatchCountLimit.Get().OrElse(10);
auto metricsQueueBatchCountLimit = solomonConfig->MetricsQueueBatchCountLimit.Get().OrElse(500);
sourceSettings.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)});

auto solomonClientDefaultReplica = solomonConfig->SolomonClientDefaultReplica.Get().OrElse(defaultReplica);
sourceSettings.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)});
auto enableSolomonClientPostApi = solomonConfig->_EnableSolomonClientPostApi.Get().OrElse(false);
sourceSettings.insert({"enableSolomonClientPostApi", ToString(enableSolomonClientPostApi)});

auto computeActorBatchSize = solomonConfig->ComputeActorBatchSize.Get().OrElse(100);
sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)});

auto truePointsFindRange = solomonConfig->_TruePointsFindRange.Get().OrElse(301);
sourceSettings.insert({"truePointsFindRange", ToString(truePointsFindRange)});

auto maxListingPageSize = solomonConfig->_MaxListingPageSize.Get().OrElse(20000);
sourceSettings.insert({"maxListingPageSize", ToString(maxListingPageSize)});

auto maxApiInflight = solomonConfig->MaxApiInflight.Get().OrElse(40);
sourceSettings.insert({"maxApiInflight", ToString(maxApiInflight)});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace {
struct TLoadSolomonMetaRequest {
NSo::ISolomonAccessorClient::TPtr SolomonClient;
NThreading::TFuture<NSo::TGetLabelsResponse> LabelNamesRequest;
NThreading::TFuture<NSo::TListMetricsResponse> ListMetricsRequest;
NThreading::TFuture<NSo::TListMetricsLabelsResponse> LabelValuesRequest;
};

TMaybe<TString> ExtractSetting(const TExprNode& settings, const TString& settingName) {
Expand Down Expand Up @@ -93,25 +93,24 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
return TStatus::Error;
}

auto defaultReplica = (source.GetClusterType() == NSo::NProto::CT_SOLOMON ? "sas" : "cloud-prod-a");
auto solomonClientDefaultReplica = State_->Configuration->SolomonClientDefaultReplica.Get().OrElse(defaultReplica);
source.MutableSettings()->insert({ "solomonClientDefaultReplica", ToString(solomonClientDefaultReplica) });
auto enableSolomonClientPostApi = State_->Configuration->_EnableSolomonClientPostApi.Get().OrElse(false);
source.MutableSettings()->insert({ "enableSolomonClientPostApi", ToString(enableSolomonClientPostApi) });

auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(clusterName));
auto credentialsProvider = providerFactory->CreateProvider();

auto solomonClient = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider);
auto labelNamesFuture = solomonClient->GetLabelNames(selectors, from, to);
auto listMetricsFuture = solomonClient->ListMetrics(selectors, from, to, 30, 0);
auto listMetricsLabelsFuture = solomonClient->ListMetricsLabels(selectors, from, to);

LabelNamesRequests_[soReadObject.Raw()] = {
.SolomonClient = solomonClient,
.LabelNamesRequest = labelNamesFuture,
.ListMetricsRequest = listMetricsFuture
.LabelValuesRequest = listMetricsLabelsFuture
};

futures.push_back(labelNamesFuture.IgnoreResult());
futures.push_back(listMetricsFuture.IgnoreResult());
futures.push_back(listMetricsLabelsFuture.IgnoreResult());
}
}

Expand Down Expand Up @@ -142,10 +141,10 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
return TStatus::Error;
}

auto listMetricsValue = request.ListMetricsRequest.GetValue();
if (listMetricsValue.Status != NSo::EStatus::STATUS_OK) {
auto listMetricLabelsValue = request.LabelValuesRequest.GetValue();
if (listMetricLabelsValue.Status != NSo::EStatus::STATUS_OK) {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
TStringBuilder() << "Failed to get total metrics count, details: " << listMetricsValue.Error));
TStringBuilder() << "Failed to get total metrics count, details: " << listMetricLabelsValue.Error));
return TStatus::Error;
}

Expand All @@ -161,7 +160,7 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
.RequiredLabelNames()
.Add(labelNames)
.Build()
.TotalMetricsCount<TCoAtom>().Build(ToString(listMetricsValue.Result.TotalCount))
.TotalMetricsCount<TCoAtom>().Build(ToString(listMetricLabelsValue.Result.TotalCount))
.Done().Ptr());
}

Expand Down
Loading
Loading