Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Better HTML preview of datasets #116

Draft
wants to merge 63 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
1f1d7bb
Modify dataset
KorlaMarch Jan 22, 2022
0b1e767
Add in takeLast operator
KorlaMarch Jan 28, 2022
eafb76d
(wip) add reverse limit in partition
KorlaMarch Jan 28, 2022
cb47a4d
Remove row count
KorlaMarch Feb 11, 2022
d879bcd
refactor TakeOperator
KorlaMarch Feb 16, 2022
a6f31dd
Add unit tests
KorlaMarch Feb 16, 2022
89cee2e
add bottom limit to transform stage (wip)
KorlaMarch Feb 16, 2022
07b87fd
more physical stage update (wip)
KorlaMarch Feb 25, 2022
b2beb88
Quick push
KorlaMarch Mar 4, 2022
3e1d243
Rework LocalBackend and TransformTask to support top and bottom limit
KorlaMarch Mar 9, 2022
3bf283f
Address Review Comments
KorlaMarch Mar 25, 2022
fb90aef
Address Review Comments (2)
KorlaMarch Mar 25, 2022
cb40313
Debugging Tests
KorlaMarch Mar 25, 2022
517f2fc
Change definition of take all
KorlaMarch Apr 7, 2022
c33fc23
Random take test with some debugging
KorlaMarch Apr 7, 2022
38d9ca9
Polish the python interface
KorlaMarch Apr 8, 2022
1f5ff59
Address PR comments
KorlaMarch Apr 8, 2022
ac4c600
Add two more testcases
KorlaMarch Apr 8, 2022
56131a7
Address PR feedbacks
KorlaMarch Apr 15, 2022
2005458
Add file testcases
KorlaMarch Apr 15, 2022
41b04a7
Python Dataset Debug
KorlaMarch Apr 20, 2022
fc751f1
Remove showHTMLPreview from Dataset in C++
KorlaMarch Apr 20, 2022
6b5c692
Separate out partition utils
KorlaMarch Apr 20, 2022
a072e40
Fix Azure pipeline failing
KorlaMarch Apr 20, 2022
5a1a342
Modify dataset
KorlaMarch Jan 22, 2022
b68b4a1
Add in takeLast operator
KorlaMarch Jan 28, 2022
02b51aa
(wip) add reverse limit in partition
KorlaMarch Jan 28, 2022
a721e0f
Remove row count
KorlaMarch Feb 11, 2022
6955392
refactor TakeOperator
KorlaMarch Feb 16, 2022
7fa6b17
Add unit tests
KorlaMarch Feb 16, 2022
c78a637
add bottom limit to transform stage (wip)
KorlaMarch Feb 16, 2022
5628d27
more physical stage update (wip)
KorlaMarch Feb 25, 2022
a506d88
Rework LocalBackend and TransformTask to support top and bottom limit
KorlaMarch Mar 9, 2022
26ed614
Address Review Comments
KorlaMarch Mar 25, 2022
2cdd269
Debugging Tests
KorlaMarch Mar 25, 2022
c203de4
Change definition of take all
KorlaMarch Apr 7, 2022
664cd14
Random take test with some debugging
KorlaMarch Apr 7, 2022
5048a9b
Polish the python interface
KorlaMarch Apr 8, 2022
3658d84
Address PR comments
KorlaMarch Apr 8, 2022
72b6580
Add file testcases
KorlaMarch Apr 15, 2022
172d6b5
Python Dataset Debug
KorlaMarch Apr 20, 2022
a2d4178
Remove showHTMLPreview from Dataset in C++
KorlaMarch Apr 20, 2022
993937d
Separate out partition utils
KorlaMarch Apr 20, 2022
6f528f8
Fix Azure pipeline failing
KorlaMarch Apr 20, 2022
4a328cc
Merge branch 'jupyter_html' of github.com:KorlaMarch/tuplex into jupy…
LeonhardFS Apr 20, 2022
816567f
Minor Debug in Python lib
KorlaMarch Apr 20, 2022
4b2e2af
Remove column counts
KorlaMarch Apr 22, 2022
01e12ba
Merge branch 'tuplex:master' into jupyter_html
KorlaMarch Apr 27, 2022
a935f1e
Fix CI not running core tests
KorlaMarch May 11, 2022
359ffed
Speed up tests
KorlaMarch May 13, 2022
9fc4382
Merge branch 'master' into jupyter_html
KorlaMarch May 18, 2022
d36e4b2
Disable the limit merge test
KorlaMarch May 18, 2022
2ed45d5
Fix the wrapper test
KorlaMarch May 18, 2022
588382a
Merge branch 'jupyter_html' of github.com:KorlaMarch/tuplex into jupy…
LeonhardFS May 20, 2022
28a9064
Merge branch 'master' of github.com:tuplex/tuplex into jupyter_html
LeonhardFS May 20, 2022
0530819
Fix Typo (and rerun CI)
KorlaMarch May 20, 2022
2099d7a
Add logging after load and transform task
KorlaMarch May 20, 2022
809b87d
Fix missing completed work issue
KorlaMarch May 20, 2022
1088bc3
Merge branch 'master' into jupyter_html
KorlaMarch May 20, 2022
bfb56a3
Resolve merge conflict
KorlaMarch May 20, 2022
71e0fe5
Resolve merge conflict
KorlaMarch May 27, 2022
ee5ff30
update partition grouping when trim partitions
KorlaMarch May 27, 2022
6a1f052
merging in march's branch
LeonhardFS Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 34 additions & 7 deletions tuplex/core/include/DataSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ namespace tuplex {

/*!
* action that displays tuples as nicely formatted table
* @param numRows how many rows to print, i.e. top numRows are printed.xs
* @param numRows how many rows to print, i.e. top numRows are printed.xs, -1 means print all rows
* @param os ostream where to print table to
*/
virtual void show(const int64_t numRows = -1, std::ostream &os = std::cout);
virtual void show(int64_t numRows = -1, std::ostream &os = std::cout);

// named dataset management functions
/*!
Expand Down Expand Up @@ -252,22 +252,49 @@ namespace tuplex {
* @param memoryLayout
* @return
*/
virtual DataSet& cache(const Schema::MemoryLayout& memoryLayout, bool storeSpecialized);
DataSet& cache(bool storeSpecialized=true) { return cache(Schema::MemoryLayout::ROW, storeSpecialized); }
virtual DataSet &cache(const Schema::MemoryLayout &memoryLayout, bool storeSpecialized);

DataSet &cache(bool storeSpecialized = true) { return cache(Schema::MemoryLayout::ROW, storeSpecialized); }

/*!
* helper setter without checks, to update internal column names.
*/
void setColumns(const std::vector<std::string> &columnNames) { _columnNames = columnNames; }

// these are actions that cause execution
/*!
* Execute the pipeline and return all outputs
* @param os the logging output
* @return the output of the execution
*/
virtual std::shared_ptr<ResultSet> collect(std::ostream &os = std::cout);

virtual std::shared_ptr<ResultSet> take(int64_t numElements, std::ostream &os = std::cout);
/*!
* Execute the pipeline and take a subset of the output from the top and bottom rows.
* If both top and bottom rows limit exist, then the top and bottom rows will be concatenated.
* In the case where topLimit + bottomLimit exceeds the output size, all rows will be taken.
* To take all rows, pass in either topLimit=size_t::max(), bottomLimit=size_t::max(), or both.
* @param topLimit number of top rows to take. size_t::max() means taking all rows
* @param bottomLimit number of bottom rows to take. size_t::max() means taking all rows
* @param os the logging output
* @return result of the execution, trim to the size of top and bottom limit.
*/
virtual std::shared_ptr<ResultSet> take(size_t topLimit, size_t bottomLimit, std::ostream &os = std::cout);

/*!
* Execute the pipeline and return all outputs as vector
* @param os the logging output
* @return the output of the execution in vector
*/
virtual std::vector<Row> collectAsVector(std::ostream &os = std::cout);

virtual std::vector<Row> takeAsVector(int64_t numElements, std::ostream &os = std::cout);
/*!
* Execute the pipeline and take a subset of the output from the top rows, return as vector
* In the case where numElements exceeds the output size, all rows will be taken.
* @param numElements number of top rows to take. size_t::max() means taking all rows
* @param os the logging output
* @return result of the execution in vector, trim to the size of numElements
*/
virtual std::vector<Row> takeAsVector(size_t numElements, std::ostream &os = std::cout);

/*!
* saves dataset to file. There are multiple options to control the behavior
Expand Down
14 changes: 8 additions & 6 deletions tuplex/core/include/EmptyDataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ namespace tuplex {
virtual DataSet& aggregateByKey(const UDF& aggCombine, const UDF& aggUDF, const Row& aggInitial, const std::vector<std::string> &keyColumns) override { return *this; }

//virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override;
virtual std::shared_ptr<ResultSet> collect(std::ostream& os) override;
virtual std::shared_ptr<ResultSet> collect(std::ostream &os) override;

// take / collect will print out the error only
virtual std::shared_ptr<ResultSet> take(int64_t numElements, std::ostream& os) override;
virtual std::shared_ptr<ResultSet> take(size_t topLimit, size_t bottomLimit, std::ostream &os) override;

//virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override;
virtual std::vector<Row> collectAsVector(std::ostream& os) override;
virtual std::vector<Row> collectAsVector(std::ostream &os) override;

// take / collect will print out the error only
virtual std::vector<Row> takeAsVector(int64_t numElements, std::ostream& os) override;
/*!
* take / collect will print out the error only, return empty rows
*/
virtual std::vector<Row> takeAsVector(size_t numElements, std::ostream &os) override;

DataSet& cache(const Schema::MemoryLayout& memoryLayout, bool storeSpecialized) override {
DataSet &cache(const Schema::MemoryLayout &memoryLayout, bool storeSpecialized) override {
return *this;
}
};
Expand Down
4 changes: 2 additions & 2 deletions tuplex/core/include/ErrorDataSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ namespace tuplex {
std::shared_ptr<ResultSet> collect(std::ostream& os) override;

// take / collect will print out the error only
std::shared_ptr<ResultSet> take(int64_t numElements, std::ostream& os) override;
std::shared_ptr<ResultSet> take(size_t topLimit, size_t bottomLimit, std::ostream& os) override;

//virtual void show(const int64_t numRows=-1, std::ostream& os=std::cout) override;
std::vector<Row> collectAsVector(std::ostream& os) override;

// take / collect will print out the error only
std::vector<Row> takeAsVector(int64_t numElements, std::ostream& os) override;
std::vector<Row> takeAsVector(size_t numElements, std::ostream& os) override;
};
}

Expand Down
5 changes: 3 additions & 2 deletions tuplex/core/include/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ namespace tuplex {
ExecutorTaskQueueType _queue;
std::mutex _completedTasksMutex;
std::vector<IExecutorTask*> _completedTasks;
std::atomic_int _numPendingTasks;
std::atomic_int _numCompletedTasks;
std::atomic_int _numPendingTasks{};
std::atomic_int _numCompletedTasks{};

public:

WorkQueue();
Expand Down
59 changes: 41 additions & 18 deletions tuplex/core/include/LocalEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,28 @@
#include <TSingleton.h>
#include "RESTInterface.h"


namespace tuplex {
struct ExecutorConfig {
size_t _size; // size in bytes that each executor should have
size_t _blockSize; // size of individual blocks used (can be used for coarse or fine grained parallelism)
size_t _runTimeMemory;
size_t _runTimeMemoryDefaultBlockSize;
URI _cache_path;

bool operator==(const ExecutorConfig &rhs) const {
return _size == rhs._size &&
_blockSize == rhs._blockSize &&
_runTimeMemory == rhs._runTimeMemory &&
_runTimeMemoryDefaultBlockSize == rhs._runTimeMemoryDefaultBlockSize &&
_cache_path == rhs._cache_path;
}

bool operator!=(const ExecutorConfig &rhs) const {
return !(rhs == *this);
}
};

/*!
* local execution engine. Provides local executors for a context
* THIS IS NOT THREADSAFE. Should be only accessed by driver thread.
Expand All @@ -25,16 +46,18 @@ namespace tuplex {

private:
// non-detached executor that serves as the driver
std::unique_ptr<Executor> _driver;
std::shared_ptr<Executor> _driver;
ExecutorConfig _driver_cfg;

std::vector<std::unique_ptr<Executor>> _executors;
std::map<Executor*, size_t> _refCounts; //! reference counts for each executor
std::map<Executor *, size_t> _refCounts; //! reference counts for each executor

LocalEngine(const LocalEngine &);

LocalEngine(const LocalEngine&);
void operator = (const LocalEngine&);
void operator=(const LocalEngine &);

// The local task queue
WorkQueue _queue;
WorkQueue _queue;

protected:
LocalEngine();
Expand Down Expand Up @@ -63,33 +86,33 @@ namespace tuplex {
* @param cache_path directory where subfolders will be created for all executors to be started
* @return array of executor references
*/
std::vector<Executor*> getExecutors(const size_t num,
const size_t size,
const size_t blockSize,
const size_t runTimeMemory,
const size_t runTimeMemoryDefaultBlockSize,
const URI& cache_path);
std::vector<Executor *> getExecutors(const size_t num,
const size_t size,
const size_t blockSize,
const size_t runTimeMemory,
const size_t runTimeMemoryDefaultBlockSize,
const URI &cache_path);

/*!
* releases executors (invoked by context)
* @param executors
* @param ctx
*/
void freeExecutors(const std::vector<Executor*>& executors, const Context* ctx=nullptr);
void freeExecutors(const std::vector<Executor *> &executors, const Context *ctx = nullptr);

Executor* getDriver(const size_t size,
const size_t blockSize,
const size_t runTimeMemory,
const size_t runTimeMemoryDefaultBlockSize,
const URI& cache_path);
std::shared_ptr<Executor> getDriver(const size_t size,
const size_t blockSize,
const size_t runTimeMemory,
const size_t runTimeMemoryDefaultBlockSize,
const URI &cache_path);

void release();

/*!
* retrieves the global work queue for local executors
* @return
*/
WorkQueue& getQueue() { return _queue; }
WorkQueue &getQueue() { return _queue; }
};
}
#endif //TUPLEX_LOCALENGINE_H
2 changes: 1 addition & 1 deletion tuplex/core/include/Partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ namespace tuplex {
* return how much capacity is left, i.e. how many bytes can be actually written
* @return
*/
size_t capacity() { return _size - sizeof(int64_t); }
size_t capacity() const { return _size - sizeof(int64_t); }

uniqueid_t uuid() const { return _uuid; }

Expand Down
46 changes: 46 additions & 0 deletions tuplex/core/include/PartitionUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//--------------------------------------------------------------------------------------------------------------------//
// //
// Tuplex: Blazing Fast Python Data Science //
// //
// //
// (c) 2017 - 2021, Tuplex team //
// Created by March Boonyapaluk first on 4/19/2021 //
// License: Apache 2.0 //
//--------------------------------------------------------------------------------------------------------------------//

#ifndef TUPLEX_PARTITIONUTILS_H
#define TUPLEX_PARTITIONUTILS_H

#include <vector>
#include <physical/TransformStage.h>
#include <Executor.h>

namespace tuplex {
/*!
* Trim list of partitions so that it includes up to the first n rows and the last m rows
* if n + m > number of rows in input partitions, the partitions will remain unchanged
* @param partitions [in,out] the list of partitions to trim
* @param topLimit n, the number of top rows to include
* @param bottomLimit m, the number of bottom rows to include
* @param tstage pointer to transform stage, might be used to generate new partition
* @param exec pointer to executor, might be used to allocate new partition
*/
void trimPartitionsToLimit(std::vector<Partition *> &partitions, size_t topLimit, size_t bottomLimit,
TransformStage *tstage, Executor *exec);

/*!
* Create a newly allocated partition with the same data as the specified partition, but with the first n rows removed
* @param p_in the input partition
* @param numToSkip number of rows to remove from the new partition
* @param tstage pointer to transform stage, used to generate new partition
* @param exec pointer to executor, used to allocate new partition
* @return the new partition
*/
Partition *newPartitionWithSkipRows(Partition *p_in,
size_t numToSkip,
TransformStage *tstage,
Executor *exec);

}

#endif //TUPLEX_PARTITIONUTILS_H
15 changes: 9 additions & 6 deletions tuplex/core/include/ee/IBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@ namespace tuplex {
class IBackend {
public:
IBackend() = delete;
IBackend(const IBackend& other) = delete;
IBackend(const Context& context) : _context(context) {}

IBackend(const IBackend &other) = delete;

IBackend(const Context &context) : _context(context) {}

// driver, i.e. where to store local data.
virtual Executor* driver() = 0;
virtual void execute(PhysicalStage* stage) = 0;
virtual Executor *driver() = 0;

virtual void execute(PhysicalStage *stage) = 0;

virtual ~IBackend() {} // virtual destructor needed b.c. of smart pointers

virtual const Context& context() const { return _context; }
virtual const Context &context() const { return _context; }

private:
const Context& _context;
const Context &_context;
};

inline std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t> merge_ecounts(std::unordered_map<std::tuple<int64_t, ExceptionCode>, size_t> lhs,
Expand Down
11 changes: 6 additions & 5 deletions tuplex/core/include/ee/local/LocalBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ namespace tuplex {
* constructor for convenience
* @param context
*/
explicit LocalBackend(const Context& context);
explicit LocalBackend(const Context &context);

Executor* driver() override; // for local execution
Executor *driver() override; // for local execution

void execute(PhysicalStage *stage) override;

void execute(PhysicalStage* stage) override;
private:
Executor *_driver; //! driver from local backend...
std::vector<Executor*> _executors; //! drivers to be used
std::shared_ptr<Executor> _driver; //! driver from local backend...
std::vector<Executor *> _executors; //! drivers to be used
std::unique_ptr<JITCompiler> _compiler;

HistoryServerConnection _historyConn;
Expand Down
10 changes: 6 additions & 4 deletions tuplex/core/include/logical/TakeOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
namespace tuplex {
class TakeOperator : public LogicalOperator {
private:
int64_t _limit;
size_t _topLimit;
size_t _bottomLimit;
public:
LogicalOperator *clone() override;

public:
TakeOperator(LogicalOperator *parent, const int64_t numElements);
TakeOperator(LogicalOperator *parent, size_t topLimit, size_t bottomLimit);

std::string name() override {
if(_limit < 0 || std::numeric_limits<int64_t>::max() == _limit)
if(_topLimit == std::numeric_limits<size_t>::max() || _bottomLimit == std::numeric_limits<size_t>::max())
return "collect";
return "take";
}
Expand All @@ -37,8 +38,9 @@ namespace tuplex {

bool good() const override;

int64_t limit() { return _limit; }
size_t topLimit() const { return _topLimit; }

size_t bottomLimit() const { return _bottomLimit; }

std::vector<Row> getSample(const size_t num) const override;

Expand Down
10 changes: 6 additions & 4 deletions tuplex/core/include/physical/StageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ namespace tuplex {
void addFileInput(FileInputOperator* csvop);
void addFileOutput(FileOutputOperator* fop);

inline void setOutputLimit(size_t limit) {
_outputLimit = limit;
inline void setOutputLimit(size_t topLimit, size_t bottomLimit = 0) {
_outputTopLimit = topLimit;
_outputBottomLimit = bottomLimit;
}

TransformStage* build(PhysicalPlan* plan, IBackend* backend);
Expand Down Expand Up @@ -134,7 +135,8 @@ namespace tuplex {
FileFormat _outputFileFormat;
int64_t _outputNodeID;
int64_t _inputNodeID;
size_t _outputLimit;
size_t _outputTopLimit;
size_t _outputBottomLimit;

LogicalOperator* _inputNode;
std::vector<bool> _columnsToRead;
Expand All @@ -157,7 +159,7 @@ namespace tuplex {
int64_t outputDataSetID() const;

inline bool hasOutputLimit() const {
return _outputLimit < std::numeric_limits<size_t>::max();
return _outputTopLimit != std::numeric_limits<size_t>::max() && _outputBottomLimit != std::numeric_limits<size_t>::max();
}

inline char csvOutputDelimiter() const {
Expand Down