From 49e5ac35aff9bc36627c495a9f0540ba45f05995 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Wed, 10 Jan 2024 22:51:29 +0800
Subject: [PATCH 1/4] fix smoke test for disable backfill sorting
---
src/Interpreters/InterpreterSelectQuery.cpp | 10 +-
.../Streaming/RewriteAsSubquery.cpp | 45 +++---
.../Streaming/RewriteAsSubquery.h | 4 +-
src/Processors/Chunk.h | 2 +-
.../Streaming/HopWatermarkStamper.cpp | 2 +-
.../Streaming/HopWatermarkStamper.h | 2 +-
.../Streaming/TumbleWatermarkStamper.cpp | 2 +-
.../Streaming/TumbleWatermarkStamper.h | 2 +-
.../Transforms/Streaming/WatermarkStamper.cpp | 131 ++++++++++++------
.../Transforms/Streaming/WatermarkStamper.h | 9 +-
.../Streaming/WatermarkTransform.cpp | 27 ++--
.../Transforms/Streaming/WatermarkTransform.h | 2 +-
.../WatermarkTransformWithSubstream.cpp | 41 ++++--
.../WatermarkTransformWithSubstream.h | 2 +-
src/Storages/StorageView.cpp | 3 +-
src/Storages/Streaming/ProxyStream.cpp | 3 +-
16 files changed, 200 insertions(+), 87 deletions(-)
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index 26c7ede79d4..03d273b8dc5 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -93,6 +93,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -2278,6 +2279,10 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
+ /// proton: starts.
+ Streaming::rewriteSubqueryByQueryInfo(subquery->as(), query_info);
+ /// proton: ends.
+
interpreter_subquery = std::make_unique(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
@@ -3437,7 +3442,10 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
/// Usually, we don't care whether the backfilled data is in order. Excepts:
/// 1) User require backfill data in order
/// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time)
- if (settings.force_backfill_in_order.value || (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc()))
+ if (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
+ context->setSetting("force_backfill_in_order", true);
+
+ if (settings.force_backfill_in_order.value)
query_info.require_in_order_backfill = true;
}
else
diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.cpp b/src/Interpreters/Streaming/RewriteAsSubquery.cpp
index 5ca12831f06..74bb76428eb 100644
--- a/src/Interpreters/Streaming/RewriteAsSubquery.cpp
+++ b/src/Interpreters/Streaming/RewriteAsSubquery.cpp
@@ -9,6 +9,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -19,6 +20,29 @@ extern const int ALIAS_REQUIRED;
namespace Streaming
{
+namespace
+{
+bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query)
+{
+ if (query.list_of_selects->children.size() != 1)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query");
+
+ auto & select_query = query.list_of_selects->children[0]->as();
+
+ /// Emit changelog
+ auto emit_query = select_query.emit();
+ if (!emit_query)
+ emit_query = std::make_shared();
+
+ if (emit_query->as().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG)
+ return false;
+
+ emit_query->as().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG;
+ select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query));
+ return true;
+}
+}
+
ASTPtr rewriteAsSubquery(ASTTableExpression & table_expr)
{
if (table_expr.subquery)
@@ -113,24 +137,13 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only
return rewriteAsChangelogQuery(query);
}
-bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query)
+bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
{
- if (query.list_of_selects->children.size() != 1)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Only expect one select query to rewrite as changelog query");
-
- auto & select_query = query.list_of_selects->children[0]->as();
+ bool rewriten = false;
+ if (query_info.left_input_tracking_changes)
+ rewriten |= rewriteAsChangelogQuery(query);
- /// Emit changelog
- auto emit_query = select_query.emit();
- if (!emit_query)
- emit_query = std::make_shared();
-
- if (emit_query->as().stream_mode == ASTEmitQuery::StreamMode::CHANGELOG)
- return false;
-
- emit_query->as().stream_mode = ASTEmitQuery::StreamMode::CHANGELOG;
- select_query.setExpression(ASTSelectQuery::Expression::EMIT, std::move(emit_query));
- return true;
+ return rewriten;
}
}
}
diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.h b/src/Interpreters/Streaming/RewriteAsSubquery.h
index 26a7dd28feb..53287bcf64e 100644
--- a/src/Interpreters/Streaming/RewriteAsSubquery.h
+++ b/src/Interpreters/Streaming/RewriteAsSubquery.h
@@ -6,6 +6,7 @@ namespace DB
{
struct ASTTableExpression;
class ASTSelectWithUnionQuery;
+struct SelectQueryInfo;
namespace Streaming
{
@@ -24,6 +25,7 @@ ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression);
/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery);
-bool rewriteAsChangelogQuery(ASTSelectWithUnionQuery & query);
+
+bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
}
}
diff --git a/src/Processors/Chunk.h b/src/Processors/Chunk.h
index c31045a0728..4ab68624b5c 100644
--- a/src/Processors/Chunk.h
+++ b/src/Processors/Chunk.h
@@ -70,7 +70,7 @@ struct ChunkContext : public COW
if (hasWatermark())
{
flags &= ~WATERMARK_FLAG;
- ts_1 = 0;
+ ts_1 = Streaming::INVALID_WATERMARK;
}
}
diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
index 7e43a3475d2..78d118c810a 100644
--- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
@@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_,
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}
-Int64 HopWatermarkStamper::calculateWatermark(Int64 event_ts) const
+Int64 HopWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
{
auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params);
if (likely(last_finalized_window.isValid()))
diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
index 8ac25754daa..c73071757b1 100644
--- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
@@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique(*this); }
private:
- Int64 calculateWatermark(Int64 event_ts) const override;
+ Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
HopWindowParams & window_params;
};
diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
index 66cf928e7fd..97ef6a4d84f 100644
--- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
@@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}
-Int64 TumbleWatermarkStamper::calculateWatermark(Int64 event_ts) const
+Int64 TumbleWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
{
return toStartTime(
event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale);
diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
index bafe590b565..df8e639eab2 100644
--- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
@@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique(*this); }
private:
- Int64 calculateWatermark(Int64 event_ts) const override;
+ Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
TumbleWindowParams & window_params;
};
diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
index e37221da507..29f54ddd8b8 100644
--- a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
@@ -146,8 +146,36 @@ void WatermarkStamper::preProcess(const Block & header)
initTimeoutTimer(params.timeout_interval);
}
-void WatermarkStamper::process(Chunk & chunk)
+template
+ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
{
+ if (params.delay_interval)
+ {
+ auto event_ts_bias = addTime(
+ event_ts,
+ params.delay_interval.unit,
+ -1 * params.delay_interval.interval,
+ *params.window_params->time_zone,
+ params.window_params->time_scale);
+
+ if constexpr (apply_watermark_per_row)
+ return event_ts_bias;
+ else
+ return calculateWatermarkBasedOnWindowImpl(event_ts_bias);
+ }
+ else
+ {
+ if constexpr (apply_watermark_per_row)
+ return event_ts;
+ else
+ return calculateWatermarkBasedOnWindowImpl(event_ts);
+ }
+}
+
+void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
+{
+ assert(!chunk.hasRows() && chunk.isHistoricalDataEnd());
+
switch (params.mode)
{
case WatermarkStamperParams::EmitMode::PERIODIC: {
@@ -155,24 +183,69 @@ void WatermarkStamper::process(Chunk & chunk)
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK: {
- assert(params.window_params);
- if (params.window_params->time_col_is_datetime64)
- processWatermark(chunk);
- else
- processWatermark(chunk);
+ auto muted_watermark_ts = calculateWatermark(max_event_ts);
+ if (muted_watermark_ts != INVALID_WATERMARK)
+ chunk.setWatermark(muted_watermark_ts);
break;
}
case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
- assert(params.window_params);
- if (params.window_params->time_col_is_datetime64)
- processWatermark(chunk);
- else
- processWatermark(chunk);
+ auto muted_watermark_ts = calculateWatermark(max_event_ts);
+ if (muted_watermark_ts != INVALID_WATERMARK)
+ chunk.setWatermark(muted_watermark_ts);
break;
}
default:
break;
}
+}
+
+template
+void WatermarkStamper::process(Chunk & chunk)
+{
+ if constexpr (mute_watermark)
+ {
+ /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
+ /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
+ if (params.window_params && chunk.hasRows())
+ {
+ if (params.window_params->time_col_is_datetime64)
+ max_event_ts = std::max(
+ max_event_ts,
+ *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ else
+ max_event_ts = std::max(
+ max_event_ts,
+ *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ }
+ }
+ else
+ {
+ switch (params.mode)
+ {
+ case WatermarkStamperParams::EmitMode::PERIODIC: {
+ processPeriodic(chunk);
+ break;
+ }
+ case WatermarkStamperParams::EmitMode::WATERMARK: {
+ assert(params.window_params);
+ if (params.window_params->time_col_is_datetime64)
+ processWatermark(chunk);
+ else
+ processWatermark(chunk);
+ break;
+ }
+ case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
+ assert(params.window_params);
+ if (params.window_params->time_col_is_datetime64)
+ processWatermark(chunk);
+ else
+ processWatermark(chunk);
+ break;
+ }
+ default:
+ break;
+ }
+ }
processTimeout(chunk);
logLateEvents();
@@ -238,31 +311,6 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
assert(params.window_params);
- std::function calc_watermark_ts;
- if (params.delay_interval)
- {
- calc_watermark_ts = [this](Int64 event_ts) {
- auto event_ts_bias = addTime(
- event_ts,
- params.delay_interval.unit,
- -1 * params.delay_interval.interval,
- *params.window_params->time_zone,
- params.window_params->time_scale);
-
- if constexpr (apply_watermark_per_row)
- return event_ts_bias;
- else
- return calculateWatermark(event_ts_bias);
- };
- }
- else
- {
- if constexpr (apply_watermark_per_row)
- calc_watermark_ts = [](Int64 event_ts) { return event_ts; };
- else
- calc_watermark_ts = [this](Int64 event_ts) { return calculateWatermark(event_ts); };
- }
-
Int64 event_ts_watermark = watermark_ts;
/// [Process chunks]
@@ -284,7 +332,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
max_event_ts = event_ts;
if constexpr (apply_watermark_per_row)
- event_ts_watermark = calc_watermark_ts(max_event_ts);
+ event_ts_watermark = calculateWatermark(max_event_ts);
}
if (unlikely(event_ts < event_ts_watermark))
@@ -295,7 +343,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
if constexpr (!apply_watermark_per_row)
- event_ts_watermark = calc_watermark_ts(max_event_ts);
+ event_ts_watermark = calculateWatermark(max_event_ts);
if (late_events_in_chunk > 0)
{
@@ -315,9 +363,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
}
-Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
+Int64 WatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
{
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermark() not implemented in {}", getName());
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkBasedOnWindowImpl() not implemented in {}", getName());
}
void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval)
@@ -385,5 +433,8 @@ void WatermarkStamper::deserialize(ReadBuffer & rb)
readIntBinary(last_logged_late_events, rb);
readIntBinary(last_logged_late_events_ts, rb);
}
+
+template void WatermarkStamper::process(Chunk &);
+template void WatermarkStamper::process(Chunk &);
}
}
diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.h b/src/Processors/Transforms/Streaming/WatermarkStamper.h
index bfec5056fe6..441d9f67d62 100644
--- a/src/Processors/Transforms/Streaming/WatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/WatermarkStamper.h
@@ -58,8 +58,12 @@ SERDE class WatermarkStamper
virtual String getName() const { return "WatermarkStamper"; }
void preProcess(const Block & header);
+
+ template
void process(Chunk & chunk);
+ void processAfterUnmuted(Chunk & chunk);
+
bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; }
VersionType getVersion() const;
@@ -80,7 +84,10 @@ SERDE class WatermarkStamper
void logLateEvents();
- virtual Int64 calculateWatermark(Int64 event_ts) const;
+ template
+ ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const;
+
+ virtual Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const;
void initPeriodicTimer(const WindowInterval & interval);
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
index 37c3d1a36b1..e561408db14 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
@@ -54,15 +54,26 @@ void WatermarkTransform::transform(Chunk & chunk)
{
chunk.clearWatermark();
- if (chunk.isHistoricalDataStart())
- is_backfilling_data = true;
- else if (chunk.isHistoricalDataEnd())
- is_backfilling_data = false;
+ if (chunk.isHistoricalDataStart() && skip_stamping_for_backfill_data) [[unlikely]]
+ {
+ mute_watermark = true;
+ return;
+ }
+
+ if (chunk.isHistoricalDataEnd() && skip_stamping_for_backfill_data) [[unlikely]]
+ {
+ mute_watermark = false;
+ watermark->processAfterUnmuted(chunk);
+ return;
+ }
+
+ if (chunk.avoidWatermark())
+ return;
- bool avoid_watermark = chunk.avoidWatermark();
- avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data;
- if (!avoid_watermark)
- watermark->process(chunk);
+ if (mute_watermark)
+ watermark->process(chunk);
+ else
+ watermark->process(chunk);
}
void WatermarkTransform::checkpoint(CheckpointContextPtr ckpt_ctx)
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.h b/src/Processors/Transforms/Streaming/WatermarkTransform.h
index c6c89e5a045..388404025a3 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransform.h
+++ b/src/Processors/Transforms/Streaming/WatermarkTransform.h
@@ -33,7 +33,7 @@ class WatermarkTransform final : public ISimpleTransform
SERDE WatermarkStamperPtr watermark;
bool skip_stamping_for_backfill_data;
- bool is_backfilling_data = false;
+ bool mute_watermark = false;
};
}
}
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
index 96b43e537ed..557c8a6754f 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
@@ -114,13 +114,31 @@ void WatermarkTransformWithSubstream::work()
process_chunk.clearWatermark();
- if (process_chunk.isHistoricalDataStart())
- is_backfilling_data = true;
- else if (process_chunk.isHistoricalDataEnd())
- is_backfilling_data = false;
+ if (process_chunk.isHistoricalDataStart() && skip_stamping_for_backfill_data) [[unlikely]]
+ {
+ mute_watermark = true;
+ /// Propagate historical data start flag
+ output_chunks.emplace_back(std::move(process_chunk));
+ return;
+ }
+
+ if (process_chunk.isHistoricalDataEnd() && skip_stamping_for_backfill_data) [[unlikely]]
+ {
+ mute_watermark = false;
+ output_chunks.reserve(substream_watermarks.size() + 1);
+ /// Propagate historical data end flag first
+ output_chunks.emplace_back(process_chunk.clone());
+ for (auto & [id, watermark] : substream_watermarks)
+ {
+ auto chunk_ctx =ChunkContext::create();
+ chunk_ctx->setSubstreamID(std::move(id));
+ process_chunk.setChunkContext(std::move(chunk_ctx)); /// reset context
- bool avoid_watermark = process_chunk.avoidWatermark();
- avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data;
+ watermark->processAfterUnmuted(process_chunk);
+ output_chunks.emplace_back(process_chunk.clone());
+ }
+ return;
+ }
if (unlikely(process_chunk.requestCheckpoint()))
{
@@ -133,8 +151,13 @@ void WatermarkTransformWithSubstream::work()
auto & watermark = getOrCreateSubstreamWatermark(process_chunk.getSubstreamID());
- if (!avoid_watermark)
- watermark.process(process_chunk);
+ if (!process_chunk.avoidWatermark())
+ {
+ if (mute_watermark)
+ watermark.process(process_chunk);
+ else
+ watermark.process(process_chunk);
+ }
assert(process_chunk);
output_chunks.emplace_back(std::move(process_chunk));
@@ -146,7 +169,7 @@ void WatermarkTransformWithSubstream::work()
/// It's possible to generate periodic or timeout watermark for each substream via an empty chunk
/// FIXME: This is a very ugly and inefficient implementation and needs to revisit.
- if (!avoid_watermark && watermark_template->requiresPeriodicOrTimeoutEmit())
+ if (!mute_watermark && watermark_template->requiresPeriodicOrTimeoutEmit())
{
output_chunks.reserve(substream_watermarks.size());
for (auto & [id, watermark] : substream_watermarks)
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h
index b94cbd40201..95dd74405a1 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h
+++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.h
@@ -41,7 +41,7 @@ class WatermarkTransformWithSubstream final : public IProcessor
SERDE SubstreamHashMap substream_watermarks;
bool skip_stamping_for_backfill_data;
- bool is_backfilling_data = false;
+ bool mute_watermark = false;
Poco::Logger * log;
};
diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp
index 7df8185888d..b460cb61a2b 100644
--- a/src/Storages/StorageView.cpp
+++ b/src/Storages/StorageView.cpp
@@ -135,8 +135,7 @@ void StorageView::read(
}
/// proton: starts.
- if (query_info.left_input_tracking_changes)
- Streaming::rewriteAsChangelogQuery(current_inner_query->as());
+ Streaming::rewriteSubqueryByQueryInfo(current_inner_query->as(), query_info);
/// proton: ends.
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp
index b335b2f3965..ac43d4daaf5 100644
--- a/src/Storages/Streaming/ProxyStream.cpp
+++ b/src/Storages/Streaming/ProxyStream.cpp
@@ -193,8 +193,7 @@ void ProxyStream::doRead(
if (current_subquery)
{
- if (query_info.left_input_tracking_changes)
- Streaming::rewriteAsChangelogQuery(current_subquery->as());
+ Streaming::rewriteSubqueryByQueryInfo(current_subquery->as(), query_info);
auto sub_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
auto interpreter_subquery = std::make_unique(
From 4bd28d56349714dfa2d9ffb286aef4e5bd82052a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Thu, 11 Jan 2024 12:59:34 +0800
Subject: [PATCH 2/4] fix
---
src/Processors/Transforms/Streaming/WatermarkStamper.cpp | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
index 29f54ddd8b8..34619b551ee 100644
--- a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
@@ -206,8 +206,9 @@ void WatermarkStamper::process(Chunk & chunk)
{
/// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
/// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
- if (params.window_params && chunk.hasRows())
+ if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
{
+ assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
max_event_ts = std::max(
max_event_ts,
From 8ce6bedf783e8cfbfb1b309161ed16a52d5366d6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Mon, 15 Jan 2024 19:13:10 +0800
Subject: [PATCH 3/4] fix comments
---
src/Core/Settings.h | 2 +-
src/Interpreters/InterpreterSelectQuery.cpp | 6 +-
.../Streaming/RewriteAsSubquery.cpp | 7 +-
.../Streaming/RewriteAsSubquery.h | 8 +-
.../Streaming/HopWatermarkStamper.cpp | 2 +-
.../Streaming/HopWatermarkStamper.h | 2 +-
.../Streaming/TumbleWatermarkStamper.cpp | 2 +-
.../Streaming/TumbleWatermarkStamper.h | 2 +-
.../Transforms/Streaming/WatermarkStamper.cpp | 134 +++++++++---------
.../Transforms/Streaming/WatermarkStamper.h | 8 +-
.../Streaming/WatermarkTransform.cpp | 4 +-
.../WatermarkTransformWithSubstream.cpp | 6 +-
src/Storages/StorageView.cpp | 2 +-
src/Storages/Streaming/ProxyStream.cpp | 2 +-
.../0030_two_level_global_aggr.yaml | 6 +-
15 files changed, 99 insertions(+), 94 deletions(-)
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index 55c79af6230..2307b4c21f4 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -790,7 +790,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \
M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \
M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \
- M(Bool, emit_aggregated_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
+ M(Bool, emit_during_backfill, false, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, force_backfill_in_order, false, "Requires backfill data in order", 0) \
M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \
M(UInt64, join_max_buffered_bytes, 524288000, "Max buffered bytes for stream to stream join", 0) \
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index 03d273b8dc5..49558771789 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -2280,7 +2280,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
/// proton: starts.
- Streaming::rewriteSubqueryByQueryInfo(subquery->as(), query_info);
+ Streaming::rewriteSubquery(subquery->as(), query_info);
/// proton: ends.
interpreter_subquery = std::make_unique(
@@ -3442,7 +3442,7 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
/// Usually, we don't care whether the backfilled data is in order. Excepts:
/// 1) User require backfill data in order
/// 2) User need window aggr emit result during backfill (it expects that process data in ascending event time)
- if (settings.emit_aggregated_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
+ if (settings.emit_during_backfill.value && hasAggregation() && hasStreamingWindowFunc())
context->setSetting("force_backfill_in_order", true);
if (settings.force_backfill_in_order.value)
@@ -3508,7 +3508,7 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con
auto params = std::make_shared(
query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params);
- bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value;
+ bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_during_backfill.value;
if (query_info.hasPartitionByKeys())
query_plan.addStep(std::make_unique(
diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.cpp b/src/Interpreters/Streaming/RewriteAsSubquery.cpp
index 74bb76428eb..416b5d5ae11 100644
--- a/src/Interpreters/Streaming/RewriteAsSubquery.cpp
+++ b/src/Interpreters/Streaming/RewriteAsSubquery.cpp
@@ -137,13 +137,12 @@ bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only
return rewriteAsChangelogQuery(query);
}
-bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
+bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info)
{
- bool rewriten = false;
if (query_info.left_input_tracking_changes)
- rewriten |= rewriteAsChangelogQuery(query);
+ return rewriteAsChangelogQuery(query);
- return rewriten;
+ return false;
}
}
}
diff --git a/src/Interpreters/Streaming/RewriteAsSubquery.h b/src/Interpreters/Streaming/RewriteAsSubquery.h
index 53287bcf64e..8a458755f2f 100644
--- a/src/Interpreters/Streaming/RewriteAsSubquery.h
+++ b/src/Interpreters/Streaming/RewriteAsSubquery.h
@@ -14,7 +14,7 @@ namespace Streaming
/// 1) `stream1` => `(select * from stream1) as stream1`
/// 2) `stream2 as t` => `(select * from stream2) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...)) as t1`
-/// Return rewritten subquery (return `nullptr` if is subquery)
+/// \return rewritten subquery (return `nullptr` if is subquery)
ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression);
/// Rewrite `table/table_function` to subquery (emit changelog):
@@ -22,10 +22,10 @@ ASTPtr rewriteAsSubquery(ASTTableExpression & table_expression);
/// 2) `stream2 as t` => `(select * from stream2 emit changelog) as t`
/// 3) `table_func(...) as t1` => `(select * from table_func(...) emit changelog) as t1`
/// 4) `(select * from stream1) as s` => `(select * from stream1 emit changelog) as s`
-/// Return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
+/// \return true if rewritten subquery, otherwise false (if already is changelog subquery or skip storage/table_function)
bool rewriteAsChangelogSubquery(ASTTableExpression & table_expression, bool only_rewrite_subquery);
-
-bool rewriteSubqueryByQueryInfo(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
+/// \return true if query was rewritten and false otherwise
+bool rewriteSubquery(ASTSelectWithUnionQuery & query, const SelectQueryInfo & query_info);
}
}
diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
index 78d118c810a..def7080683c 100644
--- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.cpp
@@ -18,7 +18,7 @@ HopWatermarkStamper::HopWatermarkStamper(const WatermarkStamperParams & params_,
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}
-Int64 HopWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
+Int64 HopWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
auto last_finalized_window = HopHelper::getLastFinalizedWindow(event_ts, window_params);
if (likely(last_finalized_window.isValid()))
diff --git a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
index c73071757b1..2321b0a1415 100644
--- a/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/HopWatermarkStamper.h
@@ -17,7 +17,7 @@ class HopWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique(*this); }
private:
- Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
+ Int64 calculateWatermarkImpl(Int64 event_ts) const override;
HopWindowParams & window_params;
};
diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
index 97ef6a4d84f..9e5b068a810 100644
--- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.cpp
@@ -16,7 +16,7 @@ TumbleWatermarkStamper::TumbleWatermarkStamper(const WatermarkStamperParams & pa
throw Exception(ErrorCodes::INCORRECT_QUERY, "{} doesn't support emit mode '{}'", getName(), magic_enum::enum_name(params.mode));
}
-Int64 TumbleWatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
+Int64 TumbleWatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
return toStartTime(
event_ts, window_params.interval_kind, window_params.window_interval, *window_params.time_zone, window_params.time_scale);
diff --git a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
index df8e639eab2..2a7127bdd27 100644
--- a/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/TumbleWatermarkStamper.h
@@ -18,7 +18,7 @@ class TumbleWatermarkStamper final : public WatermarkStamper
WatermarkStamperPtr clone() const override { return std::make_unique(*this); }
private:
- Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const override;
+ Int64 calculateWatermarkImpl(Int64 event_ts) const override;
TumbleWindowParams & window_params;
};
diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
index 34619b551ee..d961fd9d3fc 100644
--- a/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkStamper.cpp
@@ -146,7 +146,6 @@ void WatermarkStamper::preProcess(const Block & header)
initTimeoutTimer(params.timeout_interval);
}
-template
ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
{
if (params.delay_interval)
@@ -158,39 +157,47 @@ ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermark(Int64 event_ts) const
*params.window_params->time_zone,
params.window_params->time_scale);
- if constexpr (apply_watermark_per_row)
- return event_ts_bias;
- else
- return calculateWatermarkBasedOnWindowImpl(event_ts_bias);
+ return calculateWatermarkImpl(event_ts_bias);
}
else
- {
- if constexpr (apply_watermark_per_row)
- return event_ts;
- else
- return calculateWatermarkBasedOnWindowImpl(event_ts);
- }
+ return calculateWatermarkImpl(event_ts);
+}
+
+ALWAYS_INLINE Int64 WatermarkStamper::calculateWatermarkPerRow(Int64 event_ts) const
+{
+ if (params.delay_interval)
+ return addTime(
+ event_ts,
+ params.delay_interval.unit,
+ -1 * params.delay_interval.interval,
+ *params.window_params->time_zone,
+ params.window_params->time_scale);
+ else
+ return event_ts;
}
void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
{
- assert(!chunk.hasRows() && chunk.isHistoricalDataEnd());
+ assert(!chunk.hasRows());
switch (params.mode)
{
- case WatermarkStamperParams::EmitMode::PERIODIC: {
+ case WatermarkStamperParams::EmitMode::PERIODIC:
+ {
processPeriodic(chunk);
break;
}
- case WatermarkStamperParams::EmitMode::WATERMARK: {
- auto muted_watermark_ts = calculateWatermark(max_event_ts);
- if (muted_watermark_ts != INVALID_WATERMARK)
+ case WatermarkStamperParams::EmitMode::WATERMARK:
+ {
+ auto muted_watermark_ts = calculateWatermark(max_event_ts);
+ if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
- case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
- auto muted_watermark_ts = calculateWatermark(max_event_ts);
- if (muted_watermark_ts != INVALID_WATERMARK)
+ case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW:
+ {
+ auto muted_watermark_ts = calculateWatermarkPerRow(max_event_ts);
+ if (muted_watermark_ts != INVALID_WATERMARK) [[likely]]
chunk.setWatermark(muted_watermark_ts);
break;
}
@@ -199,53 +206,53 @@ void WatermarkStamper::processAfterUnmuted(Chunk & chunk)
}
}
-template
+void WatermarkStamper::processWithMutedWatermark(Chunk & chunk)
+{
+ /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
+ /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
+ if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
+ {
+ assert(params.window_params);
+ if (params.window_params->time_col_is_datetime64)
+ max_event_ts = std::max(
+ max_event_ts,
+ *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ else
+ max_event_ts = std::max(
+ max_event_ts,
+ *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ }
+
+ processTimeout(chunk);
+ logLateEvents();
+}
+
void WatermarkStamper::process(Chunk & chunk)
{
- if constexpr (mute_watermark)
+ switch (params.mode)
{
- /// NOTE: In order to avoid that when there is only backfill data and no new data, the window aggregation don't emit results after the backfill is completed.
- /// Even mute watermark, we still need collect `max_event_ts` which will be used in "processAfterUnmuted()" to emit a watermark as soon as the backfill is completed
- if (chunk.hasRows() && (params.mode == WatermarkStamperParams::EmitMode::WATERMARK || params.mode == WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW))
- {
+ case WatermarkStamperParams::EmitMode::PERIODIC: {
+ processPeriodic(chunk);
+ break;
+ }
+ case WatermarkStamperParams::EmitMode::WATERMARK: {
assert(params.window_params);
if (params.window_params->time_col_is_datetime64)
- max_event_ts = std::max(
- max_event_ts,
- *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ processWatermark(chunk);
else
- max_event_ts = std::max(
- max_event_ts,
- *std::ranges::max_element(assert_cast(*chunk.getColumns()[time_col_pos]).getData()));
+ processWatermark(chunk);
+ break;
}
- }
- else
- {
- switch (params.mode)
- {
- case WatermarkStamperParams::EmitMode::PERIODIC: {
- processPeriodic(chunk);
- break;
- }
- case WatermarkStamperParams::EmitMode::WATERMARK: {
- assert(params.window_params);
- if (params.window_params->time_col_is_datetime64)
- processWatermark(chunk);
- else
- processWatermark(chunk);
- break;
- }
- case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
- assert(params.window_params);
- if (params.window_params->time_col_is_datetime64)
- processWatermark(chunk);
- else
- processWatermark(chunk);
- break;
- }
- default:
- break;
+ case WatermarkStamperParams::EmitMode::WATERMARK_PER_ROW: {
+ assert(params.window_params);
+ if (params.window_params->time_col_is_datetime64)
+ processWatermark(chunk);
+ else
+ processWatermark(chunk);
+ break;
}
+ default:
+ break;
}
processTimeout(chunk);
@@ -333,7 +340,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
max_event_ts = event_ts;
if constexpr (apply_watermark_per_row)
- event_ts_watermark = calculateWatermark(max_event_ts);
+ event_ts_watermark = calculateWatermarkPerRow(max_event_ts);
}
if (unlikely(event_ts < event_ts_watermark))
@@ -344,7 +351,7 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
if constexpr (!apply_watermark_per_row)
- event_ts_watermark = calculateWatermark(max_event_ts);
+ event_ts_watermark = calculateWatermark(max_event_ts);
if (late_events_in_chunk > 0)
{
@@ -364,9 +371,9 @@ void WatermarkStamper::processWatermark(Chunk & chunk)
}
}
-Int64 WatermarkStamper::calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const
+Int64 WatermarkStamper::calculateWatermarkImpl(Int64 event_ts) const
{
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkBasedOnWindowImpl() not implemented in {}", getName());
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "calculateWatermarkImpl() not implemented in {}", getName());
}
void WatermarkStamper::initPeriodicTimer(const WindowInterval & interval)
@@ -434,8 +441,5 @@ void WatermarkStamper::deserialize(ReadBuffer & rb)
readIntBinary(last_logged_late_events, rb);
readIntBinary(last_logged_late_events_ts, rb);
}
-
-template void WatermarkStamper::process(Chunk &);
-template void WatermarkStamper::process(Chunk &);
}
}
diff --git a/src/Processors/Transforms/Streaming/WatermarkStamper.h b/src/Processors/Transforms/Streaming/WatermarkStamper.h
index 441d9f67d62..5eb3fc7b439 100644
--- a/src/Processors/Transforms/Streaming/WatermarkStamper.h
+++ b/src/Processors/Transforms/Streaming/WatermarkStamper.h
@@ -59,9 +59,11 @@ SERDE class WatermarkStamper
void preProcess(const Block & header);
- template
void process(Chunk & chunk);
+ /// During mute watermark, we still need to process the chunk to update max_event_ts
+ void processWithMutedWatermark(Chunk & chunk);
+
void processAfterUnmuted(Chunk & chunk);
bool requiresPeriodicOrTimeoutEmit() const { return periodic_interval || timeout_interval; }
@@ -84,10 +86,10 @@ SERDE class WatermarkStamper
void logLateEvents();
- template
ALWAYS_INLINE Int64 calculateWatermark(Int64 event_ts) const;
+ ALWAYS_INLINE Int64 calculateWatermarkPerRow(Int64 event_ts) const;
- virtual Int64 calculateWatermarkBasedOnWindowImpl(Int64 event_ts) const;
+ virtual Int64 calculateWatermarkImpl(Int64 event_ts) const;
void initPeriodicTimer(const WindowInterval & interval);
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
index e561408db14..fd07f066740 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkTransform.cpp
@@ -71,9 +71,9 @@ void WatermarkTransform::transform(Chunk & chunk)
return;
if (mute_watermark)
- watermark->process(chunk);
+ watermark->processWithMutedWatermark(chunk);
else
- watermark->process(chunk);
+ watermark->process(chunk);
}
void WatermarkTransform::checkpoint(CheckpointContextPtr ckpt_ctx)
diff --git a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
index 557c8a6754f..370059514c7 100644
--- a/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
+++ b/src/Processors/Transforms/Streaming/WatermarkTransformWithSubstream.cpp
@@ -130,7 +130,7 @@ void WatermarkTransformWithSubstream::work()
output_chunks.emplace_back(process_chunk.clone());
for (auto & [id, watermark] : substream_watermarks)
{
- auto chunk_ctx =ChunkContext::create();
+ auto chunk_ctx = ChunkContext::create();
chunk_ctx->setSubstreamID(std::move(id));
process_chunk.setChunkContext(std::move(chunk_ctx)); /// reset context
@@ -154,9 +154,9 @@ void WatermarkTransformWithSubstream::work()
if (!process_chunk.avoidWatermark())
{
if (mute_watermark)
- watermark.process(process_chunk);
+ watermark.processWithMutedWatermark(process_chunk);
else
- watermark.process(process_chunk);
+ watermark.process(process_chunk);
}
assert(process_chunk);
diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp
index b460cb61a2b..fdde026173c 100644
--- a/src/Storages/StorageView.cpp
+++ b/src/Storages/StorageView.cpp
@@ -135,7 +135,7 @@ void StorageView::read(
}
/// proton: starts.
- Streaming::rewriteSubqueryByQueryInfo(current_inner_query->as(), query_info);
+ Streaming::rewriteSubquery(current_inner_query->as(), query_info);
/// proton: ends.
auto options = SelectQueryOptions(QueryProcessingStage::Complete, 0, true, query_info.settings_limit_offset_done);
diff --git a/src/Storages/Streaming/ProxyStream.cpp b/src/Storages/Streaming/ProxyStream.cpp
index ac43d4daaf5..849f0bfc18b 100644
--- a/src/Storages/Streaming/ProxyStream.cpp
+++ b/src/Storages/Streaming/ProxyStream.cpp
@@ -193,7 +193,7 @@ void ProxyStream::doRead(
if (current_subquery)
{
- Streaming::rewriteSubqueryByQueryInfo(current_subquery->as(), query_info);
+ Streaming::rewriteSubquery(current_subquery->as(), query_info);
auto sub_context = createProxySubqueryContext(context_, query_info, isStreamingQuery());
auto interpreter_subquery = std::make_unique(
diff --git a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml
index 5d315f12fd4..09a1c4d9240 100644
--- a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml
+++ b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml
@@ -131,7 +131,7 @@ tests:
query_id: 3100
depends_on_stream: test_31_multishards_stream
query: |
- subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false;
+ subscribe to with cte as (select i as key, count() from test_31_multishards_stream where _tp_time > earliest_ts() group by key settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false;
- client: python
query_type: table
@@ -207,7 +207,7 @@ tests:
depends_on_stream: test_31_multishards_stream
wait: 1
query: |
- subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_aggregated_during_backfill=false;
+ subscribe to with cte as (select i as key, count() from changelog(test_31_multishards_stream, i) where _tp_time > earliest_ts() group by key emit changelog settings group_by_two_level_threshold=50) select count() from cte settings checkpoint_interval=2, emit_during_backfill=false;
- client: python
query_type: table
@@ -279,7 +279,7 @@ tests:
wait: 1
query_end_timer: 5
query: |
- with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_aggregated_during_backfill=false;
+ with cte as (select i, count() from test_31_multishards_stream where _tp_time > earliest_ts() shuffle by i group by i settings group_by_two_level_threshold=10) select count() from cte settings emit_during_backfill=false;
- client: python
query_type: table
From 4498680cb5a38c3a6e1a7091e661a2a38ef41026 Mon Sep 17 00:00:00 2001
From: haohang <113408135+yokofly@users.noreply.github.com>
Date: Tue, 16 Jan 2024 11:23:40 +0800
Subject: [PATCH 4/4] increase volume size from 60GiB->80GiB
---
.github/workflows/proton_ci.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.github/workflows/proton_ci.yml b/.github/workflows/proton_ci.yml
index 1f709a771e4..47f784e8b13 100644
--- a/.github/workflows/proton_ci.yml
+++ b/.github/workflows/proton_ci.yml
@@ -141,7 +141,7 @@ jobs:
with:
ec2-instance-type: ${{ vars.X64_INSTANCE_TYPE }}
ec2-image-id: ${{ vars.X64_TEST_AMI }}
- ec2-volume-size: '60'
+ ec2-volume-size: '80'
submodules: false
sanitizer: "address"
arch: ${{ vars.X64_ARCH }}