diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 925ac25bfc5..dac7a50c4e0 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -1036,11 +1036,13 @@ void LocalStore::addToStore(const ValidPathInfo & info, Source & source, StorePath LocalStore::addToStoreFromDump(Source & dump, const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair) { - return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) { + return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & wanted) { while (1) { - uint8_t buf[1]; - auto n = dump.read(buf, 1); + constexpr size_t bufSize = 1024; + uint8_t buf[bufSize]; + auto n = dump.read(buf, std::min(wanted, bufSize)); sink(buf, n); + // when control is yielded back to us wanted will be updated. } }); } @@ -1051,7 +1053,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath, { Path srcPath(absPath(_srcPath)); - return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink) { + return addToStoreCommon(name, method, hashAlgo, repair, [&](auto & sink, size_t & _) { if (method == FileIngestionMethod::Recursive) dumpPath(srcPath, sink, filter); else @@ -1062,7 +1064,7 @@ StorePath LocalStore::addToStore(const string & name, const Path & _srcPath, StorePath LocalStore::addToStoreCommon( const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair, - std::function demux) + std::function demux) { /* For computing the NAR hash. */ auto sha256Sink = std::make_unique(htSHA256); @@ -1083,7 +1085,7 @@ StorePath LocalStore::addToStoreCommon( bool inMemory = true; std::string nar; - auto source = sinkToSource([&](Sink & sink) { + auto source = sinkToSource([&](Sink & sink, size_t & wanted) { LambdaSink sink2([&](const unsigned char * buf, size_t len) { (*sha256Sink)(buf, len); if (hashSink) (*hashSink)(buf, len); @@ -1101,7 +1103,7 @@ StorePath LocalStore::addToStoreCommon( if (!inMemory) sink(buf, len); }); - demux(sink2); + demux(sink2, wanted); }); std::unique_ptr delTempDir; diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 215731f87d4..ae23004c474 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -292,7 +292,7 @@ private: StorePath addToStoreCommon( const string & name, FileIngestionMethod method, HashType hashAlgo, RepairFlag repair, - std::function demux); + std::function demux); Path getRealStoreDir() override { return realStoreDir; } diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index c8b71188fe0..141e9e9767f 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -165,35 +165,43 @@ size_t StringSource::read(unsigned char * data, size_t len) #endif std::unique_ptr sinkToSource( - std::function fun, + std::function fun, std::function eof) { struct SinkToSource : Source { - typedef boost::coroutines2::coroutine coro_t; + typedef boost::coroutines2::coroutine> coro_t; - std::function fun; + std::function fun; std::function eof; std::optional coro; bool started = false; - SinkToSource(std::function fun, std::function eof) + /* It would be nicer to have the co-routines have both args and a + return value, but unfortunately that was removed from Boost's + implementation for some reason, so we use some extra state instead. + */ + size_t wanted = 0; + + SinkToSource(std::function fun, std::function eof) : fun(fun), eof(eof) { } - std::string cur; + std::basic_string cur; size_t pos = 0; size_t read(unsigned char * data, size_t len) override { - if (!coro) + wanted = len < cur.size() ? 0 : len - cur.size(); + if (!coro) { coro = coro_t::pull_type([&](coro_t::push_type & yield) { - LambdaSink sink([&](const unsigned char * data, size_t len) { - if (len) yield(std::string((const char *) data, len)); + LambdaSink sink([&](const uint8_t * data, size_t len) { + if (len) yield(std::basic_string { data, len }); }); - fun(sink); + fun(sink, wanted); }); + } if (!*coro) { eof(); abort(); } @@ -203,11 +211,10 @@ std::unique_ptr sinkToSource( pos = 0; } - auto n = std::min(cur.size() - pos, len); - memcpy(data, (unsigned char *) cur.data() + pos, n); - pos += n; + auto numCopied = cur.copy(data, len, pos); + pos += numCopied; - return n; + return numCopied; } }; diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 8386a499124..6cb9d1bf506 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -260,11 +260,20 @@ struct LambdaSource : Source /* Convert a function that feeds data into a Sink into a Source. The Source executes the function as a coroutine. */ std::unique_ptr sinkToSource( - std::function fun, + std::function fun, std::function eof = []() { throw EndOfFile("coroutine has finished"); }); +static inline std::unique_ptr sinkToSource( + std::function fun, + std::function eof = []() { + throw EndOfFile("coroutine has finished"); + }) +{ + return sinkToSource([fun](Sink & s, size_t & _) { fun(s); }, eof); +} + void writePadding(size_t len, Sink & sink); void writeString(const unsigned char * buf, size_t len, Sink & sink);