diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index 7dd049050..f6fa33eaa 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -54,6 +54,9 @@ configure_file(sorting_internal.dox.in sorting_internal.dox @ONLY) configure_file(queue.dox.in queue.dox @ONLY) +file(READ code/serialization.inl DOCCODE_SERIALIZATION) +configure_file(serialization.dox.in serialization.dox @ONLY) + else (DOXYGEN) message(STATUS "Doxygen not found, API documentation cannot be generated.") endif(DOXYGEN) diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index c61cac5bc..10cf2f9e9 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -682,6 +682,7 @@ INPUT += @tpie_BINARY_DIR@/doc/progress.dox INPUT += @tpie_BINARY_DIR@/doc/fractiondb.dox INPUT += @tpie_BINARY_DIR@/doc/queue.dox INPUT += @tpie_SOURCE_DIR@/doc/pipelining.dox +INPUT += @tpie_BINARY_DIR@/doc/serialization.dox # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is diff --git a/doc/code/code.cpp b/doc/code/code.cpp index b147c2548..9d70fe5ed 100644 --- a/doc/code/code.cpp +++ b/doc/code/code.cpp @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -71,6 +74,10 @@ namespace _j { #include "progress3.inl" } +namespace _k { +#include "serialization.inl" +} + int main() { return 0; } diff --git a/doc/code/serialization.inl b/doc/code/serialization.inl new file mode 100644 index 000000000..f20e73018 --- /dev/null +++ b/doc/code/serialization.inl @@ -0,0 +1,76 @@ +void write_lines(std::istream & is, std::string filename) { + std::string line; + tpie::serialization_writer wr; + wr.open(filename); + while (std::getline(is, line)) { + wr.serialize(line); + } + wr.close(); +} + +void reverse_lines(std::string filename) { + tpie::temp_file f; + { + tpie::serialization_reader rd; + rd.open(filename); + tpie::serialization_reverse_writer wr; + wr.open(f); + while (rd.can_read()) { + std::string line; + rd.unserialize(line); + wr.serialize(line); + } + wr.close(); + rd.close(); + } + { + tpie::serialization_reverse_reader rd; + rd.open(f); + tpie::serialization_writer wr; + wr.open(filename); + while (rd.can_read()) { + std::string line; + rd.unserialize(line); + wr.serialize(line); + } + wr.close(); + rd.close(); + } +} + +void read_lines(std::ostream & os, std::string filename) { + tpie::serialization_reader rd; + rd.open(filename); + while (rd.can_read()) { + std::string line; + rd.unserialize(line); + os << line << '\n'; + } + rd.close(); +} + +void sort_lines(std::string filename) { + tpie::serialization_sort sorter; + sorter.set_available_memory(50*1024*1024); + sorter.begin(); + { + tpie::serialization_reader rd; + rd.open(filename); + while (rd.can_read()) { + std::string line; + rd.unserialize(line); + sorter.push(line); + } + rd.close(); + } + sorter.end(); + sorter.merge_runs(); + { + tpie::serialization_writer wr; + wr.open(filename); + while (sorter.can_pull()) { + wr.serialize(sorter.pull()); + } + wr.close(); + } +} diff --git a/doc/serialization.dox.in b/doc/serialization.dox.in new file mode 100644 index 000000000..435b9af64 --- /dev/null +++ b/doc/serialization.dox.in @@ -0,0 +1,101 @@ +/** +\page serialization Serialization streams + +\section sec_serintro Motivation + +If you want to read and write text strings with TPIE \c file_streams, +the interface requires a fixed string size. +In some cases this may be unreasonable: +space is wasted on strings that are smaller than the given size limit, +and it may be impossible to give a fixed upper bound on the length of the +strings a program has to operate on. + +For this, TPIE provides a serialization framework with a +distinct set of stream readers and writers that support, +in essence, variable-length item types, such as strings and arrays. +With the library support for reversing and sorting such serialization streams, +it becomes reasonably easy to implement external memory algorithms operating on +variable length items. + +The goal of TPIE serialization is \em not to be portable across machines, +nor is it to provide type-checking on the serialized input. +We do not track endianness or integer widths, +so it is not in general supported to read serialized streams +written on a different platform. +Indeed, the motivation for TPIE serialization is to support +temporary streams of variable-width items in external memory; +it is not intended as a persistent store or as a data transfer format. + +TPIE serialization has built-in support for plain old data, +also known as POD types. +This built-in POD support excludes pointer types, however. +POD types are serialized and unserialized by their in-memory representation. +This is intended to be fast, not safe or portable. + +The framework also supports certain library types out of the box, +such as \c std::vector, \c std::string and plain old arrays of +serializable data. + +\section sec_serusage Usage + +The interface and usage is straightforward. +See the included test program \c lines, the bulk of which is reproduced below. + +\code +@DOCCODE_SERIALIZATION@ +\endcode + +\section sec_ser_user User-supplied serializable types + +For types other than those supported natively by TPIE serialization, +the user can provide implementations of the \c serialize and \c unserialize +procedures. +For example, we can implement simple serialization/unserialization of a point type: + +\code +namespace userland { + +struct point2 { + double x; + double y; +}; + +template +void serialize(Dst & d, const point2 & pt) { + using tpie::serialize; + serialize(d, pt.x); + serialize(d, pt.y); +} + +template +void unserialize(Src & s, point2 & pt) { + using tpie::unserialize; + unserialize(s, pt.x); + unserialize(s, pt.y); +} + +} // namespace userland +\endcode + +For a more complicated example, +consider how we might serialize and unserialize a \c std::vector. + +\code +template +void serialize(D & dst, const std::vector & v) { + using tpie::serialize; + serialize(dst, v.size()); + serialize(dst, v.begin(), v.end()); +} + +template +void unserialize(S & src, std::vector & v) { + typename std::vector::size_type s; + using tpie::unserialize; + unserialize(src, s); + v.resize(s); + unserialize(src, v.begin(), v.end()); +} +\endcode + +*/ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index bca0738e9..c8054f7ed 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -38,6 +38,14 @@ add_executable(tpiecat cat.cpp) set_target_properties(tpiecat PROPERTIES FOLDER tpie/test) target_link_libraries(tpiecat tpie) +add_executable(sort sort.cpp) +set_target_properties(sort PROPERTIES FOLDER tpie/test) +target_link_libraries(sort tpie) + +add_executable(lines lines.cpp) +set_target_properties(lines PROPERTIES FOLDER tpie/test) +target_link_libraries(lines tpie) + add_executable(test_parallel_sort_threshold test_parallel_sort_threshold.cpp) set_target_properties(test_parallel_sort_threshold PROPERTIES FOLDER tpie/test) target_link_libraries(test_parallel_sort_threshold tpie) diff --git a/test/lines.cpp b/test/lines.cpp new file mode 100644 index 000000000..30918f6d9 --- /dev/null +++ b/test/lines.cpp @@ -0,0 +1,46 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet cino=(0 : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include + +#include +#include +#include + +#include "../doc/code/serialization.inl" + +int main(int argc, char ** argv) { + std::string arg = (argc < 3) ? "" : argv[1]; + std::string filename = argv[2]; + tpie::tpie_init(); + if (arg == "read") { + read_lines(std::cout, filename); + } else if (arg == "write") { + write_lines(std::cin, filename); + } else if (arg == "reverse") { + reverse_lines(filename); + } else if (arg == "sort") { + sort_lines(filename); + } else { + std::cerr << "Usage: " << argv[0] << " \n"; + return 1; + } + tpie::tpie_finish(); + return 0; +} diff --git a/test/sort.cpp b/test/sort.cpp new file mode 100644 index 000000000..d661a4b38 --- /dev/null +++ b/test/sort.cpp @@ -0,0 +1,80 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet cino=(0 : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include + +#include +#include + +namespace tp = tpie::pipelining; + +template +class line_reader_type : public tp::node { + dest_t dest; + +public: + line_reader_type(const dest_t & dest) + : dest(dest) + { + this->add_push_destination(dest); + } + + void go() { + std::string line; + while (std::getline(std::cin, line)) { + dest.push(line); + } + } +}; + +tp::pipe_begin > +line_reader() { + return tp::factory_0(); +} + +class line_writer_type : public tp::node { +public: + typedef std::string item_type; + + line_writer_type() { + } + + void push(const std::string & line) { + std::cout << line << '\n'; + } +}; + +tp::pipe_end > +line_writer() { + return tp::termfactory_0(); +} + +int main() { + tpie::tpie_init(); + const tpie::memory_size_type memory = 100*1024*1024; + tpie::get_memory_manager().set_limit(memory); + { + tp::pipeline p = line_reader() | tp::serialization_pipesort() | line_writer(); + p.plot(std::clog); + p(); + } + tpie::log_info() << "Temp file usage: " << tpie::get_temp_file_usage() << std::endl; + tpie::tpie_finish(); + return 0; +} diff --git a/test/speed_regression/CMakeLists.txt b/test/speed_regression/CMakeLists.txt index 8e6b83bd9..1ce878499 100644 --- a/test/speed_regression/CMakeLists.txt +++ b/test/speed_regression/CMakeLists.txt @@ -33,3 +33,19 @@ set_target_properties(pipeline_speed_test PROPERTIES FOLDER tpie/test) add_executable(pipelining_sort_test pipelining_sort_test.cpp) target_link_libraries(pipelining_sort_test tpie) set_target_properties(pipelining_sort_test PROPERTIES FOLDER tpie/test) + +add_executable(serialization_speed serialization.cpp ${SPEED_DEPS}) +target_link_libraries(serialization_speed tpie) +set_target_properties(serialization_speed PROPERTIES FOLDER tpie/test) + +add_executable(numbergen numbergen.cpp ${SPEED_DEPS}) +target_link_libraries(numbergen tpie) +set_target_properties(numbergen PROPERTIES FOLDER tpie/test) + +add_executable(fractile_tpie fractile_tpie.cpp ${SPEED_DEPS}) +target_link_libraries(fractile_tpie tpie) +set_target_properties(fractile_tpie PROPERTIES FOLDER tpie/test) + +add_executable(fractile_serialization fractile_serialization.cpp ${SPEED_DEPS}) +target_link_libraries(fractile_serialization tpie) +set_target_properties(fractile_serialization PROPERTIES FOLDER tpie/test) diff --git a/test/speed_regression/fractile.h b/test/speed_regression/fractile.h new file mode 100644 index 000000000..f822b6403 --- /dev/null +++ b/test/speed_regression/fractile.h @@ -0,0 +1,111 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include + +#include +#include + +struct parameters { + int argi; + int argc; + const char ** argv; + size_t fractiles; + std::string filename; + + void parse_args(); + void parse_filename(); +}; + +typedef double item_type; + +template +N parse_num(std::string arg) { + std::stringstream ss(arg); + N res; + ss >> res; + return res; +} + +std::string next_arg(int & argi, int argc, const char ** argv) { + ++argi; + if (argi == argc) throw std::invalid_argument("Missing value for option."); + return argv[argi]; +} + +void parameters::parse_args() { + while (argi < argc) { + std::string param = argv[argi]; + + if (param == "-n") + fractiles = parse_num(next_arg(argi, argc, argv)); + else + break; + + ++argi; + } +} + +void parameters::parse_filename() { + if (argi == argc) throw std::invalid_argument("Missing filename."); + filename = argv[argi++]; +} + +template +void go(parameters & params, Impl & impl) { + impl.open(params.filename); + size_t nextFrac = 0; + size_t nextGoal = 0; + tpie::stream_size_type n = impl.size(); + for (tpie::stream_size_type i = 0; i < n; ++i) { + item_type x = impl.read(); + if (i >= nextGoal) { + std::cout << x << std::endl; + ++nextFrac; + nextGoal = (n-1) * nextFrac / params.fractiles; + } + } + impl.close(); +} + +template +int fractile_main(int argc, const char ** argv) { + parameters params; + + params.argi = 1; + params.argc = argc; + params.argv = argv; + params.fractiles = 4; + + params.parse_args(); + params.parse_filename(); + params.parse_args(); + + tpie::tpie_init(tpie::ALL & ~tpie::DEFAULT_LOGGING); + tpie::get_memory_manager().set_limit(200*1024*1024); + { + tpie::stderr_log_target tgt(tpie::LOG_DEBUG); + tpie::get_log().add_target(&tgt); + Impl impl; + go(params, impl); + tpie::get_log().remove_target(&tgt); + } + tpie::tpie_finish(tpie::ALL & ~tpie::DEFAULT_LOGGING); + return 0; +} diff --git a/test/speed_regression/fractile_serialization.cpp b/test/speed_regression/fractile_serialization.cpp new file mode 100644 index 000000000..73fb9bd29 --- /dev/null +++ b/test/speed_regression/fractile_serialization.cpp @@ -0,0 +1,65 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include +#include "fractile.h" + +class fractile_serialization { + tpie::serialization_sort > sorter; + tpie::stream_size_type items; + +public: + fractile_serialization() + : sorter() + { + tpie::memory_size_type mem = + tpie::get_memory_manager().available() + - tpie::serialization_reader::memory_usage(); + sorter.set_available_memory(mem); + } + + void open(std::string path) { + tpie::serialization_reader input; + input.open(path); + sorter.begin(); + items = 0; + item_type x; + while (input.can_read()) { + input.unserialize(x); + sorter.push(x); + ++items; + } + input.close(); + sorter.end(); + sorter.merge_runs(); + } + void close() { + } + item_type read() { + return sorter.pull(); + } + tpie::stream_size_type size() { + return items; + } +}; + +int main(int argc, const char ** argv) { + return fractile_main(argc, argv); +} diff --git a/test/speed_regression/fractile_tpie.cpp b/test/speed_regression/fractile_tpie.cpp new file mode 100644 index 000000000..c0266f848 --- /dev/null +++ b/test/speed_regression/fractile_tpie.cpp @@ -0,0 +1,53 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include +#include "fractile.h" + +class fractile_file_stream { + tpie::file_stream sorted; + +public: + void open(std::string path) { + tpie::file_stream fs; + fs.open(path, tpie::access_read); + std::string sortedPath = path + ".sorted"; + boost::filesystem::remove(sortedPath); + sorted.open(sortedPath); + sorted.truncate(0); + tpie::sort(fs, sorted); + fs.close(); + sorted.close(); + sorted.open(sortedPath, tpie::access_read); + } + void close() { + sorted.close(); + } + item_type read() { + return sorted.read(); + } + tpie::stream_size_type size() { + return sorted.size(); + } +}; + +int main(int argc, const char ** argv) { + return fractile_main(argc, argv); +} diff --git a/test/speed_regression/numbergen.cpp b/test/speed_regression/numbergen.cpp new file mode 100644 index 000000000..c954ed55e --- /dev/null +++ b/test/speed_regression/numbergen.cpp @@ -0,0 +1,125 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include + +#include + +#include +#include +#include + +struct parameters { + int argi; + int argc; + const char ** argv; + uint64_t seed; + double megabytes; + bool gen_tpie; + bool gen_serialization; + + void parse_args(); + std::string stream_name(std::string suffix) { + std::stringstream ss; + ss << "numbers." << seed << "." << suffix; + return ss.str(); + } +}; + +typedef double item_type; + +uint64_t seed_from_time() { + using namespace boost::posix_time; + ptime t(microsec_clock::local_time()); + return static_cast(t.time_of_day().ticks()); +} + +template +N parse_num(std::string arg) { + std::stringstream ss(arg); + N res; + ss >> res; + return res; +} + +uint64_t parse_seed(std::string arg) { return parse_num(arg); } + +std::string next_arg(int & argi, int argc, const char ** argv) { + ++argi; + if (argi == argc) throw std::invalid_argument("Missing value for option."); + return argv[argi]; +} + +void parameters::parse_args() { + while (argi < argc) { + std::string arg(argv[argi]); + + if (arg == "--seed") + seed = parse_seed(next_arg(argi, argc, argv)); + else if (arg == "--mb") + megabytes = parse_num(next_arg(argi, argc, argv)); + else + break; + + ++argi; + } +} + +void go(parameters & params) { + tpie::file_stream fs; + tpie::serialization_writer ss; + if (params.gen_tpie) fs.open(params.stream_name("tpie")); + if (params.gen_serialization) ss.open(params.stream_name("ser")); + tpie::stream_size_type numbers = + static_cast(params.megabytes + * 1024 * 1024 / sizeof(item_type)); + boost::mt19937 rng(params.seed); + boost::uniform_01<> basedist; + boost::variate_generator > + rng01(rng, basedist); + boost::exponential_distribution<> dist(2); + for (tpie::stream_size_type i = 0; i < numbers; ++i) { + item_type x = dist(rng01); + if (params.gen_tpie) fs.write(x); + if (params.gen_serialization) ss.serialize(x); + } + if (params.gen_tpie) fs.close(); + if (params.gen_serialization) ss.close(); +} + +int main(int argc, const char ** argv) { + parameters params; + + params.argi = 1; + params.argc = argc; + params.argv = argv; + params.seed = seed_from_time(); + params.megabytes = 1024; + params.gen_tpie = true; + params.gen_serialization = true; + params.seed = seed_from_time(); + params.megabytes = 1024; + + params.parse_args(); + tpie::tpie_init(); + go(params); + tpie::tpie_finish(); + return 0; +} diff --git a/test/speed_regression/serialization.cpp b/test/speed_regression/serialization.cpp new file mode 100644 index 000000000..1edfefc8e --- /dev/null +++ b/test/speed_regression/serialization.cpp @@ -0,0 +1,342 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include +#include +#include +#include +#include +#include +#include "stat.h" +#include "testtime.h" + +struct item { + uint32_t a; + uint32_t b; + uint32_t c; + uint32_t d; + + item & operator=(uint32_t v) { + a = v; + return *this; + } + + item & operator=(uint64_t v) { + a = static_cast(v); + return *this; + } + + std::pair repr() const { + const uint32_t mask = 0x55555555u; + return std::make_pair(a & mask, a & ~mask); + } + + bool operator<(const item & other) const { + return repr() < other.repr(); + } +}; + +template +void serialize(D & dst, const item & it) { + dst.write(reinterpret_cast(&it), sizeof(it)); +} + +template +void unserialize(S & src, item & it) { + src.read(reinterpret_cast(&it), sizeof(it)); +} + +struct parameters { + size_t mb; + size_t times; + size_t memory; +}; + +template +class speed_tester { + child_t & self() { return *static_cast(this); } +public: + size_t items; + + void go(parameters params) { + items = params.mb*1024*1024 / sizeof(item); + + tpie::sysinfo info; + std::cout << info; + info.printinfo("MB", params.mb); + info.printinfo("Samples", params.times); + info.printinfo("sizeof(item)", sizeof(item)); + info.printinfo("items", items); + self().init(); + std::vector name; + name.push_back("Write"); + name.push_back("Read"); + tpie::test::stat st(name); + for (size_t i = 0; i < params.times; ++i) { + tpie::temp_file temp; + tpie::test::test_realtime_t t1; + tpie::test::test_realtime_t t2; + tpie::test::getTestRealtime(t1); + self().write(temp.path()); + tpie::test::getTestRealtime(t2); + st(tpie::test::testRealtimeDiff(t1, t2)); + + tpie::test::getTestRealtime(t1); + self().read(temp.path()); + tpie::test::getTestRealtime(t2); + st(tpie::test::testRealtimeDiff(t1, t2)); + } + } +}; + +struct forward_base { + static const char * dir() { return "forward"; } + static uint32_t expect(size_t i, size_t /*items*/) { return static_cast(i); } +}; + +struct backward_base { + static const char * dir() { return "reverse"; } + static uint32_t expect(size_t i, size_t items) { return items-1-static_cast(i); } +}; + +struct serialization_forward : public forward_base { + typedef tpie::serialization_writer writer; + typedef tpie::serialization_reader reader; +}; + +struct serialization_backward : public backward_base { + typedef tpie::serialization_reverse_writer writer; + typedef tpie::serialization_reverse_reader reader; +}; + +struct stream_forward : public forward_base { + static item read(tpie::file_stream & stream) { return stream.read(); } + static void seek(tpie::file_stream & /*stream*/) {} +}; + +struct stream_backward : public backward_base { + static item read(tpie::file_stream & stream) { return stream.read_back(); } + static void seek(tpie::file_stream & stream) { stream.seek(0, tpie::file_stream::end); } +}; + +template +class serialization_speed_tester : public speed_tester > { +public: + using speed_tester >::items; + + void init() { + std::cout << "Testing serialization " << Traits::dir() << " stream" << std::endl; + } + + void write(std::string path) { + typename Traits::writer wr; + wr.open(path); + item it; + for (size_t j = 0; j < items; ++j) { + it = j; + wr.serialize(it); + } + wr.close(); + } + + void read(std::string path) { + typename Traits::reader rd; + rd.open(path); + item it; + for (size_t j = 0; j < items; ++j) { + rd.unserialize(it); + if (it.a != Traits::expect(j, items)) std::cout << "Wrong value read" << std::endl; + } + rd.close(); + } +}; + +typedef serialization_speed_tester serialization_forward_speed_tester; +typedef serialization_speed_tester serialization_backward_speed_tester; + +template +class stream_speed_tester : public speed_tester > { +public: + using speed_tester >::items; + + void init() { + std::cout << "Testing " << Traits::dir() << " file_stream" << std::endl; + } + + void write(std::string path) { + tpie::file_stream wr; + wr.open(path, tpie::access_write); + item it; + for (size_t j = 0; j < items; ++j) { + it.a = j; + wr.write(it); + } + wr.close(); + } + + void read(std::string path) { + tpie::file_stream rd; + rd.open(path, tpie::access_read); + item it; + Traits::seek(rd); + for (size_t j = 0; j < items; ++j) { + it = Traits::read(rd); + if (it.a != Traits::expect(j, items)) std::cout << "Wrong value read" << std::endl; + } + rd.close(); + } +}; + +void usage(char ** argv) { + std::cout << "Usage: " << argv[0] << " [--mb ] [--times ] " << std::endl; +} + +size_t number(std::string arg) { + std::stringstream ss(arg); + size_t res; + ss >> res; + return res; +} + +template +class sort_tester { + typedef typename Algorithm::item_type item_type; + Algorithm a; + tpie::stream_size_type items; + +public: + sort_tester(parameters params) + : a(params) + , items(params.mb * (1024 * 1024 / sizeof(item_type))) + { + } + + void go() { + a.begin(items); + item_type x; + for (tpie::stream_size_type i = 0; i < items; ++i) { + x = (i + 91493)*104729; + a.push(x); + } + a.end(); + x = a.pull(); + for (tpie::stream_size_type i = 1; i < items; ++i) { + item_type y = a.pull(); + if (y < x) { + std::cout << "Not sorted" << std::endl; + return; + } + x = y; + } + } +}; + +class serialization_sorter { +public: + typedef item item_type; + +private: + tpie::serialization_sort > sorter; + +public: + serialization_sorter(parameters params) + : sorter(params.memory) + { + } + + void begin(tpie::stream_size_type /*items*/) { + sorter.begin(); + } + + void push(item i) { + sorter.push(i); + } + + void end() { + sorter.end(); + } + + item pull() { + return sorter.pull(); + } +}; + +class tpie_sorter { +public: + typedef item item_type; + +private: + tpie::file_stream data; + tpie::temp_file f; + +public: + tpie_sorter(parameters /*params*/) { + } + + void begin(tpie::stream_size_type /*items*/) { + data.open(f); + } + + void push(item i) { + data.write(i); + } + + void end() { + tpie::sort(data, data); + data.seek(0); + } + + item pull() { + return data.read(); + } +}; + +int main(int argc, char ** argv) { + tpie::tpie_init(); + parameters params; + params.mb = 20480; + params.times = 5; + params.memory = 200*1024*1024; + std::string type; + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg == "--mb") { + ++i; + params.mb = number(argv[i]); + } else if (arg == "--times") { + ++i; + params.times = number(argv[i]); + } else { + type = arg; + break; + } + } + tpie::get_memory_manager().set_limit(params.memory + + tpie::get_memory_manager().used()); + if (type == "serialization_forward") serialization_forward_speed_tester().go(params); + else if (type == "serialization_backward") serialization_backward_speed_tester().go(params); + else if (type == "stream_forward") stream_speed_tester().go(params); + else if (type == "stream_backward") stream_speed_tester().go(params); + else if (type == "serialization_sort") sort_tester(params).go(); + else if (type == "tpie_sort") sort_tester(params).go(); + else usage(argv); + tpie::tpie_finish(); + return 0; +} diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 311c75f47..719d8433e 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -34,14 +34,16 @@ add_unittest(internal_stack basic memory) add_unittest(internal_vector basic memory) add_unittest(job repeat) add_unittest(memory basic) -add_unittest(merge_sort external_report internal_report one_run_external_report small_final_fanout evacuate_before_merge evacuate_before_report) +add_unittest(merge_sort empty_input internal_report internal_report_after_resize one_run_external_report external_report small_final_fanout evacuate_before_merge evacuate_before_report sort_upper_bound) add_unittest(packed_array basic1 basic2 basic4) add_unittest(parallel_sort basic1 basic2 general equal_elements bad_case) -add_unittest(serialization unsafe safe) +add_unittest(serialization unsafe safe serialization2 stream stream_reopen) +add_unittest(serialization_sort empty_input internal_report internal_report_after_resize one_run_external_report external_report small_final_fanout evacuate_before_merge evacuate_before_report) add_unittest(stats simple) add_unittest(stream basic array odd truncate extend backwards array_file odd_file truncate_file extend_file backwards_file user_data user_data_file) add_unittest(stream_exception basic) add_unittest(pipelining vector filestream fspull fsaltpush merge reverse sort sorttrivial operators uniq memory fork merger_memory fetch_forward virtual_ref virtual virtual_cref_item_type prepare end_time pull_iterator push_iterator parallel parallel_ordered parallel_multiple parallel_own_buffer parallel_push_in_end node_map join) +add_unittest(pipelining_serialization basic reverse sort) add_fulltest(ami_stream stress) add_fulltest(disjoint_set large large_cycle very_large medium ovelflow) diff --git a/test/unit/merge_sort.h b/test/unit/merge_sort.h new file mode 100644 index 000000000..5c8bc8654 --- /dev/null +++ b/test/unit/merge_sort.h @@ -0,0 +1,195 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet cino+=(0 : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#ifndef TPIE_TEST_MERGE_SORT_H +#define TPIE_TEST_MERGE_SORT_H + +struct relative_memory_usage { + inline relative_memory_usage(memory_size_type extraMemory) + : m_startMemory(actual_used()) + , m_threshold(0) + , m_extraMemory(extraMemory) + { + } + + inline memory_size_type used() { + return actual_used() - m_startMemory; + } + + inline void set_threshold(memory_size_type threshold) { + m_threshold = threshold; + get_memory_manager().set_limit(m_startMemory + m_threshold + m_extraMemory); + } + + inline bool below(memory_size_type threshold) { + if (used() > threshold) { + report_overusage(threshold); + return false; + } + return true; + } + + inline bool below() { + return below(m_threshold); + } + + void report_overusage(memory_size_type threshold) { + log_error() << "Memory usage " << used() << " exceeds threshold " << threshold << std::endl; + } + +private: + inline static memory_size_type actual_used() { + return get_memory_manager().used(); + } + + memory_size_type m_startMemory; + memory_size_type m_threshold; + memory_size_type m_extraMemory; +}; + +template +class sort_tester { + typedef typename Traits::test_t test_t; + typedef typename Traits::sorter sorter; + +static bool sort_test(memory_size_type m1, + memory_size_type m2, + memory_size_type m3, + double mb_data, + memory_size_type extraMemory = 0, + bool evacuateBeforeMerge = false, + bool evacuateBeforeReport = false) +{ + m1 *= 1024*1024; + m2 *= 1024*1024; + m3 *= 1024*1024; + extraMemory *= 1024*1024; + stream_size_type items = static_cast(mb_data*1024/sizeof(test_t)*1024); + log_debug() << "sort_test with " << items << " items\n"; + boost::rand48 rng; + relative_memory_usage m(extraMemory); + sorter s; + s.set_available_memory(m1, m2, m3); + + log_debug() << "Begin phase 1" << std::endl; + m.set_threshold(m1); + s.begin(); + if (!m.below()) return false; + for (stream_size_type i = 0; i < items; ++i) { + s.push(rng()); + if (!m.below()) return false; + } + s.end(); + if (!m.below()) return false; + + if (evacuateBeforeMerge) { + s.evacuate(); + log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; + if (!m.below(s.evacuated_memory_usage())) return false; + } + + log_debug() << "Begin phase 2" << std::endl; + m.set_threshold(m2); + if (!m.below()) return false; + Traits::merge_runs(s); + if (!m.below()) return false; + + if (evacuateBeforeReport) { + s.evacuate(); + log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; + if (!m.below(s.evacuated_memory_usage())) return false; + } + + log_debug() << "Begin phase 3" << std::endl; + m.set_threshold(m3); + if (!m.below()) return false; + test_t prev = std::numeric_limits::min(); + memory_size_type itemsRead = 0; + while (s.can_pull()) { + if (!m.below()) return false; + test_t read = s.pull(); + if (!m.below()) return false; + if (read < prev) { + log_error() << "Out of order" << std::endl; + return false; + } + prev = read; + ++itemsRead; + } + if (itemsRead != items) { + log_error() << "Read the wrong number of items. Got " << itemsRead << ", expected " << items << std::endl; + return false; + } + + log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; + if (!m.below(s.evacuated_memory_usage())) return false; + + return true; +} + +static bool empty_input_test() { + return sort_test(100,100,100,0); +} + +static bool internal_report_test() { + return sort_test(100,100,100,40); +} + +static bool internal_report_after_resize_test() { + return sort_test(100,80,30,20, 100); +} + +static bool one_run_external_report_test() { + return sort_test(100,7,7,7.1); +} + +static bool external_report_test() { + return sort_test(20,20,20,50); +} + +static bool small_final_fanout_test(double mb) { + return sort_test(3,12,7,mb); +} + +static bool evacuate_before_merge_test() { + return sort_test(20,20,20,8, 0, true, false); +} + +static bool evacuate_before_report_test() { + return sort_test(20,20,20,50, 0, false, true); +} + +public: + +static tests & add_all(tests & t) { + return t + .test(empty_input_test, "empty_input") + .test(internal_report_test, "internal_report") + .test(internal_report_after_resize_test, "internal_report_after_resize") + .test(one_run_external_report_test, "one_run_external_report") + .test(external_report_test, "external_report") + .test(small_final_fanout_test, "small_final_fanout", "mb", 8.0) + .test(evacuate_before_merge_test, "evacuate_before_merge") + .test(evacuate_before_report_test, "evacuate_before_report") + ; +} + +}; + +#endif // TPIE_TEST_MERGE_SORT_H diff --git a/test/unit/test_merge_sort.cpp b/test/unit/test_merge_sort.cpp index 2f01aa838..6af332304 100644 --- a/test/unit/test_merge_sort.cpp +++ b/test/unit/test_merge_sort.cpp @@ -24,54 +24,25 @@ #include using namespace tpie; -using namespace tpie::pipelining; -typedef uint64_t test_t; +#include "merge_sort.h" -struct relative_memory_usage { - inline relative_memory_usage(memory_size_type extraMemory) - : m_startMemory(actual_used()) - , m_threshold(0) - , m_extraMemory(extraMemory) - { - } - - inline memory_size_type used() { - return actual_used() - m_startMemory; - } - - inline void set_threshold(memory_size_type threshold) { - m_threshold = threshold; - get_memory_manager().set_limit(m_startMemory + m_threshold + m_extraMemory); - } - - inline bool below(memory_size_type threshold) { - if (used() > threshold) { - report_overusage(threshold); - return false; - } - return true; - } - - inline bool below() { - return below(m_threshold); - } +class use_merge_sort { +public: + typedef uint64_t test_t; + typedef merge_sorter sorter; - void report_overusage(memory_size_type threshold) { - log_error() << "Memory usage " << used() << " exceeds threshold " << threshold << std::endl; + static void merge_runs(sorter & s) { + dummy_progress_indicator pi; + s.calc(pi); } - -private: - inline static memory_size_type actual_used() { - return get_memory_manager().used(); - } - - memory_size_type m_startMemory; - memory_size_type m_threshold; - memory_size_type m_extraMemory; }; bool sort_upper_bound_test() { + typedef use_merge_sort Traits; + typedef Traits::sorter sorter; + typedef Traits::test_t test_t; + memory_size_type m1 = 100 *1024*1024; memory_size_type m2 = 20 *1024*1024; memory_size_type m3 = 20 *1024*1024; @@ -83,7 +54,7 @@ bool sort_upper_bound_test() { stream_size_type io = get_bytes_written(); relative_memory_usage m(0); - merge_sorter s; + sorter s; s.set_available_memory(m1, m2, m3); s.set_items(dataUpperBound / sizeof(test_t)); @@ -93,126 +64,16 @@ bool sort_upper_bound_test() { s.push(i); } s.end(); - dummy_progress_indicator pi; - s.calc(pi); + Traits::merge_runs(s); while (s.can_pull()) s.pull(); return io == get_bytes_written(); } -bool sort_test(memory_size_type m1, - memory_size_type m2, - memory_size_type m3, - double mb_data, - memory_size_type extraMemory = 0, - bool evacuateBeforeMerge = false, - bool evacuateBeforeReport = false) -{ - m1 *= 1024*1024; - m2 *= 1024*1024; - m3 *= 1024*1024; - extraMemory *= 1024*1024; - stream_size_type items = static_cast(mb_data*1024/sizeof(test_t)*1024); - log_debug() << "sort_test with " << items << " items\n"; - boost::rand48 rng; - relative_memory_usage m(extraMemory); - merge_sorter s; - s.set_available_memory(m1, m2, m3); - - log_debug() << "Begin phase 1" << std::endl; - m.set_threshold(m1); - s.begin(); - if (!m.below()) return false; - for (stream_size_type i = 0; i < items; ++i) { - s.push(rng()); - if (!m.below()) return false; - } - s.end(); - if (!m.below()) return false; - - if (evacuateBeforeMerge) { - s.evacuate(); - log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; - if (!m.below(s.evacuated_memory_usage())) return false; - } - - log_debug() << "Begin phase 2" << std::endl; - m.set_threshold(m2); - if (!m.below()) return false; - dummy_progress_indicator pi; - s.calc(pi); - if (!m.below()) return false; - - if (evacuateBeforeReport) { - s.evacuate(); - log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; - if (!m.below(s.evacuated_memory_usage())) return false; - } - - log_debug() << "Begin phase 3" << std::endl; - m.set_threshold(m3); - if (!m.below()) return false; - test_t prev = std::numeric_limits::min(); - memory_size_type itemsRead = 0; - while (s.can_pull()) { - if (!m.below()) return false; - test_t read = s.pull(); - if (!m.below()) return false; - if (read < prev) { - log_error() << "Out of order" << std::endl; - return false; - } - prev = read; - ++itemsRead; - } - if (itemsRead != items) { - log_error() << "Read the wrong number of items. Got " << itemsRead << ", expected " << items << std::endl; - return false; - } - - log_debug() << "MEMORY USED: " << m.used() << '/' << s.evacuated_memory_usage() << std::endl; - if (!m.below(s.evacuated_memory_usage())) return false; - - return true; -} - -bool internal_report_test() { - return sort_test(100,100,100,50); -} - -bool internal_report_after_resize_test() { - return sort_test(100,80,80,50, 100); -} - -bool one_run_external_report_test() { - return sort_test(100,7,7,7); -} - -bool external_report_test() { - return sort_test(20,20,20,50); -} - -bool small_final_fanout_test(double mb) { - return sort_test(3,10,7,mb); -} - -bool evacuate_before_merge_test() { - return sort_test(20,20,20,10, 0, true, false); -} - -bool evacuate_before_report_test() { - return sort_test(20,20,20,50, 0, false, true); -} - int main(int argc, char ** argv) { - return tests(argc, argv) - .test(internal_report_test, "internal_report") - .test(internal_report_after_resize_test, "internal_report_after_resize") - .test(one_run_external_report_test, "one_run_external_report") - .test(external_report_test, "external_report") - .test(small_final_fanout_test, "small_final_fanout", "mb", 8.5) - .test(evacuate_before_merge_test, "evacuate_before_merge") - .test(evacuate_before_report_test, "evacuate_before_report") + tests t(argc, argv); + return + sort_tester::add_all(t) .test(sort_upper_bound_test, "sort_upper_bound") ; } diff --git a/test/unit/test_pipelining_serialization.cpp b/test/unit/test_pipelining_serialization.cpp new file mode 100644 index 000000000..a24e28d9d --- /dev/null +++ b/test/unit/test_pipelining_serialization.cpp @@ -0,0 +1,204 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet cino+=(0 : +// Copyright 2013 The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include "common.h" +#include +#include + +using namespace tpie; +using namespace tpie::pipelining; + +void populate_test_data(std::vector & test) { + char c = '!'; + size_t ante = 0; + size_t prev = 1; + size_t size = 0; + for (size_t i = 0; i < 31; ++i) { + size_t cur = ante+prev; + ante = prev; + prev = cur; + + test.push_back(std::string(cur, c++)); + + size += cur; + } +} + +bool basic_test() { + tpie::temp_file f_in; + tpie::temp_file f_out; + std::vector testData; + populate_test_data(testData); + { + serialization_writer wr; + wr.open(f_in.path()); + for (size_t i = 0; i < testData.size(); ++i) { + wr.serialize(testData[i]); + } + wr.close(); + } + { + serialization_reader rd; + rd.open(f_in.path()); + serialization_writer wr; + wr.open(f_out.path()); + pipeline p = serialization_input(rd) | serialization_output(wr); + p.plot(log_info()); + p(); + wr.close(); + } + { + serialization_reader rd; + rd.open(f_out.path()); + for (size_t i = 0; i < testData.size(); ++i) { + if (!rd.can_read()) { + log_error() << "Could not read item" << std::endl; + return false; + } + std::string d; + rd.unserialize(d); + if (d != testData[i]) { + log_error() << "Wrong item read" << std::endl; + return false; + } + } + } + return true; +} + +bool reverse_test() { + tpie::temp_file f_in; + tpie::temp_file f_out; + std::vector testData; + populate_test_data(testData); + { + serialization_writer wr; + wr.open(f_in.path()); + for (size_t i = testData.size(); i--;) { + wr.serialize(testData[i]); + } + wr.close(); + } + { + serialization_reader rd; + rd.open(f_in.path()); + serialization_writer wr; + wr.open(f_out.path()); + pipeline p = serialization_input(rd) | serialization_reverser() | serialization_output(wr); + p.plot(log_info()); + p(); + wr.close(); + } + { + serialization_reader rd; + rd.open(f_out.path()); + for (size_t i = 0; i < testData.size(); ++i) { + if (!rd.can_read()) { + log_error() << "Could not read item" << std::endl; + return false; + } + std::string d; + rd.unserialize(d); + if (d != testData[i]) { + log_error() << "Wrong item read" << std::endl; + return false; + } + } + } + return true; +} + +template +class random_strings_type : public node { + dest_t dest; + stream_size_type n; + +public: + random_strings_type(const dest_t & dest, stream_size_type n) + : dest(dest) + , n(n) + { + add_push_destination(dest); + set_name("Random strings"); + } + + virtual void go() override { + using namespace boost::posix_time; + long seed = boost::posix_time::microsec_clock::local_time().time_of_day().fractional_seconds(); + boost::mt19937 rng(seed); + for (stream_size_type i = 0; i < n; ++i) { + size_t length = rng() % 10; + std::string s(length, '\0'); + for (size_t j = 0; j < length; ++j) s[j] = 'a' + (rng() % 26); + dest.push(s); + } + } +}; + +pipe_begin > +random_strings(stream_size_type n) { + return factory_1(n); +} + +class sort_verifier_type : public node { + bool & res; + std::string prev; + +public: + typedef std::string item_type; + + sort_verifier_type(bool & res) + : res(res) + { + set_name("Sort verifier"); + res = true; + } + + void push(const std::string & s) { + if (s < prev) { + res = false; + log_error() << "Got an out of order string" << std::endl; + } + prev = s; + } +}; + +pipe_end > +sort_verifier(bool & res) { + return termfactory_1(res); +} + +bool sort_test(stream_size_type n) { + bool result = false; + pipeline p = + random_strings(n) + | serialization_pipesort() + | sort_verifier(result) + ; + p(); + return result; +} + +int main(int argc, char ** argv) { + return tpie::tests(argc, argv) + .test(basic_test, "basic") + .test(reverse_test, "reverse") + .test(sort_test, "sort", "n", static_cast(1000)) + ; +} diff --git a/test/unit/test_serialization.cpp b/test/unit/test_serialization.cpp index 55c284c7a..a9ad65829 100644 --- a/test/unit/test_serialization.cpp +++ b/test/unit/test_serialization.cpp @@ -1,6 +1,6 @@ // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- // vi:set ts=4 sts=4 sw=4 noet : -// Copyright 2008, The TPIE development team +// Copyright 2013, The TPIE development team // // This file is part of TPIE. // @@ -19,6 +19,8 @@ #include "common.h" #include +#include +#include #include #include #include @@ -27,8 +29,85 @@ using namespace tpie; using namespace std; +struct write_container { + ostream & o; + write_container(ostream & o): o(o) {} + void write(const char * x, size_t t) { + log_info() << "Write " << t << std::endl; + o.write(x, t); + } +}; + +struct read_container { + istream & o; + read_container(istream & o): o(o) {} + void read(char * x, size_t t) { + log_info() << "Read " << t << std::endl; + o.read(x, t); + } +}; + +struct serializable_dummy { + static const char msg[]; +}; + +const char serializable_dummy::msg[] = "Hello, yes, this is dog!"; + +template +void serialize(D & dst, const serializable_dummy &) { + dst.write(serializable_dummy::msg, sizeof(serializable_dummy::msg)); +} + +template +void unserialize(S & src, serializable_dummy &) { + std::string s(sizeof(serializable_dummy::msg), '\0'); + src.read(&s[0], s.size()); + if (!std::equal(s.begin(), s.end(), serializable_dummy::msg)) { + throw tpie::exception("Did not serialize the dummy"); + } +} + +bool testSer2() { + std::stringstream ss; + std::vector v; + v.push_back(88); + v.push_back(74); + + write_container wc(ss); + serialize(wc, (int)454); + serialize(wc, (float)4.5); + serialize(wc, true); + serialize(wc, v); + serialize(wc, std::string("Abekat")); + serialize(wc, serializable_dummy()); + + int a; + float b; + bool c; + std::vector d; + std::string e; + serializable_dummy f; + + read_container rc(ss); + unserialize(rc, a); + unserialize(rc, b); + unserialize(rc, c); + unserialize(rc, d); + unserialize(rc, e); + unserialize(rc, f); + std::cout << a << " " << b << " " << c << " " << d[0] << " " << d[1] << " " << e << std::endl; + if (a != 454) return false; + if (b != 4.5) return false; + if (c != true) return false; + std::cout << "Here" << std::endl; + if (d != v) return false; + std::cout << "Here" << std::endl; + if (e != "Abekat") return false; + return true; +} + + bool testSer(bool safe) { - boost::filesystem::remove("temp.ser"); std::stringstream ss; std::vector v; v.push_back(88); @@ -71,8 +150,204 @@ bool testSer(bool safe) { bool safe_test() { return testSer(true); } bool unsafe_test() { return testSer(false); } +bool stream_test() { + bool result = true; + + memory_size_type N = 2000; + tpie::array numbers(N); + for (memory_size_type i = 0; i < N; ++i) { + numbers[i] = i; + } + + temp_file f; + { + serialization_writer ss; + ss.open(f.path()); + for (memory_size_type i = 0; i < N; ++i) { + ss.serialize(&numbers[0], &numbers[i]); + } + ss.serialize(serializable_dummy()); + ss.close(); + } + { + serialization_reader ss; + ss.open(f.path()); + for (memory_size_type i = 0; i < N; ++i) { + if (!ss.can_read()) { + log_error() << "Expected can_read()" << std::endl; + result = false; + } + for (memory_size_type j = 0; j < N; ++j) { + numbers[j] = N; + } + ss.unserialize(&numbers[0], &numbers[i]); + for (memory_size_type j = 0; j < N; ++j) { + if (j < i && numbers[j] != j) { + log_error() << "Incorrect deserialization #" << i << " in position " << j << std::endl; + result = false; + } + if (j >= i && numbers[j] != N) { + log_error() << "Deserialization #" << i << " changed an array index " << j << " out of bounds" << std::endl; + result = false; + } + numbers[j] = N; + } + } + if (!ss.can_read()) { + log_error() << "Expected can_read()" << std::endl; + result = false; + } + serializable_dummy d; + ss.unserialize(d); + if (ss.can_read()) { + log_error() << "Expected !can_read()" << std::endl; + result = false; + } + ss.close(); + } + + return result; +} + +bool stream_reopen_test() { + temp_file f; + const int src = 42; + int dest; + { + serialization_writer wr; + wr.open(f); + wr.serialize(src); + wr.close(); + } + { + serialization_reader rd; + rd.open(f); + if (!rd.can_read()) { + log_error() << "Expected can_read()" << std::endl; + return false; + } + rd.unserialize(dest); + if (rd.can_read()) { + log_error() << "Expected !can_read()" << std::endl; + return false; + } + rd.close(); + rd.open(f); + if (!rd.can_read()) { + log_error() << "Expected can_read()" << std::endl; + return false; + } + rd.close(); + } + return true; +} + +bool stream_reverse_test() { + bool result = true; + + memory_size_type N = 2000; + tpie::array numbers(N); + for (memory_size_type i = 0; i < N; ++i) { + numbers[i] = i; + } + + temp_file f; + { + serialization_reverse_writer ss; + ss.open(f.path()); + ss.serialize(serializable_dummy()); + for (memory_size_type i = 0; i < N; ++i) { + ss.serialize(&numbers[0], &numbers[i]); + } + ss.serialize(serializable_dummy()); + ss.serialize(serializable_dummy()); + ss.close(); + } + { + serialization_reverse_reader ss; + serializable_dummy d; + ss.open(f.path()); + ss.unserialize(d); + ss.unserialize(d); + for (memory_size_type i = N; i--;) { + if (!ss.can_read()) { + log_error() << "Expected can_read()" << std::endl; + result = false; + } + for (memory_size_type j = 0; j < N; ++j) { + numbers[j] = N; + } + ss.unserialize(&numbers[0], &numbers[i]); + for (memory_size_type j = 0; j < N; ++j) { + if (j < i && numbers[j] != j) { + log_error() << "Incorrect deserialization #" << i << " in position " << j << std::endl; + log_error() << "Got " << numbers[j] << ", expected " << j << std::endl; + result = false; + } + if (j >= i && numbers[j] != N) { + log_error() << "Deserialization #" << i << " changed an array index " << j << " out of bounds" << std::endl; + result = false; + } + numbers[j] = N; + } + } + if (!ss.can_read()) { + log_error() << "Expected can_read()" << std::endl; + result = false; + } + ss.unserialize(d); + if (ss.can_read()) { + log_error() << "Expected !can_read()" << std::endl; + result = false; + } + ss.close(); + } + + return result; +} + +bool stream_temp_test() { + stream_size_type tmpUsage1, tmpUsage2, tmpUsage3, tmpUsage4; + tmpUsage1 = get_temp_file_usage(); + { + tpie::temp_file f; + { + serialization_writer wr; + wr.open(f); + tmpUsage2 = get_temp_file_usage(); + wr.serialize(1); + wr.serialize(2); + wr.serialize(3); + wr.close(); + } + tmpUsage3 = get_temp_file_usage(); + { + serialization_reader rd; + int i; + rd.open(f); + rd.unserialize(i); + rd.close(); + } + } + tmpUsage4 = get_temp_file_usage(); + log_info() + << "Usage before open: " << tmpUsage1 << '\n' + << "Usage after open: " << tmpUsage2 << '\n' + << "Usage after close: " << tmpUsage3 << '\n' + << "Usage after destruct: " << tmpUsage4 << std::endl; + return tmpUsage1 == tmpUsage2 + && tmpUsage1 == tmpUsage4 + && tmpUsage3 > tmpUsage2; +} + int main(int argc, char ** argv) { return tpie::tests(argc, argv) .test(safe_test, "safe") - .test(unsafe_test, "unsafe"); + .test(unsafe_test, "unsafe") + .test(testSer2, "serialization2") + .test(stream_test, "stream") + .test(stream_reopen_test, "stream_reopen") + .test(stream_reverse_test, "stream_reverse") + .test(stream_temp_test, "stream_temp") + ; } diff --git a/test/unit/test_serialization_sort.cpp b/test/unit/test_serialization_sort.cpp new file mode 100644 index 000000000..14900e3bd --- /dev/null +++ b/test/unit/test_serialization_sort.cpp @@ -0,0 +1,45 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; c-file-style: "stroustrup"; -*- +// vi:set ts=4 sts=4 sw=4 noet cino+=(0 : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include "common.h" +#include +#include +#include +#include + +using namespace tpie; + +#include "merge_sort.h" + +class use_serialization_sort { +public: + typedef uint64_t test_t; + typedef serialization_sort > sorter; + + static void merge_runs(sorter & s) { + s.merge_runs(); + } +}; + +int main(int argc, char ** argv) { + tests t(argc, argv); + return + sort_tester::add_all(t) + ; +} diff --git a/tpie/CMakeLists.txt b/tpie/CMakeLists.txt index 46c4d2e2e..117b068f0 100644 --- a/tpie/CMakeLists.txt +++ b/tpie/CMakeLists.txt @@ -58,6 +58,9 @@ set (HEADERS progress_indicator_terminal.h queue.h serialization.h + serialization2.h + serialization_stream.h + serialization_sort.h sort.h sort_deprecated.h sort_manager.h @@ -102,6 +105,7 @@ set (SOURCES prime.cpp progress_indicator_base.cpp progress_indicator_subindicator.cpp + serialization_stream.cpp tempname.cpp tpie.cpp tpie_log.cpp diff --git a/tpie/is_simple_iterator.h b/tpie/is_simple_iterator.h new file mode 100644 index 000000000..d4f7983c2 --- /dev/null +++ b/tpie/is_simple_iterator.h @@ -0,0 +1,54 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include + +namespace tpie { + +template struct is_simple_iterator_enable_if { }; +template <> struct is_simple_iterator_enable_if {typedef void type;}; + +/** + * \brief Checks if an iterator is simple + * + * An iterator is simple if it is a random access iterator + * and all elements in a range from a to b + * are located from &(*a) to &(*b) + */ +template +class is_simple_iterator { +private: + template + static char magic(typename std::vector::iterator*); + template + static char magic(typename std::vector::const_iterator*); + template + static char magic(typename std::basic_string::iterator*); + template + static char magic(typename std::basic_string::const_iterator*); + template + static char magic(TT *, typename is_simple_iterator_enable_if::type *_=0); + template + static long magic(...); +public: + static bool const value=sizeof(magic((T*)0))==sizeof(char); +}; + +} //namespace tpie diff --git a/tpie/pipelining.h b/tpie/pipelining.h index abfb6fe55..69a5576a6 100644 --- a/tpie/pipelining.h +++ b/tpie/pipelining.h @@ -45,7 +45,9 @@ #include #include #include +#include #include +#include #include #include #include diff --git a/tpie/pipelining/CMakeLists.txt b/tpie/pipelining/CMakeLists.txt index 5b448ee05..340fa2e86 100644 --- a/tpie/pipelining/CMakeLists.txt +++ b/tpie/pipelining/CMakeLists.txt @@ -26,6 +26,7 @@ set (PIPE_HEADERS pipeline.h reverse.h sort.h + serialization_sort.h std_glue.h stdio.h tokens.h diff --git a/tpie/pipelining/merge_sorter.h b/tpie/pipelining/merge_sorter.h index 8ca6d34c8..e39891a70 100644 --- a/tpie/pipelining/merge_sorter.h +++ b/tpie/pipelining/merge_sorter.h @@ -154,7 +154,13 @@ class merge_sorter { tp_assert(m_state == stRunFormation, "Wrong phase"); sort_current_run(); - if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) { + if (m_itemCount == 0) { + tp_assert(m_currentRunItemCount == 0, "m_itemCount == 0, but m_currentRunItemCount != 0"); + m_reportInternal = true; + m_itemsPulled = 0; + m_currentRunItems.resize(0); + log_debug() << "Got no items. Internal reporting mode." << std::endl; + } else if (m_finishedRuns == 0 && m_currentRunItems.size() <= p.internalReportThreshold) { // Our current buffer fits within the memory requirements of phase 2. m_reportInternal = true; m_itemsPulled = 0; diff --git a/tpie/pipelining/serialization.h b/tpie/pipelining/serialization.h new file mode 100644 index 000000000..fe30ecebf --- /dev/null +++ b/tpie/pipelining/serialization.h @@ -0,0 +1,221 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +/////////////////////////////////////////////////////////////////////////////// +/// \file pipelining/serialization.h Serialization stream glue. +/////////////////////////////////////////////////////////////////////////////// + +#ifndef TPIE_PIPELINING_SERIALIZATION_H +#define TPIE_PIPELINING_SERIALIZATION_H + +#include +#include +#include +#include + +namespace tpie { + +namespace pipelining { + +namespace serialization_bits { + +template +class input_t : public node { + dest_t dest; + serialization_reader * rd; + +public: + typedef typename dest_t::item_type item_type; + + input_t(const dest_t & dest, serialization_reader * rd) + : dest(dest) + , rd(rd) + { + set_name("Serialization reader"); + add_push_destination(dest); + set_minimum_memory(rd->memory_usage()); + } + + virtual void begin() override { + set_steps(rd->size()); + } + + virtual void go() override { + item_type x; + stream_size_type bytesRead = 0; + while (rd->can_read()) { + rd->unserialize(x); + dest.push(x); + + stream_size_type bytesRead2 = rd->offset(); + step(bytesRead2 - bytesRead); + bytesRead = bytesRead2; + } + } +}; + +typedef factory_1 input_factory; + + +template +class output_t : public node { + serialization_writer * wr; + +public: + typedef T item_type; + + output_t(serialization_writer * wr) + : wr(wr) + { + set_name("Serialization writer"); + set_minimum_memory(wr->memory_usage()); + } + + void push(const T & x) { + wr->serialize(x); + } +}; + +template +struct output_factory { + typedef termfactory_1, serialization_writer *> type; +}; + +} // namespace serialization_bits + +pipe_begin +inline serialization_input(serialization_reader & rd) { + return serialization_bits::input_factory(&rd); +} + +template +pipe_end::type> +serialization_output(serialization_writer & wr) { + return typename serialization_bits::output_factory::type(&wr); +} + +namespace serialization_bits { + +template class rev_input_t; + +template +class rev_output_t : public node { + friend class rev_input_t >; + + dest_t dest; + tpie::temp_file * m_stack; + + serialization_reverse_reader rd; + +public: + typedef typename dest_t::item_type item_type; + + rev_output_t(const dest_t & dest) + : dest(dest) + , m_stack(0) + { + this->set_name("Serialization reverse reader"); + this->add_push_destination(dest); + } + + virtual void begin() override { + if (m_stack == 0) + throw tpie::exception("No one created my stack"); + + rd.open(m_stack->path()); + this->set_steps(rd.size()); + } + + virtual void go() override { + item_type x; + stream_size_type bytesRead = 0; + while (rd.can_read()) { + rd.unserialize(x); + dest.push(x); + + stream_size_type bytesRead2 = rd.offset(); + step(bytesRead2 - bytesRead); + bytesRead = bytesRead2; + } + } + + virtual void end() override { + delete m_stack; + } +}; + +typedef factory_0 rev_output_factory; + +template +class rev_input_t; + +template +class rev_input_t > : public node { + typedef rev_output_t dest_t; + dest_t dest; + + serialization_reverse_writer wr; + stream_size_type items; + +public: + typedef typename dest_t::item_type item_type; + + rev_input_t(const dest_t & dest) + : dest(dest) + , wr() + , items(0) + { + this->set_name("Serialization reverse writer"); + this->dest.add_dependency(*this); + } + + virtual void begin() override { + dest.m_stack = new tpie::temp_file(); + wr.open(dest.m_stack->path()); + } + + void push(const item_type & x) { + wr.serialize(x); + ++items; + } + + virtual void end() override { + wr.close(); + this->forward("items", items); + } +}; + +typedef factory_0 rev_input_factory; + +typedef bits::pair_factory reverse_factory; + +} // namespace serialization_bits + +pipe_middle +inline serialization_reverser() { + serialization_bits::rev_input_factory i; + serialization_bits::rev_output_factory o; + return serialization_bits::reverse_factory(i, o); +} + +} // namespace pipelining + +} // namespace tpie + +#endif // TPIE_PIPELINING_SERIALIZATION_H diff --git a/tpie/pipelining/serialization_sort.h b/tpie/pipelining/serialization_sort.h new file mode 100644 index 000000000..759910f57 --- /dev/null +++ b/tpie/pipelining/serialization_sort.h @@ -0,0 +1,536 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#ifndef TPIE_PIPELINING_SERIALIZATION_SORT_H +#define TPIE_PIPELINING_SERIALIZATION_SORT_H + +#include +#include +#include +#include +#include + +namespace tpie { + +namespace pipelining { + +namespace serialization_bits { + +template +class sorter_traits { +public: + typedef T item_type; + typedef pred_t pred_type; + typedef serialization_sort sorter_t; + typedef boost::shared_ptr sorterptr; +}; + +template +class sort_calc_t; + +template +class sort_input_t; + +template +class sort_output_base : public node { + typedef typename Traits::pred_type pred_type; +public: + /** Type of items sorted. */ + typedef typename Traits::item_type item_type; + /** Type of the merge sort implementation used. */ + typedef typename Traits::sorter_t sorter_t; + /** Smart pointer to sorter_t. */ + typedef typename Traits::sorterptr sorterptr; + + sorterptr get_sorter() const { + return m_sorter; + } + + void set_calc_node(node & calc) { + add_dependency(calc); + } + +protected: + sort_output_base(pred_type pred) + : m_sorter(new sorter_t(sizeof(item_type), pred)) + { + } + + sorterptr m_sorter; +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipe sorter pull output node. +/////////////////////////////////////////////////////////////////////////////// +template +class sort_pull_output_t : public sort_output_base { +public: + typedef typename Traits::item_type item_type; + typedef typename Traits::pred_type pred_type; + typedef typename Traits::sorter_t sorter_t; + typedef typename Traits::sorterptr sorterptr; + + sort_pull_output_t(pred_type pred) + : sort_output_base(pred) + { + this->set_minimum_memory(sorter_t::minimum_memory_phase_3()); + this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT); + this->set_memory_fraction(1.0); + } + + virtual void begin() override { + node::begin(); + this->set_steps(this->m_sorter->item_count()); + this->forward("items", static_cast(this->m_sorter->item_count())); + } + + inline bool can_pull() const { + return this->m_sorter->can_pull(); + } + + inline item_type pull() { + this->step(); + return this->m_sorter->pull(); + } + + // Despite this go() implementation, a sort_pull_output_t CANNOT be used as + // an initiator node. Normally, it is a type error to have a phase without + // an initiator, but with a passive_sorter you can circumvent this + // mechanism. Thus we customize the error message printed (but throw the + // same type of exception.) + virtual void go() override { + log_warning() << "Passive sorter used without an initiator in the final merge and output phase.\n" + << "Define an initiator and pair it up with the pipe from passive_sorter::output()." << std::endl; + throw not_initiator_node(); + } + +protected: + virtual void set_available_memory(memory_size_type availableMemory) override { + node::set_available_memory(availableMemory); + this->m_sorter->set_phase_3_memory(availableMemory); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipe sorter push output node. +/////////////////////////////////////////////////////////////////////////////// +template +class sort_output_t : public sort_output_base { + typedef typename Traits::pred_type pred_type; +public: + typedef typename Traits::item_type item_type; + typedef sort_output_base p_t; + typedef typename Traits::sorter_t sorter_t; + typedef typename Traits::sorterptr sorterptr; + + sort_output_t(const dest_t & dest, pred_type pred) + : p_t(pred) + , dest(dest) + { + this->add_push_destination(dest); + this->set_minimum_memory(sorter_t::minimum_memory_phase_3()); + this->set_name("Write sorted output", PRIORITY_INSIGNIFICANT); + this->set_memory_fraction(1.0); + } + + virtual void begin() override { + node::begin(); + this->set_steps(this->m_sorter->item_count()); + this->forward("items", static_cast(this->m_sorter->item_count())); + } + + virtual void go() override { + while (this->m_sorter->can_pull()) { + dest.push(this->m_sorter->pull()); + this->step(); + } + } + +protected: + virtual void set_available_memory(memory_size_type availableMemory) override { + node::set_available_memory(availableMemory); + this->m_sorter->set_phase_3_memory(availableMemory); + } + +private: + dest_t dest; +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipe sorter middle node. +/// \tparam T The type of items sorted +/// \tparam pred_t The less-than predicate +/////////////////////////////////////////////////////////////////////////////// +template +class sort_calc_t : public node { +public: + typedef typename Traits::item_type item_type; + typedef typename Traits::sorter_t sorter_t; + typedef typename Traits::sorterptr sorterptr; + + typedef sort_output_base Output; + + sort_calc_t(const sort_calc_t & other) + : node(other) + , m_sorter(other.m_sorter) + , dest(other.dest) + { + } + + template + sort_calc_t(dest_t dest) + : dest(new dest_t(dest)) + { + m_sorter = this->dest->get_sorter(); + this->dest->set_calc_node(*this); + init(); + } + + sort_calc_t(sorterptr sorter) + : m_sorter(sorter) + { + init(); + } + + void init() { + set_minimum_memory(sorter_t::minimum_memory_phase_2()); + set_name("Perform merge heap", PRIORITY_SIGNIFICANT); + set_memory_fraction(1.0); + } + + virtual void begin() override { + node::begin(); + set_steps(1000); + } + + virtual void go() override { + progress_indicator_base * pi = proxy_progress_indicator(); + log_debug() << "TODO: Progress information during merging." << std::endl; + m_sorter->merge_runs(); + pi->init(1); + pi->step(); + pi->done(); + } + + virtual bool can_evacuate() override { + return true; + } + + virtual void evacuate() override { + m_sorter->evacuate(); + } + + sorterptr get_sorter() const { + return m_sorter; + } + + void set_input_node(node & input) { + add_dependency(input); + } + +protected: + virtual void set_available_memory(memory_size_type availableMemory) override { + node::set_available_memory(availableMemory); + m_sorter->set_phase_2_memory(availableMemory); + } + +private: + sorterptr m_sorter; + boost::shared_ptr dest; +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipe sorter input node. +/// \tparam T The type of items sorted +/// \tparam pred_t The less-than predicate +/////////////////////////////////////////////////////////////////////////////// +template +class sort_input_t : public node { + typedef typename Traits::pred_type pred_type; +public: + typedef typename Traits::item_type item_type; + typedef typename Traits::sorter_t sorter_t; + typedef typename Traits::sorterptr sorterptr; + + sort_input_t(sort_calc_t dest) + : m_sorter(dest.get_sorter()) + , dest(dest) + { + this->dest.set_input_node(*this); + set_minimum_memory(sorter_t::minimum_memory_phase_1()); + set_name("Form input runs", PRIORITY_SIGNIFICANT); + set_memory_fraction(1.0); + } + + virtual void begin() override { + node::begin(); + m_sorter->begin(); + } + + void push(const item_type & item) { + m_sorter->push(item); + } + + virtual void end() override { + node::end(); + m_sorter->end(); + } + + virtual bool can_evacuate() override { + return true; + } + + virtual void evacuate() override { + m_sorter->evacuate(); + } + +protected: + virtual void set_available_memory(memory_size_type availableMemory) override { + node::set_available_memory(availableMemory); + m_sorter->set_phase_1_memory(availableMemory); + } + +private: + sorterptr m_sorter; + sort_calc_t dest; +}; + +template +class sort_factory_base : public factory_base { + const child_t & self() const { return *static_cast(this); } +public: + template + struct generated { + private: + /** Type of items sorted. */ + typedef typename dest_t::item_type item_type; + public: + typedef typename child_t::template predicate::type pred_type; + typedef sorter_traits Traits; + typedef sort_input_t type; + }; + + template + typename generated::type construct(const dest_t & dest) const { + typedef typename dest_t::item_type item_type; + typedef typename generated::pred_type pred_type; + typedef typename generated::Traits Traits; + + sort_output_t output(dest, self().template get_pred()); + this->init_sub_node(output); + sort_calc_t calc(output); + this->init_sub_node(calc); + sort_input_t input(calc); + this->init_sub_node(input); + + return input; + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Sort factory using std::less as comparator. +/////////////////////////////////////////////////////////////////////////////// +class default_pred_sort_factory : public sort_factory_base { +public: + template + class predicate { + public: + typedef std::less type; + }; + + template + std::less get_pred() const { + return std::less(); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Sort factory using the given predicate as comparator. +/////////////////////////////////////////////////////////////////////////////// +template +class sort_factory : public sort_factory_base > { +public: + template + class predicate { + public: + typedef pred_t type; + }; + + sort_factory(const pred_t & p) + : pred(p) + { + } + + template + pred_t get_pred() const { + return pred; + } + +private: + pred_t pred; +}; + +} // namespace serialization_bits + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipelining sorter using std::less. +/////////////////////////////////////////////////////////////////////////////// +inline pipe_middle +serialization_pipesort() { + typedef serialization_bits::default_pred_sort_factory fact; + return pipe_middle(fact()).name("Sort"); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipelining sorter using the given predicate. +/////////////////////////////////////////////////////////////////////////////// +template +pipe_middle > +serialization_pipesort(const pred_t & p) { + typedef serialization_bits::sort_factory fact; + return pipe_middle(fact(p)).name("Sort"); +} + +template > +class serialization_passive_sorter; + +namespace serialization_bits { + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Factory for the passive sorter input node. +/////////////////////////////////////////////////////////////////////////////// +template +class passive_sorter_factory : public factory_base { +public: + typedef sort_pull_output_t output_t; + typedef sort_calc_t calc_t; + typedef sort_input_t input_t; + typedef input_t generated_type; + typedef typename Traits::sorter_t sorter_t; + typedef typename Traits::sorterptr sorterptr; + + passive_sorter_factory(output_t & output) + : output(&output) + { + } + + generated_type construct() const { + calc_t calc(output->get_sorter()); + output->set_calc_node(calc); + this->init_node(calc); + input_t input(calc); + this->init_node(input); + return input; + } + +private: + output_t * output; +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Factory for the passive sorter output node. +/////////////////////////////////////////////////////////////////////////////// +template +class passive_sorter_factory_2 : public factory_base { +public: + typedef sort_pull_output_t output_t; + typedef output_t generated_type; + + passive_sorter_factory_2(const passive_sorter & sorter) + : m_sorter(sorter) + { + } + + inline generated_type construct() const; + +private: + const passive_sorter & m_sorter; +}; + +} // namespace serialization_bits + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Pipelined sorter with push input and pull output. +/// Get the input pipe with \c input() and the output pullpipe with \c output(). +/// input() must not be called after output(). +/// \tparam T The type of item to sort +/// \tparam pred_t The predicate (e.g. std::less) indicating the predicate +/// on which to order an item before another. +/////////////////////////////////////////////////////////////////////////////// +template +class serialization_passive_sorter { + typedef serialization_bits::sorter_traits Traits; +public: + /** Type of items sorted. */ + typedef T item_type; + /** Type of the merge sort implementation used. */ + typedef typename Traits::sorter_t sorter_t; + /** Smart pointer to sorter_t. */ + typedef typename Traits::sorterptr sorterptr; + /** Type of pipe sorter output. */ + typedef serialization_bits::sort_pull_output_t output_t; + + serialization_passive_sorter(pred_t pred = pred_t()) + : m_sorter(new sorter_t()) + , pred(pred) + , m_output(pred) + { + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Get the input push node. + /////////////////////////////////////////////////////////////////////////// + pipe_end > input() { + return serialization_bits::passive_sorter_factory(m_output); + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Get the output pull node. + /////////////////////////////////////////////////////////////////////////// + pullpipe_begin > output() { + return serialization_bits::passive_sorter_factory_2(*this); + } + +private: + sorterptr m_sorter; + pred_t pred; + output_t m_output; + serialization_passive_sorter(const serialization_passive_sorter &); + serialization_passive_sorter & operator=(const serialization_passive_sorter &); + + friend class serialization_bits::passive_sorter_factory_2; +}; + +namespace serialization_bits { + +template +typename passive_sorter_factory_2::generated_type +passive_sorter_factory_2::construct() const { + generated_type res = m_sorter.m_output; + init_node(res); + return res; +} + +} // namespace serialization_bits + +} // namespace pipelining + +} // namespace tpie + +#endif // TPIE_PIPELINING_SERIALIZATION_SORT_H diff --git a/tpie/serialization2.h b/tpie/serialization2.h new file mode 100644 index 000000000..e9de202ac --- /dev/null +++ b/tpie/serialization2.h @@ -0,0 +1,268 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2010, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#ifndef TPIE_SERIALIZATION2_H +#define TPIE_SERIALIZATION2_H + +/////////////////////////////////////////////////////////////////////////////// +/// \file tpie/serialization2.h Binary serialization and unserialization. +/// +/// This serialization framework is based on generic writers that have a write +/// method accepting a source buffer and a byte size as parameters, and generic +/// readers that have a read method accepting a destination buffer and a byte +/// size as parameters. +/////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include +#include +#include +#include + +namespace tpie { + +#ifdef DOXYGEN + +/////////////////////////////////////////////////////////////////////////////// +// The following two declarations are for documentation purposes only. +/////////////////////////////////////////////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Sample tpie::serialize prototype. +/// +/// To enable serialization of your own type, overload tpie::serialize. +/// This docstring is an example for a type named foo, but it is for exposition +/// purposes only. +/// +/// The implementation of tpie::serialize(dst, v) shall call dst.write(src, n) +/// a number of times. Each time, src is a const pointer to a byte buffer of +/// size n (bytes) that represents a piece of the serialized object. +/// +/// A common idiom for polymorphic and/or variable-sized objects is to first +/// serialize a constant-size tag or length and then serialize the variably +/// sized payload. For this purpose, you may want to use +/// tpie::serialize(dst, a, b) to serialize all elements in the range [a, b). +/////////////////////////////////////////////////////////////////////////////// +template +void serialize(D & dst, const foo & v); + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Sample tpie::unserialize prototype. +/// +/// To enable unserialization of your own type, overload tpie::unserialize. +/// This docstring is an example for a type named foo, but it is for exposition +/// purposes only. +/// +/// The implementation of tpie::unserialize(src, v) shall call src.read(dst, n) +/// a number of times. Each time, src is a pointer to a byte buffer that can +/// hold at least n bytes, where n is the number of bytes to be read. +/// +/// See also tpie::serialize. +/////////////////////////////////////////////////////////////////////////////// +template +void unserialize(S & src, foo & v); + +#endif // DOXYGEN +/////////////////////////////////////////////////////////////////////////////// +// Library implementations of tpie::serialize and tpie::unserialize. +/////////////////////////////////////////////////////////////////////////////// + +template struct is_trivially_serializable_enable_if { }; +template <> struct is_trivially_serializable_enable_if {typedef void type;}; + +template +struct is_trivially_serializable { +private: + template + static char magic(TT *, typename is_simple_iterator_enable_if::type *_=0); + + template + static long magic(...); +public: + static bool const value= + boost::is_pod::value || sizeof(magic((T*)0))==sizeof(char); +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::serialize for POD/array types. +/////////////////////////////////////////////////////////////////////////////// +template +void serialize(D & dst, const T & v, + typename boost::enable_if >::type * = 0, + typename boost::disable_if >::type * = 0) { + dst.write((const char *)&v, sizeof(T)); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::unserialize for POD/array types. +/////////////////////////////////////////////////////////////////////////////// +template +void unserialize(S & src, T & v, + typename boost::enable_if >::type * = 0, + typename boost::disable_if >::type * = 0) { + src.read((char *)&v, sizeof(T)); +} + +namespace bits { + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Helper to facilitate fast serialization of trivially copyable +/// arrays. +/////////////////////////////////////////////////////////////////////////////// +template ::value, + bool is_pod=boost::is_pod::value_type>::value, + bool is_pointer=boost::is_pointer::value_type>::value> +struct array_encode_magic { + void operator()(D & dst, T start, T end) { + using tpie::serialize; + for (T i=start; i != end; ++i) serialize(dst, *i); + } +}; + +template +struct array_encode_magic { + void operator()(D & d, T start, T end) { + const char * from = reinterpret_cast(&*start); + const char * to = reinterpret_cast(&*end); + d.write(from, to-from); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Helper to facilitate fast unserialization of trivially copyable +/// arrays. +/////////////////////////////////////////////////////////////////////////////// +template ::value, + bool is_pod=boost::is_pod::value_type>::value, + bool is_pointer=boost::is_pointer::value_type>::value> +struct array_decode_magic { + void operator()(D & dst, T start, T end) { + using tpie::unserialize; + for (T i=start; i != end; ++i) unserialize(dst, *i); + } +}; + +template +struct array_decode_magic { + void operator()(D & d, T start, T end) { + char * from = reinterpret_cast(&*start); + char * to = reinterpret_cast(&*end); + d.read(from, to-from); + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Helper to count the serialized size of objects. +/////////////////////////////////////////////////////////////////////////////// +struct counter { + size_t size; + counter(): size(0) {} + void write(const void *, size_t s) {size += s;} +}; + +} // namespace bits + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Serialize an array of serializables. +/// +/// This uses direct memory copying for POD typed arrays, and tpie::serialize +/// for proper objects. +/////////////////////////////////////////////////////////////////////////////// +template +void serialize(D & dst, T start, T end) { + bits::array_encode_magic magic; + magic(dst, start, end); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Unserialize an array of serializables. +/// +/// This uses direct memory copying for POD typed arrays, and tpie::unserialize +/// for proper objects. +/////////////////////////////////////////////////////////////////////////////// +template +void unserialize(D & dst, T start, T end) { + bits::array_decode_magic magic; + magic(dst, start, end); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::serialize for std::vectors of serializable items. +/////////////////////////////////////////////////////////////////////////////// +template +void serialize(D & dst, const std::vector & v) { + using tpie::serialize; + serialize(dst, v.size()); + serialize(dst, v.begin(), v.end()); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::unserialize for std::vectors of unserializable items. +/////////////////////////////////////////////////////////////////////////////// +template +void unserialize(S & src, std::vector & v) { + typename std::vector::size_type s; + using tpie::unserialize; + unserialize(src, s); + v.resize(s); + unserialize(src, v.begin(), v.end()); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::serialize for std::basic_strings of serializable items, +/// including std::strings. +/////////////////////////////////////////////////////////////////////////////// +template +void serialize(D & dst, const std::basic_string & v) { + using tpie::serialize; + serialize(dst, v.size()); + serialize(dst, v.begin(), v.end()); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief tpie::unserialize for std::basic_strings of unserializable items, +/// including std::strings. +/////////////////////////////////////////////////////////////////////////////// +template +void unserialize(S & src, std::basic_string & v) { + typename std::basic_string::size_type s; + using tpie::unserialize; + unserialize(src, s); + v.resize(s); + unserialize(src, v.begin(), v.end()); +} + +/////////////////////////////////////////////////////////////////////////////// +/// \brief Given a serializable, serialize it and measure its serialized size. +/////////////////////////////////////////////////////////////////////////////// +template +size_t serialized_size(const T & v) { + using tpie::serialize; + bits::counter c; + serialize(c, v); + return c.size; +} + +} // namespace tpie + +#endif // TPIE_SERIALIZATION2_H diff --git a/tpie/serialization_sort.h b/tpie/serialization_sort.h new file mode 100644 index 000000000..f6b6bbba6 --- /dev/null +++ b/tpie/serialization_sort.h @@ -0,0 +1,890 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#ifndef TPIE_SERIALIZATION_SORT_H +#define TPIE_SERIALIZATION_SORT_H + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace tpie { + +namespace serialization_bits { + +struct sort_parameters { + /** Memory available while forming sorted runs. */ + memory_size_type memoryPhase1; + /** Memory available while merging runs. */ + memory_size_type memoryPhase2; + /** Memory available during output phase. */ + memory_size_type memoryPhase3; + /** Minimum size of serialized items. */ + memory_size_type minimumItemSize; + /** Directory in which temporary files are stored. */ + std::string tempDir; + + void dump(std::ostream & out) const { + out << "Serialization merge sort parameters\n" + << "Phase 1 memory: " << memoryPhase1 << '\n' + << "Phase 2 memory: " << memoryPhase2 << '\n' + << "Phase 3 memory: " << memoryPhase3 << '\n' + << "Minimum item size: " << minimumItemSize << '\n' + << "Temporary directory: " << tempDir << '\n'; + } +}; + +template +class internal_sort { + array m_buffer; + memory_size_type m_items; + memory_size_type m_serializedSize; + memory_size_type m_memAvail; + + memory_size_type m_largestItem; + + pred_t m_pred; + + bool m_full; + +public: + internal_sort(pred_t pred = pred_t()) + : m_items(0) + , m_serializedSize(0) + , m_largestItem(sizeof(T)) + , m_pred(pred) + , m_full(false) + { + } + + void begin(memory_size_type memAvail) { + m_buffer.resize(memAvail / sizeof(T) / 2); + m_items = m_serializedSize = 0; + m_largestItem = sizeof(T); + m_full = false; + m_memAvail = memAvail; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief True if all items up to and including this one fits in buffer. + /// + /// Once push() returns false, it will keep returning false until + /// the sequence is sorted, read out, and the buffer has been cleared. + /////////////////////////////////////////////////////////////////////////// + bool push(const T & item) { + if (m_full) return false; + + if (m_items == m_buffer.size()) { + m_full = true; + return false; + } + + memory_size_type serSize = serialized_size(item); + + if (serSize > sizeof(T)) { + // amount of memory this item needs for its extra stuff (stuff not in the buffer). + memory_size_type serializedExtra = serSize - sizeof(T); + + // amount of memory not used for the buffer and not used for extra stuff already. + memory_size_type memRemainingExtra = m_memAvail - memory_usage(); + + if (serializedExtra > memRemainingExtra) { + m_full = true; + return false; + } + + if (serSize > m_largestItem) + m_largestItem = serSize; + } + + m_serializedSize += serSize; + + m_buffer[m_items++] = item; + + return true; + } + + memory_size_type get_largest_item_size() { + return m_largestItem; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Get the serialized size of the items written. + /// + /// This is exactly the size the current run will use when serialized to + /// disk. + /////////////////////////////////////////////////////////////////////////// + memory_size_type current_serialized_size() { + return m_serializedSize; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Compute current memory usage. + /// + /// This includes the item buffer array as well as the extra serialized + /// size of the items already written to the buffer. + /// This assumes that items use as much primary memory as their serialized + /// size. If this assumption does not hold, the memory usage reported may + /// be useless. Nevertheless, this is the memory usage we use in our + /// calculations. + /////////////////////////////////////////////////////////////////////////// + memory_size_type memory_usage() { + return m_buffer.size() * sizeof(T) + + (m_serializedSize - m_items * sizeof(T)); + } + + bool can_shrink_buffer() { + return current_serialized_size() <= get_memory_manager().available(); + } + + void shrink_buffer() { + array newBuffer(array_view(begin(), end())); + m_buffer.swap(newBuffer); + } + + void sort() { + parallel_sort(m_buffer.get(), m_buffer.get() + m_items, m_pred); + } + + const T * begin() const { + return m_buffer.get(); + } + + const T * end() const { + return m_buffer.get() + m_items; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Deallocate buffer and call reset(). + /////////////////////////////////////////////////////////////////////////// + void free() { + m_buffer.resize(0); + reset(); + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Reset sorter, but keep the remembered largest item size and + /// buffer size. + /////////////////////////////////////////////////////////////////////////// + void reset() { + m_items = m_serializedSize = 0; + m_full = false; + } +}; + +/////////////////////////////////////////////////////////////////////////////// +/// \brief File handling for merge sort. +/// +/// This class abstracts away the details of numbering run files; tracking the +/// number of runs in each merge level; informing the TPIE stats framework of +/// the temporary size; deleting run files after use. +/// +/// The important part of the state is the tuple consisting of +/// (a, b, c) := (fileOffset, nextLevelFileOffset, nextFileOffset). +/// `a` is the first file in the level currently being merged; +/// `b` is the first file in the level being merged into; +/// `c` is the next file to write output to. +/// +/// ## Transition system +/// +/// We let remainingRuns := b - a, and nextLevelRuns := c - b. +/// +/// The tuple (remainingRuns, nextLevelRuns) has the following transitions: +/// On open_new_writer(): (x, y) -> (x, 1+y), +/// On open_readers(fanout): (fanout+x, y) -> (fanout+x, y), +/// On open_readers(fanout): (0, fanout+y) -> (fanout+y, 0), +/// On close_readers_and_delete(): (fanout+x, y) -> (x, y). +/// +/// ## Merge sorter usage +/// +/// During run formation (the first phase of merge sort), we repeatedly call +/// open_new_writer() and close_writer() to write out runs to the disk. +/// +/// After run formation, we call open_readers(fanout) to advance into the first +/// level of the merge heap (so one can think of run formation as a "zeroth +/// level" in the merge heap). +/// +/// As a slight optimization, when remaining_runs() == 1, one may call +/// move_last_reader_to_next_level() to move the remaining run into the next +/// merge level without scanning through and copying the single remaining run. +/// +/// See serialization_sort::merge_runs() for the logic involving +/// next_level_runs() and remaining_runs() in a loop. +/////////////////////////////////////////////////////////////////////////////// +template +class file_handler { + // Physical index of the run file with logical index 0. + size_t m_fileOffset; + // Physical index of the run file that begins the next run. + size_t m_nextLevelFileOffset; + // Physical index of the next run file to write + size_t m_nextFileOffset; + + bool m_writerOpen; + size_t m_readersOpen; + + serialization_writer m_writer; + stream_size_type m_currentWriterByteSize; + + array m_readers; + + std::string m_tempDir; + + std::string run_file(size_t physicalIndex) { + if (m_tempDir.size() == 0) throw exception("run_file: temp dir is the empty string"); + std::stringstream ss; + ss << m_tempDir << '/' << physicalIndex << ".tpie"; + return ss.str(); + } + +public: + file_handler() + : m_fileOffset(0) + , m_nextLevelFileOffset(0) + , m_nextFileOffset(0) + + , m_writerOpen(false) + , m_readersOpen(0) + + , m_writer() + , m_currentWriterByteSize(0) + { + } + + ~file_handler() { + reset(); + } + + void set_temp_dir(const std::string & tempDir) { + if (m_nextFileOffset != 0) + throw exception("set_temp_dir: trying to change path after files already open"); + m_tempDir = tempDir; + } + + void open_new_writer() { + if (m_writerOpen) throw exception("open_new_writer: Writer already open"); + m_writer.open(run_file(m_nextFileOffset++)); + m_currentWriterByteSize = m_writer.file_size(); + m_writerOpen = true; + } + + void write(const T & v) { + if (!m_writerOpen) throw exception("write: No writer open"); + m_writer.serialize(v); + } + + void close_writer() { + if (!m_writerOpen) throw exception("close_writer: No writer open"); + m_writer.close(); + stream_size_type sz = m_writer.file_size(); + increase_usage(m_nextFileOffset-1, static_cast(sz)); + m_writerOpen = false; + } + + size_t remaining_runs() { + return m_nextLevelFileOffset - m_fileOffset; + } + + size_t next_level_runs() { + return m_nextFileOffset - m_nextLevelFileOffset; + } + + bool readers_open() { + return m_readersOpen > 0; + } + + void open_readers(size_t fanout) { + if (m_readersOpen != 0) throw exception("open_readers: readers already open"); + if (fanout == 0) throw exception("open_readers: fanout == 0"); + if (remaining_runs() == 0) { + if (m_writerOpen) throw exception("Writer open while moving to next merge level"); + m_nextLevelFileOffset = m_nextFileOffset; + } + if (fanout > remaining_runs()) throw exception("open_readers: fanout out of bounds"); + + if (m_readers.size() < fanout) m_readers.resize(fanout); + for (size_t i = 0; i < fanout; ++i) { + m_readers[i].open(run_file(m_fileOffset + i)); + } + m_readersOpen = fanout; + } + + bool can_read(size_t idx) { + if (m_readersOpen == 0) throw exception("can_read: no readers open"); + if (m_readersOpen < idx) throw exception("can_read: index out of bounds"); + return m_readers[idx].can_read(); + } + + T read(size_t idx) { + if (m_readersOpen == 0) throw exception("read: no readers open"); + if (m_readersOpen < idx) throw exception("read: index out of bounds"); + T res; + m_readers[idx].unserialize(res); + return res; + } + + void close_readers_and_delete() { + if (m_readersOpen == 0) throw exception("close_readers_and_delete: no readers open"); + + for (size_t i = 0; i < m_readersOpen; ++i) { + decrease_usage(m_fileOffset + i, m_readers[i].file_size()); + m_readers[i].close(); + boost::filesystem::remove(run_file(m_fileOffset + i)); + } + m_fileOffset += m_readersOpen; + m_readersOpen = 0; + } + + void move_last_reader_to_next_level() { + if (remaining_runs() != 1) + throw exception("move_last_reader_to_next_level: remaining_runs != 1"); + m_nextLevelFileOffset = m_fileOffset; + } + + void reset() { + if (m_readersOpen > 0) { + log_debug() << "reset: Close readers" << std::endl; + close_readers_and_delete(); + } + m_readers.resize(0); + if (m_writerOpen) { + log_debug() << "reset: Close writer" << std::endl; + close_writer(); + } + log_debug() << "Remove " << m_fileOffset << " through " << m_nextFileOffset << std::endl; + for (size_t i = m_fileOffset; i < m_nextFileOffset; ++i) { + std::string runFile = run_file(i); + serialization_reader rd; + rd.open(runFile); + decrease_usage(i, rd.file_size()); + rd.close(); + boost::filesystem::remove(runFile); + } + m_fileOffset = m_nextLevelFileOffset = m_nextFileOffset = 0; + } + +private: + void increase_usage(size_t idx, stream_size_type sz) { + log_debug() << "+ " << idx << ' ' << sz << std::endl; + increment_temp_file_usage(static_cast(sz)); + } + + void decrease_usage(size_t idx, stream_size_type sz) { + log_debug() << "- " << idx << ' ' << sz << std::endl; + increment_temp_file_usage(-static_cast(sz)); + } +}; + +template +class merger { + class mergepred_t { + pred_t m_pred; + + public: + typedef std::pair item_type; + + mergepred_t(const pred_t & pred) : m_pred(pred) {} + + // Used with std::priority_queue, so invert the original relation. + bool operator()(const item_type & a, const item_type & b) const { + return m_pred(b.first, a.first); + } + }; + + typedef typename mergepred_t::item_type item_type; + + file_handler & files; + pred_t pred; + std::vector rd; + typedef std::priority_queue, mergepred_t> priority_queue_type; + priority_queue_type pq; + +public: + merger(file_handler & files, const pred_t & pred) + : files(files) + , pred(pred) + , pq(mergepred_t(pred)) + { + } + + // Assume files.open_readers(fanout) has just been called + void init(size_t fanout) { + rd.resize(fanout); + for (size_t i = 0; i < fanout; ++i) + push_from(i); + } + + bool empty() const { + return pq.empty(); + } + + const T & top() const { + return pq.top().first; + } + + void pop() { + size_t idx = pq.top().second; + pq.pop(); + push_from(idx); + } + + // files.close_readers_and_delete() should be called after this + void free() { + { + priority_queue_type tmp(pred); + std::swap(pq, tmp); + } + rd.resize(0); + } + +private: + void push_from(size_t idx) { + if (files.can_read(idx)) { + pq.push(std::make_pair(files.read(idx), idx)); + } + } +}; + +} // namespace serialization_bits + +template > +class serialization_sort { +public: + typedef boost::shared_ptr ptr; + +private: + enum sorter_state { state_initial, state_1, state_2, state_3 }; + + sorter_state m_state; + serialization_bits::internal_sort m_sorter; + serialization_bits::sort_parameters m_params; + bool m_parametersSet; + serialization_bits::file_handler m_files; + serialization_bits::merger m_merger; + + stream_size_type m_items; + bool m_reportInternal; + const T * m_nextInternalItem; + +public: + serialization_sort(memory_size_type minimumItemSize = sizeof(T), pred_t pred = pred_t()) + : m_state(state_initial) + , m_sorter(pred) + , m_parametersSet(false) + , m_files() + , m_merger(m_files, pred) + , m_items(0) + , m_reportInternal(false) + , m_nextInternalItem(0) + { + m_params.memoryPhase1 = 0; + m_params.memoryPhase2 = 0; + m_params.memoryPhase3 = 0; + m_params.minimumItemSize = minimumItemSize; + } + +private: + // set_phase_?_memory helper + inline void maybe_calculate_parameters() { + if (m_state != state_initial) + throw tpie::exception("Bad state in maybe_calculate_parameters"); + if (m_params.memoryPhase1 > 0 && + m_params.memoryPhase2 > 0 && + m_params.memoryPhase3 > 0) + + calculate_parameters(); + } + +public: + void set_phase_1_memory(memory_size_type m1) { + m_params.memoryPhase1 = m1; + maybe_calculate_parameters(); + } + + void set_phase_2_memory(memory_size_type m2) { + m_params.memoryPhase2 = m2; + maybe_calculate_parameters(); + } + + void set_phase_3_memory(memory_size_type m3) { + m_params.memoryPhase3 = m3; + maybe_calculate_parameters(); + } + + void set_available_memory(memory_size_type m) { + set_phase_1_memory(m); + set_phase_2_memory(m); + set_phase_3_memory(m); + } + + void set_available_memory(memory_size_type m1, memory_size_type m2, memory_size_type m3) { + set_phase_1_memory(m1); + set_phase_2_memory(m2); + set_phase_3_memory(m3); + } + + static memory_size_type minimum_memory_phase_1() { + return serialization_writer::memory_usage()*2; + } + + static memory_size_type minimum_memory_phase_2() { + return serialization_writer::memory_usage() + + 2*serialization_reader::memory_usage(); + } + + static memory_size_type minimum_memory_phase_3() { + return 2*serialization_reader::memory_usage(); + } + +private: + void calculate_parameters() { + if (m_state != state_initial) + throw tpie::exception("Bad state in calculate_parameters"); + + memory_size_type memAvail1 = m_params.memoryPhase1; + if (memAvail1 <= serialization_writer::memory_usage()) { + log_error() << "Not enough memory for run formation; have " << memAvail1 + << " bytes but " << serialization_writer::memory_usage() + << " is required for writing a run." << std::endl; + throw exception("Not enough memory for run formation"); + } + + memory_size_type memAvail2 = m_params.memoryPhase2; + + // We have to keep a writer open no matter what. + if (memAvail2 <= serialization_writer::memory_usage()) { + log_error() << "Not enough memory for merging. " + << "mem avail = " << memAvail2 + << ", writer usage = " << serialization_writer::memory_usage() + << std::endl; + throw exception("Not enough memory for merging."); + } + + memory_size_type memAvail3 = m_params.memoryPhase3; + + // We have to keep a writer open no matter what. + if (memAvail2 <= serialization_writer::memory_usage()) { + log_error() << "Not enough memory for outputting. " + << "mem avail = " << memAvail3 + << ", writer usage = " << serialization_writer::memory_usage() + << std::endl; + throw exception("Not enough memory for outputting."); + } + + memory_size_type memForMerge = std::min(memAvail2, memAvail3); + + // We do not yet know the serialized size of the largest item, + // so this calculation has to be redone. + // Instead, we assume that all items have minimum size. + + // We have to keep a writer open no matter what. + memory_size_type fanoutMemory = memForMerge - serialization_writer::memory_usage(); + + // This is a lower bound on the memory used per fanout. + memory_size_type perFanout = m_params.minimumItemSize + serialization_reader::memory_usage(); + + // Floored division to compute the largest possible fanout. + memory_size_type fanout = fanoutMemory / perFanout; + if (fanout < 2) { + log_error() << "Not enough memory for merging, even when minimum item size is assumed. " + << "mem avail = " << memForMerge + << ", fanout memory = " << fanoutMemory + << ", per fanout >= " << perFanout + << std::endl; + throw exception("Not enough memory for merging."); + } + + m_params.tempDir = tempname::tpie_dir_name(); + m_files.set_temp_dir(m_params.tempDir); + + log_info() << "Calculated serialization_sort parameters.\n"; + m_params.dump(log_info()); + log_info() << std::flush; + + m_parametersSet = true; + } + +public: + void begin() { + if (!m_parametersSet) + throw tpie::exception("Parameters not set in serialization_sorter"); + if (m_state != state_initial) + throw tpie::exception("Bad state in begin"); + m_state = state_1; + + log_info() << "Before begin; mem usage = " + << get_memory_manager().used() << std::endl; + m_sorter.begin(m_params.memoryPhase1 - serialization_writer::memory_usage()); + log_info() << "After internal sorter begin; mem usage = " + << get_memory_manager().used() << std::endl; + boost::filesystem::create_directory(m_params.tempDir); + } + + void push(const T & item) { + if (m_state != state_1) + throw tpie::exception("Bad state in push"); + + ++m_items; + + if (m_sorter.push(item)) return; + end_run(); + if (!m_sorter.push(item)) { + throw exception("Couldn't fit a single item in buffer"); + } + } + + void end() { + if (m_state != state_1) + throw tpie::exception("Bad state in end"); + + memory_size_type internalThreshold = + std::min(m_params.memoryPhase2, m_params.memoryPhase3); + + log_debug() << "m_sorter.memory_usage == " << m_sorter.memory_usage() << '\n' + << "internalThreshold == " << internalThreshold << std::endl; + + if (m_items == 0) { + m_reportInternal = true; + m_nextInternalItem = 0; + m_sorter.free(); + log_debug() << "Got no items. Internal reporting mode." << std::endl; + } else if (m_files.next_level_runs() == 0 + && m_sorter.memory_usage() + <= internalThreshold) { + + m_sorter.sort(); + m_reportInternal = true; + m_nextInternalItem = m_sorter.begin(); + log_debug() << "Got " << m_sorter.current_serialized_size() + << " bytes of items. Internal reporting mode." << std::endl; + } else if (m_files.next_level_runs() == 0 + && m_sorter.current_serialized_size() <= internalThreshold + && m_sorter.can_shrink_buffer()) { + + m_sorter.sort(); + m_sorter.shrink_buffer(); + m_reportInternal = true; + m_nextInternalItem = m_sorter.begin(); + log_debug() << "Got " << m_sorter.current_serialized_size() + << " bytes of items. Internal reporting mode after shrinking buffer." << std::endl; + + } else { + + end_run(); + log_debug() << "Got " << m_files.next_level_runs() << " runs. " + << "External reporting mode." << std::endl; + m_sorter.free(); + m_reportInternal = false; + } + + log_info() << "After internal sorter end; mem usage = " + << get_memory_manager().used() << std::endl; + + m_state = state_2; + } + + stream_size_type item_count() { + return m_items; + } + + void evacuate() { + switch (m_state) { + case state_initial: + throw tpie::exception("Cannot evacuate in state initial"); + case state_1: + throw tpie::exception("Cannot evacuate in state 1"); + case state_2: + case state_3: + if (m_reportInternal) { + end_run(); + m_sorter.free(); + m_reportInternal = false; + log_debug() << "Evacuate out of internal reporting mode." << std::endl; + } else { + log_debug() << "Evacuate in external reporting mode - noop." << std::endl; + } + break; + } + } + + memory_size_type evacuated_memory_usage() const { + return 0; + } + + void merge_runs() { + if (m_state != state_2) + throw tpie::exception("Bad state in end"); + + if (m_reportInternal) { + log_debug() << "merge_runs: internal reporting; doing nothing." << std::endl; + m_state = state_3; + return; + } + + memory_size_type largestItem = m_sorter.get_largest_item_size(); + if (largestItem == 0) { + log_warning() << "Largest item is 0 bytes; doing nothing." << std::endl; + m_state = state_3; + return; + } + + if (m_params.memoryPhase2 <= serialization_writer::memory_usage()) + throw exception("Not enough memory for merging."); + + // Perform almost the same computation as in calculate_parameters. + // Only change the item size to largestItem rather than minimumItemSize. + memory_size_type fanoutMemory = m_params.memoryPhase2 - serialization_writer::memory_usage(); + memory_size_type perFanout = largestItem + serialization_reader::memory_usage(); + memory_size_type fanout = fanoutMemory / perFanout; + + if (fanout < 2) { + log_error() << "Not enough memory for merging. " + << "mem avail = " << m_params.memoryPhase2 + << ", fanout memory = " << fanoutMemory + << ", per fanout = " << perFanout + << std::endl; + throw exception("Not enough memory for merging."); + } + + memory_size_type finalFanoutMemory = m_params.memoryPhase3; + memory_size_type finalFanout = + std::min(fanout, + finalFanoutMemory / perFanout); + + if (finalFanout < 2) { + log_error() << "Not enough memory for merging (final fanout < 2). " + << "mem avail = " << m_params.memoryPhase3 + << ", final fanout memory = " << finalFanoutMemory + << ", per fanout = " << perFanout + << std::endl; + throw exception("Not enough memory for merging."); + } + + log_debug() << "Calculated merge phase parameters for serialization sort.\n" + << "Fanout: " << fanout << '\n' + << "Final fanout: " << finalFanout << '\n' + ; + + while (m_files.next_level_runs() > finalFanout) { + if (m_files.remaining_runs() != 0) + throw exception("m_files.remaining_runs() != 0"); + log_debug() << "Runs in current level: " << m_files.next_level_runs() << '\n'; + for (size_t remainingRuns = m_files.next_level_runs(); remainingRuns > 0;) { + size_t f = std::min(fanout, remainingRuns); + merge_runs(f); + remainingRuns -= f; + if (remainingRuns != m_files.remaining_runs()) + throw exception("remainingRuns != m_files.remaining_runs()"); + } + } + + m_state = state_3; + } + +private: + void end_run() { + m_sorter.sort(); + if (m_sorter.begin() == m_sorter.end()) return; + m_files.open_new_writer(); + for (const T * item = m_sorter.begin(); item != m_sorter.end(); ++item) { + m_files.write(*item); + } + m_files.close_writer(); + m_sorter.reset(); + } + + void initialize_merger(size_t fanout) { + if (fanout == 0) throw exception("initialize_merger: fanout == 0"); + m_files.open_readers(fanout); + m_merger.init(fanout); + } + + void free_merger_and_files() { + m_merger.free(); + m_files.close_readers_and_delete(); + } + + void merge_runs(size_t fanout) { + if (fanout == 0) throw exception("merge_runs: fanout == 0"); + + if (fanout == 1 && m_files.remaining_runs() == 1) { + m_files.move_last_reader_to_next_level(); + return; + } + + initialize_merger(fanout); + m_files.open_new_writer(); + while (!m_merger.empty()) { + m_files.write(m_merger.top()); + m_merger.pop(); + } + free_merger_and_files(); + m_files.close_writer(); + } + +public: + T pull() { + if (!can_pull()) + throw exception("pull: !can_pull"); + + if (m_reportInternal) { + T item = *m_nextInternalItem++; + if (m_nextInternalItem == m_sorter.end()) { + m_sorter.free(); + m_nextInternalItem = 0; + } + return item; + } + + if (!m_files.readers_open()) { + if (m_files.next_level_runs() == 0) + throw exception("pull: next_level_runs == 0"); + initialize_merger(m_files.next_level_runs()); + } + + T item = m_merger.top(); + m_merger.pop(); + + if (m_merger.empty()) { + free_merger_and_files(); + m_files.reset(); + } + + return item; + } + + bool can_pull() { + if (m_reportInternal) return m_nextInternalItem != 0; + if (!m_files.readers_open()) return m_files.next_level_runs() > 0; + return !m_merger.empty(); + } +}; + +} + +#endif // TPIE_SERIALIZATION_SORT_H diff --git a/tpie/serialization_stream.cpp b/tpie/serialization_stream.cpp new file mode 100644 index 000000000..078fc64f7 --- /dev/null +++ b/tpie/serialization_stream.cpp @@ -0,0 +1,395 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +// serialization_header {{{ + +namespace tpie { + +namespace bits { + +/////////////////////////////////////////////////////////////////////////////// +/// \class Stream accessor for serialization streams. +/// +/// This class handles the stream header of a given file accessor. +/////////////////////////////////////////////////////////////////////////////// +class serialization_header { +public: + static memory_size_type header_size() { + memory_size_type sz = sizeof(stream_header_t); + memory_size_type align = 4096; + return (sz + (align-1))/align * align; + } + + serialization_header(file_accessor::raw_file_accessor & file) + : m_headerPtr(new stream_header_t()) + , m_header(*m_headerPtr) + , m_fileAccessor(file) + { + m_header.magic = stream_header_t::magicConst; + m_header.version = stream_header_t::versionConst; + m_header.size = 0; + m_header.cleanClose = 0; + } + + void read() { + m_fileAccessor.seek_i(0); + m_fileAccessor.read_i(&m_header, sizeof(m_header)); + } + + void write(bool cleanClose) { + m_header.cleanClose = cleanClose; + + tpie::array headerArea(header_size()); + std::fill(headerArea.begin(), headerArea.end(), '\x42'); + char * headerData = reinterpret_cast(&m_header); + std::copy(headerData, sizeof(m_header) + headerData, + headerArea.begin()); + + m_fileAccessor.seek_i(0); + m_fileAccessor.write_i(&headerArea[0], headerArea.size()); + } + + void verify() { + if (m_header.magic != m_header.magicConst) + throw stream_exception("Bad header magic"); + if (m_header.version < m_header.versionConst) + throw stream_exception("Stream version too old"); + if (m_header.version > m_header.versionConst) + throw stream_exception("Stream version too new"); + if (m_header.cleanClose != 1) + throw stream_exception("Stream was not closed properly"); + if (m_header.reverse != 0 && m_header.reverse != 1) + throw stream_exception("Reverse flag is not a boolean"); + } + + stream_size_type get_size() { + return m_header.size; + } + + void set_size(stream_size_type size) { + m_header.size = size; + } + + bool get_clean_close() { + return m_header.cleanClose; + } + + bool get_reverse() { + return m_header.reverse; + } + + void set_reverse(bool reverse) { + m_header.reverse = reverse; + } + +private: +#pragma pack(push, 1) + struct stream_header_t { + static const uint64_t magicConst = 0xfa340f49edbada67ll; + static const uint64_t versionConst = 1; + + uint64_t magic; + uint64_t version; + uint64_t size; + // bool has funny semantics with regards to equality. we want to reject + // invalid bool values (>1), but that is not easy to express with a C++ + // bool variable. + char cleanClose; + char reverse; + }; +#pragma pack(pop) + + std::auto_ptr m_headerPtr; + stream_header_t & m_header; + + file_accessor::raw_file_accessor & m_fileAccessor; +}; + +} // namespace bits + +} // namespace tpie + +// }}} +/////////////////////////////////////////////////////////////////////////////// + +namespace { + class open_guard { + bool & open_flag; + tpie::file_accessor::raw_file_accessor & fa; + bool committed; + public: + open_guard(bool & open_flag, tpie::file_accessor::raw_file_accessor & fa) + : open_flag(open_flag) + , fa(fa) + , committed(false) + { + open_flag = true; + } + + void commit() { + committed = true; + } + + ~open_guard() { + if (!committed) { + open_flag = false; + fa.close_i(); + } + } + }; +} // unnamed namespace + +namespace tpie { + +namespace bits { + +serialization_writer_base::serialization_writer_base() + : m_blocksWritten(0) + , m_size(0) + , m_open(false) + , m_tempFile(0) +{ +} + +void serialization_writer_base::open_inner(std::string path, bool reverse) { + close(reverse); + m_fileAccessor.set_cache_hint(access_sequential); + m_fileAccessor.open_wo(path); + open_guard guard(m_open, m_fileAccessor); + m_blocksWritten = 0; + m_size = 0; + + bits::serialization_header header(m_fileAccessor); + header.set_reverse(reverse); + header.write(false); + guard.commit(); +} + +void serialization_writer_base::open(std::string path, bool reverse) { + m_tempFile = 0; + open_inner(path, reverse); +} + +void serialization_writer_base::open(temp_file & tempFile, bool reverse) { + m_tempFile = &tempFile; + open_inner(tempFile.path(), reverse); +} + +void serialization_writer_base::write_block(const char * const s, const memory_size_type n) { + assert(n <= block_size()); + stream_size_type offset = m_blocksWritten * block_size(); + m_fileAccessor.seek_i(bits::serialization_header::header_size() + offset); + m_fileAccessor.write_i(s, n); + ++m_blocksWritten; + m_size = offset + n; + if (m_tempFile) + m_tempFile->update_recorded_size(m_size); +} + +void serialization_writer_base::close(bool reverse) { + if (!m_open) return; + bits::serialization_header header(m_fileAccessor); + header.set_size(m_size); + header.set_reverse(reverse); + header.write(true); + m_fileAccessor.close_i(); + m_open = false; + m_tempFile = 0; +} + +stream_size_type serialization_writer_base::file_size() { + return serialization_header::header_size() + m_size; +} + +} // namespace bits + +void serialization_writer::write_block() { + p_t::write_block(m_block.get(), m_index); + m_index = 0; +} + +void serialization_writer::open(std::string path) { + p_t::open(path, false); + m_block.resize(block_size()); + m_index = 0; +} + +void serialization_writer::open(temp_file & tempFile) { + p_t::open(tempFile, false); + m_block.resize(block_size()); + m_index = 0; +} + +void serialization_writer::close() { + if (m_index > 0) write_block(); + m_block.resize(0); + m_index = 0; + p_t::close(false); +} + +void serialization_reverse_writer::write_block() { + // See note about m_index and its semantics. + std::reverse(m_block.get(), m_block.get() + block_size()); + p_t::write_block(m_block.get(), m_index); + m_index = 0; +} + +void serialization_reverse_writer::open(std::string path) { + p_t::open(path, true); + m_block.resize(block_size()); + m_index = 0; +} + +void serialization_reverse_writer::open(temp_file & tempFile) { + p_t::open(tempFile, true); + m_block.resize(block_size()); + m_index = 0; +} + +void serialization_reverse_writer::close() { + if (m_index > 0) write_block(); + m_block.resize(0); + m_index = 0; + p_t::close(true); +} + +namespace bits { + +serialization_reader_base::serialization_reader_base() + : m_open(false) + , m_size(0) + , m_index(0) + , m_blockSize(0) +{ +} + +void serialization_reader_base::open(std::string path, bool reverse) { + close(); + m_fileAccessor.set_cache_hint(reverse ? access_normal : access_sequential); + m_fileAccessor.open_ro(path); + open_guard guard(m_open, m_fileAccessor); + m_block.resize(block_size()); + m_index = 0; + m_blockSize = 0; + + bits::serialization_header header(m_fileAccessor); + header.read(); + header.verify(); + m_size = header.get_size(); + if (reverse && !header.get_reverse()) + throw stream_exception("Opened a non-reverse stream for reverse reading"); + if (!reverse && header.get_reverse()) + throw stream_exception("Opened a reverse stream for non-reverse reading"); + guard.commit(); +} + +void serialization_reader_base::read_block(const stream_size_type blk) { + stream_size_type from = blk * block_size(); + stream_size_type to = std::min(from + block_size(), m_size); + if (to <= from) throw end_of_stream_exception(); + m_index = 0; + m_blockSize = to-from; + m_fileAccessor.seek_i(bits::serialization_header::header_size() + + from); + m_fileAccessor.read_i(m_block.get(), m_blockSize); +} + +void serialization_reader_base::close() { + if (!m_open) return; + m_fileAccessor.close_i(); + m_open = false; + m_block.resize(0); +} + +stream_size_type serialization_reader_base::file_size() { + return serialization_header::header_size() + m_size; +} + +stream_size_type serialization_reader_base::size() { + return m_size; +} + +} // namespace bits + +void serialization_reader::next_block() /*override*/ { + if (m_blockSize == 0) { + m_blockNumber = 0; + } else { + ++m_blockNumber; + } + read_block(m_blockNumber); +} + +serialization_reader::serialization_reader() + : m_blockNumber(0) +{ +} + +void serialization_reader::open(std::string path) { + p_t::open(path, false); + m_blockNumber = 0; +} + +void serialization_reader::open(temp_file & tempFile) { + open(tempFile.path()); +} + +stream_size_type serialization_reader::offset() { + if (m_blockSize == 0) + return 0; + + return m_blockNumber * block_size() + m_index; +} + +void serialization_reverse_reader::next_block() /*override*/ { + if (m_blockNumber == 0) + throw end_of_stream_exception(); + --m_blockNumber; + read_block(m_blockNumber); + std::reverse(m_block.begin(), m_block.begin() + m_blockSize); +} + +serialization_reverse_reader::serialization_reverse_reader() + : m_blockNumber(0) +{ +} + +void serialization_reverse_reader::open(std::string path) { + p_t::open(path, true); + m_blockNumber = (m_size + (block_size() - 1)) / block_size(); +} + +void serialization_reverse_reader::open(temp_file & tempFile) { + open(tempFile.path()); +} + +stream_size_type serialization_reverse_reader::offset() { + if (m_blockSize == 0) + return size(); + + // size of blocks not read at all + stream_size_type remainingBlocks = m_blockNumber * block_size(); + return size() - remainingBlocks - m_blockSize + m_index; +} + +} // namespace tpie diff --git a/tpie/serialization_stream.h b/tpie/serialization_stream.h new file mode 100644 index 000000000..db12f73b4 --- /dev/null +++ b/tpie/serialization_stream.h @@ -0,0 +1,402 @@ +// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- +// vi:set ts=4 sts=4 sw=4 noet : +// Copyright 2013, The TPIE development team +// +// This file is part of TPIE. +// +// TPIE is free software: you can redistribute it and/or modify it under +// the terms of the GNU Lesser General Public License as published by the +// Free Software Foundation, either version 3 of the License, or (at your +// option) any later version. +// +// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +// License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with TPIE. If not, see + +#ifndef TPIE_SERIALIZATION_STREAM_H +#define TPIE_SERIALIZATION_STREAM_H + +/////////////////////////////////////////////////////////////////////////////// +/// \file serialization_stream.h Stream of serializable items. +/////////////////////////////////////////////////////////////////////////////// + +#include +#include +#include +#include +#include +#include + +namespace tpie { + +namespace bits { + +class serialization_writer_base { +public: + static stream_size_type block_size() { + return 2*1024*1024; + } + +private: + file_accessor::raw_file_accessor m_fileAccessor; + stream_size_type m_blocksWritten; + stream_size_type m_size; + bool m_open; + + temp_file * m_tempFile; + +protected: + serialization_writer_base(); + + void open(std::string path, bool reverse); + void open(temp_file & tempFile, bool reverse); + +private: + void open_inner(std::string path, bool reverse); + +protected: + /////////////////////////////////////////////////////////////////////////// + /// \brief Write n bytes from memory area s to next block in stream. + /// + /// n must be less or equal to block_size(). + /// + /// \param s Memory area with data to write. + /// \param n Number of bytes to write. + /////////////////////////////////////////////////////////////////////////// + void write_block(const char * const s, const memory_size_type n); + + void close(bool reverse); + +public: + static memory_size_type memory_usage() { return block_size(); } + + stream_size_type file_size(); +}; + +} // namespace bits + +class serialization_writer : public bits::serialization_writer_base { +private: + typedef bits::serialization_writer_base p_t; + + tpie::array m_block; + memory_size_type m_index; + + void write_block(); + + class serializer { + serialization_writer & wr; + + public: + serializer(serialization_writer & wr) : wr(wr) {} + + void write(const char * const s, const memory_size_type n) { + const char * i = s; + memory_size_type written = 0; + while (written != n) { + if (wr.m_index >= wr.block_size()) wr.write_block(); + + memory_size_type remaining = n - written; + memory_size_type blockRemaining = wr.block_size() - wr.m_index; + + memory_size_type writeSize = std::min(remaining, blockRemaining); + + std::copy(i, i + writeSize, &wr.m_block[wr.m_index]); + i += writeSize; + written += writeSize; + wr.m_index += writeSize; + } + } + }; + + friend class serializer; + +public: + void open(std::string path); + void open(temp_file & tempFile); + + void close(); + + /////////////////////////////////////////////////////////////////////////// + /// \brief Serialize a serializable item and write it to the stream. + /// + /// The code stream.serialize(v) just calls serialize(stream, v) via ADL. + /////////////////////////////////////////////////////////////////////////// + template + void serialize(const T & v) { + using tpie::serialize; + serializer s(*this); + serialize(s, v); + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Serialize a sequence of serializable items and write them to + /// the stream. + /// + /// The code stream.serialize(a, b) just calls serialize(stream, a, b) via + /// ADL. + /////////////////////////////////////////////////////////////////////////// + template + void serialize(IT a, IT b) { + using tpie::serialize; + serializer s(*this); + serialize(s, a, b); + } +}; + +class serialization_reverse_writer : public bits::serialization_writer_base { + typedef bits::serialization_writer_base p_t; + + tpie::array m_block; + /** Special m_index semantics: + * In m_block, the indices [block_size() - m_index, block_size()) + * contain items that should be reversed before writing out. + * After std::reversing all of m_block, the index range to write out becomes + * [0, m_index). */ + memory_size_type m_index; + std::vector m_serializationBuffer; + + void write_block(); + + class serializer { + serialization_reverse_writer & wr; + + public: + serializer(serialization_reverse_writer & wr) : wr(wr) {} + + void write(const char * const s, const memory_size_type n) { + std::vector & data = wr.m_serializationBuffer; + memory_size_type offs = data.size(); + data.resize(data.size() + n); + std::copy(s, s + n, &data[offs]); + } + + ~serializer() { + std::vector & data = wr.m_serializationBuffer; + const memory_size_type n = data.size(); + const char * const s = &data[0]; + if (wr.m_index + n <= wr.block_size()) { + std::copy(s, s + n, &wr.m_block[block_size() - wr.m_index - n]); + wr.m_index += n; + } else { + const char * i = s + n; + memory_size_type written = 0; + while (written != n) { + if (wr.m_index >= wr.block_size()) wr.write_block(); + + memory_size_type remaining = n - written; + memory_size_type blockRemaining = wr.block_size() - wr.m_index; + + memory_size_type writeSize = std::min(remaining, blockRemaining); + + std::copy(i - writeSize, i, &wr.m_block[block_size() - wr.m_index - writeSize]); + i -= writeSize; + written += writeSize; + wr.m_index += writeSize; + } + } + data.resize(0); + } + }; + + friend class serializer; + +public: + void open(std::string path); + void open(temp_file & tempFile); + + void close(); + + /////////////////////////////////////////////////////////////////////////// + /// \brief Serialize a serializable item and write it to the stream. + /// + /// The code stream.serialize(v) just calls serialize(stream, v) via ADL. + /////////////////////////////////////////////////////////////////////////// + template + void serialize(const T & v) { + using tpie::serialize; + serializer s(*this); + serialize(s, v); + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Serialize a sequence of serializable items and write them to + /// the stream. + /// + /// The code stream.serialize(a, b) just calls serialize(stream, a, b) via + /// ADL. + /////////////////////////////////////////////////////////////////////////// + template + void serialize(IT a, IT b) { + using tpie::serialize; + serializer s(*this); + serialize(s, a, b); + } +}; + +namespace bits { + +class serialization_reader_base { +public: + static stream_size_type block_size() { + return serialization_writer_base::block_size(); + } + +private: + file_accessor::raw_file_accessor m_fileAccessor; + bool m_open; + +protected: + tpie::array m_block; + stream_size_type m_size; + memory_size_type m_index; + memory_size_type m_blockSize; + + serialization_reader_base(); + + void open(std::string path, bool reverse); + + void read_block(const stream_size_type blk); + + // Check if EOF is reached, call read_block(blk) to reset m_index/m_blockSize. + virtual void next_block() = 0; + +public: + void close(); + + /////////////////////////////////////////////////////////////////////////// + /// \brief Read n bytes from stream into buffer starting at s. + /// + /// \param s Buffer to contain the read data. + /// \param n Number of bytes to read. + /////////////////////////////////////////////////////////////////////////// + void read(char * const s, const memory_size_type n) { + // TODO: inline some of this + char * i = s; + memory_size_type written = 0; + while (written != n) { + if (m_index >= m_blockSize) { + // virtual invocation + next_block(); + } + + memory_size_type remaining = n - written; + memory_size_type blockRemaining = m_blockSize - m_index; + + memory_size_type readSize = std::min(remaining, blockRemaining); + + i = std::copy(m_block.get() + m_index, + m_block.get() + (m_index + readSize), + i); + + written += readSize; + m_index += readSize; + } + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Unserialize an unserializable item from the stream. + /// + /// An item of the given type must exist at the current point in the + /// stream. + /// + /// The code stream.unserialize(v) just calls unserialize(stream, v) via + /// ADL. + /////////////////////////////////////////////////////////////////////////// + template + void unserialize(T & v) { + using tpie::unserialize; + unserialize(*this, v); + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Unserialize a sequence of unserializable items from the stream. + /// + /// A sequence of the given item type must exist at the current point in + /// the stream. + /// + /// The code stream.unserialize(a, b) just calls unserialize(stream, a, b) + /// via ADL. + /////////////////////////////////////////////////////////////////////////// + template + void unserialize(IT a, IT b) { + using tpie::unserialize; + unserialize(*this, a, b); + } + + static memory_size_type memory_usage() { return block_size(); } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Size of file in bytes, including the header. + /////////////////////////////////////////////////////////////////////////// + stream_size_type file_size(); + + /////////////////////////////////////////////////////////////////////////// + /// \brief Size of file in bytes, not including the header. + /// + /// For progress reporting. + /////////////////////////////////////////////////////////////////////////// + stream_size_type size(); +}; + +} // namespace bits + +class serialization_reader : public bits::serialization_reader_base { + typedef bits::serialization_reader_base p_t; + stream_size_type m_blockNumber; + +protected: + virtual void next_block() /*override*/; + +public: + serialization_reader(); + + void open(std::string path); + void open(temp_file & tempFile); + + bool can_read() { + if (m_index < m_blockSize) return true; + return m_blockNumber * block_size() + m_index < m_size; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Number of bytes read, not including the header. + /// + /// For progress reporting. + /////////////////////////////////////////////////////////////////////////// + stream_size_type offset(); +}; + +class serialization_reverse_reader : public bits::serialization_reader_base { + typedef bits::serialization_reader_base p_t; + stream_size_type m_blockNumber; + +protected: + virtual void next_block() /*override*/; + +public: + serialization_reverse_reader(); + + void open(std::string path); + void open(temp_file & tempFile); + + bool can_read() { + if (m_index < m_blockSize) return true; + return m_blockNumber > 0; + } + + /////////////////////////////////////////////////////////////////////////// + /// \brief Number of bytes read, not including the header. + /// + /// For progress reporting. + /////////////////////////////////////////////////////////////////////////// + stream_size_type offset(); +}; + +} + +#endif // TPIE_SERIALIZATION_STREAM_H