Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ydb/library/yql/dq/opt/dq_opt_hopping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ TMaybeNode<TExprBase> RewriteAsHoppingWindowFullOutput(
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(*enableWatermarks));
.WatermarkMode<TCoAtom>().Build(ToString(*enableWatermarks))
.HoppingColumn<TCoAtom>().Build(hopTraits.Column);

if (*enableWatermarks) {
const auto hop = hopTraits.Hop;
const auto delay = lateArrivalDelay ? (lateArrivalDelay.MicroSeconds() + hop - 1) / hop * hop : hop;
multiHoppingCoreBuilder.template Delay<TCoInterval>()
multiHoppingCoreBuilder.Delay<TCoInterval>()
.Literal().Build(ToString(delay))
.Build();
} else {
Expand All @@ -181,9 +182,9 @@ TMaybeNode<TExprBase> RewriteAsHoppingWindowFullOutput(
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Body<TCoForwardList>()
.Stream(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
Expand All @@ -208,7 +209,7 @@ TMaybeNode<TExprBase> RewriteAsHoppingWindowFullOutput(
.Args(streamArg)
.Body<TCoMap>()
.Input(multiHoppingCoreBuilder
.template Input<TCoFromFlow>()
.Input<TCoFromFlow>()
.Input(streamArg)
.Build()
.Done())
Expand Down
182 changes: 119 additions & 63 deletions ydb/tests/fq/streaming_optimize/canondata/result.json
Original file line number Diff line number Diff line change
@@ -1,170 +1,226 @@
{
"test_sql_streaming.test[suites-GroupByHop-default.txt]": {
"test_sql_streaming.test[hop-GroupByHop-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHop-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHop-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHop-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHop-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopByStringKey-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopByStringKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopByStringKey-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopByStringKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopByStringKey-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopByStringKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopExprKey-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopExprKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopExprKey-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopExprKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopExprKey-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopExprKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopListKey-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopListKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopListKey-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopListKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopListKey-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopListKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopNoKey-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopNoKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopNoKey-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopNoKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopNoKey-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopNoKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopPercentile-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopPercentile-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopPercentile-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopPercentile-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopPercentile-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopPercentile-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopTimeExtractorUnusedColumns-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopTimeExtractorUnusedColumns-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopTimeExtractorUnusedColumns-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopTimeExtractorUnusedColumns-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopTimeExtractorUnusedColumns-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopTimeExtractorUnusedColumns-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHopWithDataWatermarks-default.txt]": {
"test_sql_streaming.test[hop-GroupByHopWithDataWatermarks-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopWithDataWatermarks-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hop-GroupByHopWithDataWatermarks-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHopWithDataWatermarks-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hop-GroupByHopWithDataWatermarks-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-GroupByHoppingWithDataWatermarks-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindow-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-GroupByHoppingWithDataWatermarks-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-GroupByHoppingWithDataWatermarks-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindow-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopic-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowByStringKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopic-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopic-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowByStringKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicGroupWriteToSolomon-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowExprKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicGroupWriteToSolomon-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicGroupWriteToSolomon-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowExprKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicWithMetadata-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowListKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadata-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadata-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowListKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicWithMetadataInsideFilter-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowNoKey-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataInsideFilter-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataInsideFilter-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowNoKey-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicWithMetadataNestedDeep-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowPercentile-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataNestedDeep-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataNestedDeep-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowPercentile-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicWithMetadataWithFilter-default.txt]": {
"test_sql_streaming.test[hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataWithFilter-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithMetadataWithFilter-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_hopping_window-GroupByHoppingWindowTimeExtractorUnusedColumns-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTopicWithSchema-default.txt]": {
"test_sql_streaming.test[pq-ReadTopic-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithSchema-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopic-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTopicWithSchema-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopic-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadTwoTopics-default.txt]": {
"test_sql_streaming.test[pq-ReadTopicWithMetadata-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadTwoTopics-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadTwoTopics-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadata-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadWriteSameTopic-default.txt]": {
"test_sql_streaming.test[pq-ReadTopicWithMetadataInsideFilter-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteSameTopic-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataInsideFilter-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteSameTopic-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataInsideFilter-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadWriteTopic-default.txt]": {
"test_sql_streaming.test[pq-ReadTopicWithMetadataNestedDeep-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteTopic-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataNestedDeep-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteTopic-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataNestedDeep-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-ReadWriteTopicWithSchema-default.txt]": {
"test_sql_streaming.test[pq-ReadTopicWithMetadataWithFilter-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteTopicWithSchema-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataWithFilter-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-ReadWriteTopicWithSchema-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithMetadataWithFilter-default.txt_/plan.json"
}
},
"test_sql_streaming.test[suites-WriteTwoTopics-default.txt]": {
"test_sql_streaming.test[pq-ReadTopicWithSchema-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_suites-WriteTwoTopics-default.txt_/ast.txt"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithSchema-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_suites-WriteTwoTopics-default.txt_/plan.json"
"uri": "file://test_sql_streaming.test_pq-ReadTopicWithSchema-default.txt_/plan.json"
}
},
"test_sql_streaming.test[pq-ReadTwoTopics-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-ReadTwoTopics-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_pq-ReadTwoTopics-default.txt_/plan.json"
}
},
"test_sql_streaming.test[pq-ReadWriteSameTopic-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteSameTopic-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteSameTopic-default.txt_/plan.json"
}
},
"test_sql_streaming.test[pq-ReadWriteTopic-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteTopic-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteTopic-default.txt_/plan.json"
}
},
"test_sql_streaming.test[pq-ReadWriteTopicWithSchema-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteTopicWithSchema-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_pq-ReadWriteTopicWithSchema-default.txt_/plan.json"
}
},
"test_sql_streaming.test[pq-WriteTwoTopics-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_pq-WriteTwoTopics-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_pq-WriteTwoTopics-default.txt_/plan.json"
}
},
"test_sql_streaming.test[solomon-ReadTopicGroupWriteToSolomon-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_solomon-ReadTopicGroupWriteToSolomon-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_solomon-ReadTopicGroupWriteToSolomon-default.txt_/plan.json"
}
},
"test_sql_streaming.test[watermarks-watermarks-default.txt]": {
"Ast": {
"uri": "file://test_sql_streaming.test_watermarks-watermarks-default.txt_/ast.txt"
},
"Plan": {
"uri": "file://test_sql_streaming.test_watermarks-watermarks-default.txt_/plan.json"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
(let $69 (CallableType '() '((OptionalType (DataType 'Timestamp))) $68))
(let $70 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $69 (VoidType) '"" '('('"blocks") $16)))
(return (Apply $70 (Convert $67 $23)))
))))) (Interval '5000) $62 $62 'true (lambda '($71) (AsStruct '('Sum0 (Member $71 '"v")))) (lambda '($72 $73) (AsStruct '('Sum0 (AggrAdd (Member $72 '"v") (Member $73 'Sum0))))) $63 $63 (lambda '($75 $76) (AsStruct '('Sum0 (AggrAdd (Member $75 'Sum0) (Member $76 'Sum0))))) (lambda '($77 $78 $79) (AsStruct '('Sum0 (Member $78 'Sum0)) '('"_yql_time" $79) '('"k" $77))) '0))
))))) (Interval '5000) $62 $62 'true (lambda '($71) (AsStruct '('Sum0 (Member $71 '"v")))) (lambda '($72 $73) (AsStruct '('Sum0 (AggrAdd (Member $72 '"v") (Member $73 'Sum0))))) $63 $63 (lambda '($75 $76) (AsStruct '('Sum0 (AggrAdd (Member $75 'Sum0) (Member $76 'Sum0))))) (lambda '($77 $78 $79) (AsStruct '('Sum0 (Member $78 'Sum0)) '('"_yql_time" $79) '('"k" $77))) '0 '"_yql_time"))
(return (FlatMap (ExtractMembers $64 '('Sum0 '"k")) (lambda '($80) (block '(
(let $81 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $20 (VoidType) '"" $17))
(let $82 (StructType '('"k" $21) '('"sum" $21)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
(let $72 (CallableType '() '((OptionalType (DataType 'Timestamp))) $71))
(let $73 (Udf '"DateTime2.FromMilliseconds" (Void) (VoidType) '"" $72 (VoidType) '"" '('('"blocks") $16)))
(return (Apply $73 (Convert $70 $23)))
))))) (Interval '5000) $65 $65 'true (lambda '($74) (AsStruct '('Sum0 (Member $74 '"v")))) (lambda '($75 $76) (AsStruct '('Sum0 (AggrAdd (Member $75 '"v") (Member $76 'Sum0))))) $66 $66 (lambda '($78 $79) (AsStruct '('Sum0 (AggrAdd (Member $78 'Sum0) (Member $79 'Sum0))))) (lambda '($80 $81 $82) (AsStruct '('Sum0 (Member $81 'Sum0)) '('"_yql_time" $82) '('"k" $80))) '0))
))))) (Interval '5000) $65 $65 'true (lambda '($74) (AsStruct '('Sum0 (Member $74 '"v")))) (lambda '($75 $76) (AsStruct '('Sum0 (AggrAdd (Member $75 '"v") (Member $76 'Sum0))))) $66 $66 (lambda '($78 $79) (AsStruct '('Sum0 (AggrAdd (Member $78 'Sum0) (Member $79 'Sum0))))) (lambda '($80 $81 $82) (AsStruct '('Sum0 (Member $81 'Sum0)) '('"_yql_time" $82) '('"k" $80))) '0 '"_yql_time"))
(return (FlatMap (ExtractMembers $67 '('Sum0 '"k")) (lambda '($83) (block '(
(let $84 (Udf '"Yson2.SerializeText" (Void) (VoidType) '"" $20 (VoidType) '"" $17))
(let $85 (StructType '('"k" $21) '('"sum" $25)))
Expand Down
Loading
Loading