Skip to content

Commit

Permalink
Update untwine to match the current hobu/untwine main branch
Browse files Browse the repository at this point in the history
Added a minor update to the untwine_to_qgis.bash script as it was
getting confused about the "untwine" subfolders and copying sources
one level deeper as supposed to
  • Loading branch information
wonder-sk committed Dec 16, 2020
1 parent 42b33db commit d76ec1c
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 44 deletions.
11 changes: 10 additions & 1 deletion external/untwine/bu/Processor.cpp
Expand Up @@ -307,12 +307,21 @@ Processor::writeOctantCompressed(const OctantInfo& o, Index& index, IndexIter po
} }
} }
flush: flush:
flushCompressed(table, view, o); try
{
flushCompressed(table, view, o);
}
catch (pdal::pdal_error& err)
{
fatal(err.what());
}

m_manager.logOctant(o.key(), count); m_manager.logOctant(o.key(), count);
return pos; return pos;
} }




// Copy data from the source file to the point view.
void Processor::appendCompressed(pdal::PointViewPtr view, const DimInfoList& dims, void Processor::appendCompressed(pdal::PointViewPtr view, const DimInfoList& dims,
const FileInfo& fi, IndexIter begin, IndexIter end) const FileInfo& fi, IndexIter begin, IndexIter end)
{ {
Expand Down
13 changes: 4 additions & 9 deletions external/untwine/epf/BufferCache.cpp
Expand Up @@ -21,9 +21,11 @@ namespace epf
{ {


// If we have a buffer in the cache, return it. Otherwise create a new one and return that. // If we have a buffer in the cache, return it. Otherwise create a new one and return that.
DataVecPtr BufferCache::fetch() // If nonblock is true and there are no available buffers, return null.
DataVecPtr BufferCache::fetch(std::unique_lock<std::mutex>& lock, bool nonblock)
{ {
std::unique_lock<std::mutex> lock(m_mutex); if (nonblock && m_buffers.empty() && m_count >= MaxBuffers)
return nullptr;


m_cv.wait(lock, [this](){ return m_buffers.size() || m_count < MaxBuffers; }); m_cv.wait(lock, [this](){ return m_buffers.size() || m_count < MaxBuffers; });
if (m_buffers.size()) if (m_buffers.size())
Expand All @@ -42,17 +44,10 @@ DataVecPtr BufferCache::fetch()
// Put a buffer back in the cache. // Put a buffer back in the cache.
void BufferCache::replace(DataVecPtr&& buf) void BufferCache::replace(DataVecPtr&& buf)
{ {
std::unique_lock<std::mutex> lock(m_mutex);

//ABELL - Fix this.
// buf->resize(BufSize);
m_buffers.push_back(std::move(buf)); m_buffers.push_back(std::move(buf));


if (m_count == MaxBuffers) if (m_count == MaxBuffers)
{
lock.unlock();
m_cv.notify_one(); m_cv.notify_one();
}
} }


} // namespace epf } // namespace epf
Expand Down
3 changes: 1 addition & 2 deletions external/untwine/epf/BufferCache.hpp
Expand Up @@ -31,12 +31,11 @@ class BufferCache
BufferCache() : m_count(0) BufferCache() : m_count(0)
{} {}


DataVecPtr fetch(); DataVecPtr fetch(std::unique_lock<std::mutex>& lock, bool nonblock);
void replace(DataVecPtr&& buf); void replace(DataVecPtr&& buf);


private: private:
std::deque<DataVecPtr> m_buffers; std::deque<DataVecPtr> m_buffers;
std::mutex m_mutex;
std::condition_variable m_cv; std::condition_variable m_cv;
int m_count; int m_count;
}; };
Expand Down
41 changes: 32 additions & 9 deletions external/untwine/epf/Cell.cpp
Expand Up @@ -21,21 +21,26 @@ namespace epf


void Cell::initialize() void Cell::initialize()
{ {
m_buf = m_writer->bufferCache().fetch(); m_buf = m_writer->fetchBuffer();

// If we couldn't fetch a buffer, flush all the the buffers for this processor and
// try again, but block.
if (!m_buf)
{
m_flush(this);
m_buf = m_writer->fetchBufferBlocking();
}
m_pos = m_buf->data(); m_pos = m_buf->data();


m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize); m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize);
} }


// NOTE - After write(), the cell is invalid and must be initialized or destroyed.
void Cell::write() void Cell::write()
{ {
// Resize the buffer so the writer knows how much to write.
size_t size = m_pos - m_buf->data(); size_t size = m_pos - m_buf->data();
if (size) if (size)
// {
// m_buf->resize(size);
m_writer->enqueue(m_key, std::move(m_buf), size); m_writer->enqueue(m_key, std::move(m_buf), size);
// }
} }


void Cell::advance() void Cell::advance()
Expand All @@ -61,17 +66,35 @@ Cell *CellMgr::get(const VoxelKey& key)
auto it = m_cells.find(key); auto it = m_cells.find(key);
if (it == m_cells.end()) if (it == m_cells.end())
{ {
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer)); Cell::FlushFunc f = [this](Cell *exclude)
{
flush(exclude);
};
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer, f));
it = m_cells.insert( {key, std::move(cell)} ).first; it = m_cells.insert( {key, std::move(cell)} ).first;
} }
Cell& c = *(it->second); Cell& c = *(it->second);
return &c; return &c;
} }


void CellMgr::flush() // Eliminate all the cells and their associated data buffers except the `exclude`
// cell.
void CellMgr::flush(Cell *exclude)
{ {
for (auto& cp : m_cells) CellMap::iterator it = m_cells.end();
cp.second->write(); if (exclude)
it = m_cells.find(exclude->key());

// If there was no exclude cell or it isn't in our list, just clear the cells.
// Otherwise, save the exclude cell, clear the list, and reinsert.
if (it == m_cells.end())
m_cells.clear();
else
{
std::unique_ptr<Cell> c = std::move(it->second);
m_cells.clear();
m_cells.insert({c->key(), std::move(c)});
}
} }


} // namespace epf } // namespace epf
Expand Down
23 changes: 17 additions & 6 deletions external/untwine/epf/Cell.hpp
Expand Up @@ -15,7 +15,8 @@


#include <cstdint> #include <cstdint>
#include <cstddef> #include <cstddef>
#include <map> #include <functional>
#include <unordered_map>
#include <memory> #include <memory>


#include "EpfTypes.hpp" #include "EpfTypes.hpp"
Expand All @@ -35,12 +36,18 @@ class Writer;
class Cell class Cell
{ {
public: public:
Cell(const VoxelKey& key, int pointSize, Writer *writer) : using FlushFunc = std::function<void(Cell *)>;
m_key(key), m_pointSize(pointSize), m_writer(writer)
Cell(const VoxelKey& key, int pointSize, Writer *writer, FlushFunc flush) :
m_key(key), m_pointSize(pointSize), m_writer(writer), m_flush(flush)
{ {
assert(pointSize < BufSize); assert(pointSize < BufSize);
initialize(); initialize();
} }
~Cell()
{
write();
}


void initialize(); void initialize();
Point point() Point point()
Expand All @@ -49,7 +56,6 @@ class Cell
{ return m_key; } { return m_key; }
void copyPoint(Point& b) void copyPoint(Point& b)
{ std::copy(b.data(), b.data() + m_pointSize, m_pos); } { std::copy(b.data(), b.data() + m_pointSize, m_pos); }
void write();
void advance(); void advance();


private: private:
Expand All @@ -59,19 +65,24 @@ class Cell
Writer *m_writer; Writer *m_writer;
uint8_t *m_pos; uint8_t *m_pos;
uint8_t *m_endPos; uint8_t *m_endPos;
FlushFunc m_flush;

void write();
}; };


class CellMgr class CellMgr
{ {
public: public:
CellMgr(int pointSize, Writer *writer); CellMgr(int pointSize, Writer *writer);

Cell *get(const VoxelKey& key); Cell *get(const VoxelKey& key);
void flush(); void flush(Cell *exclude);


private: private:
using CellMap = std::unordered_map<VoxelKey, std::unique_ptr<Cell>>;
int m_pointSize; int m_pointSize;
Writer *m_writer; Writer *m_writer;
std::map<VoxelKey, std::unique_ptr<Cell>> m_cells; CellMap m_cells;
}; };




Expand Down
6 changes: 3 additions & 3 deletions external/untwine/epf/Epf.cpp
Expand Up @@ -61,7 +61,7 @@ void writeMetadata(const std::string& tempDir, const Grid& grid,


/// Epf /// Epf


Epf::Epf() : m_pool(8) Epf::Epf() : m_pool(NumFileProcessors)
{} {}




Expand Down Expand Up @@ -120,8 +120,8 @@ void Epf::run(const Options& options, ProgressWriter& progress)
} }
} }


// Make a writer with 4 threads. // Make a writer with NumWriters threads.
m_writer.reset(new Writer(options.tempDir, 4, layout->pointSize())); m_writer.reset(new Writer(options.tempDir, NumWriters, layout->pointSize()));


// Sort file infos so the largest files come first. This helps to make sure we don't delay // Sort file infos so the largest files come first. This helps to make sure we don't delay
// processing big files that take the longest (use threads more efficiently). // processing big files that take the longest (use threads more efficiently).
Expand Down
2 changes: 2 additions & 0 deletions external/untwine/epf/EpfTypes.hpp
Expand Up @@ -36,6 +36,8 @@ using Totals = std::unordered_map<VoxelKey, size_t>;
constexpr int MaxPointsPerNode = 100000; constexpr int MaxPointsPerNode = 100000;
constexpr int BufSize = 4096 * 10; constexpr int BufSize = 4096 * 10;
constexpr int MaxBuffers = 1000; constexpr int MaxBuffers = 1000;
constexpr int NumWriters = 4;
constexpr int NumFileProcessors = 8;


struct FileInfo struct FileInfo
{ {
Expand Down
15 changes: 10 additions & 5 deletions external/untwine/epf/FileProcessor.cpp
Expand Up @@ -85,12 +85,17 @@ void FileProcessor::run()


pdal::FixedPointTable t(1000); pdal::FixedPointTable t(1000);


f.prepare(t); try
f.execute(t); {
m_progress.update(count % CountIncrement); f.prepare(t);
f.execute(t);
}
catch (const pdal::pdal_error& err)
{
fatal(err.what());
}


// Flush any data remaining in the cells. m_progress.update(count % CountIncrement);
m_cellMgr.flush();
} }


} // namespace epf } // namespace epf
Expand Down
1 change: 0 additions & 1 deletion external/untwine/epf/Reprocessor.cpp
Expand Up @@ -66,7 +66,6 @@ void Reprocessor::run()
cell->advance(); cell->advance();
pos += m_pointSize; pos += m_pointSize;
} }
m_mgr.flush();
pdal::FileUtils::unmapFile(ctx); pdal::FileUtils::unmapFile(ctx);
pdal::FileUtils::deleteFile(m_filename); pdal::FileUtils::deleteFile(m_filename);
} }
Expand Down
20 changes: 19 additions & 1 deletion external/untwine/epf/Writer.cpp
Expand Up @@ -58,6 +58,24 @@ Totals Writer::totals(size_t minSize)
return t; return t;
} }


DataVecPtr Writer::fetchBuffer()
{
std::unique_lock<std::mutex> lock(m_mutex);

// If there are fewer items in the queue than we have FileProcessors, we may choose not
// to block and return a nullptr, expecting that the caller will flush outstanding cells.
return m_bufferCache.fetch(lock, m_queue.size() < NumFileProcessors);
}


DataVecPtr Writer::fetchBufferBlocking()
{
std::unique_lock<std::mutex> lock(m_mutex);

return m_bufferCache.fetch(lock, false);
}


void Writer::enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize) void Writer::enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize)
{ {
{ {
Expand Down Expand Up @@ -123,9 +141,9 @@ void Writer::run()
out.close(); out.close();
if (!out) if (!out)
fatal("Failure writing to '" + path(wd.key) + "'."); fatal("Failure writing to '" + path(wd.key) + "'.");
m_bufferCache.replace(std::move(wd.data));


std::lock_guard<std::mutex> lock(m_mutex); std::lock_guard<std::mutex> lock(m_mutex);
m_bufferCache.replace(std::move(wd.data));
m_active.remove(wd.key); m_active.remove(wd.key);
} }
} }
Expand Down
4 changes: 2 additions & 2 deletions external/untwine/epf/Writer.hpp
Expand Up @@ -43,11 +43,11 @@ class Writer


void enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize); void enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize);
void stop(); void stop();
BufferCache& bufferCache()
{ return m_bufferCache; }
const Totals& totals() const Totals& totals()
{ return m_totals; } { return m_totals; }
Totals totals(size_t minSize); Totals totals(size_t minSize);
DataVecPtr fetchBuffer();
DataVecPtr fetchBufferBlocking();


private: private:
std::string path(const VoxelKey& key); std::string path(const VoxelKey& key);
Expand Down
10 changes: 5 additions & 5 deletions external/untwine_to_qgis.bash
@@ -1,12 +1,12 @@
#!/usr/bin/env bash #!/usr/bin/env bash


if [ "$#" -ne 1 ] ; then if [ "$#" -ne 1 ] ; then
echo "untwine_to_qgis: untwine directory argument required" echo "untwine_to_qgis: untwine directory argument required"
exit 1 exit 1
fi fi


UNTWINE_QGIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" EXTERNAL_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
UNTWINE_QGIS_DIR=$UNTWINE_QGIS_DIR/untwine UNTWINE_QGIS_DIR=$EXTERNAL_DIR/untwine


UNTWINE_DIR=$1 UNTWINE_DIR=$1
if [ ! -d "$UNTWINE_DIR/untwine" ] ; then if [ ! -d "$UNTWINE_DIR/untwine" ] ; then
Expand All @@ -20,7 +20,7 @@ echo "untwine_to_qgis: Remove old version"
rm -rf $UNTWINE_QGIS_DIR/* rm -rf $UNTWINE_QGIS_DIR/*


echo "untwine_to_qgis: Copy new version" echo "untwine_to_qgis: Copy new version"
rsync -r $UNTWINE_DIR $UNTWINE_QGIS_DIR --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore" rsync -r $UNTWINE_DIR/ $UNTWINE_QGIS_DIR/ --exclude="CMakeLists.txt*" --exclude="cmake/" --exclude="README.md" --exclude=".git" --exclude=".gitignore"


echo "untwine_to_qgis: Done" echo "untwine_to_qgis: Done"
cd $PWD cd $PWD

0 comments on commit d76ec1c

Please sign in to comment.