Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support default value and json format import for Tensor and TensorArray data type #1241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/executor/expression/expression_evaluator.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

export module expression_evaluator;

import stl;
import base_expression;
import aggregate_expression;
Expand All @@ -28,8 +30,6 @@ import data_block;
import column_vector;
import expression_state;

export module expression_evaluator;

namespace infinity {

export class ExpressionEvaluator {
Expand Down
4 changes: 2 additions & 2 deletions src/executor/expression/expression_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

export module expression_state;

import stl;
import base_expression;
import aggregate_expression;
Expand All @@ -25,8 +27,6 @@ import value_expression;
import in_expression;
import column_vector;

export module expression_state;

namespace infinity {

export enum class AggregateFlag : i8 {
Expand Down
151 changes: 77 additions & 74 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,46 +539,80 @@ void PhysicalImport::CSVRowHandler(void *context) {
parser_context->block_entry_ = std::move(block_entry);
}

template <typename T>
void AppendJsonTensorToColumn(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<T> &&embedding = line_json[column_name].get<Vector<T>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
Status status = Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension()));
LOG_ERROR(status.message());
RecoverableError(status);
}
const auto input_bytes = embedding.size() * sizeof(T);
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(embedding.data()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

template <>
void AppendJsonTensorToColumn<bool>(const nlohmann::json &line_json,
const String &column_name,
ColumnVector &column_vector,
EmbeddingInfo *embedding_info) {
Vector<float> &&embedding = line_json[column_name].get<Vector<float>>();
if (embedding.size() % embedding_info->Dimension() != 0) {
Status status = Status::ImportFileFormatError(
fmt::format("Tensor element count {} isn't multiple of dimension {}.", embedding.size(), embedding_info->Dimension()));
LOG_ERROR(status.message());
RecoverableError(status);
}
const auto input_bytes = (embedding.size() + 7) / 8;
auto input_data = MakeUnique<u8[]>(input_bytes);
for (SizeT i = 0; i < embedding.size(); ++i) {
if (embedding[i]) {
input_data[i / 8] |= (1u << (i % 8));
SharedPtr<ConstantExpr> BuildConstantExprFromJson(const nlohmann::json &json_object) {
switch (json_object.type()) {
case nlohmann::json::value_t::boolean: {
auto res = MakeShared<ConstantExpr>(LiteralType::kBoolean);
res->bool_value_ = json_object.get<bool>();
return res;
}
case nlohmann::json::value_t::number_unsigned:
case nlohmann::json::value_t::number_integer: {
auto res = MakeShared<ConstantExpr>(LiteralType::kInteger);
res->integer_value_ = json_object.get<i64>();
return res;
}
case nlohmann::json::value_t::number_float: {
auto res = MakeShared<ConstantExpr>(LiteralType::kDouble);
res->double_value_ = json_object.get<double>();
return res;
}
case nlohmann::json::value_t::string: {
auto res = MakeShared<ConstantExpr>(LiteralType::kString);
auto str = json_object.get<String>();
res->str_value_ = strdup(json_object.get<String>().c_str());
return res;
}
case nlohmann::json::value_t::array: {
const u32 array_size = json_object.size();
if (array_size == 0) {
const auto error_info = "Empty json array!";
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
switch (json_object[0].type()) {
case nlohmann::json::value_t::boolean:
case nlohmann::json::value_t::number_unsigned:
case nlohmann::json::value_t::number_integer: {
auto res = MakeShared<ConstantExpr>(LiteralType::kIntegerArray);
res->long_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->long_array_[i] = json_object[i].get<i64>();
}
return res;
}
case nlohmann::json::value_t::number_float: {
auto res = MakeShared<ConstantExpr>(LiteralType::kDoubleArray);
res->double_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->double_array_[i] = json_object[i].get<double>();
}
return res;
}
case nlohmann::json::value_t::array: {
auto res = MakeShared<ConstantExpr>(LiteralType::kSubArrayArray);
res->sub_array_array_.resize(array_size);
for (u32 i = 0; i < array_size; ++i) {
res->sub_array_array_[i] = BuildConstantExprFromJson(json_object[i]);
}
return res;
}
default: {
const auto error_info = fmt::format("Unrecognized json object type in array: {}", json_object.type_name());
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
}
}
default: {
const auto error_info = fmt::format("Unrecognized json object type: {}", json_object.type_name());
LOG_ERROR(error_info);
RecoverableError(Status::ImportFileFormatError(error_info));
return nullptr;
}
}
const Value embedding_value =
Value::MakeTensor(reinterpret_cast<const_ptr_t>(input_data.get()), input_bytes, column_vector.data_type()->type_info());
column_vector.AppendValue(embedding_value);
}

void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<ColumnVector> &column_vectors) {
Expand Down Expand Up @@ -667,42 +701,11 @@ void PhysicalImport::JSONLRowHandler(const nlohmann::json &line_json, Vector<Col
}
break;
}
case kTensor: {
auto embedding_info = static_cast<EmbeddingInfo *>(column_vector.data_type()->type_info().get());
// SizeT dim = embedding_info->Dimension();
switch (embedding_info->Type()) {
case kElemBit: {
AppendJsonTensorToColumn<bool>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt8: {
AppendJsonTensorToColumn<i8>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt16: {
AppendJsonTensorToColumn<i16>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt32: {
AppendJsonTensorToColumn<i32>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemInt64: {
AppendJsonTensorToColumn<i64>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemFloat: {
AppendJsonTensorToColumn<float>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
case kElemDouble: {
AppendJsonTensorToColumn<double>(line_json, column_def->name_, column_vector, embedding_info);
break;
}
default: {
UnrecoverableError("Not implement: Embedding type.");
}
}
case kTensor:
case kTensorArray: {
// build ConstantExpr
SharedPtr<ConstantExpr> const_expr = BuildConstantExprFromJson(line_json[column_def->name_]);
column_vector.AppendByConstantExpr(const_expr.get());
break;
}
default: {
Expand Down
14 changes: 0 additions & 14 deletions src/parser/type/complex/embedding_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ struct EmbeddingType {
EmbeddingType &operator=(const EmbeddingType &other) = delete;

EmbeddingType &operator=(EmbeddingType &&other) = delete;
/*
EmbeddingType &operator=(EmbeddingType &&other) noexcept {
if (this == &other)
return *this;
if (ptr != nullptr) {
// LOG_TRACE("Target embedding isn't null, need to manually SetNull or Reset");
Reset();
}
const_cast<bool &>(new_allocated_) = other.new_allocated_;
ptr = other.ptr;
other.ptr = nullptr;
return *this;
}
*/

void Init(const void *ptr, size_t size);

Expand Down
4 changes: 0 additions & 4 deletions src/parser/type/complex/tensor_array_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include "tensor_type.h"

namespace infinity {

#pragma pack(1)
Expand All @@ -24,8 +22,6 @@ struct TensorArrayType {
uint16_t tensor_num_ = 0;
uint16_t chunk_id_ = 0;
uint32_t chunk_offset_ = 0;

//[[nodiscard]] static std::string TensorArray2String(char *tensor_ptr, EmbeddingDataType type, size_t embedding_dimension, size_t embedding_num);
};

static_assert(sizeof(TensorArrayType) == sizeof(uint64_t));
Expand Down
139 changes: 25 additions & 114 deletions src/storage/column_vector/column_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import buffer_manager;
import status;
import logical_type;
import embedding_info;
import base_expression;
import value_expression;
import expression_binder;
import cast_function;
import bound_cast_func;
import cast_expression;
import expression_evaluator;
import expression_state;

import block_column_entry;

Expand Down Expand Up @@ -1635,120 +1643,23 @@ void ColumnVector::AppendByStringView(std::string_view sv, char delimiter) {
}

void ColumnVector::AppendByConstantExpr(const ConstantExpr *const_expr) {
switch (data_type_->type()) {
case kBoolean: {
bool v = const_expr->bool_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kTinyInt: {
i8 v = static_cast<i8>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kSmallInt: {
i16 v = static_cast<i16>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kInteger: {
i32 v = static_cast<i32>(const_expr->integer_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kBigInt: {
i64 v = const_expr->integer_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kFloat: {
float v = static_cast<float>(const_expr->double_value_);
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kDouble: {
double v = const_expr->double_value_;
AppendByPtr(reinterpret_cast<const_ptr_t>(&v));
break;
}
case kVarchar: {
std::string_view str_view = const_expr->str_value_;
AppendByStringView(str_view, ',');
break;
}
case kTensor:
case kTensorArray: {
// TODO: used by default value?
UnrecoverableError("Need fix!");
break;
}
case kEmbedding: {
auto embedding_info = static_cast<EmbeddingInfo *>(data_type_->type_info().get());
// SizeT dim = embedding_info->Dimension();
switch (embedding_info->Type()) {
case kElemInt8: {
Vector<i8> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i8>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt16: {
Vector<i16> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i16>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt32: {
Vector<i32> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<i32>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemInt64: {
Vector<i64> embedding;
embedding.reserve(const_expr->long_array_.size());
std::transform(const_expr->long_array_.begin(), const_expr->long_array_.end(), std::back_inserter(embedding), [](auto &v) {
return v;
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemFloat: {
Vector<float> embedding;
embedding.reserve(const_expr->double_array_.size());
std::transform(const_expr->double_array_.begin(), const_expr->double_array_.end(), std::back_inserter(embedding), [](auto &v) {
return static_cast<float>(v);
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
case kElemDouble: {
Vector<i8> embedding;
embedding.reserve(const_expr->double_array_.size());
std::transform(const_expr->double_array_.begin(), const_expr->double_array_.end(), std::back_inserter(embedding), [](auto &v) {
return v;
});
AppendByPtr(reinterpret_cast<const_ptr_t>(embedding.data()));
break;
}
default: {
UnrecoverableError("Not implement: Embedding type.");
}
}
break;
}
default: {
UnrecoverableError("Not implement: Invalid data type.");
}
ExpressionBinder tmp_binder(nullptr);
auto expr = tmp_binder.BuildValueExpr(*const_expr, nullptr, 0, false);
auto value_expr = std::dynamic_pointer_cast<ValueExpression>(expr);
if (value_expr->Type() == *data_type()) {
auto value_to_insert = value_expr->GetValue();
AppendValue(value_to_insert);
} else {
// try cast
BoundCastFunc cast = CastFunction::GetBoundFunc(value_expr->Type(), *data_type());
SharedPtr<BaseExpression> cast_expr = MakeShared<CastExpression>(cast, expr, *data_type());
SharedPtr<ExpressionState> expr_state = ExpressionState::CreateState(cast_expr);
SharedPtr<ColumnVector> output_column_vector = ColumnVector::Make(data_type());
output_column_vector->Initialize(ColumnVectorType::kConstant, 1);
ExpressionEvaluator evaluator;
evaluator.Init(nullptr);
evaluator.Execute(cast_expr, expr_state, output_column_vector);
AppendWith(*output_column_vector, 0, 1);
}
}

Expand Down
Loading
Loading