diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h index 3bb8c20d1ab8..4597917dd2d8 100644 --- a/ydb/library/yql/providers/pq/common/yql_names.h +++ b/ydb/library/yql/providers/pq/common/yql_names.h @@ -17,6 +17,7 @@ constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs"; constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs"; constexpr TStringBuf WatermarksIdleTimeoutUsSetting = "WatermarksIdleTimeoutUs"; constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions"; +constexpr TStringBuf WatermarksLateEventsPolicySetting = "WatermarksLateEventsPolicy"; constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod"; constexpr TStringBuf ReadGroup = "ReadGroup"; constexpr TStringBuf SkipJsonErrors = "SkipJsonErrors"; diff --git a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json index d878cbb5d9b8..1f4e2a2b3122 100644 --- a/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json +++ b/ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json @@ -58,8 +58,8 @@ {"Index": 3, "Name": "Columns", "Type": "TExprBase"}, {"Index": 4, "Name": "Format", "Type": "TCoAtom"}, {"Index": 5, "Name": "Compression", "Type": "TCoAtom"}, - {"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true}, - {"Index": 7, "Name": "Settings", "Type": "TExprList", "Optional": true}, + {"Index": 6, "Name": "LimitHint", "Type": "TExprBase"}, + {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"}, {"Index": 8, "Name": "Watermark", "Type": "TCoLambda", "Optional": true} ] }, diff --git a/ydb/library/yql/providers/pq/provider/ya.make b/ydb/library/yql/providers/pq/provider/ya.make index 6030bebb1d55..c910a5f4a47c 100644 --- a/ydb/library/yql/providers/pq/provider/ya.make +++ b/ydb/library/yql/providers/pq/provider/ya.make @@ -23,21 +23,11 @@ SRCS( PEERDIR( library/cpp/random_provider library/cpp/time_provider - yql/essentials/ast - yql/essentials/core - yql/essentials/core/type_ann + ydb/library/yql/dq/expr_nodes - yql/essentials/core/dq_integration ydb/library/yql/dq/opt - yql/essentials/minikql/comp_nodes - yql/essentials/providers/common/config ydb/library/yql/providers/common/db_id_async_resolver - yql/essentials/providers/common/dq - yql/essentials/providers/common/proto - yql/essentials/providers/common/provider ydb/library/yql/providers/common/pushdown - yql/essentials/providers/common/structured_token - yql/essentials/providers/common/transform ydb/library/yql/providers/dq/common ydb/library/yql/providers/dq/expr_nodes ydb/library/yql/providers/dq/provider/exec @@ -46,9 +36,22 @@ PEERDIR( ydb/library/yql/providers/pq/common ydb/library/yql/providers/pq/expr_nodes ydb/library/yql/providers/pq/proto + ydb/public/sdk/cpp/src/client/driver + + yql/essentials/ast + yql/essentials/core + yql/essentials/core/type_ann + yql/essentials/core/dq_integration + yql/essentials/minikql + yql/essentials/minikql/comp_nodes + yql/essentials/providers/common/config + yql/essentials/providers/common/dq + yql/essentials/providers/common/proto + yql/essentials/providers/common/provider + yql/essentials/providers/common/structured_token + yql/essentials/providers/common/transform yql/essentials/providers/result/expr_nodes yql/essentials/public/udf - ydb/public/sdk/cpp/src/client/driver ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp index a35f720675b5..dd12b4b0f1d7 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp @@ -122,91 +122,110 @@ class TPqDataSourceProvider : public TDataProviderBase { .Metadata().Add(sourceMetadata).Build() .Done(); + TExprNode::TPtr columns; + if (auto columnOrder = topicKeyParser.GetColumnOrder()) { + columns = std::move(columnOrder); + } else { + columns = Build(ctx, read.Pos()).Done().Ptr(); + } + auto format = topicKeyParser.GetFormat(); if (format.empty()) { format = "raw"; } - auto settings = Build(ctx, read.Pos()); + auto settings = Build(ctx, read.Pos()); + bool hasDateTimeFormat = false; bool hasDateTimeFormatName = false; - bool hasTimestampFormat = false; - bool hasTimestampFormatName = false; - if (topicKeyParser.GetDateTimeFormatName()) { - settings.Add(topicKeyParser.GetDateTimeFormatName()); - hasDateTimeFormatName = true; - if (!NCommon::ValidateDateTimeFormatName(topicKeyParser.GetDateTimeFormatName()->Child(1)->Content(), ctx)) { + if (auto dateTimeFormatName = topicKeyParser.GetDateTimeFormatName()) { + if (!NCommon::ValidateDateTimeFormatName(dateTimeFormatName->Child(1)->Content(), ctx)) { return nullptr; } + settings.Add(std::move(dateTimeFormatName)); + hasDateTimeFormatName = true; } - if (topicKeyParser.GetDateTimeFormat()) { - settings.Add(topicKeyParser.GetDateTimeFormat()); + if (auto dateTimeFormat = topicKeyParser.GetDateTimeFormat()) { + settings.Add(std::move(dateTimeFormat)); hasDateTimeFormat = true; } - if (topicKeyParser.GetTimestampFormatName()) { - settings.Add(topicKeyParser.GetTimestampFormatName()); - hasTimestampFormatName = true; - if (!NCommon::ValidateTimestampFormatName(topicKeyParser.GetTimestampFormatName()->Child(1)->Content(), ctx)) { + if (hasDateTimeFormat && hasDateTimeFormatName) { + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together")); + return nullptr; + } + + if (!hasDateTimeFormat && !hasDateTimeFormatName) { + settings.Add() + .Add().Build("data.datetime.formatname") + .Add().Build("POSIX") + .Build(); + } + + bool hasTimestampFormat = false; + bool hasTimestampFormatName = false; + if (auto timestampFormatName = topicKeyParser.GetTimestampFormatName()) { + if (!NCommon::ValidateTimestampFormatName(timestampFormatName->Child(1)->Content(), ctx)) { return nullptr; } + settings.Add(std::move(timestampFormatName)); + hasTimestampFormatName = true; } - if (topicKeyParser.GetTimestampFormat()) { - settings.Add(topicKeyParser.GetTimestampFormat()); + if (auto timestampFormat = topicKeyParser.GetTimestampFormat()) { + settings.Add(std::move(timestampFormat)); hasTimestampFormat = true; } - if (hasDateTimeFormat && hasDateTimeFormatName) { - ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together")); - return nullptr; - } - if (hasTimestampFormat && hasTimestampFormatName) { ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together")); return nullptr; } - if (!hasDateTimeFormat && !hasDateTimeFormatName) { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(read.Pos(), "data.datetime.formatname")); - pair.push_back(ctx.NewAtom(read.Pos(), "POSIX")); - settings.Add(ctx.NewList(read.Pos(), std::move(pair))); + if (!hasTimestampFormat && !hasTimestampFormatName) { + settings.Add() + .Add().Build("data.timestamp.formatname") + .Add().Build("POSIX") + .Build(); } - if (!hasTimestampFormat && !hasTimestampFormatName) { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(read.Pos(), "data.timestamp.formatname")); - pair.push_back(ctx.NewAtom(read.Pos(), "POSIX")); - settings.Add(ctx.NewList(read.Pos(), std::move(pair))); + if (auto dateFormat = topicKeyParser.GetDateFormat()) { + settings.Add(std::move(dateFormat)); } - if (topicKeyParser.GetDateFormat()) { - settings.Add(topicKeyParser.GetDateFormat()); + if (auto watermarkAdjustLateEvents = topicKeyParser.GetWatermarkAdjustLateEvents()) { + settings.Add(std::move(watermarkAdjustLateEvents)); } - if (topicKeyParser.GetSkipJsonErrors()) { - settings.Add(topicKeyParser.GetSkipJsonErrors()); + if (auto watermarkDropLateEvents = topicKeyParser.GetWatermarkDropLateEvents()) { + settings.Add(std::move(watermarkDropLateEvents)); + } + + if (auto watermarkGranularity = topicKeyParser.GetWatermarkGranularity()) { + settings.Add(std::move(watermarkGranularity)); + } + + if (auto watermarkIdleTimeout = topicKeyParser.GetWatermarkIdleTimeout()) { + settings.Add(std::move(watermarkIdleTimeout)); + } + + if (auto skipJsonErrors = topicKeyParser.GetSkipJsonErrors()) { + settings.Add(std::move(skipJsonErrors)); } auto builder = Build(ctx, read.Pos()) .World(read.World()) .DataSource(read.DataSource()) .Topic(std::move(topicNode)) + .Columns(std::move(columns)) .Format().Value(format).Build() .Compression().Value(topicKeyParser.GetCompression()).Build() .LimitHint().Build() .Settings(settings.Done()); - if (topicKeyParser.GetColumnOrder()) { - builder.Columns(topicKeyParser.GetColumnOrder()); - } else { - builder.Columns().Build(); - } - - if (topicKeyParser.GetWatermark()) { - builder.Watermark(topicKeyParser.GetWatermark()); + if (auto watermark = topicKeyParser.GetWatermark()) { + builder.Watermark(std::move(watermark)); } return Build(ctx, read.Pos()) diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index 61f93f41da13..47faa5990014 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -173,6 +173,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { const auto columns = input->Child(TPqReadTopic::idx_Columns); const auto format = input->Child(TPqReadTopic::idx_Format); const auto compression = input->Child(TPqReadTopic::idx_Compression); + const auto settings = input->Child(TPqReadTopic::idx_Settings); if (!EnsureWorldType(*world, ctx)) { return TStatus::Error; @@ -214,6 +215,20 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } + if (!EnsureTuple(*settings, ctx)) { + return TStatus::Error; + } + + for (const auto& setting : settings->Children()) { + if (!EnsureTupleMinSize(*setting, 1, ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(setting->Head(), ctx)) { + return TStatus::Error; + } + } + if (TPqReadTopic::idx_Watermark < input->ChildrenSize()) { auto& watermark = input->ChildRef(TPqReadTopic::idx_Watermark); const auto status = ConvertToLambda(watermark, ctx, 1, 1); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 7034f03a86f7..df40d0c72bf1 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -81,7 +81,6 @@ class TPqDqIntegration: public TDqIntegrationBase { ->Cast()->GetItems().back()->Cast() ->GetItemType()->Cast(); const auto& clusterName = pqReadTopic.DataSource().Cluster().StringValue(); - const auto format = pqReadTopic.Format().Ref().Content(); const auto token = "cluster:default_" + clusterName; const auto& typeItems = pqReadTopic.Topic().RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetItems(); @@ -95,11 +94,17 @@ class TPqDqIntegration: public TDqIntegrationBase { }); auto columnNames = ctx.NewList(pos, std::move(colNames)); + auto settings = BuildTopicReadSettings(pqReadTopic, ctx, wrSettings); + if (!settings) { + return {}; + } + TString serializedWatermarkExpr; if (const auto maybeWatermark = pqReadTopic.Watermark()) { const auto watermark = maybeWatermark.Cast(); + const auto enableWatermarks = wrSettings.WatermarksMode.GetOrElse("") == "default"; - if (wrSettings.WatermarksMode.GetOrElse("") != "default") { + if (!enableWatermarks) { ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), R"(Enable watermarks using "PRAGMA dq.WatermarksMode="default";")")); return {}; } @@ -116,32 +121,12 @@ class TPqDqIntegration: public TDqIntegrationBase { } } - bool skipJsonErrors = false; - if (auto settingsList = pqReadTopic.Settings().Maybe()) { - for (const TExprNode::TPtr& s : pqReadTopic.Settings().Raw()->Children()) { - if (s->ChildrenSize() >= 2 && s->Child(0)->Content() == "skip.json.errors"sv) { - if (!TryFromString(s->Child(1)->Content(), skipJsonErrors)) { - ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), R"("skip.json.errors" must be boolean type))")); - return {}; - } - } - } - } - - if (skipJsonErrors) { - auto clusterConfiguration = GetClusterConfiguration(clusterName); - if (!UseSharedReading(clusterConfiguration, format) ) { - ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), R"("skip.json.errors" is supported only in shared reading mode))")); - return {}; - } - } - return Build(ctx, pos) .Input() .World(pqReadTopic.World()) .Topic(pqReadTopic.Topic()) .Columns(std::move(columnNames)) - .Settings(BuildTopicReadSettings(clusterName, wrSettings, pos, format, skipJsonErrors, ctx)) + .Settings(std::move(settings)) .Token() .Name().Build(token) .Build() @@ -411,21 +396,20 @@ class TPqDqIntegration: public TDqIntegrationBase { return Nothing(); } - NNodes::TCoNameValueTupleList BuildTopicReadSettings( - const TString& cluster, - const IDqIntegration::TWrapReadSettings& wrSettings, - TPositionHandle pos, - std::string_view format, - bool skipJsonErrors, - TExprContext& ctx) const - { + TExprNode::TPtr BuildTopicReadSettings( + const TPqReadTopic& pqReadTopic, + TExprContext& ctx, + const IDqIntegration::TWrapReadSettings& wrSettings + ) const { + const auto pos = pqReadTopic.Pos(); + const auto& cluster = pqReadTopic.DataSource().Cluster().StringValue(); + const auto format = pqReadTopic.Format().Ref().Content(); + const auto& settings = pqReadTopic.Settings(); + TVector props; - { - TMaybe consumer = State_->Configuration->Consumer.Get(); - if (consumer) { - Add(props, ConsumerSetting, *consumer, pos, ctx); - } + if (auto consumer = State_->Configuration->Consumer.Get()) { + Add(props, ConsumerSetting, *consumer, pos, ctx); } auto clusterConfiguration = GetClusterConfiguration(cluster); @@ -435,9 +419,6 @@ class TPqDqIntegration: public TDqIntegrationBase { Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx); Add(props, Format, format, pos, ctx); Add(props, ReadGroup, clusterConfiguration->ReadGroup, pos, ctx); - if (skipJsonErrors) { - Add(props, SkipJsonErrors, ToString(skipJsonErrors), pos, ctx); - } if (clusterConfiguration->UseSsl) { Add(props, UseSslSetting, "1", pos, ctx); @@ -447,11 +428,128 @@ class TPqDqIntegration: public TDqIntegrationBase { Add(props, AddBearerToTokenSetting, "1", pos, ctx); } + TMaybe watermarksLateEventsPolicy; + TMaybe watermarksGranularityMs; + TMaybe watermarksIdleTimeoutMs; + for (const auto& setting : settings.Raw()->Children()) { + const auto settingName = setting->Child(0)->Content(); + if ("skip.json.errors" == settingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "Expected `skip.json.errors` = value")); + return {}; + } + const auto settingValue = setting->Child(1); + if (!EnsureAtom(*settingValue, ctx)) { + return {}; + } + bool skipJsonErrors = true; + if (!TryFromString(settingValue->Content(), skipJsonErrors)) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "`skip.json.errors` must be boolean type")); + return {}; + } + if (!skipJsonErrors) { + continue; + } + if (!UseSharedReading(clusterConfiguration, format)) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "`skip.json.errors` is supported only in shared reading mode")); + return {}; + } + + Add(props, SkipJsonErrors, ToString(skipJsonErrors), pos, ctx); + } else if ("watermarkadjustlateevents" == settingName) { + if (setting->ChildrenSize() > 2) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "Expected WATERMARK_ADJUST_LATE_EVENTS (= false|true)")); + return {}; + } + bool watermarkAdjustLateEvents = true; + if (setting->ChildrenSize() == 2) { + const auto settingValue = setting->Child(1); + if (!EnsureAtom(*settingValue, ctx)) { + return {}; + } + if (!TryFromString(settingValue->Content(), watermarkAdjustLateEvents)) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "WATERMARK_ADJUST_LATE_EVENTS must be boolean type")); + return {}; + } + } + if (!watermarkAdjustLateEvents) { + continue; + } + if (!watermarksLateEventsPolicy.Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), + TStringBuilder() << "Cannot adjust and " << *watermarksLateEventsPolicy << " late events at the same time")); + return {}; + } + + watermarksLateEventsPolicy = "adjust"; + } else if ("watermarkdroplateevents" == settingName) { + if (setting->ChildrenSize() > 2) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "Expected WATERMARK_DROP_LATE_EVENTS (= false|true)")); + return {}; + } + bool watermarkDropLateEvents = true; + if (setting->ChildrenSize() == 2) { + const auto settingValue = setting->Child(1); + if (!EnsureAtom(*settingValue, ctx)) { + return {}; + } + if (!TryFromString(settingValue->Content(), watermarkDropLateEvents)) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "WATERMARK_DROP_LATE_EVENTS must be boolean type")); + return {}; + } + } + if (!watermarkDropLateEvents) { + continue; + } + if (!watermarksLateEventsPolicy.Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), + TStringBuilder() << "Cannot drop and " << *watermarksLateEventsPolicy << " late events at the same time")); + return {}; + } + + watermarksLateEventsPolicy = "drop"; + } else if ("watermarkgranularity" == settingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "Expected WATERMARK_GRANULARITY = value")); + return {}; + } + const auto settingValue = setting->Child(1); + if (!EnsureAtom(*settingValue, ctx)) { + return {}; + } + const auto out = NKikimr::NMiniKQL::ValueFromString(NUdf::EDataSlot::Interval, settingValue->Content()); + if (!out) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), + TStringBuilder() << "Invalid value " << settingValue->Content() << " for WATERMARK_GRANULARITY")); + return {}; + } + + watermarksGranularityMs = TDuration::MicroSeconds(out.Get()).MilliSeconds(); + } else if ("watermarkidletimeout" == settingName) { + if (setting->ChildrenSize() != 2) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), "Expected WATERMARK_IDLE_TIMEOUT = value")); + return {}; + } + const auto settingValue = setting->Child(1); + if (!EnsureAtom(*settingValue, ctx)) { + return {}; + } + const auto out = NKikimr::NMiniKQL::ValueFromString(NUdf::EDataSlot::Interval, settingValue->Content()); + if (!out) { + ctx.AddError(TIssue(ctx.GetPosition(pqReadTopic.Pos()), + TStringBuilder() << "Invalid value " << settingValue->Content() << " for WATERMARK_IDLE_TIMEOUT")); + return {}; + } + + watermarksIdleTimeoutMs = TDuration::MicroSeconds(out.Get()).MilliSeconds(); + } + } + if (wrSettings.WatermarksMode.GetOrElse("") == "default") { Add(props, WatermarksEnableSetting, ToString(true), pos, ctx); - const auto granularity = TDuration::MilliSeconds(wrSettings - .WatermarksGranularityMs + const auto granularity = TDuration::MilliSeconds(watermarksGranularityMs + .OrElse(wrSettings.WatermarksGranularityMs) .GetOrElse(TDqSettings::TDefault::WatermarksGranularityMs)); Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx); @@ -460,19 +558,23 @@ class TPqDqIntegration: public TDqIntegrationBase { .GetOrElse(TDqSettings::TDefault::WatermarksLateArrivalDelayMs)); Add(props, WatermarksLateArrivalDelayUsSetting, ToString(lateArrivalDelay.MicroSeconds()), pos, ctx); + const auto lateEventsPolicy = watermarksLateEventsPolicy + .GetOrElse("adjust"); + Add(props, WatermarksLateEventsPolicySetting, lateEventsPolicy, pos, ctx); } if (wrSettings.WatermarksEnableIdlePartitions.GetOrElse(false)) { Add(props, WatermarksIdlePartitionsSetting, ToString(true), pos, ctx); - const auto idleTimeout = TDuration::MilliSeconds(wrSettings - .WatermarksIdleTimeoutMs + + const auto idleTimeout = TDuration::MilliSeconds(watermarksIdleTimeoutMs + .OrElse(wrSettings.WatermarksIdleTimeoutMs) .GetOrElse(TDqSettings::TDefault::WatermarksIdleTimeoutMs)); Add(props, WatermarksIdleTimeoutUsSetting, ToString(idleTimeout.MicroSeconds()), pos, ctx); } return Build(ctx, pos) .Add(props) - .Done(); + .Done().Ptr(); } NNodes::TCoNameValueTupleList BuildDqSourceWrapSettings(const TPqReadTopic& pqReadTopic, TPositionHandle pos, TExprContext& ctx) const { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp index eec209b95b0f..9524f48c5cee 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.cpp @@ -71,6 +71,22 @@ bool TTopicKeyParser::Parse(const TExprNode& expr, TExprNode::TPtr readSettings, DateFormat = readSettings->Child(i); continue; } + if (readSettings->Child(i)->Head().IsAtom("watermarkadjustlateevents")) { + WatermarkAdjustLateEvents = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("watermarkdroplateevents")) { + WatermarkDropLateEvents = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("watermarkgranularity")) { + WatermarkGranularity = readSettings->Child(i); + continue; + } + if (readSettings->Child(i)->Head().IsAtom("watermarkidletimeout")) { + WatermarkIdleTimeout = readSettings->Child(i); + continue; + } if (readSettings->Child(i)->Head().IsAtom("watermark")) { Watermark = readSettings->Child(i)->ChildPtr(1); continue; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h index 8f4674071b3d..e74d509850ad 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_key_parser.h @@ -50,6 +50,22 @@ class TTopicKeyParser { return DateFormat; } + TExprNode::TPtr GetWatermarkAdjustLateEvents() const { + return WatermarkAdjustLateEvents; + } + + TExprNode::TPtr GetWatermarkDropLateEvents() const { + return WatermarkDropLateEvents; + } + + TExprNode::TPtr GetWatermarkGranularity() const { + return WatermarkGranularity; + } + + TExprNode::TPtr GetWatermarkIdleTimeout() const { + return WatermarkIdleTimeout; + } + TExprNode::TPtr GetWatermark() const { return Watermark; } @@ -75,6 +91,10 @@ class TTopicKeyParser { TExprNode::TPtr DateFormat; TExprNode::TPtr UserSchema; TExprNode::TPtr ColumnOrder; + TExprNode::TPtr WatermarkAdjustLateEvents; + TExprNode::TPtr WatermarkDropLateEvents; + TExprNode::TPtr WatermarkGranularity; + TExprNode::TPtr WatermarkIdleTimeout; TExprNode::TPtr Watermark; TExprNode::TPtr SkipJsonErrors; }; diff --git a/ydb/tests/fq/streaming_optimize/canondata/result.json b/ydb/tests/fq/streaming_optimize/canondata/result.json index a6e9dbfe9651..bd426686daf7 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/result.json +++ b/ydb/tests/fq/streaming_optimize/canondata/result.json @@ -232,5 +232,29 @@ "Plan": { "uri": "file://test_sql_streaming.test_watermarks-watermarks-default.txt_/plan.json" } + }, + "test_sql_streaming.test[watermarks-watermarks_adjust-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/plan.json" + } + }, + "test_sql_streaming.test[watermarks-watermarks_as-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_as-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_as-default.txt_/plan.json" + } + }, + "test_sql_streaming.test[watermarks-watermarks_drop-default.txt]": { + "Ast": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/ast.txt" + }, + "Plan": { + "uri": "file://test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/plan.json" + } } } diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/ast.txt index df020508465a..93718bf413f8 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/ast.txt @@ -18,7 +18,7 @@ (let $17 '('"Endpoint" '"")) (let $18 '('"SharedReading" '"1")) (let $19 '('"UseSsl" '"1")) -(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $21 (SecureParam '"cluster:default_pq")) (let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"")) (let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt index df020508465a..93718bf413f8 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt @@ -18,7 +18,7 @@ (let $17 '('"Endpoint" '"")) (let $18 '('"SharedReading" '"1")) (let $19 '('"UseSsl" '"1")) -(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $20 '('('"Consumer" '"test_client") $17 $18 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $19 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $21 (SecureParam '"cluster:default_pq")) (let $22 (DqPqTopicSource $6 $15 $16 $20 $21 '"" $14 '"")) (let $23 (DqStage '((DqSource $7 $22)) (lambda '($27) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/ast.txt index d96fdc59a893..085e04f912c8 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/ast.txt @@ -16,7 +16,7 @@ (let $15 '('"Endpoint" '"")) (let $16 '('"SharedReading" '"1")) (let $17 '('"UseSsl" '"1")) -(let $18 '('('"Consumer" '"test_client") $15 $16 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $17 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $18 '('('"Consumer" '"test_client") $15 $16 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $17 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $19 (SecureParam '"cluster:default_pq")) (let $20 (DqPqTopicSource $6 $13 $14 $18 $19 '"" $12 '"")) (let $21 (DqStage '((DqSource $7 $20)) (lambda '($25) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/ast.txt index 3d10d7f5e2b1..8b04407107e0 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/ast.txt @@ -16,7 +16,7 @@ (let $15 '('"Endpoint" '"")) (let $16 '('"SharedReading" '1)) (let $17 '('"UseSsl" '1)) -(let $18 '('('"Consumer" '"test_client") $15 $16 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $17 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $18 '('('"Consumer" '"test_client") $15 $16 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $17 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $19 (SecureParam '"cluster:default_pq")) (let $20 (DqPqTopicSource $6 $13 $14 $18 $19 '"" $12 '"")) (let $21 (DqStage '((DqSource $7 $20)) (lambda '($25) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/ast.txt index 16447d13ceb5..c1f47712b8a6 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/ast.txt @@ -15,7 +15,7 @@ (let $14 '('"Endpoint" '"")) (let $15 '('"SharedReading" '"1")) (let $16 '('"UseSsl" '"1")) -(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $18 (SecureParam '"cluster:default_pq")) (let $19 (DqPqTopicSource $6 $13 '('"t" '"v") $17 $18 '"" $12 '"")) (let $20 (DqStage '((DqSource $7 $19)) (lambda '($24) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/ast.txt index 7ed2e1c77831..f51632447b41 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/ast.txt @@ -15,7 +15,7 @@ (let $14 '('"Endpoint" '"")) (let $15 '('"SharedReading" '1)) (let $16 '('"UseSsl" '1)) -(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $18 (SecureParam '"cluster:default_pq")) (let $19 (DqPqTopicSource $6 $13 '('"t" '"v") $17 $18 '"" $12 '"")) (let $20 (DqStage '((DqSource $7 $19)) (lambda '($25) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt index 7ed2e1c77831..f51632447b41 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt @@ -15,7 +15,7 @@ (let $14 '('"Endpoint" '"")) (let $15 '('"SharedReading" '1)) (let $16 '('"UseSsl" '1)) -(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) +(let $17 '('('"Consumer" '"test_client") $14 $15 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") $16 '('"WatermarksEnable" '1) '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000") '('"WatermarksLateEventsPolicy" '"adjust"))) (let $18 (SecureParam '"cluster:default_pq")) (let $19 (DqPqTopicSource $6 $13 '('"t" '"v") $17 $18 '"" $12 '"")) (let $20 (DqStage '((DqSource $7 $19)) (lambda '($25) (block '( diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-default.txt_/ast.txt index 2ea4364f2ed4..412595bd51c2 100644 --- a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-default.txt_/ast.txt +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks-default.txt_/ast.txt @@ -1,20 +1,26 @@ ( (let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) -(let $2 (Configure! $1 (DataSource '"dq" '"$all") '"Attr" '"watermarksmode" '"default")) -(let $3 (Configure! $2 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) -(let $4 (DataSink 'result)) -(let $5 (DataSource '"pq" '"pq")) -(let $6 (StructType '('"ts" (DataType 'Timestamp)))) -(let $7 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $6)) -(let $8 '('"SharedReading" '"1")) -(let $9 '('('"Consumer" '"test_client") '('"Endpoint" '"") $8 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"1000000") '('"WatermarksLateArrivalDelayUs" '"5000000"))) -(let $10 (DqPqTopicSource world $7 '('"ts") $9 (SecureParam '"cluster:default_pq") '"" $6 '"B\x1F\n\x1D\x1A\x1B\b\x03\x12\x04\x12\x02ts\x1A\x11\n\x0F\n\x02\b3\x12\t!@KL\x00\x00\x00\x00\x00")) -(let $11 (DqStage '((DqSource $5 $10)) (lambda '($14) (block '( - (let $15 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($8)))) - (let $16 (DqSourceWideWrap $14 $5 $6 $15)) - (return (NarrowMap $16 (lambda '($17) (AsStruct '('"ts" $17))))) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"watermarksmode" '"default")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksgranularityms" '"4")) +(let $5 (Configure! $4 $2 '"Attr" '"watermarksidletimeoutms" '"5")) +(let $6 (Configure! $5 $2 '"Attr" '"watermarkslatearrivaldelayms" '"6")) +(let $7 (Configure! $6 $2 '"Attr" '"watermarksenableidlepartitions" '"true")) +(let $8 (Configure! $7 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $9 (DataSink 'result)) +(let $10 (DataSource '"pq" '"pq")) +(let $11 (StructType '('"ts" (DataType 'Timestamp)))) +(let $12 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $11)) +(let $13 '('"SharedReading" '"1")) +(let $14 '('('"Consumer" '"test_client") '('"Endpoint" '"") $13 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"2000000") '('"WatermarksLateArrivalDelayUs" '"6000") '('"WatermarksLateEventsPolicy" '"adjust") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"3000000"))) +(let $15 (DqPqTopicSource world $12 '('"ts") $14 (SecureParam '"cluster:default_pq") '"" $11 '"\x1A\x1B\b\x03\x12\x04\x12\x02ts\x1A\x11\n\x0F\n\x02\b3\x12\t!@KL\x00\x00\x00\x00\x00")) +(let $16 (DqStage '((DqSource $10 $15)) (lambda '($19) (block '( + (let $20 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkgranularity" '"PT2S") '('"watermarkidletimeout" '"PT3S"))) + (let $21 '('('"format" '"json_each_row") '('"formatSettings" $20) '('"settings" '($13)))) + (let $22 (DqSourceWideWrap $19 $10 $11 $21)) + (return (NarrowMap $22 (lambda '($23) (AsStruct '('"ts" $23))))) ))) '('('"_logical_id" '0)))) -(let $12 (DqStage '((DqCnUnionAll (TDqOutput $11 '"0"))) (lambda '($18) $18) '('('"_logical_id" '0)))) -(let $13 (ResPull! $3 $4 (Key) (DqCnResult (TDqOutput $12 '"0") '()) '('('type) '('autoref)) '"dq")) -(return (Commit! (Commit! $13 $4) (DataSink '"pq" '"pq"))) +(let $17 (DqStage '((DqCnUnionAll (TDqOutput $16 '"0"))) (lambda '($24) $24) '('('"_logical_id" '0)))) +(let $18 (ResPull! $8 $9 (Key) (DqCnResult (TDqOutput $17 '"0") '()) '('('type) '('autoref)) '"dq")) +(return (Commit! (Commit! $18 $9) (DataSink '"pq" '"pq"))) ) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt new file mode 100644 index 000000000000..198821b472d8 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/ast.txt @@ -0,0 +1,26 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"watermarksmode" '"default")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksgranularityms" '"4")) +(let $5 (Configure! $4 $2 '"Attr" '"watermarksidletimeoutms" '"5")) +(let $6 (Configure! $5 $2 '"Attr" '"watermarkslatearrivaldelayms" '"6")) +(let $7 (Configure! $6 $2 '"Attr" '"watermarksenableidlepartitions" '"true")) +(let $8 (Configure! $7 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $9 (DataSink 'result)) +(let $10 (DataSource '"pq" '"pq")) +(let $11 (StructType '('"ts" (DataType 'Timestamp)))) +(let $12 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $11)) +(let $13 '('"SharedReading" '"1")) +(let $14 '('('"Consumer" '"test_client") '('"Endpoint" '"") $13 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"2000000") '('"WatermarksLateArrivalDelayUs" '"6000") '('"WatermarksLateEventsPolicy" '"adjust") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"3000000"))) +(let $15 (DqPqTopicSource world $12 '('"ts") $14 (SecureParam '"cluster:default_pq") '"" $11 '"\x1A\x1B\b\x03\x12\x04\x12\x02ts\x1A\x11\n\x0F\n\x02\b3\x12\t!@KL\x00\x00\x00\x00\x00")) +(let $16 (DqStage '((DqSource $10 $15)) (lambda '($19) (block '( + (let $20 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkadjustlateevents") '('"watermarkdroplateevents" '"false") '('"watermarkgranularity" '"PT2S") '('"watermarkidletimeout" '"PT3S"))) + (let $21 '('('"format" '"json_each_row") '('"formatSettings" $20) '('"settings" '($13)))) + (let $22 (DqSourceWideWrap $19 $10 $11 $21)) + (return (NarrowMap $22 (lambda '($23) (AsStruct '('"ts" $23))))) +))) '('('"_logical_id" '0)))) +(let $17 (DqStage '((DqCnUnionAll (TDqOutput $16 '"0"))) (lambda '($24) $24) '('('"_logical_id" '0)))) +(let $18 (ResPull! $8 $9 (Key) (DqCnResult (TDqOutput $17 '"0") '()) '('('type) '('autoref)) '"dq")) +(return (Commit! (Commit! $18 $9) (DataSink '"pq" '"pq"))) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/plan.json new file mode 100644 index 000000000000..a0919980b586 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_adjust-default.txt_/plan.json @@ -0,0 +1,93 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 10, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 8, + "Name" : "DqStage", + "Streams" : { + "Program" : [ ] + }, + "DependsOn" : [ + 10 + ] + }, + { + "Id" : 3, + "Name" : "ResPull!", + "DependsOn" : [ + 8 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 3 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqStage" : 2, + "ResPull!" : 1 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 10, + "level" : 1, + "name" : "DqStage #10", + "type" : "op" + }, + { + "id" : 8, + "level" : 2, + "name" : "DqStage #8", + "type" : "op" + }, + { + "id" : 3, + "level" : 3, + "name" : "ResPull!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 10, + "target" : 8 + }, + { + "source" : 8, + "target" : 3 + }, + { + "source" : 3, + "target" : 1 + } + ] + } +} \ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/ast.txt new file mode 100644 index 000000000000..df238f27277e --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/ast.txt @@ -0,0 +1,25 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"watermarksmode" '"default")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksgranularityms" '"4")) +(let $5 (Configure! $4 $2 '"Attr" '"watermarksidletimeoutms" '"5")) +(let $6 (Configure! $5 $2 '"Attr" '"watermarkslatearrivaldelayms" '"6")) +(let $7 (Configure! $6 $2 '"Attr" '"watermarksenableidlepartitions" '"true")) +(let $8 (Configure! $7 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $9 (DataSink 'result)) +(let $10 (DataSource '"pq" '"pq")) +(let $11 (StructType '('"ts" (DataType 'Timestamp)))) +(let $12 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $11)) +(let $13 '('"SharedReading" '"1")) +(let $14 '('('"Consumer" '"test_client") '('"Endpoint" '"") $13 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"4000") '('"WatermarksLateArrivalDelayUs" '"6000") '('"WatermarksLateEventsPolicy" '"adjust") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"5000"))) +(let $15 (DqPqTopicSource world $12 '('"ts") $14 (SecureParam '"cluster:default_pq") '"" $11 '"B\x1F\n\x1D\x1A\x1B\b\x03\x12\x04\x12\x02ts\x1A\x11\n\x0F\n\x02\b3\x12\t!@KL\x00\x00\x00\x00\x00")) +(let $16 (DqStage '((DqSource $10 $15)) (lambda '($19) (block '( + (let $20 '('('"format" '"json_each_row") '('"formatSettings" '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX"))) '('"settings" '($13)))) + (let $21 (DqSourceWideWrap $19 $10 $11 $20)) + (return (NarrowMap $21 (lambda '($22) (AsStruct '('"ts" $22))))) +))) '('('"_logical_id" '0)))) +(let $17 (DqStage '((DqCnUnionAll (TDqOutput $16 '"0"))) (lambda '($23) $23) '('('"_logical_id" '0)))) +(let $18 (ResPull! $8 $9 (Key) (DqCnResult (TDqOutput $17 '"0") '()) '('('type) '('autoref)) '"dq")) +(return (Commit! (Commit! $18 $9) (DataSink '"pq" '"pq"))) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/plan.json new file mode 100644 index 000000000000..a0919980b586 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_as-default.txt_/plan.json @@ -0,0 +1,93 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 10, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 8, + "Name" : "DqStage", + "Streams" : { + "Program" : [ ] + }, + "DependsOn" : [ + 10 + ] + }, + { + "Id" : 3, + "Name" : "ResPull!", + "DependsOn" : [ + 8 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 3 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqStage" : 2, + "ResPull!" : 1 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 10, + "level" : 1, + "name" : "DqStage #10", + "type" : "op" + }, + { + "id" : 8, + "level" : 2, + "name" : "DqStage #8", + "type" : "op" + }, + { + "id" : 3, + "level" : 3, + "name" : "ResPull!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 10, + "target" : 8 + }, + { + "source" : 8, + "target" : 3 + }, + { + "source" : 3, + "target" : 1 + } + ] + } +} \ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/ast.txt b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/ast.txt new file mode 100644 index 000000000000..1e576fce4ccd --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/ast.txt @@ -0,0 +1,26 @@ +( +(let $1 (Configure! world (DataSource '"config") '"DqEngine" '"force")) +(let $2 (DataSource '"dq" '"$all")) +(let $3 (Configure! $1 $2 '"Attr" '"watermarksmode" '"default")) +(let $4 (Configure! $3 $2 '"Attr" '"watermarksgranularityms" '"4")) +(let $5 (Configure! $4 $2 '"Attr" '"watermarksidletimeoutms" '"5")) +(let $6 (Configure! $5 $2 '"Attr" '"watermarkslatearrivaldelayms" '"6")) +(let $7 (Configure! $6 $2 '"Attr" '"watermarksenableidlepartitions" '"true")) +(let $8 (Configure! $7 (DataSource '"pq" '"$all") '"Attr" '"consumer" '"test_client")) +(let $9 (DataSink 'result)) +(let $10 (DataSource '"pq" '"pq")) +(let $11 (StructType '('"ts" (DataType 'Timestamp)))) +(let $12 (PqTopic '"pq" '"local" '"test_topic_input" '('('"PartitionsCount" '"1")) '() $11)) +(let $13 '('"SharedReading" '"1")) +(let $14 '('('"Consumer" '"test_client") '('"Endpoint" '"") $13 '('"ReconnectPeriod" '"") '('"Format" '"json_each_row") '('"ReadGroup" '"fqrun") '('"UseSsl" '"1") '('"WatermarksEnable" '"1") '('"WatermarksGranularityUs" '"2000000") '('"WatermarksLateArrivalDelayUs" '"6000") '('"WatermarksLateEventsPolicy" '"drop") '('"WatermarksIdlePartitions" '"1") '('"WatermarksIdleTimeoutUs" '"3000000"))) +(let $15 (DqPqTopicSource world $12 '('"ts") $14 (SecureParam '"cluster:default_pq") '"" $11 '"\x1A\x1B\b\x03\x12\x04\x12\x02ts\x1A\x11\n\x0F\n\x02\b3\x12\t!@KL\x00\x00\x00\x00\x00")) +(let $16 (DqStage '((DqSource $10 $15)) (lambda '($19) (block '( + (let $20 '('('"data.datetime.formatname" '"POSIX") '('"data.timestamp.formatname" '"POSIX") '('"watermarkadjustlateevents" '"false") '('"watermarkdroplateevents") '('"watermarkgranularity" '"PT2S") '('"watermarkidletimeout" '"PT3S"))) + (let $21 '('('"format" '"json_each_row") '('"formatSettings" $20) '('"settings" '($13)))) + (let $22 (DqSourceWideWrap $19 $10 $11 $21)) + (return (NarrowMap $22 (lambda '($23) (AsStruct '('"ts" $23))))) +))) '('('"_logical_id" '0)))) +(let $17 (DqStage '((DqCnUnionAll (TDqOutput $16 '"0"))) (lambda '($24) $24) '('('"_logical_id" '0)))) +(let $18 (ResPull! $8 $9 (Key) (DqCnResult (TDqOutput $17 '"0") '()) '('('type) '('autoref)) '"dq")) +(return (Commit! (Commit! $18 $9) (DataSink '"pq" '"pq"))) +) diff --git a/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/plan.json b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/plan.json new file mode 100644 index 000000000000..a0919980b586 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/canondata/test_sql_streaming.test_watermarks-watermarks_drop-default.txt_/plan.json @@ -0,0 +1,93 @@ +{ + "Detailed" : { + "Operations" : [ + { + "Id" : 10, + "Name" : "DqStage", + "Streams" : { + "Program" : [ + { + "Name" : "DqSourceWideWrap" + }, + { + "Name" : "NarrowMap" + } + ] + } + }, + { + "Id" : 8, + "Name" : "DqStage", + "Streams" : { + "Program" : [ ] + }, + "DependsOn" : [ + 10 + ] + }, + { + "Id" : 3, + "Name" : "ResPull!", + "DependsOn" : [ + 8 + ] + }, + { + "Id" : 1, + "Name" : "Commit!", + "DependsOn" : [ + 3 + ] + } + ], + "OperationRoot" : 1, + "Providers" : [ ], + "OperationStats" : { + "Commit!" : 1, + "DqStage" : 2, + "ResPull!" : 1 + } + }, + "Basic" : { + "nodes" : [ + { + "id" : 10, + "level" : 1, + "name" : "DqStage #10", + "type" : "op" + }, + { + "id" : 8, + "level" : 2, + "name" : "DqStage #8", + "type" : "op" + }, + { + "id" : 3, + "level" : 3, + "name" : "ResPull!", + "type" : "op" + }, + { + "id" : 1, + "level" : 4, + "name" : "Commit!", + "type" : "op" + } + ], + "links" : [ + { + "source" : 10, + "target" : 8 + }, + { + "source" : 8, + "target" : 3 + }, + { + "source" : 3, + "target" : 1 + } + ] + } +} \ No newline at end of file diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks.sql index 809c2cd56348..888605e09230 100644 --- a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks.sql +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks.sql @@ -1,4 +1,8 @@ PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.WatermarksGranularityMs="4"; +PRAGMA dq.WatermarksIdleTimeoutMs="5"; +PRAGMA dq.WatermarksLateArrivalDelayMs="6"; +PRAGMA dq.WatermarksEnableIdlePartitions="true"; PRAGMA pq.Consumer="test_client"; SELECT @@ -9,5 +13,7 @@ WITH( SCHEMA( ts Timestamp NOT NULL ), - WATERMARK AS (UNWRAP(ts - Interval("PT5S"))) + WATERMARK = ts - Interval("PT5S"), + WATERMARK_GRANULARITY="PT2S", + WATERMARK_IDLE_TIMEOUT="PT3S" ); diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_adjust.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_adjust.sql new file mode 100644 index 000000000000..0e01c56c4184 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_adjust.sql @@ -0,0 +1,21 @@ +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.WatermarksGranularityMs="4"; +PRAGMA dq.WatermarksIdleTimeoutMs="5"; +PRAGMA dq.WatermarksLateArrivalDelayMs="6"; +PRAGMA dq.WatermarksEnableIdlePartitions="true"; +PRAGMA pq.Consumer="test_client"; + +SELECT + * +FROM pq.test_topic_input +WITH( + FORMAT=json_each_row, + SCHEMA( + ts Timestamp NOT NULL + ), + WATERMARK = ts - Interval("PT5S"), + WATERMARK_ADJUST_LATE_EVENTS, + WATERMARK_DROP_LATE_EVENTS="false", + WATERMARK_GRANULARITY="PT2S", + WATERMARK_IDLE_TIMEOUT="PT3S" +); diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_as.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_as.sql new file mode 100644 index 000000000000..2792c94b726c --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_as.sql @@ -0,0 +1,17 @@ +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.WatermarksGranularityMs="4"; +PRAGMA dq.WatermarksIdleTimeoutMs="5"; +PRAGMA dq.WatermarksLateArrivalDelayMs="6"; +PRAGMA dq.WatermarksEnableIdlePartitions="true"; +PRAGMA pq.Consumer="test_client"; + +SELECT + * +FROM pq.test_topic_input +WITH( + FORMAT=json_each_row, + SCHEMA( + ts Timestamp NOT NULL + ), + WATERMARK AS (UNWRAP(ts - Interval("PT5S"))) +); diff --git a/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_drop.sql b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_drop.sql new file mode 100644 index 000000000000..e8c758d49f91 --- /dev/null +++ b/ydb/tests/fq/streaming_optimize/suites/watermarks/watermarks_drop.sql @@ -0,0 +1,21 @@ +PRAGMA dq.WatermarksMode="default"; +PRAGMA dq.WatermarksGranularityMs="4"; +PRAGMA dq.WatermarksIdleTimeoutMs="5"; +PRAGMA dq.WatermarksLateArrivalDelayMs="6"; +PRAGMA dq.WatermarksEnableIdlePartitions="true"; +PRAGMA pq.Consumer="test_client"; + +SELECT + * +FROM pq.test_topic_input +WITH( + FORMAT=json_each_row, + SCHEMA( + ts Timestamp NOT NULL + ), + WATERMARK = ts - Interval("PT5S"), + WATERMARK_ADJUST_LATE_EVENTS="false", + WATERMARK_DROP_LATE_EVENTS, + WATERMARK_GRANULARITY="PT2S", + WATERMARK_IDLE_TIMEOUT="PT3S" +);