Skip to content

Commit

Permalink
Issue #2066 Implemented handling of reindexStart, reindexStop signals.
Browse files Browse the repository at this point in the history
  • Loading branch information
vogel76 authored and mvandeberg committed Mar 20, 2018
1 parent c0eb8c4 commit de873db
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 47 deletions.
4 changes: 2 additions & 2 deletions libraries/chain/include/steem/chain/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,9 @@ namespace steem { namespace chain {
typedef void on_reindex_start_t();
typedef void on_reindex_done_t(bool,uint32_t);

void on_reindex_start_connect(on_reindex_start_t functor)
void on_reindex_start_connect(std::function<on_reindex_start_t> functor)
{ _on_reindex_start.connect(functor); }
void on_reindex_done_connect(on_reindex_done_t functor)
void on_reindex_done_connect(std::function<on_reindex_done_t> functor)
{ _on_reindex_done.connect(functor); }

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class rocksdb_plugin final : public appbase::plugin< rocksdb_plugin >
class impl;

std::unique_ptr<impl> _my;
bfs::path _dbPath;
uint32_t _blockLimit = 0;
bool _doImmediateImport = false;
};
Expand Down
154 changes: 110 additions & 44 deletions libraries/plugins/rocksdb/rocksdb_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,23 @@ class CachableWriteBatch : public WriteBatch
class rocksdb_plugin::impl final
{
public:

impl(const rocksdb_plugin& mainPlugin, const bpo::variables_map& options) :
impl(const rocksdb_plugin& mainPlugin, const bpo::variables_map& options, const bfs::path& storagePath) :
_mainDb(appbase::app().get_plugin<steem::plugins::chain::chain_plugin>().db()),
_mainPlugin(mainPlugin),
_storagePath(storagePath),
_writeBuffer(_storage, _columnHandles)
{
collectOptions(options);
_pre_apply_connection = _mainDb.pre_apply_operation.connect( [&]( const operation_notification& note ){ on_operation(note); } );

_mainDb.on_reindex_start_connect([&]() -> void
{
onReindexStart();
});

_mainDb.on_reindex_done_connect([&](bool success, uint32_t finalBlock) -> void
{
onReindexStop(finalBlock);
});
}

~impl()
Expand All @@ -324,14 +333,14 @@ class rocksdb_plugin::impl final
_api = std::make_unique<rocksdb_api>(_mainPlugin);
}

void openDb(const bfs::path& path)
void openDb()
{
createDbSchema(path);
createDbSchema(_storagePath);

auto columnDefs = prepareColumnDefinitions(true);

DB* storageDb = nullptr;
auto strPath = path.string();
auto strPath = _storagePath.string();
Options options;
/// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
Expand All @@ -344,6 +353,12 @@ class rocksdb_plugin::impl final
ilog("RocksDB opened successfully storage at location: `${p}'.", ("p", strPath));
loadSeqIdentifiers(storageDb);
_storage.reset(storageDb);

_pre_apply_connection = _mainDb.pre_apply_operation.connect(
[&]( const operation_notification& note )
{
on_operation(note);
});
}
else
{
Expand All @@ -352,6 +367,10 @@ class rocksdb_plugin::impl final
}
}

void printReport(uint32_t blockNo, const char* detailText) const;
void onReindexStart();
void onReindexStop(uint32_t finalBlock);

/// Allows to start immediate data import (outside replay process).
void importData(unsigned int blockLimit);

Expand All @@ -364,20 +383,7 @@ class rocksdb_plugin::impl final
void shutdownDb()
{
chain::util::disconnect_signal(_pre_apply_connection);
FC_TODO("Flush should be done inside onReindexDone handler")
if(_storage != nullptr)
{
/// If there are still not yet saved changes let's do it now.
if(_collectedOps != 0)
flushWriteBuffer();

::rocksdb::FlushOptions fOptions;
for(const auto& cf : _columnHandles)
{
auto s = _storage->Flush(fOptions, cf);
checkStatus(s);
}
}
flushStorage();
cleanupColumnHandles();
_storage.reset();
}
Expand Down Expand Up @@ -451,6 +457,23 @@ class rocksdb_plugin::impl final
_collectedOps = 0;
}

void flushStorage()
{
if(_storage == nullptr)
return;

/// If there are still not yet saved changes let's do it now.
if(_collectedOps != 0)
flushWriteBuffer();

::rocksdb::FlushOptions fOptions;
for(const auto& cf : _columnHandles)
{
auto s = _storage->Flush(fOptions, cf);
checkStatus(s);
}
}

void on_operation(const operation_notification& opNote);

void collectOptions(const bpo::variables_map& options);
Expand Down Expand Up @@ -489,6 +512,7 @@ class rocksdb_plugin::impl final

chain::database& _mainDb;
const rocksdb_plugin& _mainPlugin;
bfs::path _storagePath;
std::unique_ptr<rocksdb_api> _api;
std::unique_ptr<DB> _storage;
std::vector<ColumnFamilyHandle*> _columnHandles;
Expand All @@ -511,9 +535,10 @@ class rocksdb_plugin::impl final
unsigned int _collectedOps = 0;
/** Limit which value depends on block data source:
* - if blocks come from network, there is no need for delaying write, becasue they appear quite rare (limit == 1)
* - if reindex process or direct import has been spawned, this massive operation can need reduction of direct writes (limit == WRITE_BUFFER_FLUSH_LIMIT).
* - if reindex process or direct import has been spawned, this massive operation can need reduction of direct
writes (limit == WRITE_BUFFER_FLUSH_LIMIT).
*/
unsigned int _collectedOpsWriteLimit = WRITE_BUFFER_FLUSH_LIMIT;
unsigned int _collectedOpsWriteLimit = 1;

account_name_range_index _tracked_accounts;
flat_set<std::string> _op_list;
Expand Down Expand Up @@ -1065,6 +1090,55 @@ void rocksdb_plugin::impl::prunePotentiallyTooOldItems(account_history_info* ahI
}
}

void rocksdb_plugin::impl::onReindexStart()
{
ilog("Received onReindexStart request, attempting to clean database storage.");

shutdownDb();
std::string strPath = _storagePath.string();

auto s = ::rocksdb::DestroyDB(strPath, ::rocksdb::Options());
checkStatus(s);

openDb();

ilog("Setting write limit to massive level");

_collectedOpsWriteLimit = WRITE_BUFFER_FLUSH_LIMIT;

_lastTx = transaction_id_type();
_txNo = 0;
_totalOps = 0;
_excludedOps = 0;

ilog("onReindexStart request completed successfully.");
}

void rocksdb_plugin::impl::onReindexStop(uint32_t finalBlock)
{
ilog("Reindex completed up to block: ${b}. Setting back write limit to non-massive level.",
("b", finalBlock));

flushStorage();
_collectedOpsWriteLimit = 1;

printReport(finalBlock, "RocksDB data reindex finished. ");
}

void rocksdb_plugin::impl::printReport(uint32_t blockNo, const char* detailText) const
{
ilog("${t}Processed blocks: ${n}, containing: ${tx} transactions and ${op} operations.\n"
"${ep} operations have been filtered out due to configured options.\n"
"${ea} accounts have been filtered out due to configured options.",
("t", detailText)
("n", blockNo)
("tx", _txNo)
("op", _totalOps)
("ep", _excludedOps)
("ea", _excludedAccountCount)
);
}

void rocksdb_plugin::impl::importData(unsigned int blockLimit)
{
if(_storage == nullptr)
Expand Down Expand Up @@ -1118,16 +1192,8 @@ void rocksdb_plugin::impl::importData(unsigned int blockLimit)
("ct", measure.cpu_ms)
("cm", measure.current_mem)
("pm", measure.peak_mem) );

ilog("RocksDb data import finished. Processed blocks: ${n}, containing: ${tx} transactions and ${op} operations.\n"
"${ep} operations have been filtered out due to configured options.\n"
"${ea} accounts have been filtered out due to configured options.",
("n", blockNo)
("tx", _txNo)
("op", _totalOps)
("ep", _excludedOps)
("ea", _excludedAccountCount)
);

printReport(blockNo, "RocksDB data import finished. ");
}

void rocksdb_plugin::impl::on_operation(const operation_notification& n)
Expand Down Expand Up @@ -1174,26 +1240,26 @@ void rocksdb_plugin::set_program_options(

void rocksdb_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
_my = std::make_unique<impl>(*this, options);

if(options.count("rocksdb-path"))
_dbPath = options.at("rocksdb-path").as<bfs::path>();

if(options.count("rocksdb-stop-import-at-block"))
_blockLimit = options.at("rocksdb-stop-import-at-block").as<uint32_t>();

_doImmediateImport = options.at("rocksdb-immediate-import").as<bool>();

if(_dbPath.is_absolute())
{
_my->openDb(_dbPath);
}
else
bfs::path dbPath;

if(options.count("rocksdb-path"))
dbPath = options.at("rocksdb-path").as<bfs::path>();

if(dbPath.is_absolute() == false)
{
auto basePath = appbase::app().get_plugin<steem::plugins::chain::chain_plugin>().state_storage_dir();
auto actualPath = basePath / _dbPath;
_my->openDb(actualPath);
auto actualPath = basePath / dbPath;
dbPath = actualPath;
}

_my = std::make_unique<impl>(*this, options, dbPath);

_my->openDb();
}

void rocksdb_plugin::plugin_startup()
Expand Down

0 comments on commit de873db

Please sign in to comment.