-
Notifications
You must be signed in to change notification settings - Fork 6.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CLICKHOUSE-3837 Add support for table function in remote() and cluster() #2708
CLICKHOUSE-3837 Add support for table function in remote() and cluster() #2708
Conversation
@@ -47,6 +47,8 @@ class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailo | |||
*/ | |||
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode); | |||
|
|||
std::vector<TryResult> getManyForTableFunc(const Settings * settings, PoolMode pool_mode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
Let's use getManyForTableFunction
name.
if (table_func_ptr) | ||
{ | ||
auto table_function = static_cast<ASTFunction *>(table_func_ptr.get()); | ||
main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to not execute table function on initiating server.
Because it may execute successfully only on remote nodes.
For example:
remote('host', merge(db, '^table$'))
- database
db
may be absent locally and it is Ok.
{ | ||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage); | ||
stream->setPoolMode(PoolMode::GET_MANY); | ||
stream->setMainTable(main_table); | ||
res.emplace_back(std::move(stream)); | ||
}; | ||
|
||
auto emplace_remote_stream_for_func = [&]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is almost identical, why don't just branch on table_func_ptr
inside?
const Tables & external_tables); | ||
|
||
SelectStreamFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
@@ -51,14 +53,19 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co | |||
|
|||
|
|||
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context) | |||
{ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excessive whitespace.
if (typeid_cast<const ASTLiteral *>(node.get())) | ||
return node; | ||
|
||
|
||
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
dbms/src/Parsers/ASTSelectQuery.cpp
Outdated
table_expression->database_and_table_name = std::make_shared<ASTIdentifier>(database_name + "." + table_name, ASTIdentifier::Table); | ||
table_expression->database_and_table_name->children = {database, table}; | ||
|
||
if (table_function_ptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style.
dbms/src/Parsers/ASTSelectQuery.h
Outdated
@@ -46,7 +46,7 @@ class ASTSelectQuery : public IAST | |||
bool array_join_is_left() const; | |||
bool final() const; | |||
void setDatabaseIfNeeded(const String & database_name); | |||
void replaceDatabaseAndTable(const String & database_name, const String & table_name); | |||
void replaceDatabaseAndTable(const String & database_name, const String & table_name, ASTPtr table_function_name = nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to have two different methods?
@@ -67,10 +67,10 @@ namespace | |||
|
|||
/// select query has database and table names as AST pointers | |||
/// Creates a copy of query, changes database and table names. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obsolete comment.
@@ -154,7 +154,7 @@ StorageDistributed::StorageDistributed( | |||
bool attach) | |||
: IStorage{columns_}, | |||
table_name(table_name_), | |||
remote_database(remote_database_), remote_table(remote_table_), | |||
remote_database(remote_database_), remote_table(remote_table_), remote_table_function_ptr(nullptr), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make sense.
if (typeid_cast<const ASTLiteral *>(node.get())) | ||
return node; | ||
|
||
|
||
if (auto table_func_ptr = typeid_cast<ASTFunction *>(node.get())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
ClusterProxy::SelectStreamFactory select_stream_factory( | ||
header, processed_stage, remote_table_function_ptr, context.getExternalTables()); | ||
return ClusterProxy::executeQuery( | ||
select_stream_factory, cluster, modified_query_ast, context, settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent.
{ | ||
ClusterProxy::SelectStreamFactory select_stream_factory( | ||
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables()); | ||
return ClusterProxy::executeQuery( | ||
select_stream_factory, cluster, modified_query_ast, context, settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent.
{ | ||
ClusterProxy::SelectStreamFactory select_stream_factory( | ||
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables()); | ||
return ClusterProxy::executeQuery( | ||
select_stream_factory, cluster, modified_query_ast, context, settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent.
static StoragePtr createWithOwnCluster( | ||
const std::string & table_name_, | ||
const ColumnsDescription & columns_, | ||
const String & remote_database_, /// database on remote servers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need these two parameters: remote_database, remote_table, if we have remote_table_function_ptr?
const ColumnsDescription & columns_, | ||
const String & remote_database_, /// database on remote servers. | ||
const String & remote_table_, /// The name of the table on the remote servers. | ||
ASTPtr remote_table_function_ptr_, /// Table function ptr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass by reference?
return TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context)->getColumns(); | ||
} | ||
|
||
auto table_func_name = table_func_ptr->getAliasOrColumnName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use queryToString
, because column name doesn't necessarily correspond to serialized expression in query.
input->readPrefix(); | ||
|
||
const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); | ||
|
||
ParserExpression expr_parser; | ||
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excessive whitespace.
@@ -196,8 +196,9 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C | |||
|
|||
String cluster_name; | |||
String cluster_description; | |||
String remote_database; | |||
String remote_table; | |||
String remote_database = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make sense.
size_t dot = remote_database.find('.'); | ||
if (dot != String::npos) | ||
|
||
auto table_function = static_cast<ASTFunction *>(args[arg_num].get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const
Missing functional test. At least as simple as |
Also should test |
You have a test only for local shard, and only for the case of single shard. |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en