diff --git a/.github/workflows/proton_ci.yml b/.github/workflows/proton_ci.yml index 1f709a771e4..47f784e8b13 100644 --- a/.github/workflows/proton_ci.yml +++ b/.github/workflows/proton_ci.yml @@ -141,7 +141,7 @@ jobs: with: ec2-instance-type: ${{ vars.X64_INSTANCE_TYPE }} ec2-image-id: ${{ vars.X64_TEST_AMI }} - ec2-volume-size: '60' + ec2-volume-size: '80' submodules: false sanitizer: "address" arch: ${{ vars.X64_ARCH }} diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 55c79af6230..2307b4c21f4 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -790,7 +790,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \ M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \ M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \ - M(Bool, emit_aggregated_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \ + M(Bool, emit_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \ M(Bool, force_backfill_in_order, false, "Requires backfill data in order", 0) \ M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \ M(UInt64, join_max_buffered_bytes, 524288000, "Max buffered bytes for stream to stream join", 0) \ diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 26c7ede79d4..49558771789 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -93,6 +93,7 @@ #include #include #include +#include #include #include #include @@ -2278,6 +2279,10 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (!subquery) throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR); + /// proton: starts. + Streaming::rewriteSubquery(subquery->as(), query_info); + /// proton: ends. + interpreter_subquery = std::make_unique( subquery, getSubqueryContext(context), options.copy().subquery().noModify(), required_columns); @@ -3437,7 +3442,10 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery() /// Usually, we don't care whether the backfilled data is in order. Excepts: /// 1) User require backfill data in order /// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time) - if (settings.force_backfill_in_order.value || (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())) + if (settings.emit_during_backfill.value && hasAggregation() && hasStreamingWindowFunc()) + context->setSetting("force_backfill_in_order", true); + + if (settings.force_backfill_in_order.value) query_info.require_in_order_backfill = true; } else @@ -3500,7 +3508,7 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con auto params = std::make_shared( query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params); - bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value; + bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_during_backfill.value; if (query_info.hasPartitionByKeys()) query_plan.addStep(std::make_unique( diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.cpp b/src/Interpreters/Streaming/RewriteAsSubquery.cpp index 5ca12831f06..416b5d5ae11 100644 --- a/src/Interpreters/Streaming/RewriteAsSubquery.cpp +++ b/src/Interpreters/Streaming/RewriteAsSubquery.cpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace DB { @@ -19,6 +20,29 @@ extern const int ALIAS_REQUIRED; namespace Streaming { +namespace +{ +bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query) +{ + if (query.list_of_selects->children.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query"); + + auto & select_query = query.list_of_selects->children[0]->as(); + + /// Emit changelog + auto emit_query = select_query.emit(); + if (!emit_query) + emit_query = std::make_shared(); + + if (emit_query->as().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG) + return false; + + emit_query->as().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG; + select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query)); + return true; +} +} + ASTPtr rewriteAsSubquery(ASTTableExpression & table_expr) { if (table_expr.subquery) @@ -113,24 +137,12 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only return rewriteAsChangelogQuery(query); } -bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query) +bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info) { - if (query.list_of_selects->children.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query"); - - auto & select_query = query.list_of_selects->children[0]->as(); + if (query_info.left_input_tracking_changes) + return rewriteAsChangelogQuery(query); - /// Emit changelog - auto emit_query = select_query.emit(); - if (!emit_query) - emit_query = std::make_shared(); - - if (emit_query->as().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG) - return false; - - emit_query->as().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG; - select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query)); - return true; + return false; } } } diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.h b/src/Interpreters/Streaming/RewriteAsSubquery.h index 26a7dd28feb..8a458755f2f 100644 --- a/src/Interpreters/Streaming/RewriteAsSubquery.h +++ b/src/Interpreters/Streaming/RewriteAsSubquery.h @@ -6,6 +6,7 @@ namespace DB { struct ASTTableExpression; class ASTSelectWithUnionQuery; +struct SelectQueryInfo; namespace Streaming { @@ -13,7 +14,7 @@ namespace Streaming /// 1) `stream1` => `(select * from stream1) as stream1` /// 2) `stream2 as t` => `(select * from stream2) as t` /// 3) `table_func(...) as t1` => `(select * from table_func(...)) as t1` -/// Return rewritten subquery (return `nullptr` if is subquery) +/// \return rewritten subquery (return `nullptr` if is subquery) ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression); /// Rewrite `table/table_function` to subquery (emit changelog): @@ -21,9 +22,10 @@ ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression); /// 2) `stream2 as t` => `(select * from stream2 emit changelog) as t` /// 3) `table_func(...) as t1` => `(select * from table_func(...) emit changelog) as t1` /// 4) `(select * from stream1) as s` => `(select * from stream1 emit changelog) as s` -/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function) +/// \return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function) bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery); -bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query); +/// \return true if query was rewritten and false otherwise +bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info); } } diff --git a/src/Processors/Chunk.h b/src/Processors/Chunk.h index c31045a0728..4ab68624b5c 100644 --- a/src/Processors/Chunk.h +++ b/src/Processors/Chunk.h @@ -70,7 +70,7 @@ struct ChunkContext : public COW if (hasWatermark()) { flags &= ~WATERMARK_FLAG; - ts_1 = 0; + ts_1 = Streaming::INVALID_WATERMARK; } } diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp index 7e43a3475d2..def7080683c 100644 --- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp @@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_, throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode)); } -Int64 HopWatermarkStamper::calculateWatermark(Int64 event_ts) const +Int64 HopWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params); if (likely(last_finalized_window.isValid())) diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h index 8ac25754daa..2321b0a1415 100644 --- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h @@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper WatermarkStamperPtr clone() const override { return std::make_unique(*this); } private: - Int64 calculateWatermark(Int64 event_ts) const override; + Int64 calculateWatermarkImpl(Int64 event_ts) const override; HopWindowParams & window_params; }; diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp index 66cf928e7fd..9e5b068a810 100644 --- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp @@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode)); } -Int64 TumbleWatermarkStamper::calculateWatermark(Int64 event_ts) const +Int64 TumbleWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { return toStartTime( event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale); diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h index bafe590b565..2a7127bdd27 100644 --- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h @@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper WatermarkStamperPtr clone() const override { return std::make_unique(*this); } private: - Int64 calculateWatermark(Int64 event_ts) const override; + Int64 calculateWatermarkImpl(Int64 event_ts) const override; TumbleWindowParams & window_params; }; diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp index e37221da507..d961fd9d3fc 100644 --- a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp @@ -146,6 +146,87 @@ void WatermarkStamper::preProcess(const Block & header) initTimeoutTimer(params.timeout_interval); } +ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const +{ + if (params.delay_interval) + { + auto event_ts_bias = addTime( + event_ts, + params.delay_interval.unit, + -1 * params.delay_interval.interval, + *params.window_params->time_zone, + params.window_params->time_scale); + + return calculateWatermarkImpl(event_ts_bias); + } + else + return calculateWatermarkImpl(event_ts); +} + +ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermarkPerRow(Int64 event_ts) const +{ + if (params.delay_interval) + return addTime( + event_ts, + params.delay_interval.unit, + -1 * params.delay_interval.interval, + *params.window_params->time_zone, + params.window_params->time_scale); + else + return event_ts; +} + +void WatermarkStamper::processAfterUnmuted(Chunk & chunk) +{ + assert(!chunk.hasRows()); + + switch (params.mode) + { + case WatermarkStamperParams::EmitMode::PERIODIC: + { + processPeriodic(chunk); + break; + } + case WatermarkStamperParams::EmitMode::WATERMARK: + { + auto muted_watermark_ts = calculateWatermark(max_event_ts); + if (muted_watermark_ts != INVALID_WATERMARK) [[likely]] + chunk.setWatermark(muted_watermark_ts); + break; + } + case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: + { + auto muted_watermark_ts = calculateWatermarkPerRow(max_event_ts); + if (muted_watermark_ts != INVALID_WATERMARK) [[likely]] + chunk.setWatermark(muted_watermark_ts); + break; + } + default: + break; + } +} + +void WatermarkStamper::processWithMutedWatermark(Chunk & chunk) +{ + /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed. + /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed + if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW)) + { + assert(params.window_params); + if (params.window_params->time_col_is_datetime64) + max_event_ts = std::max( + max_event_ts, + *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + else + max_event_ts = std::max( + max_event_ts, + *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData())); + } + + processTimeout(chunk); + logLateEvents(); +} + void WatermarkStamper::process(Chunk & chunk) { switch (params.mode) @@ -238,31 +319,6 @@ void WatermarkStamper::processWatermark(Chunk & chunk) assert(params.window_params); - std::function calc_watermark_ts; - if (params.delay_interval) - { - calc_watermark_ts = [this](Int64 event_ts) { - auto event_ts_bias = addTime( - event_ts, - params.delay_interval.unit, - -1 * params.delay_interval.interval, - *params.window_params->time_zone, - params.window_params->time_scale); - - if constexpr (apply_watermark_per_row) - return event_ts_bias; - else - return calculateWatermark(event_ts_bias); - }; - } - else - { - if constexpr (apply_watermark_per_row) - calc_watermark_ts = [](Int64 event_ts) { return event_ts; }; - else - calc_watermark_ts = [this](Int64 event_ts) { return calculateWatermark(event_ts); }; - } - Int64 event_ts_watermark = watermark_ts; /// [Process chunks] @@ -284,7 +340,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk) max_event_ts = event_ts; if constexpr (apply_watermark_per_row) - event_ts_watermark = calc_watermark_ts(max_event_ts); + event_ts_watermark = calculateWatermarkPerRow(max_event_ts); } if (unlikely(event_ts < event_ts_watermark)) @@ -295,7 +351,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk) } if constexpr (!apply_watermark_per_row) - event_ts_watermark = calc_watermark_ts(max_event_ts); + event_ts_watermark = calculateWatermark(max_event_ts); if (late_events_in_chunk > 0) { @@ -315,9 +371,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk) } } -Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const +Int64 WatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermark() not implemented in {}", getName()); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkImpl() not implemented in {}", getName()); } void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval) diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.h b/src/Processors/Transforms/Streaming/WatermarkStamper.h index bfec5056fe6..5eb3fc7b439 100644 --- a/src/Processors/Transforms/Streaming/WatermarkStamper.h +++ b/src/Processors/Transforms/Streaming/WatermarkStamper.h @@ -58,8 +58,14 @@ SERDE class WatermarkStamper virtual String getName() const { return "WatermarkStamper"; } void preProcess(const Block & header); + void process(Chunk & chunk); + /// During mute watermark, we still need to process the chunk to update max_event_ts + void processWithMutedWatermark(Chunk & chunk); + + void processAfterUnmuted(Chunk & chunk); + bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; } VersionType getVersion() const; @@ -80,7 +86,10 @@ SERDE class WatermarkStamper void logLateEvents(); - virtual Int64 calculateWatermark(Int64 event_ts) const; + ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const; + ALWAYS_INLINE Int64 calculateWatermarkPerRow(Int64 event_ts) const; + + virtual Int64 calculateWatermarkImpl(Int64 event_ts) const; void initPeriodicTimer(const WindowInterval & interval); diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp index 37c3d1a36b1..fd07f066740 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp @@ -54,14 +54,25 @@ void WatermarkTransform::transform(Chunk & chunk) { chunk.clearWatermark(); - if (chunk.isHistoricalDataStart()) - is_backfilling_data = true; - else if (chunk.isHistoricalDataEnd()) - is_backfilling_data = false; + if (chunk.isHistoricalDataStart() && skip_stamping_for_backfill_data) [[unlikely]] + { + mute_watermark = true; + return; + } + + if (chunk.isHistoricalDataEnd() && skip_stamping_for_backfill_data) [[unlikely]] + { + mute_watermark = false; + watermark->processAfterUnmuted(chunk); + return; + } + + if (chunk.avoidWatermark()) + return; - bool avoid_watermark = chunk.avoidWatermark(); - avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data; - if (!avoid_watermark) + if (mute_watermark) + watermark->processWithMutedWatermark(chunk); + else watermark->process(chunk); } diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.h b/src/Processors/Transforms/Streaming/WatermarkTransform.h index c6c89e5a045..388404025a3 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransform.h +++ b/src/Processors/Transforms/Streaming/WatermarkTransform.h @@ -33,7 +33,7 @@ class WatermarkTransform final : public ISimpleTransform SERDE WatermarkStamperPtr watermark; bool skip_stamping_for_backfill_data; - bool is_backfilling_data = false; + bool mute_watermark = false; }; } } diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp index 96b43e537ed..370059514c7 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp +++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp @@ -114,13 +114,31 @@ void WatermarkTransformWithSubstream::work() process_chunk.clearWatermark(); - if (process_chunk.isHistoricalDataStart()) - is_backfilling_data = true; - else if (process_chunk.isHistoricalDataEnd()) - is_backfilling_data = false; + if (process_chunk.isHistoricalDataStart() && skip_stamping_for_backfill_data) [[unlikely]] + { + mute_watermark = true; + /// Propagate historical data start flag + output_chunks.emplace_back(std::move(process_chunk)); + return; + } + + if (process_chunk.isHistoricalDataEnd() && skip_stamping_for_backfill_data) [[unlikely]] + { + mute_watermark = false; + output_chunks.reserve(substream_watermarks.size() + 1); + /// Propagate historical data end flag first + output_chunks.emplace_back(process_chunk.clone()); + for (auto & [id, watermark] : substream_watermarks) + { + auto chunk_ctx = ChunkContext::create(); + chunk_ctx->setSubstreamID(std::move(id)); + process_chunk.setChunkContext(std::move(chunk_ctx)); /// reset context - bool avoid_watermark = process_chunk.avoidWatermark(); - avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data; + watermark->processAfterUnmuted(process_chunk); + output_chunks.emplace_back(process_chunk.clone()); + } + return; + } if (unlikely(process_chunk.requestCheckpoint())) { @@ -133,8 +151,13 @@ void WatermarkTransformWithSubstream::work() auto & watermark = getOrCreateSubstreamWatermark(process_chunk.getSubstreamID()); - if (!avoid_watermark) - watermark.process(process_chunk); + if (!process_chunk.avoidWatermark()) + { + if (mute_watermark) + watermark.processWithMutedWatermark(process_chunk); + else + watermark.process(process_chunk); + } assert(process_chunk); output_chunks.emplace_back(std::move(process_chunk)); @@ -146,7 +169,7 @@ void WatermarkTransformWithSubstream::work() /// It's possible to generate periodic or timeout watermark for each substream via an empty chunk /// FIXME: This is a very ugly and inefficient implementation and needs to revisit. - if (!avoid_watermark && watermark_template->requiresPeriodicOrTimeoutEmit()) + if (!mute_watermark && watermark_template->requiresPeriodicOrTimeoutEmit()) { output_chunks.reserve(substream_watermarks.size()); for (auto & [id, watermark] : substream_watermarks) diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h index b94cbd40201..95dd74405a1 100644 --- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h +++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h @@ -41,7 +41,7 @@ class WatermarkTransformWithSubstream final : public IProcessor SERDE SubstreamHashMap substream_watermarks; bool skip_stamping_for_backfill_data; - bool is_backfilling_data = false; + bool mute_watermark = false; Poco::Logger * log; }; diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index 7df8185888d..fdde026173c 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -135,8 +135,7 @@ void StorageView::read( } /// proton: starts. - if (query_info.left_input_tracking_changes) - Streaming::rewriteAsChangelogQuery(current_inner_query->as()); + Streaming::rewriteSubquery(current_inner_query->as(), query_info); /// proton: ends. auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done); diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp index b335b2f3965..849f0bfc18b 100644 --- a/src/Storages/Streaming/ProxyStream.cpp +++ b/src/Storages/Streaming/ProxyStream.cpp @@ -193,8 +193,7 @@ void ProxyStream::doRead( if (current_subquery) { - if (query_info.left_input_tracking_changes) - Streaming::rewriteAsChangelogQuery(current_subquery->as()); + Streaming::rewriteSubquery(current_subquery->as(), query_info); auto sub_context = createProxySubqueryContext(context_, query_info, isStreamingQuery()); auto interpreter_subquery = std::make_unique( diff --git a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml index 5d315f12fd4..09a1c4d9240 100644 --- a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml +++ b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml @@ -131,7 +131,7 @@ tests: query_id: 3100 depends_on_stream: test_31_multishards_stream query: | - subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false; + subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false; - client: python query_type: table @@ -207,7 +207,7 @@ tests: depends_on_stream: test_31_multishards_stream wait: 1 query: | - subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false; + subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false; - client: python query_type: table @@ -279,7 +279,7 @@ tests: wait: 1 query_end_timer: 5 query: | - with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_aggregated_during_backfill=false; + with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_during_backfill=false; - client: python query_type: table