Skip to content

Commit

Permalink
wip: fix build errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulyesantharao committed Oct 21, 2021
1 parent d07d73a commit cb0754f
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 137 deletions.
4 changes: 2 additions & 2 deletions tuplex/core/include/Context.h
Expand Up @@ -51,7 +51,7 @@ namespace tuplex {
// a context is associated with a number of logical operators
// the context object does the memory management of these operators.
// a dataset is the result of applying the DAG of operations.
std::vector<LogicalOperator*> _operators;
std::vector<std::shared_ptr<LogicalOperator>> _operators;

// needed because of C++ template issues
void addPartition(DataSet* ds, Partition *partition);
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace tuplex {

IBackend* backend() const { assert(_ee); return _ee.get(); }

LogicalOperator* addOperator(LogicalOperator* op);
std::shared_ptr<LogicalOperator> addOperator(const std::shared_ptr<LogicalOperator> &op);

void visualizeOperationGraph(GraphVizBuilder& builder);

Expand Down
2 changes: 1 addition & 1 deletion tuplex/core/include/DataSet.h
Expand Up @@ -292,7 +292,7 @@ namespace tuplex {

Context *getContext() const { return _context; }

LogicalOperator* getOperator() const { return _operator; }
std::shared_ptr<LogicalOperator> getOperator() const { return _operator; }

virtual bool isError() const { return false; }
virtual bool isEmpty() const;
Expand Down
9 changes: 7 additions & 2 deletions tuplex/core/include/logical/LogicalPlan.h
Expand Up @@ -26,7 +26,7 @@ namespace tuplex {
class LogicalPlan {
private:
// the action which is called
LogicalOperator* _action;
std::shared_ptr<LogicalOperator> _action;


void optimizeFilters();
Expand Down Expand Up @@ -74,13 +74,18 @@ namespace tuplex {
*/
PhysicalPlan* createPhysicalPlan(const Context& context);

LogicalOperator* getAction() const { return _action; }
std::shared_ptr<LogicalOperator> getAction() const { return _action; }

/*!
* output logical plan as PDF (graphviz)
* @param path
*/
void toPDF(const std::string& path) const;

// cereal serialization functions
template<class Archive> void serialize(Archive &ar) {
ar(_action);
}
};
}

Expand Down
18 changes: 9 additions & 9 deletions tuplex/core/src/Context.cc
Expand Up @@ -336,7 +336,7 @@ namespace tuplex {
}
}

LogicalOperator* Context::addOperator(LogicalOperator *op) {
std::shared_ptr<LogicalOperator> Context::addOperator(const std::shared_ptr<LogicalOperator> &op) {
_operators.push_back(op);
return op;
}
Expand All @@ -351,7 +351,7 @@ namespace tuplex {
assert(ds->_schema.getRowType() != python::Type::UNKNOWN);

// add new (root) node
ds->_operator = addOperator(new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns()));
ds->_operator = addOperator(std::shared_ptr<LogicalOperator>(new ParallelizeOperator(ds->_schema, ds->getPartitions(), ds->columns())));

// set dataset
ds->_operator->setDataSet(ds);
Expand All @@ -372,9 +372,9 @@ namespace tuplex {
DataSet *dsptr = createDataSet(schema);

dsptr->_operator = addOperator(
new FileInputOperator(pattern, this->_options, hasHeader, delimiter, quotechar, null_values, columns,
index_based_type_hints, column_based_type_hints));
auto op = ((FileInputOperator*)dsptr->_operator);
std::shared_ptr<LogicalOperator>(new FileInputOperator(pattern, this->_options, hasHeader, delimiter, quotechar, null_values, columns,
index_based_type_hints, column_based_type_hints)));
auto op = ((FileInputOperator*)dsptr->_operator.get());

// check whether files were found, else return empty dataset!
if(op->getURIs().empty()) {
Expand All @@ -384,7 +384,7 @@ namespace tuplex {
return ds;
}

auto detectedColumns = ((FileInputOperator*)dsptr->_operator)->columns();
auto detectedColumns = ((FileInputOperator*)dsptr->_operator.get())->columns();
dsptr->setColumns(detectedColumns);

// check if columns are given
Expand All @@ -406,7 +406,7 @@ namespace tuplex {
}

dsptr->setColumns(columns);
((FileInputOperator*)dsptr->_operator)->setColumns(columns);
((FileInputOperator*)dsptr->_operator.get())->setColumns(columns);
}

// set dataset to operator
Expand All @@ -430,9 +430,9 @@ namespace tuplex {
int dataSetID = getNextDataSetID();
DataSet *dsptr = createDataSet(schema);

dsptr->_operator = addOperator(new FileInputOperator(pattern, this->_options, null_values));
dsptr->_operator = addOperator(std::shared_ptr<LogicalOperator>(new FileInputOperator(pattern, this->_options, null_values)));

auto detectedColumns = ((FileInputOperator*)dsptr->_operator)->columns();
auto detectedColumns = ((FileInputOperator*)dsptr->_operator.get())->columns();
dsptr->setColumns(detectedColumns);

// set dataset to operator
Expand Down
90 changes: 45 additions & 45 deletions tuplex/core/src/DataSet.cc
Expand Up @@ -50,7 +50,7 @@ namespace tuplex {

// create a take node
assert(_context);
LogicalOperator *op = _context->addOperator(new TakeOperator(this->_operator, numElements));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new TakeOperator(this->_operator, numElements)));
DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
op->setDataSet(dsptr);
Expand Down Expand Up @@ -110,10 +110,10 @@ namespace tuplex {

assert(_context);
assert(_operator);
LogicalOperator *op = _context->addOperator(
new FileOutputOperator(_operator, uri, udf, "csv", FileFormat::OUTFMT_CSV, outputOptions,
fileCount, shardSize, limit));
((FileOutputOperator*)op)->udf().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
auto op = _context->addOperator(
std::shared_ptr<LogicalOperator>(new FileOutputOperator(_operator, uri, udf, "csv", FileFormat::OUTFMT_CSV, outputOptions,
fileCount, shardSize, limit)));
((FileOutputOperator*)op.get())->udf().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());

if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create file output operator");
Expand All @@ -137,7 +137,7 @@ namespace tuplex {

assert(_context);
assert(this->_operator);
LogicalOperator *op = _context->addOperator(new MapOperator(this->_operator, udf, _columnNames, allowTypeUnification()));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new MapOperator(this->_operator, udf, _columnNames, allowTypeUnification())));

if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create map operator");
Expand All @@ -146,7 +146,7 @@ namespace tuplex {

DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
auto outputCols = ((MapOperator *) op)->columns();
auto outputCols = ((MapOperator *) op.get())->columns();
if (!outputCols.empty())
dsptr->setColumns(outputCols);
op->setDataSet(dsptr);
Expand All @@ -171,7 +171,7 @@ namespace tuplex {
assert(_context);
assert(this->_operator);

LogicalOperator *op = _context->addOperator(new CacheOperator(this->_operator, storeSpecialized, memoryLayout));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new CacheOperator(this->_operator, storeSpecialized, memoryLayout)));

if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create cache operator");
Expand All @@ -193,7 +193,7 @@ namespace tuplex {
auto rs = op->compute(*this->_context); // note: this should also hold the exceptions...

// result set is computed, now make both partitions&exceptions ephemeral (@TODO: uncache mechanism)
auto cop = (CacheOperator*)op;
auto cop = (CacheOperator*)op.get();
cop->setResult(rs);

// signal check
Expand Down Expand Up @@ -224,11 +224,11 @@ namespace tuplex {
return _context->makeError("there is no column " + columnName + " to map");


LogicalOperator *op = _context->addOperator(new MapColumnOperator(this->_operator,
columnName,
columns(),
udf,
_context->getOptions().AUTO_UPCAST_NUMBERS()));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new MapColumnOperator(this->_operator,
columnName,
columns(),
udf,
_context->getOptions().AUTO_UPCAST_NUMBERS())));
if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create mapColumn operator");
return _context->makeError("failed to add mapColumn operator to logical plan");
Expand Down Expand Up @@ -263,12 +263,12 @@ namespace tuplex {
assert(_context);
assert(this->_operator);

LogicalOperator *op = _context->addOperator(
new WithColumnOperator(this->_operator,
auto op = _context->addOperator(
std::shared_ptr<LogicalOperator>(new WithColumnOperator(this->_operator,
_columnNames,
columnName,
udf,
_context->getOptions().AUTO_UPCAST_NUMBERS()));
_context->getOptions().AUTO_UPCAST_NUMBERS())));

if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create withColumn operator");
Expand All @@ -280,7 +280,7 @@ namespace tuplex {
op->setDataSet(dsptr);

// set column names
auto wop = dynamic_cast<WithColumnOperator *>(op);
auto wop = dynamic_cast<WithColumnOperator *>(op.get());
dsptr->setColumns(wop->columns());

// signal check
Expand Down Expand Up @@ -340,11 +340,11 @@ namespace tuplex {
}

ds.setColumns(sel_columns);
((MapOperator*)ds._operator)->setOutputColumns(sel_columns);
((MapOperator*)ds._operator.get())->setOutputColumns(sel_columns);
}

// rename operator
((MapOperator*)ds._operator)->setName("select");
((MapOperator*)ds._operator.get())->setName("select");

// signal check
if(check_and_forward_signals()) {
Expand Down Expand Up @@ -386,8 +386,8 @@ namespace tuplex {
ds.setColumns(columnNames);

// rename operator
((MapOperator*)ds._operator)->setName("rename");
((MapOperator*)ds._operator)->setOutputColumns(columnNames);
((MapOperator*)ds._operator.get())->setName("rename");
((MapOperator*)ds._operator.get())->setOutputColumns(columnNames);

// signal check
if(check_and_forward_signals()) {
Expand All @@ -413,7 +413,7 @@ namespace tuplex {

// check first that each column name is returned, else return error message
std::vector<std::string> missingColumns;
for (auto cn : columnNames) {
for (const auto &cn : columnNames) {
if (std::find(_columnNames.begin(), _columnNames.end(), cn) == _columnNames.end()) {
missingColumns.emplace_back(cn);
}
Expand Down Expand Up @@ -464,9 +464,9 @@ namespace tuplex {
// set columns to restricted cols
ds.setColumns(columnNames);

((MapOperator*)ds._operator)->setOutputColumns(columnNames);
((MapOperator*)ds._operator.get())->setOutputColumns(columnNames);
// rename operator
((MapOperator*)ds._operator)->setName("select");
((MapOperator*)ds._operator.get())->setName("select");

// signal check
if(check_and_forward_signals()) {
Expand All @@ -486,10 +486,10 @@ namespace tuplex {

assert(_context);
assert(this->_operator);
LogicalOperator *op = _context->addOperator(new FilterOperator(this->_operator,
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new FilterOperator(this->_operator,
udf,
_columnNames,
_context->getOptions().AUTO_UPCAST_NUMBERS()));
_context->getOptions().AUTO_UPCAST_NUMBERS())));

if (!op->good()) {

Expand Down Expand Up @@ -528,9 +528,9 @@ namespace tuplex {

assert(_context);
assert(this->_operator);
LogicalOperator *op = _context->addOperator(new ResolveOperator(this->_operator, ec,
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new ResolveOperator(this->_operator, ec,
udf, _columnNames,
_context->getOptions().AUTO_UPCAST_NUMBERS()));
_context->getOptions().AUTO_UPCAST_NUMBERS())));
if (!op->good()) {
Logger::instance().defaultLogger().error("failed to create resolve operator");
return _context->makeError("failed to add resolve operator to logical plan");
Expand Down Expand Up @@ -560,7 +560,7 @@ namespace tuplex {
return *this;

assert(_context && this->_operator);
LogicalOperator *op = _context->addOperator(new IgnoreOperator(this->_operator, ec));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new IgnoreOperator(this->_operator, ec)));

DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
Expand All @@ -586,7 +586,7 @@ namespace tuplex {

assert(_context && this->_operator);

LogicalOperator *op = _context->addOperator(new AggregateOperator(this->_operator, AggregateType::AGG_UNIQUE, false));
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new AggregateOperator(this->_operator, AggregateType::AGG_UNIQUE, false)));

DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
Expand All @@ -613,12 +613,12 @@ namespace tuplex {

assert(_context && this->_operator);

LogicalOperator* op = _context->addOperator(new AggregateOperator(this->_operator, AggregateType::AGG_GENERAL,
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new AggregateOperator(this->_operator, AggregateType::AGG_GENERAL,
_context->getOptions().AUTO_UPCAST_NUMBERS(),
aggCombine, aggUDF, aggInitial));
aggCombine, aggUDF, aggInitial)));

((AggregateOperator*)op)->aggregatorUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op)->combinerUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op.get())->aggregatorUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op.get())->combinerUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());

DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
Expand Down Expand Up @@ -647,12 +647,12 @@ namespace tuplex {

assert(_context && this->_operator);

LogicalOperator* op = _context->addOperator(new AggregateOperator(this->_operator, AggregateType::AGG_BYKEY,
auto op = _context->addOperator(std::shared_ptr<LogicalOperator>(new AggregateOperator(this->_operator, AggregateType::AGG_BYKEY,
_context->getOptions().AUTO_UPCAST_NUMBERS(),
aggCombine, aggUDF, aggInitial, keyColumns));
aggCombine, aggUDF, aggInitial, keyColumns)));

((AggregateOperator*)op)->aggregatorUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op)->combinerUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op.get())->aggregatorUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());
((AggregateOperator*)op.get())->combinerUDF().getAnnotatedAST().allowNumericTypeUnification(_context->getOptions().AUTO_UPCAST_NUMBERS());

DataSet *dsptr = _context->createDataSet(op->getOutputSchema());
dsptr->_operator = op;
Expand Down Expand Up @@ -724,10 +724,10 @@ namespace tuplex {
assert(_context);
assert(this->_operator);
assert(other._operator); // if this fails, probably dataset not declared via auto& ds = ...
LogicalOperator *op = _context->addOperator(
new JoinOperator(this->_operator, other._operator, leftColumn, rightColumn, JoinType::INNER,
auto op = _context->addOperator(
std::shared_ptr<LogicalOperator>(new JoinOperator(this->_operator, other._operator, leftColumn, rightColumn, JoinType::INNER,
leftPrefix.value_or(""), leftSuffix.value_or(""), rightPrefix.value_or(""),
rightSuffix.value_or("")));
rightSuffix.value_or(""))));

if (!op->good()) {

Expand Down Expand Up @@ -769,10 +769,10 @@ namespace tuplex {
assert(_context);
assert(this->_operator);
assert(other._operator); // if this fails, probably dataset not declared via auto& ds = ...
LogicalOperator *op = _context->addOperator(
new JoinOperator(this->_operator, other._operator, leftColumn, rightColumn, JoinType::LEFT,
auto op = _context->addOperator(
std::shared_ptr<LogicalOperator>(new JoinOperator(this->_operator, other._operator, leftColumn, rightColumn, JoinType::LEFT,
leftPrefix.value_or(""), leftSuffix.value_or(""), rightPrefix.value_or(""),
rightSuffix.value_or("")));
rightSuffix.value_or(""))));

if (!op->good()) {

Expand Down
3 changes: 1 addition & 2 deletions tuplex/core/src/logical/LogicalOperator.cc
Expand Up @@ -76,9 +76,8 @@ namespace tuplex {

void LogicalOperator::freeParents() {
// recurse
for(auto parent : parents()) {
for(const auto &parent : parents()) {
parent->freeParents();
delete parent;
}
_parents.clear();
}
Expand Down

0 comments on commit cb0754f

Please sign in to comment.