Skip to content

Commit

Permalink
Support agg functiuon for const vliteral and remove some unless sink …
Browse files Browse the repository at this point in the history
…code (apache#32)
  • Loading branch information
HappenLee committed Jul 1, 2021
1 parent 2b9a009 commit 06c7213
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 333 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (is_vec) {
tmp_sink = new doris::vectorized::ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024, config::is_vec);
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
sink->reset(tmp_sink);
break;
Expand Down
282 changes: 6 additions & 276 deletions be/src/runtime/mysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,15 @@
namespace doris {

MysqlResultWriter::MysqlResultWriter(BufferControlBlock* sinker,
const std::vector<ExprContext*>& output_expr_ctxs, const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
RuntimeProfile* parent_profile)
: ResultWriter(!output_vexpr_ctxs.empty()),
const std::vector<ExprContext*>& output_expr_ctxs, RuntimeProfile* parent_profile)
: ResultWriter(),
_sinker(sinker),
_output_expr_ctxs(output_expr_ctxs),
_output_vexpr_ctxs(output_vexpr_ctxs),
_row_buffer(NULL),
_parent_profile(parent_profile) {}

MysqlResultWriter::~MysqlResultWriter() {
if (_is_vec) {
for (auto buffer : _vec_buffers) {
delete buffer;
}
} else {
delete _row_buffer;
}
delete _row_buffer;
}

Status MysqlResultWriter::init(RuntimeState* state) {
Expand All @@ -64,16 +56,9 @@ Status MysqlResultWriter::init(RuntimeState* state) {
return Status::InternalError("sinker is NULL pointer.");
}

if (_is_vec) {
_vec_buffers.resize(state->batch_size());
for (int i = 0; i < state->batch_size() ; ++i) {
_vec_buffers[i] = new MysqlRowBuffer();
}
} else {
_row_buffer = new(std::nothrow) MysqlRowBuffer();
if (NULL == _row_buffer) {
return Status::InternalError("no memory to alloc.");
}
_row_buffer = new(std::nothrow) MysqlRowBuffer();
if (NULL == _row_buffer) {
return Status::InternalError("no memory to alloc.");
}

return Status::OK();
Expand Down Expand Up @@ -221,118 +206,7 @@ Status MysqlResultWriter::_add_one_row(TupleRow* row) {
return Status::OK();
}

template <PrimitiveType type, bool is_nullable>
Status MysqlResultWriter::_add_one_column(const doris::vectorized::ColumnPtr& column_ptr) {
SCOPED_TIMER(_convert_tuple_timer);
for (const auto buffer : _vec_buffers) {
buffer->reset();
}

doris::vectorized::ColumnPtr column;
if constexpr (is_nullable) {
column = assert_cast<const vectorized::ColumnNullable&>(*column_ptr).getNestedColumnPtr();
} else {
column = column_ptr;
}

int buf_ret = 0;
for (int i = 0; i < column_ptr->size(); ++i) {
if constexpr (is_nullable) {
if (column_ptr->isNullAt(i)) {
buf_ret = _vec_buffers[i]->push_null();
continue;
}
}

if constexpr (type == TYPE_TINYINT) {
buf_ret = _vec_buffers[i]->push_tinyint(assert_cast<const vectorized::ColumnVector<vectorized::Int8>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_SMALLINT) {
buf_ret = _vec_buffers[i]->push_smallint(assert_cast<const vectorized::ColumnVector<vectorized::Int16>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_INT) {
buf_ret = _vec_buffers[i]->push_int(assert_cast<const vectorized::ColumnVector<vectorized::Int32>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_BIGINT) {
buf_ret = _vec_buffers[i]->push_bigint(assert_cast<const vectorized::ColumnVector<vectorized::Int64>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_LARGEINT) {
char buf[48];
int len = 48;
char* v = LargeIntValue::to_string(assert_cast<const vectorized::ColumnVector<vectorized::Int128>&>(*column).getData()[i],
buf, &len);
buf_ret = _vec_buffers[i]->push_string(v, len);
}
if constexpr (type == TYPE_FLOAT) {
buf_ret = _vec_buffers[i]->push_float(assert_cast<const vectorized::ColumnVector<vectorized::Float32>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_DOUBLE) {
buf_ret = _vec_buffers[i]->push_double(assert_cast<const vectorized::ColumnVector<vectorized::Float64>&>(*column).getData()[i]);
}
if constexpr (type == TYPE_DATETIME) {
char buf[64];
auto time_num = assert_cast<const vectorized::ColumnVector<vectorized::Int128>&>(*column).getData()[i];
DateTimeValue time_val;
memcpy(&time_val, &time_num, sizeof(vectorized::Int128));
// TODO(zhaochun), this function has core risk
char* pos = time_val.to_string(buf);
buf_ret = _vec_buffers[i]->push_string(buf, pos - buf - 1);
}

if constexpr (type == TYPE_OBJECT) {
buf_ret = _vec_buffers[i]->push_null();
}
if constexpr (type == TYPE_VARCHAR) {
const auto string_val = column->getDataAt(i);

if (string_val.data == NULL) {
if (string_val.size == 0) {
// 0x01 is a magic num, not useful actually, just for present ""
char* tmp_val = reinterpret_cast<char*>(0x01);
buf_ret = _vec_buffers[i]->push_string(tmp_val, string_val.size);
} else {
buf_ret = _vec_buffers[i]->push_null();
}
} else {
buf_ret = _vec_buffers[i]->push_string(string_val.data, string_val.size);
}
}
if constexpr (type == TYPE_DECIMALV2) {
DecimalV2Value decimal_val(assert_cast<const vectorized::ColumnDecimal<vectorized::Decimal128>&>(*column).getData()[i]);
std::string decimal_str;
// int output_scale = _output_expr_ctxs[i]->root()->output_scale();
//
// if (output_scale > 0 && output_scale <= 30) {
// decimal_str = decimal_val.to_string(output_scale);
// } else {
decimal_str = decimal_val.to_string();
// }
buf_ret = _vec_buffers[i]->push_string(decimal_str.c_str(), decimal_str.length());
}

if (0 != buf_ret) {
return Status::InternalError("pack mysql buffer failed.");
}
}

return Status::OK();
}

Status MysqlResultWriter::append_row_batch(const RowBatch* batch) {
if (_is_vec) {
auto block = batch->convert_to_vec_block();

_result_column_ids.resize(_output_vexpr_ctxs.size());
for (int i = 0; i < _output_vexpr_ctxs.size(); i++) {
const auto& vexpr_ctx = _output_vexpr_ctxs[i];
int result_column_id = -1;
vexpr_ctx->execute(&block, &result_column_id);
DCHECK(result_column_id != -1);
_result_column_ids[i] = result_column_id;
}
return append_block(block);
}

SCOPED_TIMER(_append_row_batch_timer);
if (NULL == batch || 0 == batch->num_rows()) {
return Status::OK();
Expand Down Expand Up @@ -379,150 +253,6 @@ Status MysqlResultWriter::append_row_batch(const RowBatch* batch) {
return status;
}

Status MysqlResultWriter::append_block(const vectorized::Block& block) {
DCHECK(_output_vexpr_ctxs.size() == _result_column_ids.size());

SCOPED_TIMER(_append_row_batch_timer);
if (block.rows() == 0) {
return Status::OK();
}

Status status;
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
int num_rows = block.rows();
result->result_batch.rows.resize(num_rows);

for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
auto column_ptr = block.getByPosition(_result_column_ids[i]).column;
auto type_ptr = block.getByPosition(_result_column_ids[i]).type;

switch (_output_vexpr_ctxs[i]->root()->result_type()) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:{
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_TINYINT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_TINYINT, false>(column_ptr);
}
break;
}
case TYPE_SMALLINT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_SMALLINT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_SMALLINT, false>(column_ptr);
}
break;
}
case TYPE_INT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_INT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_INT, false>(column_ptr);
}
break;
}
case TYPE_BIGINT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_BIGINT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_BIGINT, false>(column_ptr);
}
break;
}
case TYPE_LARGEINT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_LARGEINT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_LARGEINT, false>(column_ptr);
}
break;
}
case TYPE_FLOAT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_FLOAT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_FLOAT, false>(column_ptr);
}
break;
}
case TYPE_DOUBLE: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_DOUBLE, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_DOUBLE, false>(column_ptr);
}
break;
}
case TYPE_CHAR:
case TYPE_VARCHAR: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_VARCHAR, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_VARCHAR, false>(column_ptr);
}
break;
}
case TYPE_DECIMALV2: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_DECIMALV2, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_DECIMALV2, false>(column_ptr);
}
break;
}
case TYPE_DATE:
case TYPE_DATETIME: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_DATETIME, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_DATETIME, false>(column_ptr);
}
break;
}
case TYPE_HLL:
case TYPE_OBJECT: {
if (type_ptr->isNullable()) {
status = _add_one_column<PrimitiveType::TYPE_OBJECT, true>(column_ptr);
} else {
status = _add_one_column<PrimitiveType::TYPE_OBJECT, false>(column_ptr);
}
break;
}
default: {
LOG(WARNING) << "can't convert this type to mysql type. type = "
<< _output_expr_ctxs[i]->root()->type();
return Status::InternalError("vec block pack mysql buffer failed.");
}
}

if (status.ok()) {
for (int j = 0; j < num_rows; ++j) {
result->result_batch.rows[j].append(_vec_buffers[j]->buf(), _vec_buffers[j]->length());
}
} else {
LOG(WARNING) << "convert row to mysql result failed.";
break;
}
}

if (status.ok()) {
SCOPED_TIMER(_result_send_timer);
// push this batch to back
status = _sinker->add_batch(result.get());

if (status.ok()) {
result.release();
_written_rows += num_rows;
} else {
LOG(WARNING) << "append result batch to sink failed.";
}
}

return status;
}

Status MysqlResultWriter::close() {
COUNTER_SET(_sent_rows_counter, _written_rows);
return Status::OK();
Expand Down
8 changes: 1 addition & 7 deletions be/src/runtime/mysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace vectorized {
class MysqlResultWriter final : public ResultWriter {
public:
MysqlResultWriter(BufferControlBlock* sinker, const std::vector<ExprContext*>& output_expr_ctxs,
const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, RuntimeProfile* parent_profile);
RuntimeProfile* parent_profile);

virtual ~MysqlResultWriter();

Expand All @@ -49,23 +49,17 @@ class MysqlResultWriter final : public ResultWriter {
// append this batch to the result sink
virtual Status append_row_batch(const RowBatch* batch) override;

virtual Status append_block(const vectorized::Block& block);

virtual Status close() override;

private:
void _init_profile();
// convert one tuple row
Status _add_one_row(TupleRow* row);

template <PrimitiveType type, bool is_nullable>
Status _add_one_column(const vectorized::ColumnPtr& column_ptr);

private:
BufferControlBlock* _sinker;
const std::vector<ExprContext*>& _output_expr_ctxs;

const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs;
std::vector<int> _result_column_ids;

MysqlRowBuffer* _row_buffer;
Expand Down
Loading

0 comments on commit 06c7213

Please sign in to comment.