From 61ab99368e581d291e30a1f80a186c4eed5ded39 Mon Sep 17 00:00:00 2001 From: Bartek Wrona Date: Wed, 7 Feb 2018 22:23:05 +0100 Subject: [PATCH] Issue #2066 Redesigned data structures to make possible implementation of AH data prunning. Reimplemented get_account_history API method accordingly. --- libraries/plugins/rocksdb/rocksdb_api.hpp | 38 +- libraries/plugins/rocksdb/rocksdb_plugin.cpp | 379 +++++++++++++------ 2 files changed, 266 insertions(+), 151 deletions(-) diff --git a/libraries/plugins/rocksdb/rocksdb_api.hpp b/libraries/plugins/rocksdb/rocksdb_api.hpp index 9bcd5fabee..687ddf1922 100644 --- a/libraries/plugins/rocksdb/rocksdb_api.hpp +++ b/libraries/plugins/rocksdb/rocksdb_api.hpp @@ -20,35 +20,23 @@ namespace detail {class api_impl;} typedef std::vector serialize_buffer_t; -/** Dedicated definition is needed because of conflict of BIP allocator - * against usage of this class as temporary object. - * The conflict appears in original associated_ops container type definition, - * which in BIP version needs an allocator during constructor call. +/** Represents an AH entry in mapped to account name. + * Holds additional informations, which are needed to simplify pruning process. + * All operations specific to given account, are next mapped to ID of given object. */ -class tmp_account_history_object +class account_history_info { public: - typedef std::vector operation_container; - - steem::protocol::account_name_type account; + uint32_t id = 0; + uint32_t oldestEntryId = 0; + uint32_t newestEntryId = 0; + /// Timestamp of oldest operation, just to quickly decide if start detail prune checking at all. + time_point_sec oldestEntryTimestamp; - const operation_container& get_ops() const + uint32_t getAssociatedOpCount() const { - return associated_ops; + return newestEntryId - oldestEntryId + 1; } - - void store_operation_id(size_t opId) - { - associated_ops.push_back(opId); - } - -private: - /** Holds IDs of operations somehow involving given accout. - * List head points to oldest operation, tail to newest one. - * Container order is determined by operation processing. - */ - operation_container associated_ops; - template friend struct fc::reflector; }; /** Dedicated definition is needed because of conflict of BIP allocator @@ -59,7 +47,7 @@ class tmp_account_history_object class tmp_operation_object { public: - size_t id = 0; + uint32_t id = 0; chain::transaction_id_type trx_id; uint32_t block = 0; @@ -138,7 +126,7 @@ class rocksdb_api final FC_REFLECT( steem::plugins::rocksdb::tmp_operation_object, (id)(trx_id)(block)(trx_in_block)(timestamp)(serialized_op) ) -FC_REFLECT( steem::plugins::rocksdb::tmp_account_history_object, (account)(associated_ops) ) +FC_REFLECT( steem::plugins::rocksdb::account_history_info, (id)(oldestEntryId)(newestEntryId)(oldestEntryTimestamp)) FC_REFLECT( steem::plugins::rocksdb::api_operation_object, (trx_id)(block)(trx_in_block)(op_in_trx)(virtual_op)(timestamp)(op) ) diff --git a/libraries/plugins/rocksdb/rocksdb_plugin.cpp b/libraries/plugins/rocksdb/rocksdb_plugin.cpp index 3d151f24c0..c400507326 100644 --- a/libraries/plugins/rocksdb/rocksdb_plugin.cpp +++ b/libraries/plugins/rocksdb/rocksdb_plugin.cpp @@ -49,12 +49,14 @@ using steem::utilities::benchmark_dumper; using ::rocksdb::DB; using ::rocksdb::Options; +using ::rocksdb::PinnableSlice; using ::rocksdb::ReadOptions; using ::rocksdb::Slice; using ::rocksdb::Comparator; using ::rocksdb::ColumnFamilyDescriptor; using ::rocksdb::ColumnFamilyOptions; using ::rocksdb::ColumnFamilyHandle; +using ::rocksdb::WriteBatch; namespace { @@ -181,54 +183,20 @@ class PrimitiveTypeComparatorImpl final : public AComparator } }; -class account_name_id_ComparatorImpl final : public AComparator -{ -public: - virtual int Compare(const Slice& a, const Slice& b) const override - { - if(a.size() != sizeof(account_name_storage_id_pair) || b.size() != sizeof(account_name_storage_id_pair)) - return a.compare(b); - - const auto& id1 = retrieveKey(a); - const auto& id2 = retrieveKey(b); - - if(id1 < id2) - return -1; - - if(id1 > id2) - return 1; - - return 0; - } +typedef PrimitiveTypeComparatorImpl by_id_ComparatorImpl; - virtual bool Equal(const Slice& a, const Slice& b) const override - { - if(a.size() != sizeof(account_name_storage_id_pair) || b.size() != sizeof(account_name_storage_id_pair)) - return a == b; +typedef PrimitiveTypeComparatorImpl by_account_name_ComparatorImpl; - const auto& id1 = retrieveKey(a); - const auto& id2 = retrieveKey(b); - return id1 == id2; - } - -private: - const account_name_storage_id_pair& retrieveKey(const Slice& slice) const - { - assert(sizeof(account_name_storage_id_pair) == slice.size()); - const char* rawData = slice.data(); - const account_name_storage_id_pair* data = reinterpret_cast(rawData); - return *data; - } -}; - -typedef PrimitiveTypeComparatorImpl by_id_ComparatorImpl; /** Location index is nonunique. Since RocksDB supports only unique indexes, it must be extended * by some unique part (ie ID). * */ -typedef std::pair location_id_pair; +typedef std::pair location_id_pair; typedef PrimitiveTypeComparatorImpl by_location_ComparatorImpl; +/// Compares account_history_info::id and tmp_operation_object::id pair +typedef PrimitiveTypeComparatorImpl> by_ah_info_operation_ComparatorImpl; + const Comparator* by_id_Comparator() { static by_id_ComparatorImpl c; @@ -241,9 +209,15 @@ const Comparator* by_location_Comparator() return &c; } -const Comparator* by_account_name_storage_id_pair_Comparator() +const Comparator* by_account_name_Comparator() { - static account_name_id_ComparatorImpl c; + static by_account_name_ComparatorImpl c; + return &c; +} + +const Comparator* by_ah_info_operation_Comparator() +{ + static by_ah_info_operation_ComparatorImpl c; return &c; } @@ -275,6 +249,56 @@ class operation_name_provider mutable std::string _name; }; +class CachableWriteBatch : public WriteBatch +{ +public: + CachableWriteBatch(const std::unique_ptr& storage, const std::vector& columnHandles) : + _storage(storage), _columnHandles(columnHandles) {} + + bool getAHInfo(const account_name_type& name, account_history_info* ahInfo) const + { + auto fi = _ahInfoCache.find(name); + if(fi != _ahInfoCache.end()) + { + *ahInfo = fi->second; + return true; + } + + PrimitiveTypeSlice key(name.data); + PinnableSlice buffer; + auto s = _storage->Get(ReadOptions(), _columnHandles[3], key, &buffer); + if(s.ok()) + { + load(*ahInfo, buffer.data(), buffer.size()); + return true; + } + + FC_ASSERT(s.IsNotFound()); + return false; + } + + void putAHInfo(const account_name_type& name, const account_history_info& ahInfo) + { + _ahInfoCache[name] = ahInfo; + auto serializeBuf = dump(ahInfo); + PrimitiveTypeSlice nameSlice(name.data); + auto s = Put(_columnHandles[3], nameSlice, Slice(serializeBuf.data(), serializeBuf.size())); + checkStatus(s); + } + + void Clear() + { + _ahInfoCache.clear(); + WriteBatch::Clear(); + } + +private: + const std::unique_ptr& _storage; + const std::vector& _columnHandles; + std::map _ahInfoCache; +}; + + } /// anonymous class rocksdb_plugin::impl final @@ -283,7 +307,8 @@ class rocksdb_plugin::impl final impl(const rocksdb_plugin& mainPlugin, const bpo::variables_map& options) : _mainDb(appbase::app().get_plugin().db()), - _mainPlugin(mainPlugin) + _mainPlugin(mainPlugin), + _writeBuffer(_storage, _columnHandles) { collectOptions(options); _pre_apply_connection = _mainDb.pre_apply_operation.connect( [&]( const operation_notification& note ){ on_operation(note); } ); @@ -332,7 +357,7 @@ class rocksdb_plugin::impl final void find_account_history_data(const account_name_type& name, uint64_t start, uint32_t limit, std::function processor) const; - bool find_operation_object(size_t opId, tmp_operation_object* data) const; + bool find_operation_object(size_t opId, tmp_operation_object* op) const; /// Allows to look for all operations present in given block and call `processor` for them. void find_operations_by_block(size_t blockNum, std::function processor) const; @@ -375,8 +400,8 @@ class rocksdb_plugin::impl final uint32_t txInBlock, const operation& op, uint16_t opInTx); void buildAccountHistoryRecord(const account_name_type& name, const tmp_operation_object& obj, const operation& op, const fc::time_point_sec& blockTime); - bool prunePotentiallyTooOldItems(::rocksdb::WBWIIterator* dataItr, const account_name_type& name, - size_t lastFoundNo, const fc::time_point_sec& now); + void prunePotentiallyTooOldItems(account_history_info* ahInfo, const account_name_type& name, + const fc::time_point_sec& now); void storeSequenceIds() { @@ -453,8 +478,9 @@ class rocksdb_plugin::impl final enum { - WRITE_BUFFER_FLUSH_LIMIT = 100, - ACCOUNT_HISTORY_LENGTH_LIMIT = 30 + WRITE_BUFFER_FLUSH_LIMIT = 100, + ACCOUNT_HISTORY_LENGTH_LIMIT = 30, + ACCOUNT_HISTORY_TIME_LIMIT = 30, }; /// Class attributes: @@ -466,7 +492,7 @@ class rocksdb_plugin::impl final std::unique_ptr _api; std::unique_ptr _storage; std::vector _columnHandles; - ::rocksdb::WriteBatchWithIndex _writeBuffer; + CachableWriteBatch _writeBuffer; boost::signals2::connection _pre_apply_connection; /// Helper member to be able to detect another incomming tx and increment tx-counter. transaction_id_type _lastTx; @@ -651,68 +677,68 @@ void rocksdb_plugin::impl::storeOpFilteringParameters(const std::vector processor) const { - std::unique_ptr<::rocksdb::Iterator> it(_storage->NewIterator(ReadOptions(), _columnHandles[3])); - bool backwardSearch = start == (uint64_t)-1; - account_name_storage_id_pair nameIdPair(name.data, backwardSearch ? (size_t)-1 : 0); - PrimitiveTypeSlice key(nameIdPair); + ReadOptions rOptions; + PrimitiveTypeSlice nameSlice(name.data); + PinnableSlice buffer; + auto s = _storage->Get(rOptions, _columnHandles[3], nameSlice, &buffer); - if(backwardSearch) - it->SeekForPrev(key); - else - it->Seek(key); - - if(it->Valid() == false) + if(s.IsNotFound()) return; - if(backwardSearch) - { - unsigned int count = 0; - for(; it->Valid() && it->key().starts_with(nameSlice); it->Prev()) - ++count; + checkStatus(s); - size_t entries = count - 1; - - for(it->SeekForPrev(key); it->Valid() && it->key().starts_with(nameSlice); it->Prev(), --entries, --limit) - { - auto valueSlice = it->value(); - const auto& opId = PrimitiveTypeSlice::unpackSlice(valueSlice); - tmp_operation_object oObj; - bool found = find_operation_object(opId, &oObj); - FC_ASSERT(found, "Missing operation?"); - processor(entries, oObj); - - if(limit == 0) - break; - } + account_history_info ahInfo; + load(ahInfo, buffer.data(), buffer.size()); + + PrimitiveTypeSlice> lowerBoundSlice(std::make_pair(ahInfo.id, ahInfo.oldestEntryId)); + PrimitiveTypeSlice> upperBoundSlice(std::make_pair(ahInfo.id, ahInfo.newestEntryId+1)); + + rOptions.iterate_lower_bound = &lowerBoundSlice; + rOptions.iterate_upper_bound = &upperBoundSlice; + PrimitiveTypeSlice> key(std::make_pair(ahInfo.id, start)); + PrimitiveTypeSlice ahIdSlice(ahInfo.id); + + std::unique_ptr<::rocksdb::Iterator> it(_storage->NewIterator(rOptions, _columnHandles[4])); + + it->SeekForPrev(key); + + if(it->Valid() == false) return; - } - size_t toSkip = start - limit + 1; - size_t entries = 0; + auto keySlice = it->key(); + auto keyValue = PrimitiveTypeSlice>::unpackSlice(keySlice); + + auto lowerBound = keyValue.second > limit ? keyValue.second - limit : 0; - for(; it->Valid() && it->key().starts_with(nameSlice); it->Next()) + for(; it->Valid(); it->Prev()) { - if(entries > start) + auto keySlice = it->key(); + if(keySlice.starts_with(ahIdSlice) == false) break; - if(++entries < toSkip) - continue; + keyValue = PrimitiveTypeSlice>::unpackSlice(keySlice); auto valueSlice = it->value(); - const auto& opId = PrimitiveTypeSlice::unpackSlice(valueSlice); + const auto& opId = PrimitiveTypeSlice::unpackSlice(valueSlice); tmp_operation_object oObj; bool found = find_operation_object(opId, &oObj); FC_ASSERT(found, "Missing operation?"); - processor(entries-1, oObj); + +// ilog("AH-info-id: ${a}, Entry: ${e}, OperationId: ${oid}", ("a", keyValue.first)("e", keyValue.second)("oid", oObj.id)); + + processor(keyValue.second, oObj); + + if(keyValue.second <= lowerBound) + break; } } bool rocksdb_plugin::impl::find_operation_object(size_t opId, tmp_operation_object* op) const { std::string data; - PrimitiveTypeSlice idSlice(opId); + PrimitiveTypeSlice idSlice(opId); ::rocksdb::Status s = _storage->Get(ReadOptions(), _columnHandles[1], idSlice, &data); if(s.ok()) @@ -760,9 +786,13 @@ rocksdb_plugin::impl::ColumnDefinitions rocksdb_plugin::impl::prepareColumnDefin auto& byLocationColumn = columnDefs.back(); byLocationColumn.options.comparator = by_location_Comparator(); - columnDefs.emplace_back("account_history_object_by_name_id", ColumnFamilyOptions()); + columnDefs.emplace_back("account_history_info_by_name", ColumnFamilyOptions()); auto& byAccountNameColumn = columnDefs.back(); - byAccountNameColumn.options.comparator = by_account_name_storage_id_pair_Comparator(); + byAccountNameColumn.options.comparator = by_account_name_Comparator(); + + columnDefs.emplace_back("ah_info_operation_by_ids", ColumnFamilyOptions()); + auto& byAHInfoColumn = columnDefs.back(); + byAHInfoColumn.options.comparator = by_ah_info_operation_Comparator(); return columnDefs; } @@ -867,11 +897,11 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi fc::raw::pack(ds, obj); } - PrimitiveTypeSlice idSlice(_operationSeqId); + PrimitiveTypeSlice idSlice(obj.id); auto s = _writeBuffer.Put(_columnHandles[1], idSlice, Slice(serializedObj.data(), serializedObj.size())); checkStatus(s); - PrimitiveTypeSlice blockNoIdSlice(location_id_pair(blockNum, _operationSeqId)); + PrimitiveTypeSlice blockNoIdSlice(location_id_pair(blockNum, obj.id)); s = _writeBuffer.Put(_columnHandles[2], blockNoIdSlice, idSlice); checkStatus(s); @@ -894,48 +924,145 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi void rocksdb_plugin::impl::buildAccountHistoryRecord(const account_name_type& name, const tmp_operation_object& obj, const operation& op, const fc::time_point_sec& blockTime) { - std::unique_ptr<::rocksdb::WBWIIterator> latestEntryItr(_writeBuffer.NewIterator(_columnHandles[3])); - account_name_storage_id_pair maxIdPair(name.data, std::numeric_limits::max()); - PrimitiveTypeSlice maxIdSlice(maxIdPair); - latestEntryItr->SeekForPrev(maxIdSlice); - size_t lastNo = 0; - if(latestEntryItr->Valid()) - { - /// Some last ID has been found. - auto e = latestEntryItr->Entry(); - auto foundEntry = PrimitiveTypeSlice::unpackSlice(e.key); - lastNo = foundEntry.second; - // account_name_type foundAccount; - // foundAccount.data = foundEntry.first; - // std::string strAccount = foundAccount; - // ilog("Found last entry no: ${e} for account: `${n}'", ("e", lastNo)("n", strAccount)); - } + std::string strName = name; - auto nextSeqId = lastNo + 1; - account_name_storage_id_pair nameIdPair(name.data, nextSeqId); - PrimitiveTypeSlice nameIdPairSlice(nameIdPair); - PrimitiveTypeSlice idSlice(obj.id); - - if(_prune && lastNo >= ACCOUNT_HISTORY_LENGTH_LIMIT) + ReadOptions rOptions; + //rOptions.tailing = true; + + PrimitiveTypeSlice nameSlice(name.data); + + account_history_info ahInfo; + bool found = _writeBuffer.getAHInfo(name, &ahInfo); + + if(found) { - if(prunePotentiallyTooOldItems(latestEntryItr.get(), name, lastNo, blockTime)) - { - auto s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice); - checkStatus(s); - } + auto count = ahInfo.getAssociatedOpCount(); + + if(_prune && count > ACCOUNT_HISTORY_LENGTH_LIMIT && + ((blockTime - ahInfo.oldestEntryTimestamp) > fc::days(ACCOUNT_HISTORY_TIME_LIMIT))) + { + prunePotentiallyTooOldItems(&ahInfo, name, blockTime); + } + + auto nextEntryId = ++ahInfo.newestEntryId; + _writeBuffer.putAHInfo(name, ahInfo); + + PrimitiveTypeSlice> ahInfoOpSlice(std::make_pair(ahInfo.id, nextEntryId)); + PrimitiveTypeSlice valueSlice(obj.id); + auto s = _writeBuffer.Put(_columnHandles[4], ahInfoOpSlice, valueSlice); + checkStatus(s); + + // if(strName == "blocktrades") + // { + // ilog("Block: ${b}: Storing another AH entry ${e} for account: `${a}' (${ahID}). Operation block: ${ob}, Operation id: ${oid}", + // ("b", _mainDb.head_block_num()) + // ("e", nextEntryId) + // ("a", strName) + // ("ahID", ahInfo.id) + // ("ob", obj.block) + // ("oid", obj.id) + // ); + // } + } else { - auto s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice); + /// New entry must be created - there is first operation recorded. + ahInfo.id = _accountHistorySeqId++; + ahInfo.newestEntryId = ahInfo.oldestEntryId = 0; + ahInfo.oldestEntryTimestamp = obj.timestamp; + + _writeBuffer.putAHInfo(name, ahInfo); + + PrimitiveTypeSlice> ahInfoOpSlice(std::make_pair(ahInfo.id, 0)); + PrimitiveTypeSlice valueSlice(obj.id); + auto s = _writeBuffer.Put(_columnHandles[4], ahInfoOpSlice, valueSlice); checkStatus(s); + + // if(strName == "blocktrades") + // { + // blocktrades_id = ahInfo.id; + // ilog("Block: ${b}: Storing FIRST AH entry ${e} for account: `${a}' (${ahID}). Operation block: ${ob}, OPeration id: ${oid}", + // ("b", _mainDb.head_block_num()) + // ("e", 0) + // ("a", strName) + // ("ahID", ahInfo.id) + // ("ob", obj.block) + // ("oid", obj.id) + // ); + // } + } } -bool rocksdb_plugin::impl::prunePotentiallyTooOldItems(::rocksdb::WBWIIterator* dataItr, const account_name_type& name, - size_t lastFoundNo, const fc::time_point_sec& now) +void rocksdb_plugin::impl::prunePotentiallyTooOldItems(account_history_info* ahInfo, const account_name_type& name, + const fc::time_point_sec& now) { - FC_TODO("NOT IMPLEMENTED YET"); - return true; + std::string strName = name; + + auto ageLimit = fc::days(ACCOUNT_HISTORY_TIME_LIMIT); + + PrimitiveTypeSlice> oldestEntrySlice( + std::make_pair(ahInfo->id, ahInfo->oldestEntryId)); + auto lookupUpperBound = std::make_pair(ahInfo->id + 1, + ahInfo->newestEntryId - ACCOUNT_HISTORY_LENGTH_LIMIT + 1); + + PrimitiveTypeSlice> newestEntrySlice(lookupUpperBound); + + ReadOptions rOptions; + //rOptions.tailing = true; + rOptions.iterate_lower_bound = &oldestEntrySlice; + rOptions.iterate_upper_bound = &newestEntrySlice; + + auto s = _writeBuffer.SingleDelete(_columnHandles[4], oldestEntrySlice); + checkStatus(s); + + std::unique_ptr<::rocksdb::Iterator> dataItr(_storage->NewIterator(rOptions, _columnHandles[4])); + + /** To clean outdated records we have to iterate over all AH records having subsequent number greater than limit + * and additionally verify date of operation, to clean up only these exceeding a date limit. + * So just operations having a list position > ACCOUNT_HISTORY_LENGTH_LIMIT and older that ACCOUNT_HISTORY_TIME_LIMIT + * shall be removed. + */ + dataItr->Seek(oldestEntrySlice); + + /// Boundaries of keys to be removed + //uint32_t leftBoundary = ahInfo->oldestEntryId; + uint32_t rightBoundary = ahInfo->oldestEntryId; + + for(; dataItr->Valid(); dataItr->Next()) + { + auto key = dataItr->key(); + auto foundEntry = PrimitiveTypeSlice>::unpackSlice(key); + + if(foundEntry.first != ahInfo->id || foundEntry.second >= lookupUpperBound.second) + break; + + auto value = dataItr->value(); + + auto pointedOpId = PrimitiveTypeSlice::unpackSlice(value); + tmp_operation_object op; + find_operation_object(pointedOpId, &op); + + auto age = now - op.timestamp; + + if(age > ageLimit) + { + rightBoundary = foundEntry.second; + PrimitiveTypeSlice> rightBoundarySlice( + std::make_pair(ahInfo->id, rightBoundary)); + s = _writeBuffer.SingleDelete(_columnHandles[4], rightBoundarySlice); + checkStatus(s); + } + else + { + ahInfo->oldestEntryId = foundEntry.second; + ahInfo->oldestEntryTimestamp = op.timestamp; + FC_ASSERT(ahInfo->oldestEntryId <= ahInfo->newestEntryId); + + break; + } + } } void rocksdb_plugin::impl::importData(unsigned int blockLimit) @@ -1091,9 +1218,9 @@ void rocksdb_plugin::find_account_history_data(const account_name_type& name, ui _my->find_account_history_data(name, start, limit, processor); } -bool rocksdb_plugin::find_operation_object(size_t opId, tmp_operation_object* data) const +bool rocksdb_plugin::find_operation_object(size_t opId, tmp_operation_object* op) const { - return _my->find_operation_object(opId, data); + return _my->find_operation_object(opId, op); } void rocksdb_plugin::find_operations_by_block(size_t blockNum,