Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6796
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Feb 13, 2023
1 parent 4816a7c commit 631499f
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 1 deletion.
97 changes: 97 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/TiDBTableScan.h>

namespace DB
{
class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream
{
public:
GeneratedColumnPlaceholderBlockInputStream(
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }
Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
}

static String getColumnName(UInt64 col_index)
{
return "generated_column_" + std::to_string(col_index);
}

protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
const LoggerPtr log;
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end());
}

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
}


Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MultiplexInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
Expand Down Expand Up @@ -375,6 +376,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
Expand Down Expand Up @@ -1043,6 +1046,15 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
auto const & ci = table_scan.getColumns()[i];
ColumnID cid = ci.column_id();

if (ci.hasGeneratedColumnFlag())
{
LOG_DEBUG(log, "got column({}) with generated column flag", i);
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class DAGStorageInterpreter
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};

} // namespace DB
74 changes: 74 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
<<<<<<< HEAD
=======
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/SharedQueryBlockInputStream.h>
Expand Down Expand Up @@ -194,4 +199,73 @@ void executeCreatingSets(
log->identifier());
}
}
<<<<<<< HEAD
=======

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer)
{
assert(filter_conditions.hasValue());

ExpressionActionsChain chain;
analyzer.initChain(chain);
String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions);
ExpressionActionsPtr before_where = chain.getLastActions();
chain.addStep();

// remove useless tmp column and keep the schema of local streams and remote streams the same.
NamesWithAliases project_cols;
for (const auto & col : analyzer.getCurrentInputColumns())
{
chain.getLastStep().required_output.push_back(col.name);
project_cols.emplace_back(col.name, col.name);
}
chain.getLastActions()->add(ExpressionAction::project(project_cols));
ExpressionActionsPtr project_after_where = chain.getLastActions();
chain.finalize();
chain.clear();

return {before_where, filter_column_name, project_after_where};
}

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer);

assert(remote_read_streams_start_index <= pipeline.streams.size());
// for remote read, filter had been pushed down, don't need to execute again.
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<FilterBlockInputStream>(stream, before_where, filter_column_name, log->identifier());
stream->setExtraInfo("push down filter");
// after filter, do project action to keep the schema of local streams and remote streams the same.
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_after_where, log->identifier());
stream->setExtraInfo("projection after push down filter");
}
}

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline)
{
if (generated_column_infos.empty())
return;
assert(remote_read_streams_start_index <= pipeline.streams.size());
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<GeneratedColumnPlaceholderBlockInputStream>(stream, generated_column_infos, log->identifier());
stream->setExtraInfo("generated column placeholder above table scan");
}
}
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
} // namespace DB
20 changes: 20 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,24 @@ void executeCreatingSets(
const Context & context,
size_t max_streams,
const LoggerPtr & log);
<<<<<<< HEAD
=======

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
const FilterConditions & filter_conditions,
DAGExpressionAnalyzer & analyzer,
LoggerPtr log,
DAGPipeline & pipeline);

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline);
>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796))
} // namespace DB

0 comments on commit 631499f

Please sign in to comment.