Skip to content

Commit

Permalink
wip: serialize logical op graph - needs correction
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulyesantharao committed Oct 15, 2021
1 parent c4c9eaa commit 2e70656
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 30 deletions.
6 changes: 5 additions & 1 deletion tuplex/core/include/logical/AggregateOperator.h
Expand Up @@ -30,7 +30,7 @@ namespace tuplex {
public:
virtual ~AggregateOperator() override = default;

AggregateOperator(LogicalOperator* parent,
AggregateOperator(const std::shared_ptr<LogicalOperator> &parent,
const AggregateType& at,
bool allowNumericTypeUnification,
const UDF& combiner=UDF("",""),
Expand Down Expand Up @@ -138,6 +138,10 @@ namespace tuplex {
*/
std::vector<size_t> keyColsInParent() const { assert(aggType() == AggregateType::AGG_BYKEY); return _keyColsInParent; }
python::Type keyType() const { assert(aggType() == AggregateType::AGG_BYKEY); return _keyType; }

// cereal serialization functions
template<class Archive>
void serialize(Archive &archive);
private:
AggregateType _aggType;

Expand Down
29 changes: 19 additions & 10 deletions tuplex/core/include/logical/LogicalOperator.h
Expand Up @@ -13,12 +13,14 @@

#include <logical/LogicalOperatorType.h>
#include <vector>
#include <memory>
#include <physical/ResultSet.h>
#include <DataSet.h>
#include "graphviz/GraphVizBuilder.h"
#include "Schema.h"
// to avoid conflicts with Python3.7
#include "../Context.h"
#include "cereal/access.hpp"

static const size_t MAX_TYPE_SAMPLING_ROWS=100; // make this configurable? i.e. defines how much to trace...

Expand All @@ -28,20 +30,20 @@ namespace tuplex {
class DataSet;
class Context;

class LogicalOperator {
class LogicalOperator : std::enable_shared_from_this<LogicalOperator> {
private:
int buildGraph(GraphVizBuilder& builder);
int64_t _id;
std::vector<LogicalOperator*> _children;
std::vector<LogicalOperator*> _parents;
std::vector<std::shared_ptr<LogicalOperator>> _parents;
Schema _schema;
DataSet *_dataSet;
void addThisToParents() {
for(auto parent : _parents) {
if(!parent)
continue;

if(parent == this)
if(parent == shared_from_this())
throw std::runtime_error("cycle encountered! invalid for operator graph.");

parent->_children.push_back(this);
Expand All @@ -54,8 +56,8 @@ namespace tuplex {
void setSchema(const Schema& schema) { _schema = schema; }
virtual void copyMembers(const LogicalOperator* other);
public:
explicit LogicalOperator(const std::vector<LogicalOperator*>& parents) : _id(logicalOperatorIDGenerator++), _parents(parents), _dataSet(nullptr) { addThisToParents(); }
explicit LogicalOperator(LogicalOperator* parent) : _id(logicalOperatorIDGenerator++), _parents({parent}), _dataSet(nullptr) { if(!parent) throw std::runtime_error("can't have nullptr as parent"); addThisToParents(); }
explicit LogicalOperator(const std::vector<std::shared_ptr<LogicalOperator>>& parents) : _id(logicalOperatorIDGenerator++), _parents(parents), _dataSet(nullptr) { addThisToParents(); }
explicit LogicalOperator(std::shared_ptr<LogicalOperator> parent) : _id(logicalOperatorIDGenerator++), _parents({parent}), _dataSet(nullptr) { if(!parent) throw std::runtime_error("can't have nullptr as parent"); addThisToParents(); }
LogicalOperator() : _id(logicalOperatorIDGenerator++), _dataSet(nullptr) { addThisToParents(); }

virtual ~LogicalOperator();
Expand All @@ -75,18 +77,18 @@ namespace tuplex {
virtual bool good() const = 0;

std::vector<LogicalOperator*> getChildren() const { return _children; }
LogicalOperator* parent() const { if(_parents.empty())return nullptr; assert(_parents.size() == 1); return _parents.front(); }
std::vector<LogicalOperator*> parents() const { return _parents; }
std::shared_ptr<LogicalOperator> parent() const { if(_parents.empty())return nullptr; assert(_parents.size() == 1); return _parents.front(); }
std::vector<std::shared_ptr<LogicalOperator>> parents() const { return _parents; }
size_t numParents() const { return _parents.size(); }
size_t numChildren() const { return _children.size(); }


// manipulating functions for the tree (i.e. used in logical optimizer)
// Note: this functions DO NOT free memory.
void setParents(const std::vector<LogicalOperator*>& parents);
void setParents(const std::vector<std::shared_ptr<LogicalOperator>>& parents);
void setChildren(const std::vector<LogicalOperator*>& children);

void setParent(LogicalOperator* parent) { setParents({parent}); }
void setParent(std::shared_ptr<LogicalOperator> parent) { setParents({parent}); }
void setChild(LogicalOperator* child) { setChildren({child}); }

/*!
Expand All @@ -95,7 +97,7 @@ namespace tuplex {
* @param newParent
* @return true if oldParent found, false else
*/
bool replaceParent(LogicalOperator* oldParent, LogicalOperator* newParent) {
bool replaceParent(std::shared_ptr<LogicalOperator> oldParent, std::shared_ptr<LogicalOperator> newParent) {
auto it = std::find(_parents.begin(), _parents.end(), oldParent);
if(it == _parents.end())
return false;
Expand Down Expand Up @@ -214,6 +216,13 @@ namespace tuplex {
* @return python objects, acquires GIL and releases GIL
*/
virtual std::vector<PyObject*> getPythonicSample(size_t num);

// cereal serialization functions
template<class Archive>
void serialize(Archive &archive);

// template<class Archive>
// void save(Archive &archive) const;
};
}
#endif //TUPLEX_LOGICALOPERATOR_H
11 changes: 9 additions & 2 deletions tuplex/core/include/logical/LogicalPlan.h
Expand Up @@ -26,7 +26,7 @@ namespace tuplex {
class LogicalPlan {
private:
// the action which is called
LogicalOperator* _action;
std::unique_ptr<LogicalOperator> _action;


void optimizeFilters();
Expand Down Expand Up @@ -74,13 +74,20 @@ namespace tuplex {
*/
PhysicalPlan* createPhysicalPlan(const Context& context);

LogicalOperator* getAction() const { return _action; }
LogicalOperator* getAction() const { return _action.get(); }

/*!
* output logical plan as PDF (graphviz)
* @param path
*/
void toPDF(const std::string& path) const;

// cereal serialization functions
template<class Archive>
void serialize(Archive &archive);

template<class Archive>
static void load_and_construct(Archive &archive, cereal::construct<LogicalPlan> &construct);
};
}

Expand Down
4 changes: 2 additions & 2 deletions tuplex/core/src/ee/aws/AWSLambdaBackend.cc
Expand Up @@ -325,7 +325,7 @@ namespace tuplex {
// check whether scratch dir exists.
auto scratch = scratchDir(hintsFromTransformStage(tstage));
if(scratch == URI::INVALID) {
throw std::runtime_error("temporaty AWS Lambda scratch dir required to write output, please specify via tuplex.aws.scratchDir key");
throw std::runtime_error("temporary AWS Lambda scratch dir required to write output, please specify via tuplex.aws.scratchDir key");
return;
}
}
Expand Down Expand Up @@ -368,7 +368,7 @@ namespace tuplex {

// make invocation
std::stringstream ss;
ss<<"LAMDA request "<<(i+1)<<"/"<<uri_infos.size()<<" on "<<sizeToMemString(inputSize);
ss<<"LAMBDA request "<<(i+1)<<"/"<<uri_infos.size()<<" on "<<sizeToMemString(inputSize);
logger().info(ss.str());

// debug, save to protobuf!
Expand Down
9 changes: 9 additions & 0 deletions tuplex/core/src/logical/AggregateOperator.cc
Expand Up @@ -141,4 +141,13 @@ namespace tuplex {
return false;
}
}

// cereal serialization functions
template<class Archive>
void AggregateOperator::serialize(Archive &archive) {
UDF _combiner;
UDF _aggregator;
Row _initialValue;
archive(_aggType, _aggregateOutputType, _combiner, _aggregator, _initialValue, _keys, _keyColsInParent, _keyType);
}
}
18 changes: 15 additions & 3 deletions tuplex/core/src/logical/LogicalOperator.cc
Expand Up @@ -12,6 +12,8 @@
#include <logical/LogicalOperator.h>
#include <cassert>
#include <logical/LogicalPlan.h>
#include "cereal/types/memory.hpp"
#include "cereal/types/vector.hpp"

namespace tuplex {

Expand Down Expand Up @@ -76,14 +78,13 @@ namespace tuplex {

void LogicalOperator::freeParents() {
// recurse
for(auto parent : parents()) {
for(const auto &parent : parents()) {
parent->freeParents();
delete parent;
}
_parents.clear();
}

void LogicalOperator::setParents(const std::vector<LogicalOperator *> &parents) {
void LogicalOperator::setParents(const std::vector<std::shared_ptr<LogicalOperator>> &parents) {
_parents.clear();
_parents = parents;
}
Expand All @@ -103,4 +104,15 @@ namespace tuplex {
python::unlockGIL();
return v;
}

// cereal serialization functions
template<class Archive>
void LogicalOperator::serialize(Archive &archive) {
archive(_id, _parents, _schema, _dataSet);
// addThisToParents() should be called by default constructor
}
// template<class Archive>
// void LogicalOperator::save(Archive &archive) const {
// archive(_id, _parents, _schema, _dataSet);
// }
}
37 changes: 25 additions & 12 deletions tuplex/core/src/logical/LogicalPlan.cc
Expand Up @@ -27,25 +27,25 @@
#include <ApplyVisitor.h>
#include <logical/AggregateOperator.h>
#include <FilterBreakdownVisitor.h>
#include "cereal/types/memory.hpp"

namespace tuplex {
LogicalPlan::LogicalPlan(LogicalOperator *action) {
assert(action->isActionable());

_action = action->clone();
_action = std::unique_ptr<LogicalOperator>(action->clone());
}

LogicalPlan::~LogicalPlan() {
// simply call free on operator
_action->freeParents();
delete _action;
_action = nullptr;
}

PhysicalPlan* LogicalPlan::createPhysicalPlan(const Context& context) {

Timer timer;

// TODO: I don't think this is correct anymore!
// first step is to separate out the stages. As of now, only filter/map operations are supported.
// Hence, there is a single stage.
// Also, need to separate between narrow & wide stages (i.e. those with and without shuffling)
Expand Down Expand Up @@ -1068,7 +1068,7 @@ namespace tuplex {
std::vector<LogicalOperator*> v_filters;
// @TODO: maybe define a logical tree visitor class because they're so convenient
std::queue<LogicalOperator*> q; // BFS
q.push(_action);
q.push(_action.get());
while(!q.empty()) {
auto node = q.front(); q.pop();
if(node->type() == LogicalOperatorType::FILTER)
Expand Down Expand Up @@ -1106,7 +1106,7 @@ namespace tuplex {
std::vector<LogicalOperator*> v_filters;
// @TODO: maybe define a logical tree visitor class because they're so convenient
std::queue<LogicalOperator*> q; // BFS
q.push(_action);
q.push(_action.get());
while(!q.empty()) {
auto node = q.front(); q.pop();
if(node->type() == LogicalOperatorType::FILTER)
Expand Down Expand Up @@ -1255,7 +1255,7 @@ namespace tuplex {
// first step: find all join operators
std::vector<LogicalOperator*> v_joins;
std::queue<LogicalOperator*> q; // BFS
q.push(_action);
q.push(_action.get());
while(!q.empty()) {
auto node = q.front(); q.pop();
if(node->type() == LogicalOperatorType::JOIN)
Expand Down Expand Up @@ -1301,7 +1301,7 @@ namespace tuplex {
#endif

#ifndef NDEBUG
assert(verifyLogicalPlan(_action));
assert(verifyLogicalPlan(_action.get()));
#endif

if(context.getOptions().OPT_FILTER_PUSHDOWN()) {
Expand All @@ -1310,14 +1310,14 @@ namespace tuplex {
}

#ifndef NDEBUG
assert(verifyLogicalPlan(_action));
assert(verifyLogicalPlan(_action.get()));
#endif

if(context.getOptions().OPT_OPERATOR_REORDERING())
reorderDataProcessingOperators();

#ifndef NDEBUG
assert(verifyLogicalPlan(_action));
assert(verifyLogicalPlan(_action.get()));
#endif

// projectionPushdown (to csv parser etc. if possible)
Expand All @@ -1332,7 +1332,7 @@ namespace tuplex {
auto num_cols = _action->getInputSchema().getRowType().parameters().size();
for(unsigned i = 0; i < num_cols; ++i)
cols.emplace_back(i);
projectionPushdown(_action, nullptr, cols);
projectionPushdown(_action.get(), nullptr, cols);

// note: could remove identity functions...
// i.e. lambda x: x or lambda x: (x[0], x[1], ..., x[len(x) - 1]) same for def...
Expand All @@ -1343,7 +1343,7 @@ namespace tuplex {
#endif

#ifndef NDEBUG
assert(verifyLogicalPlan(_action));
assert(verifyLogicalPlan(_action.get()));
#endif
}

Expand Down Expand Up @@ -1384,7 +1384,7 @@ namespace tuplex {

void LogicalPlan::toPDF(const std::string &path) const {
GraphVizBuilder b;
recursiveLPBuilder(b, _action);
recursiveLPBuilder(b, _action.get());
b.saveToPDF(path);
}

Expand All @@ -1395,4 +1395,17 @@ namespace tuplex {
// perform a deep copy of the plan...
return new LogicalPlan(_action->clone());
}

// cereal serialization functions
template<class Archive>
void LogicalPlan::serialize(Archive &archive) {
archive(_action);
}

template<class Archive>
void LogicalPlan::load_and_construct(Archive &archive, cereal::construct<LogicalPlan> &construct) {
std::unique_ptr<LogicalOperator> action;
archive(action);
construct(action);
}
}

0 comments on commit 2e70656

Please sign in to comment.