Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregation #405

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[submodule "deps/3rd/libevent"]
path = deps/3rd/libevent
url = https://github.com/libevent/libevent
url = https://gitee.com/sunkang315629/libevent
[submodule "deps/3rd/jsoncpp"]
path = deps/3rd/jsoncpp
url = https://github.com/open-source-parsers/jsoncpp
url = https://gitee.com/wangxiaoshuo_1_15133227828/jsoncpp
[submodule "deps/3rd/googletest"]
path = deps/3rd/googletest
url = https://github.com/google/googletest
url = https://gitee.com/wangxiaoshuo_1_15133227828/googletest
[submodule "deps/3rd/benchmark"]
path = deps/3rd/benchmark
url = https://github.com/google/benchmark
url = https://gitee.com/ws-mirror-github/google.benchmark
3 changes: 2 additions & 1 deletion .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
{
"label": "gen_parser",
"type": "shell",
"command": "cd ${workspaceFolder}/src/observer/sql/parser && bash gen_parser.sh"
"command": "cd ${workspaceFolder}/src/observer/sql/parser && bash gen_parser.sh",
"problemMatcher": []
}
]
}
4 changes: 3 additions & 1 deletion deps/common/lang/comparator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ int compare_int(void *arg1, void *arg2)
return 0;
}
}

int compare_date(void *arg1, void *arg2){
return compare_int(arg1,arg2);
}
int compare_float(void *arg1, void *arg2)
{
float v1 = *(float *)arg1;
Expand Down
1 change: 1 addition & 0 deletions deps/common/lang/comparator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ namespace common {
int compare_int(void *arg1, void *arg2);
int compare_float(void *arg1, void *arg2);
int compare_string(void *arg1, int arg1_max_length, void *arg2, int arg2_max_length);
int compare_date(void *arg1, void *arg2);

} // namespace common
6 changes: 4 additions & 2 deletions src/observer/sql/executor/execute_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ RC ExecuteStage::handle_request_with_physical_operator(SQLStageEvent *sql_event)
bool with_table_name = select_stmt->tables().size() > 1;

for (const Field &field : select_stmt->query_fields()) {
const AggrOp aggr = field.aggregation();
if (with_table_name) {
schema.append_cell(field.table_name(), field.field_name());
//const AggrOp aggr = field.aggregation();
schema.append_cell(field.table_name(), field.field_name(), aggr);
} else {
schema.append_cell(field.field_name());
schema.append_cell(field.field_name(), aggr);
}
}
} break;
Expand Down
6 changes: 4 additions & 2 deletions src/observer/sql/expr/tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ class TupleSchema
{
public:
void append_cell(const TupleCellSpec &cell) { cells_.push_back(cell); }
void append_cell(const char *table, const char *field, const AggrOp aggr = AggrOp::AGGR_NONE){append_cell(TupleCellSpec(table, field, nullptr, aggr));}
void append_cell(const char *table, const char *field) { append_cell(TupleCellSpec(table, field)); }
void append_cell(const char *alias) { append_cell(TupleCellSpec(alias)); }
void append_cell(const char *alias, const AggrOp aggr = AggrOp::AGGR_NONE) { append_cell(TupleCellSpec(alias, aggr)); }

int cell_num() const { return static_cast<int>(cells_.size()); }

const TupleCellSpec &cell_at(int i) const { return cells_[i]; }
Expand Down Expand Up @@ -361,4 +363,4 @@ class JoinedTuple : public Tuple
private:
Tuple *left_ = nullptr;
Tuple *right_ = nullptr;
};
};
70 changes: 70 additions & 0 deletions src/observer/sql/expr/tuple_cell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,73 @@ TupleCellSpec::TupleCellSpec(const char *alias)
alias_ = alias;
}
}

TupleCellSpec::TupleCellSpec(const char *table_name, const char * field_name, const char *alias, const AggrOp aggr)
{
if(table_name) {
table_name_ = table_name;
}
if(field_name) {
field_name_ = field_name;
}
if(aggr){
aggr_ = aggr;
}
if(alias) {
alias_ = alias;
} else {
if(table_name_.empty()){
alias_ = field_name_;
} else {
alias_ = table_name_ + "." + field_name_;
}

if(aggr_ == AggrOp::AGGR_COUNT_ALL) {
alias_ = "COUNT(*)";
} else if (aggr_ != AggrOp::AGGR_NONE) {
std::string aggr_repr;
aggr_to_string(aggr, aggr_repr);
alias_ = aggr_repr + "(" + alias_ + ")";
}
}
}

TupleCellSpec::TupleCellSpec(const char *alias, const AggrOp aggr = AggrOp::AGGR_NONE)
{
if (aggr){
aggr_ = aggr;
}
if (alias){
alias_ = alias;
if (aggr==AggrOp::AGGR_COUNT_ALL) {
alias_ = "COUNT(*)";
} else if (aggr_ != AggrOp::AGGR_NONE) {
std::string aggr_repr;
aggr_to_string(aggr, aggr_repr);
alias_ = aggr_repr + "(" + alias_ + ")";
}
}
}

void TupleCellSpec::aggr_to_string (const AggrOp aggr,std::string& aggr_expr)
{
switch (aggr){
case AggrOp::AGGR_SUM :
aggr_expr = "SUM";
break;
case AggrOp::AGGR_AVG :
aggr_expr = "AVG";
break;
case AggrOp::AGGR_MAX :
aggr_expr = "MAX";
break;
case AggrOp::AGGR_MIN :
aggr_expr = "MIN";
break;
case AggrOp::AGGR_COUNT :
aggr_expr = "COUNT";
break;
default:
break;
}
}
8 changes: 5 additions & 3 deletions src/observer/sql/expr/tuple_cell.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ class TupleCellSpec
public:
TupleCellSpec(const char *table_name, const char *field_name, const char *alias = nullptr);
TupleCellSpec(const char *alias);

TupleCellSpec(const char *table_name, const char * field_name, const char *alias, const AggrOp aggr);
TupleCellSpec(const char *alias, const AggrOp aggr);
const char *table_name() const { return table_name_.c_str(); }
const char *field_name() const { return field_name_.c_str(); }
const char *alias() const { return alias_.c_str(); }

void aggr_to_string (const AggrOp aggr,std::string& aggr_expr);
private:
std::string table_name_;
std::string field_name_;
AggrOp aggr_;
std::string alias_;
};
};
3 changes: 3 additions & 0 deletions src/observer/sql/operator/aggregate_logical_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "sql/operator/aggregate_logical_operator.h"
#include <vector>
AggregateLogicalOperator::AggregateLogicalOperator(const std::vector<Field> &field):fields_(field){};
23 changes: 23 additions & 0 deletions src/observer/sql/operator/aggregate_logical_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <memory>
#include <vector>
#include "sql/expr/expression.h"
#include "sql/operator/logical_operator.h"
#include "storage/field/field.h"

class AggregateLogicalOperator:public LogicalOperator{
public:
AggregateLogicalOperator(const std::vector<Field> &field);
virtual ~AggregateLogicalOperator()=default;

LogicalOperatorType type()const override{
return LogicalOperatorType::AGGREGATE;
}
const std::vector<Field> &fields()const{
return fields_;
}
private:
std::vector<Field> fields_;
}
;
130 changes: 130 additions & 0 deletions src/observer/sql/operator/aggregate_physical_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include "sql/operator/aggregate_physical_operator.h"
#include "common/log/log.h"
#include "storage/table/table.h"
#include "storage/trx/trx.h"
//select sum(id) from test //确定了要选择的列
void AggregatePhysicalOperator::add_aggregation(const AggrOp aggregation){
aggregations_.push_back(aggregation);
}

RC AggregatePhysicalOperator::open(Trx *trx)
{
if (children_.empty()) {
return RC::SUCCESS;
}

std::unique_ptr<PhysicalOperator> &child = children_[0];
RC rc = child->open(trx);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to open child operator: %s", strrc(rc));
return rc;
}

//trx_ = trx;

return RC::SUCCESS;
}

RC AggregatePhysicalOperator::next()
{
// already aggregated
if (result_tuple_.cell_num() > 0){
return RC::RECORD_EOF;
}

RC rc = RC::SUCCESS;
PhysicalOperator *oper = children_[0].get();

std::vector<Value> result_cells;
int tuples_num=0;
while (RC::SUCCESS == (rc = oper->next())) {
//get tuple
Tuple *tuple = oper->current_tuple();
tuples_num++;

//do aggregate
for (int cell_idx = 0; cell_idx < (int)aggregations_.size(); cell_idx++) {//aggregations_是横着的?yes!
const AggrOp aggregation = aggregations_[cell_idx];

Value cell;
AttrType attr_type = AttrType::INTS;
switch (aggregation)
{
case AggrOp::AGGR_AVG:
case AggrOp::AGGR_SUM:
rc = tuple->cell_at(cell_idx, cell);
attr_type = cell.attr_type();
if(attr_type == AttrType::INTS or attr_type == AttrType::FLOATS) {
//result_cells.at(cell_idx)=new Value();
if(static_cast<int>(result_cells.size())!=(int)aggregations_.size()){
result_cells.push_back(cell);
}else{
result_cells[cell_idx].set_float(result_cells[cell_idx].get_float() + cell.get_float());
}
}

break;
case AggrOp::AGGR_MIN:
rc = tuple->cell_at(cell_idx, cell);
attr_type = cell.attr_type();
if(static_cast<int>(result_cells.size())!=(int)aggregations_.size()){
result_cells.push_back(cell);
}
else if(result_cells[cell_idx].compare(cell)==1){////

result_cells[cell_idx].set_value(cell);
}
break;

case AggrOp::AGGR_MAX:
rc = tuple->cell_at(cell_idx, cell);
attr_type = cell.attr_type();
if(static_cast<int>(result_cells.size())!=(int)aggregations_.size()){
result_cells.push_back(cell);
}
else if(result_cells[cell_idx].compare(cell)==-1){////

result_cells[cell_idx].set_value(cell);
}
break;
case AggrOp::AGGR_COUNT_ALL:
case AggrOp::AGGR_COUNT:
rc = tuple->cell_at(cell_idx, cell);
if(static_cast<int>(result_cells.size())!=(int)aggregations_.size()){
result_cells.push_back(cell);
}
break;


default:
return RC::UNIMPLENMENT;
}
}
}

for (int cell_idx = 0; cell_idx < (int)aggregations_.size(); cell_idx++){
if(aggregations_[cell_idx]==AggrOp::AGGR_AVG){
result_cells[cell_idx].set_float(result_cells[cell_idx].get_float()/tuples_num);
}
if(aggregations_[cell_idx]== AggrOp::AGGR_COUNT || aggregations_[cell_idx] == AggrOp::AGGR_COUNT_ALL){
result_cells[cell_idx].set_float((float)tuples_num);
}
}

if (rc == RC::RECORD_EOF){
rc = RC::SUCCESS;
}


result_tuple_.set_cells(result_cells);

return rc;
}

RC AggregatePhysicalOperator::close()
{
if (!children_.empty()) {
children_[0]->close();
}
return RC::SUCCESS;
}
37 changes: 37 additions & 0 deletions src/observer/sql/operator/aggregate_physical_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include "sql/operator/physical_operator.h"
#include "sql/parser/parse.h"
#include "sql/expr/tuple.h"
#include <vector>
/**
* @brief 物理算子,聚合
* @ingroup PhysicalOperator
*/

class AggregatePhysicalOperator :public PhysicalOperator{
public:
AggregatePhysicalOperator(){}

virtual ~AggregatePhysicalOperator()=default;

void add_aggregation(const AggrOp aggregation);

PhysicalOperatorType type()const{
return PhysicalOperatorType::AGGREGATE;
}

RC open(Trx *trx)override;
RC next()override;
RC close()override;

Tuple *current_tuple()override{
//return nullptr;
return &result_tuple_;
};

private:
std::vector<AggrOp> aggregations_;
ValueListTuple result_tuple_;

};
7 changes: 7 additions & 0 deletions src/observer/sql/operator/delete_physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ See the Mulan PSL v2 for more details. */
#include "storage/table/table.h"
#include "storage/trx/trx.h"


/*
private:
Table *table_ = nullptr;
Trx *trx_ = nullptr;
};
*/
RC DeletePhysicalOperator::open(Trx *trx)
{
if (children_.empty()) {
Expand Down
3 changes: 3 additions & 0 deletions src/observer/sql/operator/logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ enum class LogicalOperatorType
INSERT, ///< 插入
DELETE, ///< 删除,删除可能会有子查询
EXPLAIN, ///< 查看执行计划
UPDATE, ///< 更新
////////////////////////////
AGGREGATE,
};

/**
Expand Down
Loading