From 03d83cf076b2bfdc4c26c062157304f58701f941 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Wed, 17 Jan 2024 13:14:29 +0800
Subject: [PATCH 1/2] fix incorrect subscribe check for internal subquery
---
src/Interpreters/InterpreterSelectWithUnionQuery.cpp | 2 +-
src/Processors/QueryPlan/QueryExecuteMode.cpp | 8 +++++++-
src/Processors/QueryPlan/QueryExecuteMode.h | 2 +-
src/Processors/QueryPlan/QueryPlan.cpp | 2 --
4 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp
index 2d701bbb2e2..4cf7a29d09e 100644
--- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp
+++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp
@@ -400,7 +400,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
BuildQueryPipelineSettings::fromContext(context), context);
/// proton : starts, setup execute mode
- builder->setExecuteMode(queryExecuteMode(query_plan.isStreaming(), context->getSettingsRef()));
+ builder->setExecuteMode(queryExecuteMode(query_plan.isStreaming(), options.is_subquery, context->getSettingsRef()));
/// proton : ends
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
diff --git a/src/Processors/QueryPlan/QueryExecuteMode.cpp b/src/Processors/QueryPlan/QueryExecuteMode.cpp
index 2f34866b4b2..91841dc7647 100644
--- a/src/Processors/QueryPlan/QueryExecuteMode.cpp
+++ b/src/Processors/QueryPlan/QueryExecuteMode.cpp
@@ -10,7 +10,7 @@ extern const int INVALID_SETTING_VALUE;
extern const int NOT_IMPLEMENTED;
}
-ExecuteMode queryExecuteMode(bool is_streaming, const Settings & settings)
+ExecuteMode queryExecuteMode(bool is_streaming, bool is_subquery, const Settings & settings)
{
switch (settings.exec_mode)
{
@@ -20,7 +20,13 @@ ExecuteMode queryExecuteMode(bool is_streaming, const Settings & settings)
case ExecuteMode::UNSUBSCRIBE:
case ExecuteMode::RECOVER:
if (!is_streaming)
+ {
+ /// Assume it's a historical subquery in streaming query, we only execute it in normal mode.
+ if (is_subquery)
+ return ExecuteMode::NORMAL;
+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SUBSCRIBE can only work with streaming query");
+ }
break;
}
diff --git a/src/Processors/QueryPlan/QueryExecuteMode.h b/src/Processors/QueryPlan/QueryExecuteMode.h
index 075cbf3c648..4ab4acb370f 100644
--- a/src/Processors/QueryPlan/QueryExecuteMode.h
+++ b/src/Processors/QueryPlan/QueryExecuteMode.h
@@ -5,5 +5,5 @@
namespace DB
{
struct Settings;
-ExecuteMode queryExecuteMode(bool is_streaming, const Settings & settings);
+ExecuteMode queryExecuteMode(bool is_streaming, bool is_subquery, const Settings & settings);
}
diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp
index f6b2e4f113a..7abe6ae5674 100644
--- a/src/Processors/QueryPlan/QueryPlan.cpp
+++ b/src/Processors/QueryPlan/QueryPlan.cpp
@@ -198,8 +198,6 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
last_pipeline->addResources(std::move(resources));
/// proton : starts
- last_pipeline->setExecuteMode(queryExecuteMode(isStreaming(), query_context->getSettingsRef()));
-
if (isStreaming() != last_pipeline->isStreaming())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
From c9f7772c261d6102e8ac654827c92019f049c190 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Wed, 17 Jan 2024 13:51:25 +0800
Subject: [PATCH 2/2] add smoke test
---
.../test_stream_smoke/0099_fixed_issues.json | 27 +++++++++++++++++++
1 file changed, 27 insertions(+)
diff --git a/tests/stream/test_stream_smoke/0099_fixed_issues.json b/tests/stream/test_stream_smoke/0099_fixed_issues.json
index fafb631514d..1422c6d81a7 100644
--- a/tests/stream/test_stream_smoke/0099_fixed_issues.json
+++ b/tests/stream/test_stream_smoke/0099_fixed_issues.json
@@ -668,6 +668,33 @@
]
}
]
+ },
+ {
+ "id": 23,
+ "tags": ["in set", "checkpoint"],
+ "name": "#498",
+ "description": "mv over versioned_kv with where in set",
+ "steps":[
+ {
+ "statements": [
+ {"client":"python", "query_type": "table", "query": "drop view if exists test_mv_498"},
+ {"client":"python", "query_type": "table", "query": "drop stream if exists test_dim_498"},
+ {"client":"python", "query_type": "table", "query": "drop stream if exists test_vk_498"},
+ {"client":"python", "query_type": "table", "exist": "test_dim_498", "exist_wait":2, "wait":1, "query": "create stream test_dim_498(id int, type string)"},
+ {"client":"python", "query_type": "table", "exist": "test_vk_498", "exist_wait":2, "wait":1, "query": "create stream test_vk_498(id int, value float) primary key id settings mode='versioned_kv'"},
+ {"client":"python", "query_type": "table", "depends_on_stream": "test_dim_498", "wait": 1, "query": "insert into test_dim_498(id, type) values(1, 'old')(2, 'old')(3, 'new');"},
+ {"client":"python", "query_type": "table", "wait":2, "query":"create materialized view test_mv_498 as with list as (select id from table(test_dim_498)) select id, value from test_vk_498 where id in list;"},
+ {"client":"python", "query_type": "stream", "depends_on_stream":"test_mv_498", "wait": 1, "query_id":"9923", "query":"select id, value from test_mv_498;"},
+ {"client":"python", "query_type": "table", "depends_on":"9923", "wait": 1, "kill":"9923", "kill_wait":2, "drop_view":"test_mv_498", "drop_view_wait":2, "query": "insert into test_vk_498(id, value) values(3, 20)(3, 30)(1, 10)(4, 40)(5, 50);"}
+ ]
+ }
+ ],
+ "expected_results": [
+ {
+ "query_id":"9923",
+ "expected_results":[[1, 10], [3, 20], [3, 30]]
+ }
+ ]
}
]
}