Skip to content

Commit

Permalink
[CH-123] Support short/byte/binary/decimal/array/map/struct (ClickHou…
Browse files Browse the repository at this point in the history
…se#128)

* support calculate backing length of different types

* remove comment

* rename symbols

* apply BackingDataLengthCalculator

* support decimal from ch column to spark row

* fix decimal issue in ch column to spark row

* refactor SparkRowInfo

* fix building error

* wip

* implement demo

* dev map

* finish map and tuple

* fix building error

* finish writer dev

* fix code style

* ready to improve spark row to ch column

* wip

* finish array/map/tuple reader

* fix building error

* add some uts

* finish debug

* commit again

* finish plan convert

* add benchmark

* improve performance

* try to optimize spark row to ch column

* continue

* optimize SparkRowInfo::SparkRowInfo

* wrap functions

* improve performance

* improve from 360ms to 240 ms

* finish optimizeing performance

* add benchmark for BM_SparkRowTOCHColumn_Lineitem

* refactor spark row reader

* finish tests

* revert cmake

* fix code style

* fix code style

* fix memory leak

* fix build error

* fix building error in debug mode

* add test data file

* add build type, convert ch type to substrait type

* refactor jni interface: native column type

* fixbug of decimal

* replace decimal.parquet

* add data array.parquet

* add test data map.parquet

* add test data file

* finish debug

* wip

* fix logging

* fix address problem

* fix core dump

* fix code style
  • Loading branch information
taiyang-li committed Oct 14, 2022
1 parent 48322be commit 729900c
Show file tree
Hide file tree
Showing 23 changed files with 2,404 additions and 1,077 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ if (COMPILER_CLANG)
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Xclang -fuse-ctor-homing")
endif()
endif()

# set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer -fno-optimize-sibling-calls")
# set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer -fno-optimize-sibling-calls")
endif ()

# If compiler has support for -Wreserved-identifier. It is difficult to detect by clang version,
Expand Down
108 changes: 108 additions & 0 deletions utils/local-engine/Builder/SerializedPlanBuilder.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
#include "SerializedPlanBuilder.h"
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Functions/FunctionHelpers.h>

namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
}
}

namespace dbms
{

using namespace DB;
SchemaPtr SerializedSchemaBuilder::build()
{
for (const auto & [name, type] : this->type_map)
Expand Down Expand Up @@ -171,9 +188,11 @@ std::unique_ptr<substrait::Plan> SerializedPlanBuilder::build()
{
return std::move(this->plan);
}

SerializedPlanBuilder::SerializedPlanBuilder() : plan(std::make_unique<substrait::Plan>())
{
}

SerializedPlanBuilder & SerializedPlanBuilder::aggregate(std::vector<int32_t> /*keys*/, std::vector<substrait::AggregateRel_Measure *> aggregates)
{
substrait::Rel * rel = new substrait::Rel();
Expand All @@ -188,6 +207,7 @@ SerializedPlanBuilder & SerializedPlanBuilder::aggregate(std::vector<int32_t> /
this->prev_rel = rel;
return *this;
}

SerializedPlanBuilder & SerializedPlanBuilder::project(std::vector<substrait::Expression *> projections)
{
substrait::Rel * project = new substrait::Rel();
Expand All @@ -200,6 +220,94 @@ SerializedPlanBuilder & SerializedPlanBuilder::project(std::vector<substrait::Ex
return *this;
}

std::shared_ptr<substrait::Type> SerializedPlanBuilder::buildType(const DB::DataTypePtr & ch_type)
{
const auto * ch_type_nullable = checkAndGetDataType<DataTypeNullable>(ch_type.get());
const bool is_nullable = (ch_type_nullable != nullptr);
auto type_nullability
= is_nullable ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED;

const auto ch_type_without_nullable = DB::removeNullable(ch_type);
const DB::WhichDataType which(ch_type_without_nullable);

auto res = std::make_shared<substrait::Type>();
if (which.isUInt8())
res->mutable_bool_()->set_nullability(type_nullability);
else if (which.isInt8())
res->mutable_i8()->set_nullability(type_nullability);
else if (which.isInt16())
res->mutable_i16()->set_nullability(type_nullability);
else if (which.isInt32())
res->mutable_i32()->set_nullability(type_nullability);
else if (which.isInt64())
res->mutable_i64()->set_nullability(type_nullability);
else if (which.isString() || which.isAggregateFunction())
res->mutable_binary()->set_nullability(type_nullability); /// Spark Binary type is more similiar to CH String type
else if (which.isFloat32())
res->mutable_fp32()->set_nullability(type_nullability);
else if (which.isFloat64())
res->mutable_fp64()->set_nullability(type_nullability);
else if (which.isFloat64())
res->mutable_fp64()->set_nullability(type_nullability);
else if (which.isDateTime64())
{
const auto * ch_type_datetime64 = checkAndGetDataType<DataTypeDateTime64>(ch_type_without_nullable.get());
if (ch_type_datetime64->getScale() != 6)
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName());
res->mutable_timestamp()->set_nullability(type_nullability);
}
else if (which.isDate32())
res->mutable_date()->set_nullability(type_nullability);
else if (which.isDecimal())
{
if (which.isDecimal256())
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName());

const auto scale = getDecimalScale(*ch_type_without_nullable, 0);
const auto precision = getDecimalPrecision(*ch_type_without_nullable);
if (scale == 0 && precision == 0)
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName());
res->mutable_decimal()->set_nullability(type_nullability);
res->mutable_decimal()->set_scale(scale);
res->mutable_decimal()->set_precision(precision);
}
else if (which.isTuple())
{
const auto * ch_tuple_type = checkAndGetDataType<DataTypeTuple>(ch_type_without_nullable.get());
const auto & ch_field_types = ch_tuple_type->getElements();
res->mutable_struct_()->set_nullability(type_nullability);
for (const auto & ch_field_type: ch_field_types)
res->mutable_struct_()->mutable_types()->Add(std::move(*buildType(ch_field_type)));
}
else if (which.isArray())
{
const auto * ch_array_type = checkAndGetDataType<DataTypeArray>(ch_type_without_nullable.get());
const auto & ch_nested_type = ch_array_type->getNestedType();
res->mutable_list()->set_nullability(type_nullability);
*(res->mutable_list()->mutable_type()) = *buildType(ch_nested_type);
}
else if (which.isMap())
{
const auto & ch_map_type = checkAndGetDataType<DataTypeMap>(ch_type_without_nullable.get());
const auto & ch_key_type = ch_map_type->getKeyType();
const auto & ch_val_type = ch_map_type->getValueType();
res->mutable_map()->set_nullability(type_nullability);
*(res->mutable_map()->mutable_key()) = *buildType(ch_key_type);
*(res->mutable_map()->mutable_value()) = *buildType(ch_val_type);
}
else
throw Exception(ErrorCodes::UNKNOWN_TYPE, "Spark doesn't support converting from {}", ch_type->getName());

return std::move(res);
}

void SerializedPlanBuilder::buildType(const DB::DataTypePtr & ch_type, String & substrait_type)
{
auto pb = buildType(ch_type);
substrait_type = pb->SerializeAsString();
}


substrait::Expression * selection(int32_t field_id)
{
substrait::Expression * rel = new substrait::Expression();
Expand Down
3 changes: 3 additions & 0 deletions utils/local-engine/Builder/SerializedPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class SerializedPlanBuilder
SchemaPtr schema);
std::unique_ptr<substrait::Plan> build();

static std::shared_ptr<substrait::Type> buildType(const DB::DataTypePtr & ch_type);
static void buildType(const DB::DataTypePtr & ch_type, String & substrait_type);

private:
void setInputToPrev(substrait::Rel * input);
substrait::Rel * prev_rel = nullptr;
Expand Down
1 change: 1 addition & 0 deletions utils/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ target_compile_options(_icui18n PRIVATE -fPIC)
target_compile_options(_cpuid PRIVATE -fPIC)
target_compile_options(re2_st PRIVATE -fPIC)
target_compile_options(_boost_program_options PRIVATE -fPIC)
target_compile_options(_boost_context PRIVATE -fPIC)
target_compile_options(clickhouse_common_io PRIVATE -fPIC)
target_compile_options(clickhouse_dictionaries_embedded PRIVATE -fPIC)
target_compile_options(clickhouse_common_zookeeper PRIVATE -fPIC)
Expand Down
114 changes: 9 additions & 105 deletions utils/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,20 @@ void headBlock(const DB::Block & block, size_t count)
std::cerr << block.dumpStructure() << std::endl;
// print header
for (const auto& name : block.getNames())
{
std::cerr << name << "\t";
}
std::cerr << std::endl;

// print rows
for (size_t row = 0; row < std::min(count, block.rows()); ++row)
{
for (size_t column = 0; column < block.columns(); ++column)
{
const auto type = block.getByPosition(column).type;
auto col = block.getByPosition(column).column;
auto nested_col = col;
DB::DataTypePtr nested_type = type;
if (const auto *nullable = DB::checkAndGetDataType<DB::DataTypeNullable>(type.get()))
{
nested_type = nullable->getNestedType();
const auto *nullable_column = DB::checkAndGetColumn<DB::ColumnNullable>(*col);
nested_col = nullable_column->getNestedColumnPtr();
}
DB::WhichDataType which(nested_type);
if (col->isNullAt(row))
{
std::cerr << "null" << "\t";
}
else if (which.isUInt())
{
auto value = nested_col->getUInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*nested_col)->getDataAt(row).toString();
std::cerr << value << "\t";
}
else if (which.isInt())
{
auto value = nested_col->getInt(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat32())
{
auto value = nested_col->getFloat32(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isFloat64())
{
auto value = nested_col->getFloat64(row);
std::cerr << std::to_string(value) << "\t";
}
else if (which.isDate())
{
const auto * date_type = DB::checkAndGetDataType<DB::DataTypeDate>(nested_type.get());
String date_string;
DB::WriteBufferFromString wb(date_string);
date_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*nested_col, row, wb, {});
std::cerr << date_string.substr(0, 10) << "\t";
}
else if (which.isDate32())
{
const auto * date_type = DB::checkAndGetDataType<DB::DataTypeDate32>(nested_type.get());
String date_string;
DB::WriteBufferFromString wb(date_string);
date_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*nested_col, row, wb, {});
std::cerr << date_string.substr(0, 10) << "\t";
}
else if (which.isDateTime64())
{
const auto * datetime64_type = DB::checkAndGetDataType<DB::DataTypeDateTime64>(nested_type.get());
String datetime64_string;
DB::WriteBufferFromString wb(datetime64_string);
datetime64_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*nested_col, row, wb, {});
std::cerr << datetime64_string << "\t";
}
else
std::cerr << "N/A" << "\t";

if (column > 0)
std::cerr << "\t";
std::cerr << toString((*col)[row]);
}
std::cerr << std::endl;
}
Expand All @@ -100,49 +39,14 @@ void headBlock(const DB::Block & block, size_t count)
void headColumn(const DB::ColumnPtr column, size_t count)
{
std::cerr << "============Column============" << std::endl;
// print header

// print header
std::cerr << column->getName() << "\t";
std::cerr << std::endl;

// print rows
for (size_t row = 0; row < std::min(count, column->size()); ++row)
{
auto type = column->getDataType();
const auto& col = column;
DB::WhichDataType which(type);
if (col->isNullAt(row))
{
std::cerr << "null" << "\t";
}
else if (which.isUInt())
{
auto value = col->getUInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isString())
{
auto value = DB::checkAndGetColumn<DB::ColumnString>(*col)->getDataAt(row).toString();
std::cerr << value << std::endl;
}
else if (which.isInt())
{
auto value = col->getInt(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat32())
{
auto value = col->getFloat32(row);
std::cerr << std::to_string(value) << std::endl;
}
else if (which.isFloat64())
{
auto value = col->getFloat64(row);
std::cerr << std::to_string(value) << std::endl;
}
else
{
std::cerr << "N/A" << std::endl;
}
}
std::cerr << toString((*column)[row]) << std::endl;
}

}
Loading

0 comments on commit 729900c

Please sign in to comment.