Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
ui64 computeActorBatchSize,
TDuration truePointsFindRange,
ui64 metricsQueueConsumersCountDelta,
ui64 maxInflight,
NActors::TActorId metricsQueueActor,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
Expand All @@ -97,6 +98,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
, TrueRangeFrom(TInstant::Seconds(ReadParams.Source.GetFrom()) - truePointsFindRange)
, TrueRangeTo(TInstant::Seconds(ReadParams.Source.GetTo()) + truePointsFindRange)
, MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta)
, MaxInflight(maxInflight)
, MetricsQueueActor(metricsQueueActor)
, CredentialsProvider(credentialsProvider)
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
Expand All @@ -113,9 +115,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}
return ERetryErrorClass::NoRetry;
},
TDuration::MilliSeconds(25),
TDuration::MilliSeconds(50),
TDuration::MilliSeconds(200),
TDuration::MilliSeconds(500),
TDuration::MilliSeconds(1000),
10
);

Expand All @@ -128,8 +130,14 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

void FillSystemColumnPositionIndex() {
YQL_ENSURE(ReadParams.Source.GetLabelNameAliases().size() == ReadParams.Source.GetLabelNames().size());

for (int i = 0; i < ReadParams.Source.GetLabelNameAliases().size(); ++i) {
AliasIndex[ReadParams.Source.GetLabelNameAliases()[i]] = ReadParams.Source.GetLabelNames()[i];
}

std::vector<TString> names(ReadParams.Source.GetSystemColumns().begin(), ReadParams.Source.GetSystemColumns().end());
names.insert(names.end(), ReadParams.Source.GetLabelNames().begin(), ReadParams.Source.GetLabelNames().end());
names.insert(names.end(), ReadParams.Source.GetLabelNameAliases().begin(), ReadParams.Source.GetLabelNameAliases().end());
std::sort(names.begin(), names.end());
size_t index = 0;
for (auto& n : names) {
Expand Down Expand Up @@ -250,7 +258,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
ParsePointsCount(metric, pointsCount);
CompletedMetricsCount++;

TryRequestData();
while (TryRequestData()) {}
}

void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
Expand Down Expand Up @@ -368,9 +376,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
items[it->second] = dictValue;
}

for (const auto& c : ReadParams.Source.GetLabelNames()) {
for (const auto& c : ReadParams.Source.GetLabelNameAliases()) {
auto& v = items[Index[c]];
auto it = labels.find(c);
auto it = labels.find(AliasIndex[c]);
if (it != labels.end()) {
v = NKikimr::NMiniKQL::MakeString(it->second);
} else {
Expand Down Expand Up @@ -438,6 +446,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct

bool TryRequestPointsCount() {
TryRequestMetrics();

if (ListedMetrics.empty()) {
return false;
}
Expand All @@ -463,12 +472,19 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
});
}

void TryRequestData() {
bool TryRequestData() {
TryRequestPointsCount();
while (!MetricsWithTimeRange.empty()) {
RequestData();
TryRequestPointsCount();

if (MetricsWithTimeRange.empty()) {
return false;
}

if (CurrentInflight >= MaxInflight) {
return false;
}

RequestData();
return true;
}

void RequestData() {
Expand All @@ -477,6 +493,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct

auto request = MetricsWithTimeRange.back();
MetricsWithTimeRange.pop_back();
CurrentInflight++;

if (UseMetricsQueue) {
dataRequestFuture = SolomonClient->GetData(request.Selectors, request.From, request.To);
Expand Down Expand Up @@ -548,6 +565,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

PendingDataRequests_.erase(request);
CurrentInflight--;

if (batch.Response.Status != NSo::EStatus::STATUS_OK) {
TIssues issues { TIssue(batch.Response.Error) };
Expand Down Expand Up @@ -579,6 +597,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
const TInstant TrueRangeFrom;
const TInstant TrueRangeTo;
const ui64 MetricsQueueConsumersCountDelta;
const ui64 MaxInflight;
IRetryPolicy<NSo::TGetDataResponse>::TPtr RetryPolicy;

bool UseMetricsQueue;
Expand All @@ -596,6 +615,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
ui64 CompletedMetricsCount = 0;
ui64 ListedTimeRanges = 0;
ui64 CompletedTimeRanges = 0;
ui64 CurrentInflight = 0;
const ui64 MaxPointsPerOneRequest = 10000;

TString SourceId;
Expand All @@ -604,6 +624,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
TType* DictType = nullptr;
std::vector<size_t> SystemColumnPositionIndex;
THashMap<TString, size_t> Index;
THashMap<TString, TString> AliasIndex;
};


Expand Down Expand Up @@ -649,6 +670,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
truePointsFindRange = FromString<ui64>(it->second);
}

ui64 maxInflight = 40;
if (auto it = settings.find("maxApiInflight"); it != settings.end()) {
maxInflight = FromString<ui64>(it->second);
}

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
auto credentialsProvider = credentialsProviderFactory->CreateProvider();

Expand All @@ -663,6 +689,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
computeActorBatchSize,
TDuration::Seconds(truePointsFindRange),
metricsQueueConsumersCountDelta,
maxInflight,
metricsQueueActor,
counters,
credentialsProvider);
Expand Down
25 changes: 24 additions & 1 deletion ydb/library/yql/providers/solomon/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ THolder<re2::RE2> CompileRE2WithCheck(const std::string& pattern) {
}

const TString LABEL_NAME_PATTERN = R"( *[a-zA-Z0-9-._/]{1,50} *)";
const TString LABEL_VALUE_PATTERN = R"( *"[ -!#-&(-)+->@-_a-{}-~]{1,200}" *)";
const TString LABEL_VALUE_PATTERN = R"( *"[ -!#-&(-)+->@-_a-{}-~*|-]{1,200}" *)";

const TString SENSOR_NAME_PATTERN = "(" + LABEL_VALUE_PATTERN + ")?({.*})";
THolder<re2::RE2> SENSOR_NAME_RE = CompileRE2WithCheck(SENSOR_NAME_PATTERN);
Expand All @@ -28,6 +28,9 @@ THolder<re2::RE2> SELECTOR_RE = CompileRE2WithCheck(SELECTOR_PATTERN);
const TString SELECTORS_FULL_PATTERN = "{((" + SELECTOR_PATTERN + ",)*" + SELECTOR_PATTERN + ")?}";
THolder<re2::RE2> SELECTORS_FULL_RE = CompileRE2WithCheck(SELECTORS_FULL_PATTERN);

const TString USER_LABELS_PATTERN = "(" + LABEL_NAME_PATTERN + ")(?: (?i:as) (" + LABEL_NAME_PATTERN + "))?";
THolder<re2::RE2> USER_LABELS_RE = CompileRE2WithCheck(USER_LABELS_PATTERN);

TMaybe<TString> InsertOrCheck(std::map<TString, TString>& selectors, const TString& name, const TString& value) {
auto [it, inserted] = selectors.emplace(name, value);
if (!inserted && it->second != value) {
Expand Down Expand Up @@ -101,6 +104,26 @@ TMaybe<TString> BuildSelectorValues(const NSo::NProto::TDqSolomonSource& source,
return {};
}

TMaybe<TString> ParseLabelNames(const TString& labelNames, TVector<TString>& names, TVector<TString>& aliases) {
auto labels = StringSplitter(labelNames).Split(',').SkipEmpty().ToList<TString>();
names.reserve(labels.size());
aliases.reserve(labels.size());

for (TString& label : labels) {
TString name;
std::optional<TString> alias;

if (!RE2::FullMatch(label, *USER_LABELS_RE, &name, &alias)) {
return "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format";
}

names.push_back(StripString(name));
aliases.push_back(StripString(alias ? *alias : name));
}

return {};
}

NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
switch (clusterType) {
case TSolomonClusterConfig::SCT_SOLOMON:
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/solomon/common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace NYql::NSo {

TMaybe<TString> ParseSelectorValues(const TString& selectors, std::map<TString, TString>& result);
TMaybe<TString> BuildSelectorValues(const NSo::NProto::TDqSolomonSource& source, const TString& selectors, std::map<TString, TString>& result);

TMaybe<TString> ParseLabelNames(const TString& labelNames, TVector<TString>& names, TVector<TString>& aliases);

NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType);

Expand Down
94 changes: 94 additions & 0 deletions ydb/library/yql/providers/solomon/common/util_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,98 @@ Y_UNIT_TEST_SUITE(TestSolomonParseSelectors) {
}
}

Y_UNIT_TEST_SUITE(TestSolomonParseLabelNames) {
Y_UNIT_TEST(Basic) {
TString labelNames = "label1, label2";
TVector<TString> names;
TVector<TString> aliases;

TVector<TString> expectedNames = {
"label1", "label2"
};
TVector<TString> expectedAliases = {
"label1", "label2"
};

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
UNIT_ASSERT_EQUAL(names, expectedNames);
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
}

Y_UNIT_TEST(WithAliases) {
TString labelNames = "label1 as alias1, label2 as alias2";
TVector<TString> names;
TVector<TString> aliases;

TVector<TString> expectedNames = {
"label1", "label2"
};
TVector<TString> expectedAliases = {
"alias1", "alias2"
};

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
UNIT_ASSERT_EQUAL(names, expectedNames);
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
}

Y_UNIT_TEST(OneAlias) {
TString labelNames = "label1, label2 as alias2, label3";
TVector<TString> names;
TVector<TString> aliases;

TVector<TString> expectedNames = {
"label1", "label2", "label3"
};
TVector<TString> expectedAliases = {
"label1", "alias2", "label3"
};

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
UNIT_ASSERT_EQUAL(names, expectedNames);
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
}

Y_UNIT_TEST(CaseSensitivity) {
TString labelNames = "label1, label2 AS alias2, label3";
TVector<TString> names;
TVector<TString> aliases;

TVector<TString> expectedNames = {
"label1", "label2", "label3"
};
TVector<TString> expectedAliases = {
"label1", "alias2", "label3"
};

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), TMaybe<TString>{});
UNIT_ASSERT_EQUAL(names, expectedNames);
UNIT_ASSERT_EQUAL(aliases, expectedAliases);
}

Y_UNIT_TEST(InvalidLabelName) {
TString labelNames = "{}, {}";
TVector<TString> names;
TVector<TString> aliases;

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
}

Y_UNIT_TEST(NoAs) {
TString labelNames = "label1 alias1";
TVector<TString> names;
TVector<TString> aliases;

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
}

Y_UNIT_TEST(EmptyAlias) {
TString labelNames = "label1 as, label2";
TVector<TString> names;
TVector<TString> aliases;

UNIT_ASSERT_EQUAL(ParseLabelNames(labelNames, names, aliases), "Label names should be specified in \"label1 [as alias1], label2 [as alias2], ...\" format");
}
}

} // namespace NYql::NSo
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
{"Index": 12, "Name": "DownsamplingFill", "Type": "TCoAtom"},
{"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"},
{"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
{"Index": 15, "Name": "TotalMetricsCount", "Type": "TCoAtom"}
{"Index": 15, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
{"Index": 16, "Name": "LabelNameAliases", "Type": "TCoAtomList"}
]
},
{
Expand All @@ -85,10 +86,11 @@
{"Index": 2, "Name": "Object", "Type": "TSoObject"},
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
{"Index": 5, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
{"Index": 6, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
{"Index": 7, "Name": "RowType", "Type": "TExprBase"},
{"Index": 8, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
{"Index": 5, "Name": "LabelNameAliases", "Type": "TCoAtomList"},
{"Index": 6, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
{"Index": 7, "Name": "TotalMetricsCount", "Type": "TCoAtom"},
{"Index": 8, "Name": "RowType", "Type": "TExprBase"},
{"Index": 9, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ message TDqSolomonSource {
string GrpcEndpoint = 17;
optional string Cluster = 18;
optional string Service = 19;
repeated string LabelNameAliases = 20;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
}

TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 16, ctx)) {
if (!EnsureArgsCount(*input, 17, ctx)) {
return TStatus::Error;
}

Expand Down Expand Up @@ -71,6 +71,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
return TStatus::Error;
}

auto& labelNameAliases = *input->Child(TSoSourceSettings::idx_LabelNameAliases);
if (!EnsureTupleOfAtoms(labelNameAliases, ctx)) {
return TStatus::Error;
}

auto& requiredLabelNames = *input->Child(TSoSourceSettings::idx_RequiredLabelNames);
if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -155,7 +160,7 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
}

TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureMinMaxArgsCount(*input, 8U, 9U, ctx)) {
if (!EnsureMinMaxArgsCount(*input, 9U, 10U, ctx)) {
return TStatus::Error;
}

Expand All @@ -177,6 +182,11 @@ class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBa
return TStatus::Error;
}

auto& labelNameAliases = *input->Child(TSoReadObject::idx_LabelNameAliases);
if (!EnsureTupleOfAtoms(labelNameAliases, ctx)) {
return TStatus::Error;
}

auto& requiredLabelNames = *input->Child(TSoReadObject::idx_RequiredLabelNames);
if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
return TStatus::Error;
Expand Down
Loading
Loading