diff --git a/tuplex/core/include/DataSet.h b/tuplex/core/include/DataSet.h index 899032723..1b11c1f75 100644 --- a/tuplex/core/include/DataSet.h +++ b/tuplex/core/include/DataSet.h @@ -125,10 +125,10 @@ namespace tuplex { /*! * action that displays tuples as nicely formatted table - * @param numRows how many rows to print, i.e. top numRows are printed.xs + * @param numRows how many rows to print, i.e. top numRows are printed.xs, -1 means print all rows * @param os ostream where to print table to */ - virtual void show(const int64_t numRows = -1, std::ostream &os = std::cout); + virtual void show(int64_t numRows = -1, std::ostream &os = std::cout); // named dataset management functions /*! @@ -252,22 +252,49 @@ namespace tuplex { * @param memoryLayout * @return */ - virtual DataSet& cache(const Schema::MemoryLayout& memoryLayout, bool storeSpecialized); - DataSet& cache(bool storeSpecialized=true) { return cache(Schema::MemoryLayout::ROW, storeSpecialized); } + virtual DataSet &cache(const Schema::MemoryLayout &memoryLayout, bool storeSpecialized); + + DataSet &cache(bool storeSpecialized = true) { return cache(Schema::MemoryLayout::ROW, storeSpecialized); } /*! * helper setter without checks, to update internal column names. */ void setColumns(const std::vector &columnNames) { _columnNames = columnNames; } - // these are actions that cause execution + /*! + * Execute the pipeline and return all outputs + * @param os the logging output + * @return the output of the execution + */ virtual std::shared_ptr collect(std::ostream &os = std::cout); - virtual std::shared_ptr take(int64_t numElements, std::ostream &os = std::cout); + /*! + * Execute the pipeline and take a subset of the output from the top and bottom rows. + * If both top and bottom rows limit exist, then the top and bottom rows will be concatenated. + * In the case where topLimit + bottomLimit exceeds the output size, all rows will be taken. + * To take all rows, pass in either topLimit=size_t::max(), bottomLimit=size_t::max(), or both. + * @param topLimit number of top rows to take. size_t::max() means taking all rows + * @param bottomLimit number of bottom rows to take. size_t::max() means taking all rows + * @param os the logging output + * @return result of the execution, trim to the size of top and bottom limit. + */ + virtual std::shared_ptr take(size_t topLimit, size_t bottomLimit, std::ostream &os = std::cout); + /*! + * Execute the pipeline and return all outputs as vector + * @param os the logging output + * @return the output of the execution in vector + */ virtual std::vector collectAsVector(std::ostream &os = std::cout); - virtual std::vector takeAsVector(int64_t numElements, std::ostream &os = std::cout); + /*! + * Execute the pipeline and take a subset of the output from the top rows, return as vector + * In the case where numElements exceeds the output size, all rows will be taken. + * @param numElements number of top rows to take. size_t::max() means taking all rows + * @param os the logging output + * @return result of the execution in vector, trim to the size of numElements + */ + virtual std::vector takeAsVector(size_t numElements, std::ostream &os = std::cout); /*! * saves dataset to file. There are multiple options to control the behavior diff --git a/tuplex/core/include/EmptyDataset.h b/tuplex/core/include/EmptyDataset.h index b3c1ed7af..585b70881 100644 --- a/tuplex/core/include/EmptyDataset.h +++ b/tuplex/core/include/EmptyDataset.h @@ -67,18 +67,20 @@ namespace tuplex { virtual DataSet& aggregateByKey(const UDF& aggCombine, const UDF& aggUDF, const Row& aggInitial, const std::vector &keyColumns) override { return *this; } //virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override; - virtual std::shared_ptr collect(std::ostream& os) override; + virtual std::shared_ptr collect(std::ostream &os) override; // take / collect will print out the error only - virtual std::shared_ptr take(int64_t numElements, std::ostream& os) override; + virtual std::shared_ptr take(size_t topLimit, size_t bottomLimit, std::ostream &os) override; //virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override; - virtual std::vector collectAsVector(std::ostream& os) override; + virtual std::vector collectAsVector(std::ostream &os) override; - // take / collect will print out the error only - virtual std::vector takeAsVector(int64_t numElements, std::ostream& os) override; + /*! + * take / collect will print out the error only, return empty rows + */ + virtual std::vector takeAsVector(size_t numElements, std::ostream &os) override; - DataSet& cache(const Schema::MemoryLayout& memoryLayout, bool storeSpecialized) override { + DataSet &cache(const Schema::MemoryLayout &memoryLayout, bool storeSpecialized) override { return *this; } }; diff --git a/tuplex/core/include/ErrorDataSet.h b/tuplex/core/include/ErrorDataSet.h index 2f46d8638..cf283ebd1 100644 --- a/tuplex/core/include/ErrorDataSet.h +++ b/tuplex/core/include/ErrorDataSet.h @@ -90,13 +90,13 @@ namespace tuplex { std::shared_ptr collect(std::ostream& os) override; // take / collect will print out the error only - std::shared_ptr take(int64_t numElements, std::ostream& os) override; + std::shared_ptr take(size_t topLimit, size_t bottomLimit, std::ostream& os) override; //virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override; std::vector collectAsVector(std::ostream& os) override; // take / collect will print out the error only - std::vector takeAsVector(int64_t numElements, std::ostream& os) override; + std::vector takeAsVector(size_t numElements, std::ostream& os) override; }; } diff --git a/tuplex/core/include/Executor.h b/tuplex/core/include/Executor.h index 0bca412be..b6b7edac1 100644 --- a/tuplex/core/include/Executor.h +++ b/tuplex/core/include/Executor.h @@ -48,8 +48,9 @@ namespace tuplex { ExecutorTaskQueueType _queue; std::mutex _completedTasksMutex; std::vector _completedTasks; - std::atomic_int _numPendingTasks; - std::atomic_int _numCompletedTasks; + std::atomic_int _numPendingTasks{}; + std::atomic_int _numCompletedTasks{}; + public: WorkQueue(); diff --git a/tuplex/core/include/LocalEngine.h b/tuplex/core/include/LocalEngine.h index 66ed3a1e8..740a40b4d 100644 --- a/tuplex/core/include/LocalEngine.h +++ b/tuplex/core/include/LocalEngine.h @@ -16,7 +16,28 @@ #include #include "RESTInterface.h" + namespace tuplex { + struct ExecutorConfig { + size_t _size; // size in bytes that each executor should have + size_t _blockSize; // size of individual blocks used (can be used for coarse or fine grained parallelism) + size_t _runTimeMemory; + size_t _runTimeMemoryDefaultBlockSize; + URI _cache_path; + + bool operator==(const ExecutorConfig &rhs) const { + return _size == rhs._size && + _blockSize == rhs._blockSize && + _runTimeMemory == rhs._runTimeMemory && + _runTimeMemoryDefaultBlockSize == rhs._runTimeMemoryDefaultBlockSize && + _cache_path == rhs._cache_path; + } + + bool operator!=(const ExecutorConfig &rhs) const { + return !(rhs == *this); + } + }; + /*! * local execution engine. Provides local executors for a context * THIS IS NOT THREADSAFE. Should be only accessed by driver thread. @@ -25,16 +46,18 @@ namespace tuplex { private: // non-detached executor that serves as the driver - std::unique_ptr _driver; + std::shared_ptr _driver; + ExecutorConfig _driver_cfg; std::vector> _executors; - std::map _refCounts; //! reference counts for each executor + std::map _refCounts; //! reference counts for each executor + + LocalEngine(const LocalEngine &); - LocalEngine(const LocalEngine&); - void operator = (const LocalEngine&); + void operator=(const LocalEngine &); // The local task queue - WorkQueue _queue; + WorkQueue _queue; protected: LocalEngine(); @@ -63,25 +86,25 @@ namespace tuplex { * @param cache_path directory where subfolders will be created for all executors to be started * @return array of executor references */ - std::vector getExecutors(const size_t num, - const size_t size, - const size_t blockSize, - const size_t runTimeMemory, - const size_t runTimeMemoryDefaultBlockSize, - const URI& cache_path); + std::vector getExecutors(const size_t num, + const size_t size, + const size_t blockSize, + const size_t runTimeMemory, + const size_t runTimeMemoryDefaultBlockSize, + const URI &cache_path); /*! * releases executors (invoked by context) * @param executors * @param ctx */ - void freeExecutors(const std::vector& executors, const Context* ctx=nullptr); + void freeExecutors(const std::vector &executors, const Context *ctx = nullptr); - Executor* getDriver(const size_t size, - const size_t blockSize, - const size_t runTimeMemory, - const size_t runTimeMemoryDefaultBlockSize, - const URI& cache_path); + std::shared_ptr getDriver(const size_t size, + const size_t blockSize, + const size_t runTimeMemory, + const size_t runTimeMemoryDefaultBlockSize, + const URI &cache_path); void release(); @@ -89,7 +112,7 @@ namespace tuplex { * retrieves the global work queue for local executors * @return */ - WorkQueue& getQueue() { return _queue; } + WorkQueue &getQueue() { return _queue; } }; } #endif //TUPLEX_LOCALENGINE_H \ No newline at end of file diff --git a/tuplex/core/include/Partition.h b/tuplex/core/include/Partition.h index 6e3e1a912..85818700e 100644 --- a/tuplex/core/include/Partition.h +++ b/tuplex/core/include/Partition.h @@ -158,7 +158,7 @@ namespace tuplex { * return how much capacity is left, i.e. how many bytes can be actually written * @return */ - size_t capacity() { return _size - sizeof(int64_t); } + size_t capacity() const { return _size - sizeof(int64_t); } uniqueid_t uuid() const { return _uuid; } diff --git a/tuplex/core/include/PartitionUtils.h b/tuplex/core/include/PartitionUtils.h new file mode 100644 index 000000000..d247edcfc --- /dev/null +++ b/tuplex/core/include/PartitionUtils.h @@ -0,0 +1,46 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by March Boonyapaluk first on 4/19/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#ifndef TUPLEX_PARTITIONUTILS_H +#define TUPLEX_PARTITIONUTILS_H + +#include +#include +#include + +namespace tuplex { + /*! + * Trim list of partitions so that it includes up to the first n rows and the last m rows + * if n + m > number of rows in input partitions, the partitions will remain unchanged + * @param partitions [in,out] the list of partitions to trim + * @param topLimit n, the number of top rows to include + * @param bottomLimit m, the number of bottom rows to include + * @param tstage pointer to transform stage, might be used to generate new partition + * @param exec pointer to executor, might be used to allocate new partition + */ + void trimPartitionsToLimit(std::vector &partitions, size_t topLimit, size_t bottomLimit, + TransformStage *tstage, Executor *exec); + + /*! + * Create a newly allocated partition with the same data as the specified partition, but with the first n rows removed + * @param p_in the input partition + * @param numToSkip number of rows to remove from the new partition + * @param tstage pointer to transform stage, used to generate new partition + * @param exec pointer to executor, used to allocate new partition + * @return the new partition + */ + Partition *newPartitionWithSkipRows(Partition *p_in, + size_t numToSkip, + TransformStage *tstage, + Executor *exec); + +} + +#endif //TUPLEX_PARTITIONUTILS_H diff --git a/tuplex/core/include/ee/IBackend.h b/tuplex/core/include/ee/IBackend.h index e7a80e5bb..1a543df8f 100644 --- a/tuplex/core/include/ee/IBackend.h +++ b/tuplex/core/include/ee/IBackend.h @@ -29,19 +29,22 @@ namespace tuplex { class IBackend { public: IBackend() = delete; - IBackend(const IBackend& other) = delete; - IBackend(const Context& context) : _context(context) {} + + IBackend(const IBackend &other) = delete; + + IBackend(const Context &context) : _context(context) {} // driver, i.e. where to store local data. - virtual Executor* driver() = 0; - virtual void execute(PhysicalStage* stage) = 0; + virtual Executor *driver() = 0; + + virtual void execute(PhysicalStage *stage) = 0; virtual ~IBackend() {} // virtual destructor needed b.c. of smart pointers - virtual const Context& context() const { return _context; } + virtual const Context &context() const { return _context; } private: - const Context& _context; + const Context &_context; }; inline std::unordered_map, size_t> merge_ecounts(std::unordered_map, size_t> lhs, diff --git a/tuplex/core/include/ee/local/LocalBackend.h b/tuplex/core/include/ee/local/LocalBackend.h index 58ae5a96c..4396186db 100644 --- a/tuplex/core/include/ee/local/LocalBackend.h +++ b/tuplex/core/include/ee/local/LocalBackend.h @@ -40,14 +40,15 @@ namespace tuplex { * constructor for convenience * @param context */ - explicit LocalBackend(const Context& context); + explicit LocalBackend(const Context &context); - Executor* driver() override; // for local execution + Executor *driver() override; // for local execution + + void execute(PhysicalStage *stage) override; - void execute(PhysicalStage* stage) override; private: - Executor *_driver; //! driver from local backend... - std::vector _executors; //! drivers to be used + std::shared_ptr _driver; //! driver from local backend... + std::vector _executors; //! drivers to be used std::unique_ptr _compiler; HistoryServerConnection _historyConn; diff --git a/tuplex/core/include/logical/TakeOperator.h b/tuplex/core/include/logical/TakeOperator.h index 8d0d6dcab..f3841236b 100644 --- a/tuplex/core/include/logical/TakeOperator.h +++ b/tuplex/core/include/logical/TakeOperator.h @@ -17,15 +17,16 @@ namespace tuplex { class TakeOperator : public LogicalOperator { private: - int64_t _limit; + size_t _topLimit; + size_t _bottomLimit; public: LogicalOperator *clone() override; public: - TakeOperator(LogicalOperator *parent, const int64_t numElements); + TakeOperator(LogicalOperator *parent, size_t topLimit, size_t bottomLimit); std::string name() override { - if(_limit < 0 || std::numeric_limits::max() == _limit) + if(_topLimit == std::numeric_limits::max() || _bottomLimit == std::numeric_limits::max()) return "collect"; return "take"; } @@ -37,8 +38,9 @@ namespace tuplex { bool good() const override; - int64_t limit() { return _limit; } + size_t topLimit() const { return _topLimit; } + size_t bottomLimit() const { return _bottomLimit; } std::vector getSample(const size_t num) const override; diff --git a/tuplex/core/include/physical/StageBuilder.h b/tuplex/core/include/physical/StageBuilder.h index 63b94bd57..1c322b9a6 100644 --- a/tuplex/core/include/physical/StageBuilder.h +++ b/tuplex/core/include/physical/StageBuilder.h @@ -76,8 +76,9 @@ namespace tuplex { void addFileInput(FileInputOperator* csvop); void addFileOutput(FileOutputOperator* fop); - inline void setOutputLimit(size_t limit) { - _outputLimit = limit; + inline void setOutputLimit(size_t topLimit, size_t bottomLimit = 0) { + _outputTopLimit = topLimit; + _outputBottomLimit = bottomLimit; } TransformStage* build(PhysicalPlan* plan, IBackend* backend); @@ -134,7 +135,8 @@ namespace tuplex { FileFormat _outputFileFormat; int64_t _outputNodeID; int64_t _inputNodeID; - size_t _outputLimit; + size_t _outputTopLimit; + size_t _outputBottomLimit; LogicalOperator* _inputNode; std::vector _columnsToRead; @@ -157,7 +159,7 @@ namespace tuplex { int64_t outputDataSetID() const; inline bool hasOutputLimit() const { - return _outputLimit < std::numeric_limits::max(); + return _outputTopLimit != std::numeric_limits::max() && _outputBottomLimit != std::numeric_limits::max(); } inline char csvOutputDelimiter() const { diff --git a/tuplex/core/include/physical/TransformStage.h b/tuplex/core/include/physical/TransformStage.h index 7488d29e9..76aeb45a5 100644 --- a/tuplex/core/include/physical/TransformStage.h +++ b/tuplex/core/include/physical/TransformStage.h @@ -133,17 +133,20 @@ namespace tuplex { /*! * sets maximum number of rows this pipeline will produce - * @param outputLimit + * @param topLimit number of top rows to produce, 0 means none, and size_t::max means everything + * @param bottomLimit number of bottom rows to produce, 0 means none, and size_t::max means everything */ - void setOutputLimit(size_t outputLimit) { - _outputLimit = outputLimit; + inline void setOutputLimit(size_t topLimit, size_t bottomLimit) { + _outputTopLimit = topLimit; + _outputBottomLimit = bottomLimit; // @TODO: move this logic to physical plan! // pushdown limit //pushDownOutputLimit(); } - size_t outputLimit() const { return _outputLimit; } + size_t outputTopLimit() const { return _outputTopLimit; } + size_t outputBottomLimit() const { return _outputBottomLimit; } size_t inputLimit() const { return _inputLimit; } /*! @@ -414,6 +417,10 @@ namespace tuplex { */ void setDataAggregationMode(const AggregateType& t) { _aggMode = t; } + // default case: both _outputTopLimit and _outputBottomLimit is zero = take everything + bool hasOutputLimit() const { + return _outputTopLimit != std::numeric_limits::max() && _outputBottomLimit != std::numeric_limits::max(); + } private: /*! * creates a new TransformStage with generated code @@ -466,11 +473,14 @@ namespace tuplex { std::vector _inputPartitions; //! memory input partitions for this task. size_t _inputLimit; //! limit number of input rows (inf per default) - size_t _outputLimit; //! output limit, set e.g. by take, to_csv etc. (inf per default) + + size_t _outputTopLimit; //! output limit, set e.g. by take, to_csv etc. (inf per default) + size_t _outputBottomLimit; //! output limit, set e.g. by take, to_csv etc. (0 per default) std::vector _generalPartitions; //! general case input partitions std::vector _fallbackPartitions; //! fallback case input partitions std::vector _partitionGroups; //! groups partitions together for correct row indices + std::shared_ptr _rs; //! result set URI _outputURI; //! the output uri for file mode of this stage @@ -499,10 +509,6 @@ namespace tuplex { // for hash output, the key and bucket type python::Type _hashOutputKeyType; python::Type _hashOutputBucketType; - - bool hasOutputLimit() const { - return _outputLimit < std::numeric_limits::max(); - } }; } #endif //TUPLEX_TRANSFORMSTAGE_H \ No newline at end of file diff --git a/tuplex/core/include/physical/TransformTask.h b/tuplex/core/include/physical/TransformTask.h index 3eb8013dd..bdb35ec3d 100644 --- a/tuplex/core/include/physical/TransformTask.h +++ b/tuplex/core/include/physical/TransformTask.h @@ -180,15 +180,32 @@ namespace tuplex { void setOutputPrefix(const char* buf, size_t bufSize); // extra prefix to write first to output. void sinkOutputToHashTable(HashTableFormat fmt, int64_t outputDataSetID); + HashTableSink hashTableSink() const { return _htable; } // needs to be freed manually! - void setOutputLimit(size_t limit) { _outLimit = limit; resetOutputLimitCounter(); } - void setOutputSkip(size_t numRowsToSkip) { _outSkipRows = numRowsToSkip; } + void setOutputTopLimit(size_t limit) { + _outTopLimit = limit; + } + + void setOutputBottomLimit(size_t limit) { + _outBottomLimit = limit; + } + + /*! + * Set the maximum task order number that the current stage execute and reset the row counter. + * This is used to detect and stop the execution when we have reached the rows limit + * @param maxOrder maximum task order number in the pipeline, infinity means disregarding the bottomLimit short circuit + */ + static void setMaxOrderAndResetLimits(size_t maxOrder = std::numeric_limits::max()); + void execute() override; bool hasFileSink() const { return _outputFilePath != URI::INVALID; } + bool hasFileSource() const { return _inputFilePath != URI::INVALID; } + bool hasMemorySink() const { return _outputSchema != Schema::UNKNOWN; } + bool hasMemorySource() const { return !_inputPartitions.empty(); } bool hasHashTableSink() const { return _htableFormat != HashTableFormat::UNKNOWN; } HashTableFormat hashTableFormat() const { return _htableFormat; } @@ -207,8 +224,6 @@ namespace tuplex { static codegen::i64_hash_row_f writeInt64HashTableAggregateCallback(); static codegen::write_row_f aggCombineCallback(); - static void resetOutputLimitCounter(); - // most be public because of C++ issues -.- int64_t writeRowToMemory(uint8_t* buf, int64_t bufSize); int64_t writeRowToFile(uint8_t* buf, int64_t bufSize); @@ -252,7 +267,9 @@ namespace tuplex { double wallTime() const override { return _wallTime; } size_t output_rows_written() const { return _numOutputRowsWritten; } - size_t output_limit() const { return _outLimit; } + size_t output_top_limit() const { return _outTopLimit; } + size_t output_bottom_limit() const { return _outBottomLimit; } + private: void resetSinks(); void resetSources(); @@ -279,8 +296,8 @@ namespace tuplex { Buffer _outPrefix; std::unordered_map _outOptions; - size_t _outLimit; // limits how many rows to write at max - size_t _outSkipRows; // how many rows at start to skip + size_t _outTopLimit; // limits how many rows to write at max + size_t _outBottomLimit; // limits how many last rows to write at max // memory source variables std::vector _inputPartitions; @@ -311,12 +328,26 @@ namespace tuplex { inline int64_t contextID() const { return _contextID; } inline void unlockAllMemorySinks() { // output partition existing? if so unlock - _output.unlock(); - _exceptions.unlock(); + _output.unlock(); + _exceptions.unlock(); } + /*! + * check whether the stage reached both top and bottom limit, to use this one must call + * setMaxOrderAndResetLimits before execution and set both top and bottom limit + * @return true if limit is reached + */ + bool limitReached() const; + + /*! + * Update the global stage limit counter, should only be called once, at the end of task + */ + void updateLimits(); + void processMemorySourceWithExp(); + void processMemorySource(); + void processFileSource(); // exceptions diff --git a/tuplex/core/src/DataSet.cc b/tuplex/core/src/DataSet.cc index a33925d7f..0196ec97c 100644 --- a/tuplex/core/src/DataSet.cc +++ b/tuplex/core/src/DataSet.cc @@ -38,21 +38,17 @@ namespace tuplex { } std::shared_ptr DataSet::collect(std::ostream &os) { - return take(-1, os); + return take(std::numeric_limits::max(), 0, os); } - std::shared_ptr DataSet::take(int64_t numElements, std::ostream &os) { + std::shared_ptr DataSet::take(size_t topLimit, size_t bottomLimit, std::ostream &os) { // error dataset? if (isError()) throw std::runtime_error("is error dataset!"); - // negative numbers mean get all elements! - if (numElements < 0) - numElements = std::numeric_limits::max(); - // create a take node assert(_context); - LogicalOperator *op = _context->addOperator(new TakeOperator(this->_operator, numElements)); + LogicalOperator *op = _context->addOperator(new TakeOperator(this->_operator, topLimit, bottomLimit)); DataSet *dsptr = _context->createDataSet(op->getOutputSchema()); dsptr->_operator = op; op->setDataSet(dsptr); @@ -66,18 +62,14 @@ namespace tuplex { // collect functions std::vector DataSet::collectAsVector(std::ostream &os) { - return takeAsVector(-1, os); + return takeAsVector(std::numeric_limits::max(), os); } - // -1 means to retrieve all elements - std::vector DataSet::takeAsVector(int64_t numElements, std::ostream &os) { - auto rs = take(numElements, os); + std::vector DataSet::takeAsVector(size_t numElements, std::ostream &os) { + auto rs = take(numElements, 0, os); Timer timer; #warning "limiting should make this hack irrelevant..." - if (numElements < 0) - numElements = std::numeric_limits::max(); - // std::vector v; // while (rs->hasNextRow() && v.size() < numElements) { // v.push_back(rs->getNextRow()); @@ -734,10 +726,14 @@ namespace tuplex { } - void DataSet::show(const int64_t numRows, std::ostream &os) { + void DataSet::show(int64_t numRows, std::ostream &os) { assert(_context); // get rows + if (numRows < 0) { + numRows = std::numeric_limits::max(); + } + auto rows = takeAsVector(numRows, os); if (rows.empty()) { return; diff --git a/tuplex/core/src/EmptyDataset.cc b/tuplex/core/src/EmptyDataset.cc index 984fa904f..3664a591a 100644 --- a/tuplex/core/src/EmptyDataset.cc +++ b/tuplex/core/src/EmptyDataset.cc @@ -11,16 +11,16 @@ #include namespace tuplex { - std::shared_ptr EmptyDataset::take(int64_t numElements, std::ostream &os) { + std::shared_ptr EmptyDataset::take(size_t topLimit, size_t bottomLimit, std::ostream &os) { return std::make_shared(); } - std::vector EmptyDataset::takeAsVector(int64_t numElements, std::ostream &os) { + std::vector EmptyDataset::takeAsVector(size_t numElements, std::ostream &os) { return std::vector{}; } std::shared_ptr EmptyDataset::collect(std::ostream &os) { - return take(0, os); + return take(0, 0, os); } std::vector EmptyDataset::collectAsVector(std::ostream &os) { diff --git a/tuplex/core/src/ErrorDataSet.cc b/tuplex/core/src/ErrorDataSet.cc index 57c03ffba..c87999e5f 100644 --- a/tuplex/core/src/ErrorDataSet.cc +++ b/tuplex/core/src/ErrorDataSet.cc @@ -12,7 +12,7 @@ namespace tuplex { - std::vector ErrorDataSet::takeAsVector(int64_t numElements, std::ostream &os) { + std::vector ErrorDataSet::takeAsVector(size_t numElements, std::ostream &os) { // return empty vector and print err message Logger::instance().logger("core").error(this->_error); @@ -23,7 +23,7 @@ namespace tuplex { return takeAsVector(0, os); } - std::shared_ptr ErrorDataSet::take(int64_t numElements, std::ostream &os) { + std::shared_ptr ErrorDataSet::take(size_t topLimit, size_t bottomLimit, std::ostream &os) { // return empty vector and print err message Logger::instance().logger("core").error(this->_error); @@ -31,7 +31,7 @@ namespace tuplex { } std::shared_ptr ErrorDataSet::collect(std::ostream &os) { - return take(0, os); + return take(0, 0, os); } void diff --git a/tuplex/core/src/Executor.cc b/tuplex/core/src/Executor.cc index 845b78e6a..618b01345 100644 --- a/tuplex/core/src/Executor.cc +++ b/tuplex/core/src/Executor.cc @@ -32,8 +32,8 @@ namespace tuplex { std::vector WorkQueue::popCompletedTasks() { TRACE_LOCK("workQueue"); - std::lock_guard lock(_completedTasksMutex); + std::lock_guard lock(_completedTasksMutex); // move leads to circular dependency in gcc and thus a bug on travis-ci. Therefore, just // use the below hack to fool the compiler into actually copying the vectors // // move to reset completed tasks and return array @@ -78,59 +78,42 @@ namespace tuplex { bool WorkQueue::workTask(Executor& executor, bool nonBlocking) { IExecutorTask *task = nullptr; - if(nonBlocking) { - // @Todo: This should be put into a function "work" on the workQueue... - // dequeue from general working queue - if(_queue.try_dequeue(task)) { - if(!task) - return false; - - task->setOwner(&executor); - task->setThreadNumber(executor.threadNumber()); // redundant? - //executor.logger().info("started task..."); - // process task - task->execute(); - // save which thread executed this task - task->setID(std::this_thread::get_id()); - - _numPendingTasks.fetch_add(-1, std::memory_order_release); - - // add task to done list - TRACE_LOCK("completedTasks"); - _completedTasksMutex.lock(); - _completedTasks.push_back(std::move(task)); - _completedTasksMutex.unlock(); - _numCompletedTasks.fetch_add(1, std::memory_order_release); - TRACE_UNLOCK("completedTasks"); - return true; + // dequeue from general working queue + // Note: is this TODO: outdated? + // @Todo: This should be put into a function "work" on the workQueue... + if (nonBlocking) { + if(!_queue.try_dequeue(task)) { + return false; } } else { _queue.wait_dequeue(task); + } - if(!task) - return false; + if(!task) { + return false; + } - task->setOwner(&executor); - task->setThreadNumber(executor.threadNumber()); // redundant? + task->setOwner(&executor); + task->setThreadNumber(executor.threadNumber()); // redundant? - // process task - task->execute(); - // save which thread executed this task - task->setID(std::this_thread::get_id()); + // executor.logger().info("started task..."); + // process task + task->execute(); + // save which thread executed this task + task->setID(std::this_thread::get_id()); - // add task to done list - TRACE_LOCK("completedTasks"); - _completedTasksMutex.lock(); - _completedTasks.push_back(std::move(task)); - _completedTasksMutex.unlock(); - _numCompletedTasks.fetch_add(1, std::memory_order_release); - TRACE_UNLOCK("completedTasks"); + // add task to done list + TRACE_LOCK("completedTasks"); + _completedTasksMutex.lock(); + _completedTasks.push_back(std::move(task)); + _completedTasksMutex.unlock(); + _numCompletedTasks.fetch_add(1, std::memory_order_release); + TRACE_UNLOCK("completedTasks"); - _numPendingTasks.fetch_add(-1, std::memory_order_release); - return true; - } - return false; + _numPendingTasks.fetch_add(-1, std::memory_order_release); + + return true; } void WorkQueue::workUntilAllTasksFinished(tuplex::Executor &executor, bool flushPeriodicallyToPython) { diff --git a/tuplex/core/src/LocalEngine.cc b/tuplex/core/src/LocalEngine.cc index 02c060a90..91892d44d 100644 --- a/tuplex/core/src/LocalEngine.cc +++ b/tuplex/core/src/LocalEngine.cc @@ -98,7 +98,8 @@ namespace tuplex { exec->processQueue(true); std::stringstream ss; - ss<<"started local executor "<name()<<" ("<name() << " (" << sizeToMemString(size) << ", " + << sizeToMemString(blockSize) << " default partition size)"; logger.info(ss.str()); } @@ -107,23 +108,42 @@ namespace tuplex { return execs; } - Executor* LocalEngine::getDriver(const size_t size, const size_t blockSize, const size_t runTimeMemory, - const size_t runTimeMemoryDefaultBlockSize, const tuplex::URI &cache_path) { - // lazy start driver - if(!_driver) { + std::shared_ptr + LocalEngine::getDriver(const size_t size, const size_t blockSize, const size_t runTimeMemory, + const size_t runTimeMemoryDefaultBlockSize, const tuplex::URI &cache_path) { + ExecutorConfig new_cfg = ExecutorConfig{ + ._size = size, + ._blockSize = blockSize, + ._runTimeMemory = runTimeMemory, + ._runTimeMemoryDefaultBlockSize = runTimeMemoryDefaultBlockSize, + ._cache_path = cache_path + }; + + if (!_driver || _driver_cfg != new_cfg) { + if (_driver) { + Logger::instance().logger("local execution engine").info( + "driver already exist, starting new driver with updated config"); + } + + // lazy start driver URI uri = URI(cache_path.toString() + "/" + "driver"); - _driver = std::make_unique(size, blockSize, runTimeMemory, runTimeMemoryDefaultBlockSize, uri, "driver"); + _driver = std::make_shared(size, blockSize, runTimeMemory, runTimeMemoryDefaultBlockSize, uri, + "driver"); + _driver_cfg = new_cfg; // driver always has thread number 0! + // Note: this could be a potential issue if the config change and the old driver is still running + // due to external reference. Then there could be two executors with the same number _driver->setThreadNumber(0); std::stringstream ss; - ss<<"started driver ("< & executors, const Context* ctx) { diff --git a/tuplex/core/src/PartitionUtils.cc b/tuplex/core/src/PartitionUtils.cc new file mode 100644 index 000000000..349f41048 --- /dev/null +++ b/tuplex/core/src/PartitionUtils.cc @@ -0,0 +1,154 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by March Boonyapaluk first on 4/19/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#include "PartitionUtils.h" + +namespace tuplex { + + void trimPartitionsToLimit(std::vector &partitions, + size_t topLimit, + size_t bottomLimit, + TransformStage *tstage, + Executor *exec) { + std::vector limitedPartitions, limitedTailPartitions; + + // check top output limit, adjust partitions if necessary + size_t numTopOutputRows = 0; + Partition *lastTopPart = nullptr; + size_t clippedTop = 0; + for (auto partition: partitions) { + numTopOutputRows += partition->getNumRows(); + lastTopPart = partition; + if (numTopOutputRows >= topLimit) { + // clip last partition & leave loop + clippedTop = topLimit - (numTopOutputRows - partition->getNumRows()); + assert(clippedTop <= partition->getNumRows()); + break; + } else if (partition == partitions.back()) { + // last partition, mark full row, but don't put to output set yet to avoid double put + clippedTop = partition->getNumRows(); + break; + } else { + // put full partition to output set + limitedPartitions.push_back(partition); + } + } + + // check the bottom output limit, adjust partitions if necessary + size_t numBottomOutputRows = 0; + size_t clippedBottom = 0; + for (auto it = partitions.rbegin(); it != partitions.rend(); it++) { + auto partition = *it; + numBottomOutputRows += partition->getNumRows(); + + if (partition == lastTopPart) { + // the bottom and the top partitions are overlapping + clippedBottom = bottomLimit - (numBottomOutputRows - partition->getNumRows()); + if (clippedTop + clippedBottom >= partition->getNumRows()) { + // if top and bottom range intersect, use full partitions + clippedTop = partition->getNumRows(); + clippedBottom = 0; + } + break; + } else if (numBottomOutputRows >= bottomLimit) { + // clip last partition & leave loop + auto clipped = bottomLimit - (numBottomOutputRows - partition->getNumRows()); + assert(clipped <= partition->getNumRows()); + if (clipped > 0) { + Partition *newPart = newPartitionWithSkipRows(partition, partition->getNumRows() - clipped, tstage, + exec); + assert(newPart->getNumRows() == clipped); + limitedTailPartitions.push_back(newPart); + } + partition->invalidate(); + break; + } else { + // put full partition to output set + limitedTailPartitions.push_back(partition); + } + } + + // push the middle partition + if (lastTopPart != nullptr && (clippedTop > 0 || clippedBottom > 0)) { + assert(clippedTop + clippedBottom <= lastTopPart->getNumRows()); + + // split into two partitions with both top and bottom are in the same partition + Partition *lastBottomPart = nullptr; + + if (clippedBottom != 0) { + lastBottomPart = newPartitionWithSkipRows(lastTopPart, lastTopPart->getNumRows() - clippedBottom, + tstage, exec); + } + + if (clippedTop != 0) { + lastTopPart->setNumRows(clippedTop); + limitedPartitions.push_back(lastTopPart); + } else { + lastTopPart->invalidate(); + } + + if (lastBottomPart != nullptr) { + limitedPartitions.push_back(lastBottomPart); + } + } + + if (partitions.size() != limitedPartitions.size() + limitedTailPartitions.size()) { + // partition is changed, we need to change the partition grouping too + std::vector oldGrouping = tstage->partitionGroups(); + std::vector newGrouping; + size_t new_normal_num = limitedPartitions.size() + limitedTailPartitions.size(); + // remove all normal partition, put new one at the front + newGrouping.emplace_back(new_normal_num, 0, 0, 0, 0, 0); + for (auto gp: oldGrouping) { + gp.numNormalPartitions = 0; + newGrouping.push_back(gp); + } + + tstage->setPartitionGroups(newGrouping); + } + + // merge the head and tail partitions + partitions.clear(); + partitions.insert(partitions.end(), limitedPartitions.begin(), limitedPartitions.end()); + partitions.insert(partitions.end(), limitedTailPartitions.rbegin(), limitedTailPartitions.rend()); + } + + Partition *newPartitionWithSkipRows(Partition *p_in, size_t numToSkip, TransformStage *tstage, Executor *exec) { + auto ptr = p_in->lockRaw(); + auto num_rows = *((int64_t *) ptr); + assert(numToSkip < num_rows); + + ptr += sizeof(int64_t); + size_t numBytesToSkip = 0; + + Deserializer ds(tstage->outputSchema()); + for (unsigned i = 0; i < numToSkip; ++i) { + Row r = Row::fromMemory(ds, ptr, p_in->capacity() - numBytesToSkip); + ptr += r.serializedLength(); + numBytesToSkip += r.serializedLength(); + } + + Partition *p_out = exec->allocWritablePartition(p_in->size() - numBytesToSkip + sizeof(int64_t), + tstage->outputSchema(), tstage->outputDataSetID(), + tstage->context().id()); + assert(p_out->capacity() >= p_in->size() - numBytesToSkip); + + auto ptr_out = p_out->lockWriteRaw(); + *((int64_t *) ptr_out) = p_in->getNumRows() - numToSkip; + ptr_out += sizeof(int64_t); + memcpy((void *) ptr_out, ptr, p_in->size() - numBytesToSkip); + p_out->setNumRows(p_in->getNumRows() - numToSkip); + p_out->unlock(); + + p_in->unlock(); + + return p_out; + } +} // namespace tuplex \ No newline at end of file diff --git a/tuplex/core/src/ee/local/LocalBackend.cc b/tuplex/core/src/ee/local/LocalBackend.cc index 39f4bc8c0..85ce66383 100644 --- a/tuplex/core/src/ee/local/LocalBackend.cc +++ b/tuplex/core/src/ee/local/LocalBackend.cc @@ -27,7 +27,7 @@ #include #include #include -#include +#include "PartitionUtils.h" namespace tuplex { @@ -116,14 +116,14 @@ namespace tuplex { } Executor *LocalBackend::driver() { - assert(_driver); - return _driver; + assert(_driver); + return _driver.get(); } void LocalBackend::execute(tuplex::PhysicalStage *stage) { assert(stage); - if(!stage) + if (!stage) return; // history server connection should be established @@ -550,7 +550,8 @@ namespace tuplex { task->sinkExceptionsToMemory(inputSchema); task->setStageID(tstage->getID()); - task->setOutputLimit(tstage->outputLimit()); + task->setOutputTopLimit(tstage->outputTopLimit()); + task->setOutputBottomLimit(tstage->outputBottomLimit()); // add to tasks tasks.emplace_back(std::move(task)); } else { @@ -584,7 +585,8 @@ namespace tuplex { } task->sinkExceptionsToMemory(inputSchema); task->setStageID(tstage->getID()); - task->setOutputLimit(tstage->outputLimit()); + task->setOutputTopLimit(tstage->outputTopLimit()); + task->setOutputBottomLimit(tstage->outputBottomLimit()); // add to tasks tasks.emplace_back(std::move(task)); num_parts++; @@ -621,7 +623,8 @@ namespace tuplex { } task->sinkExceptionsToMemory(inputSchema); task->setStageID(tstage->getID()); - task->setOutputLimit(tstage->outputLimit()); + task->setOutputTopLimit(tstage->outputTopLimit()); + task->setOutputBottomLimit(tstage->outputBottomLimit()); // add to tasks tasks.emplace_back(std::move(task)); @@ -648,6 +651,7 @@ namespace tuplex { // --> issue for each memory partition a transform task and put it into local workqueue assert(tstage->inputMode() == EndPointMode::MEMORY); + // restrict after input limit size_t numInputRows = 0; auto inputPartitions = tstage->inputPartitions(); @@ -703,7 +707,8 @@ namespace tuplex { } task->sinkExceptionsToMemory(tstage->inputSchema()); task->setStageID(tstage->getID()); - task->setOutputLimit(tstage->outputLimit()); + task->setOutputTopLimit(tstage->outputTopLimit()); + task->setOutputBottomLimit(tstage->outputBottomLimit()); tasks.emplace_back(std::move(task)); // input limit exhausted? break! @@ -712,6 +717,42 @@ namespace tuplex { } } + // assign the order for all tasks + for(size_t i = 0; i < tasks.size(); ++i) { + tasks[i]->setOrder(i); + } + + TransformTask::setMaxOrderAndResetLimits(tasks.size() - 1); + + if (tstage->hasOutputLimit()) { + // There are 3 possible cases here: + // 1. both top and bottom limit + // 2. only top limit + // 3. only bottom limit + if (tstage->outputTopLimit() > 0 && tstage->outputBottomLimit() > 0) { + // case 1: do task striping for output limit on both ends + // We are executing in the striping order instead of ascending or descending order + // This is an optimization in the case where we have small limits to avoid executing all partitions + vector newTasks; + for(size_t i = 0; i < tasks.size() - i; i++) { + const size_t rev_i = tasks.size() - 1 - i; + newTasks.push_back(tasks[i]); + if (i < rev_i) { + newTasks.push_back(tasks[rev_i]); + } + } + assert(tasks.size() == newTasks.size()); + tasks.swap(newTasks); + } else if (tstage->outputBottomLimit() > 0) { + // case 3: bottom limit only, just reverse the task order + // We are executing the last partitions first, since we don't need the top rows. + // Thus speeding up the execution time + std::reverse(tasks.begin(), tasks.end()); + } + // case 3: if top limit only, do nothing since the order is already good + // (the tasks is generated in ascending order) + } + return tasks; } @@ -770,7 +811,6 @@ namespace tuplex { } void LocalBackend::executeTransformStage(tuplex::TransformStage *tstage) { - Timer stageTimer; Timer timer; // for detailed measurements. @@ -788,8 +828,14 @@ namespace tuplex { // special case: skip stage, i.e. empty code and mem2mem if(tstage->code().empty() && !tstage->fileInputMode() && !tstage->fileOutputMode()) { - tstage->setMemoryResult(tstage->inputPartitions(), tstage->generalPartitions(), tstage->fallbackPartitions(), + auto output_par = tstage->inputPartitions(); + if (tstage->hasOutputLimit()) { + trimPartitionsToLimit(output_par, tstage->outputTopLimit(), tstage->outputBottomLimit(), tstage, + _driver.get()); + } + tstage->setMemoryResult(output_par, tstage->generalPartitions(), tstage->fallbackPartitions(), tstage->partitionGroups()); + // skip stage Logger::instance().defaultLogger().info("[Transform Stage] skipped stage " + std::to_string(tstage->number()) + " because there is nothing todo here."); return; @@ -870,6 +916,13 @@ namespace tuplex { } auto tasks = createLoadAndTransformToMemoryTasks(tstage, _options, syms); + + { + std::stringstream ss; + ss<<"[Transform Stage] Stage "<number()<<" starting "<hasOutputLimit()) { + // the function expect the output to be sorted in ascending order (guaranteed by sortTasks()) + trimPartitionsToLimit(normalPartitions, tstage->outputTopLimit(), tstage->outputBottomLimit(), tstage, + _driver.get()); + } + tstage->setMemoryResult(normalPartitions, generalPartitions, fallbackPartitions, partitionGroups, exceptionCounts); break; } @@ -1468,9 +1527,9 @@ namespace tuplex { logger().debug("task without order found, please fix in code."); } #endif - // add all tasks to queue for(auto& task : tasks) wq.addTask(task); + // clear tasks.clear(); @@ -2098,7 +2157,7 @@ namespace tuplex { // now simply go over the partitions and write the full buffers out // check all the params from TrafoStage - size_t limit = tstage->outputLimit(); + size_t limit = tstage->outputTopLimit(); size_t splitSize = tstage->splitSize(); size_t numOutputFiles = tstage->numOutputFiles(); URI uri = tstage->outputURI(); @@ -2223,4 +2282,5 @@ namespace tuplex { Logger::instance().defaultLogger().info("writing output took " + std::to_string(timer.time()) + "s"); tstage->setFileResult(ecounts); } + } // namespace tuplex \ No newline at end of file diff --git a/tuplex/core/src/logical/TakeOperator.cc b/tuplex/core/src/logical/TakeOperator.cc index aa7c49668..49a4452b4 100644 --- a/tuplex/core/src/logical/TakeOperator.cc +++ b/tuplex/core/src/logical/TakeOperator.cc @@ -12,13 +12,13 @@ #include namespace tuplex { - TakeOperator::TakeOperator(LogicalOperator *parent, const int64_t numElements) : LogicalOperator::LogicalOperator(parent), _limit(numElements) { + TakeOperator::TakeOperator(LogicalOperator *parent, size_t topLimit, size_t bottomLimit) : LogicalOperator::LogicalOperator(parent), _topLimit(topLimit), _bottomLimit(bottomLimit) { // take schema from parent node setSchema(this->parent()->getOutputSchema()); } bool TakeOperator::good() const { - return _limit >= -1; + return _topLimit >= 0 && _bottomLimit >= 0; } std::vector TakeOperator::getSample(const size_t num) const { @@ -33,7 +33,7 @@ namespace tuplex { LogicalOperator *TakeOperator::clone() { // create clone of this operator - auto copy = new TakeOperator(parent()->clone(), _limit); + auto copy = new TakeOperator(parent()->clone(), _topLimit, _bottomLimit); copy->setDataSet(getDataSet()); // weak ptr to old dataset... copy->copyMembers(this); diff --git a/tuplex/core/src/physical/PhysicalPlan.cc b/tuplex/core/src/physical/PhysicalPlan.cc index 8aa49a5fb..db595a0ba 100644 --- a/tuplex/core/src/physical/PhysicalPlan.cc +++ b/tuplex/core/src/physical/PhysicalPlan.cc @@ -210,7 +210,8 @@ namespace tuplex { if(ops.back()->isActionable()) { if(ops.back()->type() == LogicalOperatorType::FILEOUTPUT) outputMode = EndPointMode::FILE; - else if(ops.back()->type() == LogicalOperatorType::TAKE || ops.back()->type() == LogicalOperatorType::CACHE) { + else if(ops.back()->type() == LogicalOperatorType::TAKE || + ops.back()->type() == LogicalOperatorType::CACHE) { // memory? outputMode = EndPointMode::MEMORY; } else @@ -241,7 +242,7 @@ namespace tuplex { // user wants to merge exceptions in order. bool updateInputExceptions = hasFilter && hasInputExceptions && _context.getOptions().OPT_MERGE_EXCEPTIONS_INORDER(); - // create trafostage via builder pattern + // create transform stage via builder pattern auto builder = codegen::StageBuilder(_num_stages++, isRootStage, _context.getOptions().UNDEFINED_BEHAVIOR_FOR_OPERATORS(), @@ -383,7 +384,7 @@ namespace tuplex { // set limit if output node has a limit (currently only TakeOperator) if(outputNode->type() == LogicalOperatorType::TAKE) { auto top = static_cast(outputNode); - builder.setOutputLimit(top->limit()); + builder.setOutputLimit(top->topLimit(), top->bottomLimit()); } // @TODO: add slowPip builder to this process... diff --git a/tuplex/core/src/physical/ResultSet.cc b/tuplex/core/src/physical/ResultSet.cc index 5086a1e58..cb9373335 100644 --- a/tuplex/core/src/physical/ResultSet.cc +++ b/tuplex/core/src/physical/ResultSet.cc @@ -207,18 +207,22 @@ namespace tuplex { _currentNormalPartitions.push_back(p); Deserializer ds(_schema); - for(int i = 0; i < limit;) { + for (size_t i = 0; i < limit;) { // all exhausted - if(_currentNormalPartitions.empty()) + if (_currentNormalPartitions.empty()) break; // get number of rows in first partition Partition *first = _currentNormalPartitions.front(); auto num_rows = first->getNumRows(); + + assert(num_rows >= _curNormalRowCounter); + assert(limit >= i); + // how many left to retrieve? auto num_to_retrieve_from_partition = std::min(limit - i, num_rows - _curNormalRowCounter); - if(num_to_retrieve_from_partition <= 0) + if (num_to_retrieve_from_partition <= 0) break; // make sure partition schema matches stored schema @@ -465,7 +469,7 @@ namespace tuplex { count += partition->getNumRows(); for (const auto& partition : _remainingFallbackPartitions) count += partition->getNumRows(); - return count; + return std::min(count, _maxRows); } void ResultSet::removeFirstGeneralPartition() { diff --git a/tuplex/core/src/physical/StageBuilder.cc b/tuplex/core/src/physical/StageBuilder.cc index 11ca64ac7..c3a55d605 100644 --- a/tuplex/core/src/physical/StageBuilder.cc +++ b/tuplex/core/src/physical/StageBuilder.cc @@ -50,7 +50,7 @@ namespace tuplex { : _stageNumber(stage_number), _isRootStage(rootStage), _allowUndefinedBehavior(allowUndefinedBehavior), _generateParser(generateParser), _normalCaseThreshold(normalCaseThreshold), _sharedObjectPropagation(sharedObjectPropagation), _nullValueOptimization(nullValueOptimization), _updateInputExceptions(updateInputExceptions), - _inputNode(nullptr), _outputLimit(std::numeric_limits::max()) { + _inputNode(nullptr), _outputTopLimit(std::numeric_limits::max()), _outputBottomLimit(0) { } void StageBuilder::generatePythonCode() { @@ -457,7 +457,8 @@ namespace tuplex { break; } case LogicalOperatorType::TAKE: { - opt_ops.push_back(new TakeOperator(lastParent, dynamic_cast(node)->limit())); + auto takeOp = dynamic_cast(node); + opt_ops.push_back(new TakeOperator(lastParent, takeOp->topLimit(), takeOp->bottomLimit())); opt_ops.back()->setID(node->getID()); break; } @@ -1431,7 +1432,8 @@ namespace tuplex { // no limit operator yet... // get limit - stage->_outputLimit = _outputLimit; + stage->_outputTopLimit = _outputTopLimit; + stage->_outputBottomLimit = _outputBottomLimit; // copy input/output configurations stage->_fileInputParameters = _fileInputParameters; diff --git a/tuplex/core/src/physical/TransformStage.cc b/tuplex/core/src/physical/TransformStage.cc index ff99bf4a7..bbae53a5f 100644 --- a/tuplex/core/src/physical/TransformStage.cc +++ b/tuplex/core/src/physical/TransformStage.cc @@ -48,7 +48,8 @@ namespace tuplex { int64_t number, bool allowUndefinedBehavior) : PhysicalStage::PhysicalStage(plan, backend, number), _inputLimit(std::numeric_limits::max()), - _outputLimit(std::numeric_limits::max()), + _outputTopLimit(std::numeric_limits::max()), + _outputBottomLimit(0), _aggMode(AggregateType::AGG_NONE) { // TODO: is this code out of date? + is allowUndefinedBehavior needed here? @@ -128,7 +129,6 @@ namespace tuplex { if (normalPartitions.empty() && generalPartitions.empty() && fallbackPartitions.empty()) _rs = emptyResultSet(); else { - std::vector limitedPartitions; auto schema = Schema::UNKNOWN; if(!normalPartitions.empty()) { @@ -136,27 +136,17 @@ namespace tuplex { for (auto partition : normalPartitions) { assert(schema == partition->schema()); } + } - // check output limit, adjust partitions if necessary - size_t numOutputRows = 0; - for (auto partition : normalPartitions) { - numOutputRows += partition->getNumRows(); - if (numOutputRows >= outputLimit()) { - // clip last partition & leave loop - auto clipped = outputLimit() - (numOutputRows - partition->getNumRows()); - assert(clipped <= partition->getNumRows()); - partition->setNumRows(clipped); - if (clipped > 0) - limitedPartitions.push_back(partition); - break; - } else { - // put full partition to output set - limitedPartitions.push_back(partition); - } - } + size_t maxRows; + if (hasOutputLimit()) { + maxRows = outputTopLimit() + outputBottomLimit(); + } else { + maxRows = std::numeric_limits::max(); } - _rs = std::make_shared(schema, limitedPartitions, generalPartitions, fallbackPartitions, partitionGroups, outputLimit()); + // put ALL partitions to result set + _rs = std::make_shared(schema, normalPartitions, generalPartitions, fallbackPartitions, partitionGroups, maxRows); } } diff --git a/tuplex/core/src/physical/TransformTask.cc b/tuplex/core/src/physical/TransformTask.cc index ecac0dd74..0ae2ddf90 100644 --- a/tuplex/core/src/physical/TransformTask.cc +++ b/tuplex/core/src/physical/TransformTask.cc @@ -18,11 +18,22 @@ #include namespace tuplex { - // atomic var to count output rows! - static std::atomic_int64_t g_totalOutputRows; - - void TransformTask::resetOutputLimitCounter() { - g_totalOutputRows = 0; + // this is a logic to stop the execution once it has reached the topLimit and bottomLimit + // here, we assume that task order starts with zero and count up by 1, e.g. 0, 1, 2, ..., n + // To implement limit, we maintain a mapping from the task order to the number of rows done in that task + // (rows done are either 0 or #output rows after processing) + // we can then find out how many top rows are done by looking at g_rowsDone[0], g_rowsDone[1], ... + // until we reach some segment that's 0 + // likewise, we can find the bottom rows done by looking at g_rowsDone[g_maxOrder], g_rowsDone[g_maxOrder - 1], ... + + // mapping from order number -> row count if the task is finished + static std::mutex g_rowsDoneMutex; + static std::unordered_map g_rowsDone; + static std::atomic_size_t g_maxOrder; + + void TransformTask::setMaxOrderAndResetLimits(size_t maxOrder) { + g_rowsDone.clear(); + g_maxOrder = maxOrder; } } @@ -40,22 +51,9 @@ extern "C" { } static int64_t limited_w2mCallback(tuplex::TransformTask* task, uint8_t* buf, int64_t bufSize) { - // i.e. check here how many output rows, if already limit reached - jump to goto! - if(tuplex::g_totalOutputRows >= task->output_limit()) { - return tuplex::ecToI64(tuplex::ExceptionCode::OUTPUT_LIMIT_REACHED); - } - assert(task); assert(dynamic_cast(task)); - auto rc = task->writeRowToMemory(buf, bufSize); - if(0 == rc) - tuplex::g_totalOutputRows++; - - // i.e. check here how many output rows, if already limit reached - jump to goto! - if(tuplex::g_totalOutputRows >= task->output_limit()) { - return tuplex::ecToI64(tuplex::ExceptionCode::OUTPUT_LIMIT_REACHED); - } - return rc; + return task->writeRowToMemory(buf, bufSize); } static int64_t limited_w2fCallback(tuplex::TransformTask* task, uint8_t* buf, int64_t bufSize) { @@ -513,8 +511,8 @@ namespace tuplex { _outputFilePath = URI::INVALID; _outFile.reset(nullptr); _outPrefix.reset(); - _outLimit = std::numeric_limits::max(); // write all rows - _outSkipRows = 0; // skip no rows + _outTopLimit = std::numeric_limits::max(); // write all rows + _outBottomLimit = 0; // reset memory sink _output.reset(); @@ -646,6 +644,47 @@ namespace tuplex { #endif } + bool TransformTask::limitReached() const { + size_t numTopCompleted = 0; + size_t numBottomCompleted = 0; + bool isTopLimitReached = false; + bool isBottomLimitReached = false; + + tuplex::g_rowsDoneMutex.lock(); + if (_outTopLimit == 0) { + isTopLimitReached = true; + } else { + for (size_t i = 0; tuplex::g_rowsDone.count(i) != 0; i++) { + numTopCompleted += tuplex::g_rowsDone[i]; + if (numTopCompleted >= _outTopLimit) { + isTopLimitReached = true; + break; + } + } + } + + if (_outBottomLimit == 0) { + isBottomLimitReached = true; + } else { + for (size_t i = tuplex::g_maxOrder; tuplex::g_rowsDone.count(i) != 0; i--) { + numBottomCompleted += tuplex::g_rowsDone[i]; + if (numBottomCompleted >= _outBottomLimit) { + isBottomLimitReached = true; + break; + } + } + } + tuplex::g_rowsDoneMutex.unlock(); + + return isTopLimitReached && isBottomLimitReached; + } + + void TransformTask::updateLimits() { + tuplex::g_rowsDoneMutex.lock(); + tuplex::g_rowsDone[getOrder(0)] += getNumOutputRows(); + tuplex::g_rowsDoneMutex.unlock(); + } + void TransformTask::processMemorySource() { assert(!_inputPartitions.empty()); assert(_functor); @@ -659,6 +698,11 @@ namespace tuplex { // go over all input partitions. for(const auto &inputPartition : _inputPartitions) { + if (limitReached()) { + // skip the execution, enough is done + break; + } + // lock ptr, extract number of rows ==> store them // lock raw & call functor! int64_t inSize = inputPartition->size(); @@ -680,6 +724,8 @@ namespace tuplex { // delete partition if desired... if(_invalidateSourceAfterUse) inputPartition->invalidate(); + + updateLimits(); } #ifndef NDEBUG @@ -716,7 +762,7 @@ namespace tuplex { // skip rows? limit rows?? - if(_numOutputRowsWritten >= _outSkipRows && _numOutputRowsWritten < (_outLimit - _outSkipRows)) { + if(_numOutputRowsWritten < _outTopLimit) { if(_outFile->write(buf, bufSize) != VirtualFileSystemStatus::VFS_OK) return ecToI32(ExceptionCode::IOERROR); } diff --git a/tuplex/python/CMakeLists.txt b/tuplex/python/CMakeLists.txt index 6ea09725c..33a8d5a9e 100644 --- a/tuplex/python/CMakeLists.txt +++ b/tuplex/python/CMakeLists.txt @@ -130,6 +130,7 @@ FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/__init__.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/tracebacks.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/version.py ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/globs.py + ${CMAKE_CURRENT_SOURCE_DIR}/tuplex/utils/table_format.py DESTINATION ${PYTHON_DIST_DIR}/tuplex/utils) FILE(COPY ${CMAKE_CURRENT_SOURCE_DIR}/tests/test_tuples.py diff --git a/tuplex/python/include/PythonDataSet.h b/tuplex/python/include/PythonDataSet.h index 169712493..025454a1e 100644 --- a/tuplex/python/include/PythonDataSet.h +++ b/tuplex/python/include/PythonDataSet.h @@ -77,7 +77,7 @@ namespace tuplex { PythonDataSet resolve(const int64_t exceptionCode, const std::string& lambda_code, const std::string& pickled_code, const py::object& closure=py::object()); py::object collect(); - py::object take(const int64_t numRows); + py::object take(const int64_t topLimit, const int64_t bottomLimit); void show(const int64_t numRows=-1); // DataFrame like operations diff --git a/tuplex/python/src/PythonDataSet.cc b/tuplex/python/src/PythonDataSet.cc index c6c69e76b..3946bb7b5 100644 --- a/tuplex/python/src/PythonDataSet.cc +++ b/tuplex/python/src/PythonDataSet.cc @@ -107,7 +107,7 @@ namespace tuplex { } } - py::object PythonDataSet::take(const int64_t numRows) { + py::object PythonDataSet::take(const int64_t topLimit, const int64_t bottomLimit) { // make sure a dataset is wrapped assert(this->_dataset); @@ -129,8 +129,23 @@ namespace tuplex { std::shared_ptr rs; std::string err_message = ""; + + size_t castedTopLimit = 0; + if (topLimit < 0) { + castedTopLimit = std::numeric_limits::max(); + } else { + castedTopLimit = topLimit; + } + + size_t castedBottomLimit = 0; + if (bottomLimit < 0) { + castedBottomLimit = std::numeric_limits::max(); + } else { + castedBottomLimit = bottomLimit; + } + try { - rs = _dataset->take(numRows, ss); + rs = _dataset->take(castedTopLimit, castedBottomLimit, ss); if(!rs) throw std::runtime_error("invalid result set"); // if there are more than 1 million (100k in debug mode) elements print message... @@ -162,7 +177,7 @@ namespace tuplex { // new version, directly interact with the interpreter Timer timer; // build python list object from resultset - auto listObj = resultSetToCPython(rs.get(), numRows); + auto listObj = resultSetToCPython(rs.get(), rs->rowCount()); Logger::instance().logger("python").info("Data transfer back to python took " + std::to_string(timer.time()) + " seconds"); // Logger::instance().flushAll(); diff --git a/tuplex/python/tuplex/dataset.py b/tuplex/python/tuplex/dataset.py index fecd257ee..a43bdf4a9 100644 --- a/tuplex/python/tuplex/dataset.py +++ b/tuplex/python/tuplex/dataset.py @@ -19,6 +19,7 @@ from tuplex.utils.framework import UDFCodeExtractionError from tuplex.utils.source_vault import SourceVault from .exceptions import classToExceptionCode +import tuplex.utils.table_format as table_format # signed 64bit limit max_rows = 9223372036854775807 @@ -28,6 +29,12 @@ class DataSet: def __init__(self): self._dataSet = None + def _repr_html_(self): + return self.showHTMLPreview() + + def __repr__(self): + return self.showStrPreview() + def unique(self): """ removes duplicates from Dataset (out-of-order). Equivalent to a DISTINCT clause in a SQL-statement. Returns: @@ -109,22 +116,49 @@ def collect(self): assert self._dataSet is not None, 'internal API error, datasets must be created via context objects' return self._dataSet.collect() - def take(self, nrows=5): - """ action that generates a physical plan, processes data and collects the top results then as list of tuples. + def take(self, limitTop=5, limitBottom=0): + """ action that generates a physical plan, processes data and collects the top and bottom results + then as list of tuples. Args: - nrows (int): number of rows to collect. Per default ``5``. + limitTop (int): number of top rows to collect. Per default ``5``. + limitBottom (int): number of bottom rows to collect. Per default ``0``. Returns: (list): A list of tuples """ + assert limitTop is None or isinstance(limitTop, int), 'num rows must be an integer or None' + assert limitBottom is None or isinstance(limitBottom, int), 'num bottom last must be an integer or None' - assert isinstance(nrows, int), 'num rows must be an integer' - assert nrows > 0, 'please specify a number greater than zero' + if limitTop is None or limitTop < 0: + limitTop = -1 + + if limitBottom is None or limitBottom < 0: + limitBottom = -1 assert self._dataSet is not None, 'internal API error, datasets must be created via context objects' - return self._dataSet.take(nrows) + return self._dataSet.take(limitTop, limitBottom) + + def head(self, nrows): + """ action that generates a physical plan, processes data and collects the top results then as list of tuples. + + Args: + nrows (int): number of rows to collect. + Returns: + (list): A list of tuples + """ + return self.take(nrows, 0) + + def tail(self, nrows): + """ action that generates a physical plan, processes data and collects the bottom results then as list of tuples. + + Args: + nrows (int): number of rows to collect. + Returns: + (list): A list of tuples + """ + return self.take(0, nrows) def show(self, nrows=None): """ action that generates a physical plan, processes data and prints results as nicely formatted @@ -142,6 +176,180 @@ def show(self, nrows=None): self._dataSet.show(nrows) + def showHTMLPreview(self, topLimit=5, bottomLimit=5): + """ action that generates a physical plan, processes data and return a subset of results as nicely formatted + HTML table to stdout. + + Args: + topLimit (int): number of top rows to collect. If ``None`` all rows will be collected + bottomLimit (int): number of bottom rows to collect. If ``None`` all rows will be collected + + Returns: + string: an HTML table showing a preview of the data + """ + HTML_TEMPLATE = ( + "
\n" + "\n" + "\n" + " \n" + " \n" + "{}" + " \n" + " \n" + " \n" + "{}" + " \n" + "
\n" + "
") + + assert self._dataSet is not None, 'internal API error, datasets must be created via context objects' + + rows = self.take(topLimit, bottomLimit) + + if len(rows) == 0: + return HTML_TEMPLATE.format("\n", "\n", 0) + + assert topLimit == -1 or bottomLimit == -1 or len(rows) <= topLimit + bottomLimit + + headers_str = "" + body = "" + num_columns = None + + # construct tables + if len(rows) < topLimit + bottomLimit: + # the data is small so we get everything (no need to render ...) + i = 0 + for r in rows: + if i == 0: + # we set num columns based on the first row + num_columns = len(r) if isinstance(r, list) or isinstance(r, tuple) else 1 + body += table_format.getHTMLRow(i, r) + i += 1 + else: + # some data is not processed because of limiting + i = 0 + for r in rows: + if i >= topLimit: + break + if i == 0: + # we set num columns based on the first row + num_columns = len(r) if isinstance(r, list) or isinstance(r, tuple) else 1 + + body += table_format.getHTMLRow(i, r) + i += 1 + + # add the ... + body += " \n" + body += " ...\n" + for _ in range(num_columns): + body += " ...\n" + body += " \n" + + for j in range(i, len(rows)): + body += table_format.getHTMLRow(len(rows) - j, rows[j]) + + assert num_columns is not None + + # construct headers + column_names = self._dataSet.columns() + headers_str += " \n" + if len(column_names) > 0: + assert (num_columns == len(column_names)) + for c_name in column_names: + headers_str += " {}\n".format(c_name) + else: + # default to generic name if column name doesn't exist + for i in range(num_columns): + headers_str += " Column {}\n".format(i) + + return HTML_TEMPLATE.format(headers_str, body) + + def showStrPreview(self, topLimit=5, bottomLimit=5): + """ action that generates a physical plan, processes data and return a subset of results as nicely formatted + ASCII table to stdout. + + Args: + topLimit (int): number of top rows to collect. If ``None`` all rows will be collected + bottomLimit (int): number of bottom rows to collect. If ``None`` all rows will be collected + + Returns: + string: an HTML table showing a preview of the data + """ + assert self._dataSet is not None, 'internal API error, datasets must be created via context objects' + + rows = self.take(topLimit, bottomLimit) + + if len(rows) == 0: + return ( + "---\n" + "| |\n" + "---\n" + "0 columns\n") + + assert topLimit == -1 or bottomLimit == -1 or len(rows) <= topLimit + bottomLimit + + str_table = [] + num_columns = None + + # construct tables + if len(rows) < topLimit + bottomLimit: + # the data is small so we get everything (no need to render ...) + i = 0 + for r in rows: + if i == 0: + # we set num columns based on the first row + num_columns = len(r) if isinstance(r, list) or isinstance(r, tuple) else 1 + str_table.append(table_format.getStrTableRow(i, r)) + i += 1 + else: + # some data is not processed because of limiting + i = 0 + for r in rows: + if i >= topLimit: + break + if i == 0: + # we set num columns based on the first row + num_columns = len(r) if isinstance(r, list) or isinstance(r, tuple) else 1 + + str_table.append(table_format.getStrTableRow(i, r)) + i += 1 + + # add the ... + str_table.append(["..."] * (num_columns + 1)) + + for j in range(i, len(rows)): + str_table.append(table_format.getStrTableRow(len(rows) - j, rows[j])) + + assert num_columns is not None + + # construct headers + column_names = self._dataSet.columns() + headers_list = [""] + if len(column_names) > 0: + assert (num_columns == len(column_names)) + for c_name in column_names: + headers_list.append("{}".format(c_name)) + else: + # default to generic name if column name doesn't exist + for i in range(num_columns): + headers_list.append("Column {}".format(i)) + + str_table = [headers_list] + str_table + + return table_format.generateStrTable(num_columns + 1, str_table) + def resolve(self, eclass, ftor): """ Adds a resolver operator to the pipeline. The signature of ftor needs to be identical to the one of the preceding operator. diff --git a/tuplex/python/tuplex/utils/table_format.py b/tuplex/python/tuplex/utils/table_format.py new file mode 100644 index 000000000..7bc8dd9d9 --- /dev/null +++ b/tuplex/python/tuplex/utils/table_format.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# ----------------------------------------------------------------------------------------------------------------------# +# # +# Tuplex: Blazing Fast Python Data Science # +# # +# # +# (c) 2017 - 2021, Tuplex team # +# Created by March Boonyapaluk first on 4/19/2022 # +# License: Apache 2.0 # +# ----------------------------------------------------------------------------------------------------------------------# + +def getHTMLRow(ind, row): + """ + Given a row, converts all the contents to an HTML row and return + :param ind: the index of that row + :param row: a row output from dataset + :return: an HTML row, representative of the row + """ + row_str = "" + row_str += " \n" + row_str += " {}\n".format(ind) + if isinstance(row, list) or isinstance(row, tuple): + for col in row: + row_str += " {}\n".format(col) + else: + row_str += " {}\n".format(row) + row_str += " \n" + return row_str + + +def getStrTableRow(ind, row): + """ + Given a row, converts all the contents to string and return + :param ind: the index of that row + :param row: a row output from dataset + :return: a list of string, representative of the row + """ + row_str_list = ["{}".format(ind)] + if isinstance(row, list) or isinstance(row, tuple): + for col in row: + row_str_list.append("{}".format(col)) + else: + row_str_list.append("{}".format(row)) + return row_str_list + + +def _getLineDivider(col_width): + out = "" + for w in col_width: + out += "+" + ("-" * (w + 2)) + out += "+\n" + + return out + +def generateStrTable(numCols, strTable): + """ + Given a 2-dimensional list of strings, print a nicely formatted table of the contents in the list + :param numCols: number of columns in the table + :param strTable: 2-dimensional list of strings, as list of list + :return: a nicely formatted table in string + """ + max_col_width = [0] * numCols + + for r in strTable: + for i in range(0, len(r)): + assert (isinstance(r[i], str)) + if len(r[i]) > max_col_width[i]: + max_col_width[i] = len(r[i]) + + output_str = "" + + for r in strTable: + output_str += _getLineDivider(max_col_width) + for i in range(0, len(r)): + output_str += "| {:<{width}} ".format(r[i], width=max_col_width[i]) + output_str += "|\n" + + return output_str diff --git a/tuplex/test/CMakeLists.txt b/tuplex/test/CMakeLists.txt index 3f3721780..f76d2f0af 100755 --- a/tuplex/test/CMakeLists.txt +++ b/tuplex/test/CMakeLists.txt @@ -19,9 +19,9 @@ if (Python3_FOUND) # check that cloudpickle is installed via import set(cmd -c "import cloudpickle") execute_process(COMMAND ${Python3_EXECUTABLE} ${cmd} RESULT_VARIABLE ret) - if(NOT "${ret}" STREQUAL "0") + if (NOT "${ret}" STREQUAL "0") message(FATAL_ERROR "Could not find cloudpickle module, please install via pip3 install cloudpickle.") - endif() + endif () # check that numpy is installed too for testing purposes... set(cmd -c "import numpy") diff --git a/tuplex/test/core/ContextBasics.cc b/tuplex/test/core/ContextBasics.cc index fdbdd8d50..e85107b40 100644 --- a/tuplex/test/core/ContextBasics.cc +++ b/tuplex/test/core/ContextBasics.cc @@ -136,4 +136,56 @@ TEST_F(ContextBasicsTest, JSON) { auto str = ContextOptions::defaults().asJSON(); EXPECT_GT(str.length(), 2); +} + +TEST_F(ContextBasicsTest, twoContextTest) { + using namespace tuplex; + + python::initInterpreter(); + python::unlockGIL(); + + ContextOptions co = testOptions(); + co.set("tuplex.partitionSize", "100B"); + co.set("tuplex.executorMemory", "1MB"); + co.set("tuplex.scratchDir", scratchDir + "/context1"); + + // second context with different executor config, should cause the driver to split up + ContextOptions co2 = testOptions(); + co.set("tuplex.partitionSize", "100B"); + co2.set("tuplex.executorMemory", "2MB"); + co2.set("tuplex.scratchDir", scratchDir + "/context2"); + + Context c1(co); + Context c2(co2); + Row row1(Tuple(0), Tuple("hello")); + Row row2(Tuple(1), Tuple("this")); + Row row3(Tuple(2), Tuple("is")); + Row row4(Tuple(3), Tuple("a")); + Row row5(Tuple(4), Tuple("test")); + + for (int t = 0; t < 10; t++) { + auto ds1 = c1.parallelize({row1, row2, row3, row4, row5}) + .map(UDF("lambda x: x[1][0]")); // new code: string index operator! first to raise an exception! + + auto ds2 = c2.parallelize({row1, row2, row3, row4, row5}) + .map(UDF("lambda x: x[1][0]")); // new code: string index operator! first to raise an exception! + + auto v1 = ds1.collectAsVector(); + auto v2 = ds2.collectAsVector(); + + std::vector ref{"hello", "this", "is", "a", "test"}; + + EXPECT_EQ(v1.size(), 5); + for (int i = 0; i < 5; i++) { + EXPECT_EQ(v1[i].getString(0), ref[i]); + } + + EXPECT_EQ(v2.size(), 5); + for (int i = 0; i < 5; i++) { + EXPECT_EQ(v2[i].getString(0), ref[i]); + } + } + + python::lockGIL(); + python::closeInterpreter(); } \ No newline at end of file diff --git a/tuplex/test/core/ResultSetTest.cc b/tuplex/test/core/ResultSetTest.cc index 4aedd0649..334c4fd9b 100644 --- a/tuplex/test/core/ResultSetTest.cc +++ b/tuplex/test/core/ResultSetTest.cc @@ -14,7 +14,7 @@ class ResultSetTest : public PyTest { protected: - tuplex::Executor *driver; + std::shared_ptr driver; tuplex::ContextOptions options; public: // init function @@ -45,7 +45,8 @@ class ResultSetTest : public PyTest { EXPECT_EQ(r.getRowType(), first_type); // now write via partition writer - tuplex::PartitionWriter pw(driver, Schema(Schema::MemoryLayout::ROW, first_type), 0, 0, options.PARTITION_SIZE()); + tuplex::PartitionWriter pw(driver.get(), Schema(Schema::MemoryLayout::ROW, first_type), 0, 0, + options.PARTITION_SIZE()); for(const auto& r : rows) pw.writeRow(r); return pw.getOutputPartitions(); diff --git a/tuplex/test/core/TakeTest.cc b/tuplex/test/core/TakeTest.cc new file mode 100644 index 000000000..2c7a1e067 --- /dev/null +++ b/tuplex/test/core/TakeTest.cc @@ -0,0 +1,387 @@ +//--------------------------------------------------------------------------------------------------------------------// +// // +// Tuplex: Blazing Fast Python Data Science // +// // +// // +// (c) 2017 - 2021, Tuplex team // +// Created by March Boonyapaluk first on 4/19/2021 // +// License: Apache 2.0 // +//--------------------------------------------------------------------------------------------------------------------// + +#include + +#include +#include "TestUtils.h" + +using namespace tuplex; +using namespace std; + +class TakeTest : public PyTest { +}; + + +struct TakeTestConfig { + size_t data_size; + size_t top_limit; + size_t bottom_limit; + string partition_sizes; +}; + +/** + * Generate a predefine list of test scenarios composing of different data size and limit values + */ +vector generateTakeTestCfgs() { + std::vector testCfgs; + + // generate exhaustive test for small values + const std::vector small_test_size{1, 10}; + const std::vector small_limit_values{0, 1, 5, 11}; + for (auto data_size: small_test_size) { + for (auto top_limit: small_limit_values) { + for (auto bottom_limit: small_limit_values) { + testCfgs.push_back({data_size, top_limit, bottom_limit, "256B"}); + } + } + } + + // add pre-defined bigger cases + testCfgs.push_back({1000, 600, 0, "256B"}); + testCfgs.push_back({1000, 600, 600, "256B"}); + testCfgs.push_back({1000, 11, 600, "512KB"}); + + testCfgs.push_back({10001, 600, 1001, "256B"}); + testCfgs.push_back({10001, 600, 1001, "512KB"}); + testCfgs.push_back({10001, 600, 1001, "1MB"}); + + testCfgs.push_back({10001, 5000, 4950, "256B"}); + testCfgs.push_back({10001, 5000, 4950, "512KB"}); + testCfgs.push_back({10001, 5000, 4950, "1MB"}); + + return testCfgs; +} + +/** + * partition test into different partition sizes to avoid reinitializing the same context multiple times + */ +map> splitCfgsByPartitionSize(const std::vector &testCfgs) { + map> mp; + for (const auto &cfg: testCfgs) { + mp[cfg.partition_sizes].push_back(cfg); + } + return mp; +} + +/** + * Randomly generate a vector of rows for testing + * @param N the size of vector + * @return a vector of size N, containing the random data + */ +vector generateTestData(size_t N, uint64_t seed) { + mt19937 gen(seed); //Standard mersenne_twister_engine seeded with rd() + uniform_int_distribution<> distrib(1, 100000000); + + vector data; + data.reserve(N); + + for (int i = 0; i < N; i++) { + data.emplace_back(distrib(gen), distrib(gen), distrib(gen)); + } + + return data; +} + +vector generateReferenceData(const vector &input, size_t topLimit, size_t bottomLimit) { + vector output; + for (size_t i = 0; i < topLimit && i < input.size(); i++) { + output.push_back(input[i]); + } + size_t start_bottom = input.size() >= bottomLimit ? input.size() - bottomLimit : 0; + start_bottom = max(topLimit, start_bottom); + + for (size_t i = start_bottom; i < input.size(); i++) { + output.push_back(input[i]); + } + + return output; +} + +TEST_F(TakeTest, takeTopTest) { + auto opt = testOptions(); + Context context(opt); + + auto rs = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(1, 0); + + ASSERT_EQ(rs->rowCount(), 1); + auto v = rs->getRows(1); + + EXPECT_EQ(v[0].getInt(0), 1); + + auto rs2 = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(3, 0); + + ASSERT_EQ(rs2->rowCount(), 3); + auto v2 = rs2->getRows(3); + + EXPECT_EQ(v2[0].getInt(0), 1); + EXPECT_EQ(v2[1].getInt(0), 2); + EXPECT_EQ(v2[2].getInt(0), 3); + + auto rs3 = context.parallelize( + {Row("hello"), Row("world"), Row("! :)"), Row("world"), Row("hello"), Row("!"), Row("! :)"), + Row("!")}).take(5, 0); + + ASSERT_EQ(rs3->rowCount(), 5); + auto v3 = rs3->getRows(5); + + EXPECT_EQ(v3[0].getString(0), "hello"); + EXPECT_EQ(v3[1].getString(0), "world"); + EXPECT_EQ(v3[2].getString(0), "! :)"); + EXPECT_EQ(v3[3].getString(0), "world"); + EXPECT_EQ(v3[4].getString(0), "hello"); + +} + +TEST_F(TakeTest, takeBottomTest) { + auto opt = testOptions(); + Context context(opt); + + auto rs = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(0, 1); + + ASSERT_EQ(rs->rowCount(), 1); + auto v = rs->getRows(1); + + EXPECT_EQ(v[0].getInt(0), 6); + + auto rs2 = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(0, 3); + + ASSERT_EQ(rs2->rowCount(), 3); + auto v2 = rs2->getRows(3); + + EXPECT_EQ(v2[0].getInt(0), 4); + EXPECT_EQ(v2[1].getInt(0), 5); + EXPECT_EQ(v2[2].getInt(0), 6); + + auto rs3 = context.parallelize( + {Row("hello"), Row("world"), Row("! :)"), Row("world"), Row("hello"), Row("!"), Row("! :)"), + Row("!")}).take(0, 5); + + ASSERT_EQ(rs3->rowCount(), 5); + auto v3 = rs3->getRows(5); + + EXPECT_EQ(v3[0].getString(0), "world"); + EXPECT_EQ(v3[1].getString(0), "hello"); + EXPECT_EQ(v3[2].getString(0), "!"); + EXPECT_EQ(v3[3].getString(0), "! :)"); + EXPECT_EQ(v3[4].getString(0), "!"); + +} + +TEST_F(TakeTest, takeBothTest) { + auto opt = testOptions(); + Context context(opt); + + auto rs = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(1, 1); + + ASSERT_EQ(rs->rowCount(), 2); + auto v = rs->getRows(2); + + EXPECT_EQ(v[0].getInt(0), 1); + EXPECT_EQ(v[1].getInt(0), 6); + + auto rs2 = context.parallelize( + {Row(1), Row(2), Row(3), Row(4), Row(5), Row(6)}).take(2, 1); + + ASSERT_EQ(rs2->rowCount(), 3); + auto v2 = rs2->getRows(3); + + EXPECT_EQ(v2[0].getInt(0), 1); + EXPECT_EQ(v2[1].getInt(0), 2); + EXPECT_EQ(v2[2].getInt(0), 6); + + auto rs3 = context.parallelize( + {Row("hello"), Row("world"), Row("! :)"), Row("world"), Row("hello"), Row("!"), Row("! :)"), + Row("!")}).take(2, 3); + + ASSERT_EQ(rs3->rowCount(), 5); + auto v3 = rs3->getRows(5); + + EXPECT_EQ(v3[0].getString(0), "hello"); + EXPECT_EQ(v3[1].getString(0), "world"); + EXPECT_EQ(v3[2].getString(0), "!"); + EXPECT_EQ(v3[3].getString(0), "! :)"); + EXPECT_EQ(v3[4].getString(0), "!"); +} + +TEST_F(TakeTest, takeBigTest) { + mt19937 data_seed_gen(4242); + + auto testCfgs = generateTakeTestCfgs(); + auto partitionedCfgs = splitCfgsByPartitionSize(testCfgs); + + for (const auto &cfg_pair: partitionedCfgs) { + auto opt = testOptions(); + opt.set("tuplex.partitionSize", cfg_pair.first); + Context context(opt); + + for (const auto &cfg: cfg_pair.second) { + std::cout << "testing with partition size:" << cfg.partition_sizes << " data size:" + << cfg.data_size << " top:" << cfg.top_limit << " bottom:" << cfg.bottom_limit << std::endl; + + auto data = generateTestData(cfg.data_size, data_seed_gen()); + auto ref_data = generateReferenceData(data, cfg.top_limit, cfg.bottom_limit); + + auto res = context.parallelize(data).take(cfg.top_limit, cfg.bottom_limit); + ASSERT_EQ(ref_data.size(), res->rowCount()); + for (Row &r: ref_data) { + Row res_row = res->getNextRow(); + if (!(res_row == r)) { + ASSERT_EQ(res_row, r); + } + } + } + } +} + +vector generateMapFilterReferenceData(const vector &input, size_t topLimit, size_t bottomLimit) { + if (input.empty()) { + return {}; + } + + assert(input[0].getNumColumns() == 3); + vector intermediate; + for (const Row &r: input) { + int64_t new_a = r.getInt(0) + r.getInt(1); + + if (new_a % 2 == 0) { + intermediate.emplace_back(new_a, r.getInt(2)); + } + } + + return generateReferenceData(intermediate, topLimit, bottomLimit); +} + +TEST_F(TakeTest, takeMapFilterTest) { + mt19937 data_seed_gen(56120); + + auto testCfgs = generateTakeTestCfgs(); + auto partitionedCfgs = splitCfgsByPartitionSize(testCfgs); + + UDF map_udf("lambda a, b, c: ((a + b), c)"); + UDF filter_udf("lambda a, b: a % 2 == 0"); + + for (const auto &cfg_pair: partitionedCfgs) { + auto opt = testOptions(); + opt.set("tuplex.partitionSize", cfg_pair.first); + Context context(opt); + + for (const auto &cfg: cfg_pair.second) { + std::cout << "testing with partition size:" << cfg.partition_sizes << " data size:" + << cfg.data_size << " top:" << cfg.top_limit << " bottom:" << cfg.bottom_limit << std::endl; + + auto data = generateTestData(cfg.data_size, data_seed_gen()); + auto ref_data = generateMapFilterReferenceData(data, cfg.top_limit, cfg.bottom_limit); + + auto ds = context.parallelize(data).map(map_udf).filter(filter_udf); + auto res = ds.take(cfg.top_limit, cfg.bottom_limit); + ASSERT_EQ(ref_data.size(), res->rowCount()); + for (Row &r: ref_data) { + Row res_row = res->getNextRow(); + if (!(res_row == r)) { + ASSERT_EQ(res_row, r); + } + } + } + } +} + +TEST_F(TakeTest, collectIdentityTest) { + mt19937 data_seed_gen(123454); + + const std::vector test_size{1, 10, 1000, 10001}; + const std::vector partition_sizes{"256B", "512KB", "1MB"}; + + for (auto &part_size: partition_sizes) { + auto opt = testOptions(); + opt.set("tuplex.partitionSize", part_size); + Context context(opt); + + for (auto data_size: test_size) { + auto data = generateTestData(data_size, data_seed_gen()); + auto res = context.parallelize(data).collect(); + ASSERT_EQ(data.size(), res->rowCount()); + for (Row &r: data) { + Row res_row = res->getNextRow(); + if (!(res_row == r)) { + ASSERT_EQ(res_row, r); + } + } + } + } +} + +TEST_F(TakeTest, fileInputTest) { + const std::vector test_size{1, 1001, 10001}; + const std::vector limit_values{0, 1, 600, 5000}; + const std::vector partition_sizes{"256B", "1MB"}; + std::vector> expected_outputs; + + if (!boost::filesystem::exists(scratchDir)) { + boost::filesystem::create_directory(scratchDir); + } + + std::vector fileInputNames; + for (unsigned long N: test_size) { + std::vector ref_output; + // write temp file + auto fName = fmt::format("{}/{}-{}.csv", scratchDir, testName, N); + + FILE *fp = fopen(fName.c_str(), "w"); + ASSERT_TRUE(fp); + fprintf(fp, "colA,colStr,colB\n"); + for (int i = 0; i < N; ++i) { + fprintf(fp, "%d,\"hello%d\",%d\n", i, (i * 3) % 7, i % 15); + ref_output.emplace_back(i, fmt::format("hello{}", (i * 3) % 7), (i % 15) * (i % 15)); + } + fclose(fp); + + expected_outputs.push_back(std::move(ref_output)); + fileInputNames.push_back(fName); + } + + ASSERT_TRUE(expected_outputs.size() == test_size.size()); + ASSERT_TRUE(fileInputNames.size() == test_size.size()); + + for (auto &part_size: partition_sizes) { + auto opt = microTestOptions(); + opt.set("tuplex.partitionSize", part_size); + Context context(opt); + + for (int t = 0; t < test_size.size(); t++) { + const size_t data_size = test_size[t]; + + for (auto top_limit: limit_values) { + for (auto bottom_limit: limit_values) { + std::cout << "file testing with partition size:" << part_size << " data size:" + << data_size << " top:" << top_limit << " bottom:" << bottom_limit << std::endl; + + auto ref_output = generateReferenceData(expected_outputs[t], top_limit, bottom_limit); + auto res = context.csv(fileInputNames[t]) + .mapColumn("colB", UDF("lambda x: x * x")) + .take(top_limit, bottom_limit); + + ASSERT_EQ(ref_output.size(), res->rowCount()); + for (Row &r: ref_output) { + Row res_row = res->getNextRow(); + ASSERT_EQ(res_row.getInt(0), r.getInt(0)); + ASSERT_EQ(res_row.getString(1), r.getString(1)); + ASSERT_EQ(res_row.getInt(2), r.getInt(2)); + } + } + } + } + } +} \ No newline at end of file diff --git a/tuplex/test/wrappers/WrapperTest.cc b/tuplex/test/wrappers/WrapperTest.cc index d3b602ca8..32ca2e6a8 100644 --- a/tuplex/test/wrappers/WrapperTest.cc +++ b/tuplex/test/wrappers/WrapperTest.cc @@ -67,7 +67,7 @@ TEST_F(WrapperTest, LambdaBackend) { // PyTuple_SET_ITEM(tupleObj1, 1, python::PyString_FromString("a")); // PyList_SetItem(listObj, 0, tupleObj1); - + // { // need to keep curly braces (for weird memory errors) // auto list = py::reinterpret_borrow(listObj); // // add parallelize-map-collect @@ -126,7 +126,7 @@ TEST_F(WrapperTest, MathIsInf) { // list object contains all rows in test (in this test, only one row) PyObject *listObj = PyList_New(1); - + // initialize listObj // note that using runAndGet on each individual value, and then setting // them as an element of the list is buggy (doesn't always return the right value) @@ -2631,7 +2631,7 @@ TEST_F(WrapperTest, PartitionRelease) { cols_to_select = PyList_New(1); PyList_SET_ITEM(cols_to_select, 0, python::PyString_FromString("Incident Zip")); - ctx2.csv(service_path,py::none(), true, false, "", "\"", + ctx2.csv(service_path, py::none(), true, false, "", "\"", py::none(), py::reinterpret_steal(type_dict)) .mapColumn("Incident Zip", fix_zip_codes_c, "") .selectColumns(py::reinterpret_steal(cols_to_select)) @@ -2855,6 +2855,53 @@ TEST_F(WrapperTest, DoubleCollect) { EXPECT_EQ(PyList_Size(resObj), 2); } } +TEST_F(WrapperTest, ResultWithLimitMerge) { + using namespace tuplex; + + auto ctx_opts = "{\"webui.enable\": false," + " \"driverMemory\": \"8MB\"," + " \"partitionSize\": \"256KB\"," + "\"executorCount\": 0," + "\"tuplex.scratchDir\": \"file://" + scratchDir + "\"," + "\"resolveWithInterpreterOnly\": true}"; + + PythonContext c("c", "", ctx_opts); + + PyObject *listObj = PyList_New(4); + PyObject *tupleObj1 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj1, 0, PyLong_FromLong(1)); + PyTuple_SET_ITEM(tupleObj1, 1, python::PyString_FromString("a")); + + PyObject *tupleObj2 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj2, 0, PyLong_FromLong(2)); + PyTuple_SET_ITEM(tupleObj2, 1, python::PyString_FromString("b")); + + + PyObject *tupleObj3 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj3, 0, PyLong_FromLong(3)); + PyTuple_SET_ITEM(tupleObj3, 1, PyLong_FromLong(42)); + + + PyObject *tupleObj4 = PyTuple_New(2); + PyTuple_SET_ITEM(tupleObj4, 0, PyLong_FromLong(4)); + PyTuple_SET_ITEM(tupleObj4, 1, python::PyString_FromString("d")); + + PyList_SetItem(listObj, 0, tupleObj1); + PyList_SetItem(listObj, 1, tupleObj2); + PyList_SetItem(listObj, 2, tupleObj3); + PyList_SetItem(listObj, 3, tupleObj4); + + { + auto list = py::reinterpret_borrow(listObj); + auto res = c.parallelize(list).filter("lambda a, b: a > 1", "").take(1, 0); + auto resObj = res.ptr(); + + ASSERT_TRUE(PyList_Check(resObj)); + ASSERT_EQ(PyList_GET_SIZE(resObj), 1); + + PyObject_Print(resObj, stdout, 0); + } +} //// debug any python module... ///** Takes a path and adds it to sys.paths by calling PyRun_SimpleString. diff --git a/tuplex/utils/include/mt/ITask.h b/tuplex/utils/include/mt/ITask.h index 8434896a7..6c85d2d36 100644 --- a/tuplex/utils/include/mt/ITask.h +++ b/tuplex/utils/include/mt/ITask.h @@ -53,9 +53,10 @@ namespace tuplex { void setOrder(size_t order) { _orderNumbers = std::vector{order}; } -// size_t getOrder(const size_t nth = 0) const { -// return _orderNumbers[nth]; -// } + size_t getOrder(size_t nth) const { + return _orderNumbers[nth]; + } + std::vector getOrder() const { return _orderNumbers; } void setOrder(const std::vector& order) {