From 9e1561f5b21f57c041f5262336bab5a28ea86e91 Mon Sep 17 00:00:00 2001 From: Bartek Wrona Date: Thu, 15 Feb 2018 12:06:54 +0100 Subject: [PATCH] Issue #2099 Issue #2066 Implemented enum_virtual_ops method. --- .../steem/plugins/rocksdb/rocksdb_plugin.hpp | 2 + libraries/plugins/rocksdb/rocksdb_api.cpp | 17 +++ libraries/plugins/rocksdb/rocksdb_api.hpp | 21 ++++ libraries/plugins/rocksdb/rocksdb_plugin.cpp | 114 +++++++++++++++++- 4 files changed, 153 insertions(+), 1 deletion(-) diff --git a/libraries/plugins/rocksdb/include/steem/plugins/rocksdb/rocksdb_plugin.hpp b/libraries/plugins/rocksdb/include/steem/plugins/rocksdb/rocksdb_plugin.hpp index 7df1e5efe0..4886d9f8a5 100644 --- a/libraries/plugins/rocksdb/include/steem/plugins/rocksdb/rocksdb_plugin.hpp +++ b/libraries/plugins/rocksdb/include/steem/plugins/rocksdb/rocksdb_plugin.hpp @@ -39,6 +39,8 @@ class rocksdb_plugin final : public appbase::plugin< rocksdb_plugin > bool find_operation_object(size_t opId, tmp_operation_object* data) const; void find_operations_by_block(size_t blockNum, std::function processor) const; + uint32_t enum_operations_from_block_range(uint32_t blockRangeBegin, uint32_t blockRangeEnd, + std::function processor) const; private: class impl; diff --git a/libraries/plugins/rocksdb/rocksdb_api.cpp b/libraries/plugins/rocksdb/rocksdb_api.cpp index 75af2f4867..4d2db88d50 100644 --- a/libraries/plugins/rocksdb/rocksdb_api.cpp +++ b/libraries/plugins/rocksdb/rocksdb_api.cpp @@ -26,6 +26,7 @@ class api_impl DECLARE_API_IMPL( (get_ops_in_block) (get_account_history) + (enum_virtual_ops) ) const rocksdb::rocksdb_plugin& _dataSource; @@ -62,6 +63,21 @@ DEFINE_API_IMPL( api_impl, get_account_history ) return result; } +DEFINE_API_IMPL(api_impl, enum_virtual_ops) +{ + enum_virtual_ops_return result; + + result.next_block_range_begin = _dataSource.enum_operations_from_block_range(args.block_range_begin, + args.block_range_end, + [&result](const tmp_operation_object& op) + { + result.ops.emplace_back(api_operation_object(op)); + } + ); + + return result; +} + } // detail rocksdb_api::rocksdb_api(const rocksdb::rocksdb_plugin& dataSource) : my( new detail::api_impl(dataSource) ) @@ -75,6 +91,7 @@ rocksdb_api::~rocksdb_api() {} DEFINE_LOCKLESS_APIS( rocksdb_api, (get_ops_in_block) (get_account_history) + (enum_virtual_ops) ) } } } // steem::plugins::rocksdb diff --git a/libraries/plugins/rocksdb/rocksdb_api.hpp b/libraries/plugins/rocksdb/rocksdb_api.hpp index 687ddf1922..b1b6a122ed 100644 --- a/libraries/plugins/rocksdb/rocksdb_api.hpp +++ b/libraries/plugins/rocksdb/rocksdb_api.hpp @@ -107,6 +107,22 @@ struct get_account_history_return std::map< uint32_t, api_operation_object > history; }; +/** Allows to specify range of blocks to retrieve virtual operations for. + * \param block_range_begin - starting block number (inclusive) to search for virtual operations + * \param block_range_end - last block number (exclusive) to search for virtual operations + */ +struct enum_virtual_ops_args +{ + uint32_t block_range_begin = 1; + uint32_t block_range_end = 2; +}; + +struct enum_virtual_ops_return +{ + vector ops; + uint32_t next_block_range_begin = 0; +}; + class rocksdb_api final { public: @@ -116,6 +132,7 @@ class rocksdb_api final DECLARE_API( (get_ops_in_block) (get_account_history) + (enum_virtual_ops) ) private: @@ -142,3 +159,7 @@ FC_REFLECT( steem::plugins::rocksdb::get_account_history_args, FC_REFLECT( steem::plugins::rocksdb::get_account_history_return, (history) ) + +FC_REFLECT(steem::plugins::rocksdb::enum_virtual_ops_args, (block_range_begin)(block_range_end)) +FC_REFLECT(steem::plugins::rocksdb::enum_virtual_ops_return, (ops)(next_block_range_begin)) + diff --git a/libraries/plugins/rocksdb/rocksdb_plugin.cpp b/libraries/plugins/rocksdb/rocksdb_plugin.cpp index 66e557e9d8..ba21f7d023 100644 --- a/libraries/plugins/rocksdb/rocksdb_plugin.cpp +++ b/libraries/plugins/rocksdb/rocksdb_plugin.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -351,6 +352,7 @@ class rocksdb_plugin::impl final if(status.ok()) { ilog("RocksDB opened successfully storage at location: `${p}'.", ("p", strPath)); + verifyStoreVersion(storageDb); loadSeqIdentifiers(storageDb); _storage.reset(storageDb); @@ -380,6 +382,10 @@ class rocksdb_plugin::impl final /// Allows to look for all operations present in given block and call `processor` for them. void find_operations_by_block(size_t blockNum, std::function processor) const; + /// Allows to enumerate all operations registered in given block range. + uint32_t enumVirtualOperationsFromBlockRange(uint32_t blockRangeBegin, + uint32_t blockRangeEnd, std::function processor) const; + void shutdownDb() { chain::util::disconnect_signal(_pre_apply_connection); @@ -409,6 +415,35 @@ class rocksdb_plugin::impl final void prunePotentiallyTooOldItems(account_history_info* ahInfo, const account_name_type& name, const fc::time_point_sec& now); + void saveStoreVersion() + { + PrimitiveTypeSlice majorVSlice(STORE_MAJOR_VERSION); + PrimitiveTypeSlice minorVSlice(STORE_MINOR_VERSION); + + auto s = _writeBuffer.Put(Slice("STORE_MAJOR_VERSION"), majorVSlice); + checkStatus(s); + s = _writeBuffer.Put(Slice("STORE_MINOR_VERSION"), minorVSlice); + checkStatus(s); + } + + void verifyStoreVersion(DB* storageDb) + { + ReadOptions rOptions; + + std::string buffer; + auto s = storageDb->Get(rOptions, "STORE_MAJOR_VERSION", &buffer); + checkStatus(s); + const auto major = PrimitiveTypeSlice::unpackSlice(buffer); + + FC_ASSERT(major == STORE_MAJOR_VERSION, "Store major version mismatch"); + + s = storageDb->Get(rOptions, "STORE_MINOR_VERSION", &buffer); + checkStatus(s); + const auto minor = PrimitiveTypeSlice::unpackSlice(buffer); + + FC_ASSERT(minor == STORE_MINOR_VERSION, "Store minor version mismatch"); + } + void storeSequenceIds() { Slice opSeqIdName("OPERATION_SEQ_ID"); @@ -504,6 +539,15 @@ class rocksdb_plugin::impl final WRITE_BUFFER_FLUSH_LIMIT = 100, ACCOUNT_HISTORY_LENGTH_LIMIT = 30, ACCOUNT_HISTORY_TIME_LIMIT = 30, + + VIRTUAL_OP_FLAG = 0x80000000, + /** Because localtion_id_pair stores block_number paired with (VIRTUAL_OP_FLAG|operation_id), + * max allowed operation-id is max_int (instead of max_uint). + */ + MAX_OPERATION_ID = std::numeric_limits::max(), + + STORE_MAJOR_VERSION = 1, + STORE_MINOR_VERSION = 0, }; /// Class attributes: @@ -797,6 +841,60 @@ void rocksdb_plugin::impl::find_operations_by_block(size_t blockNum, } } +uint32_t rocksdb_plugin::impl::enumVirtualOperationsFromBlockRange(uint32_t blockRangeBegin, + uint32_t blockRangeEnd, std::function processor) const +{ + FC_ASSERT(blockRangeEnd > blockRangeBegin, "Block range must be upward"); + + PrimitiveTypeSlice upperBoundSlice(location_id_pair(blockRangeEnd, 0)); + + PrimitiveTypeSlice rangeBeginSlice(location_id_pair(blockRangeBegin, 0)); + + ReadOptions rOptions; + rOptions.iterate_upper_bound = &upperBoundSlice; + + std::unique_ptr<::rocksdb::Iterator> it(_storage->NewIterator(rOptions, _columnHandles[2])); + + uint32_t lastFoundBlock = 0; + + for(it->Seek(rangeBeginSlice); it->Valid(); it->Next()) + { + auto keySlice = it->key(); + const auto& key = PrimitiveTypeSlice::unpackSlice(keySlice); + + /// Accept only virtual operations + if(key.second & VIRTUAL_OP_FLAG) + { + auto valueSlice = it->value(); + const auto& opId = PrimitiveTypeSlice::unpackSlice(valueSlice); + + tmp_operation_object op; + bool found = find_operation_object(opId, &op); + FC_ASSERT(found); + + processor(op); + lastFoundBlock = op.block; + } + } + + PrimitiveTypeSlice lowerBoundSlice(location_id_pair(lastFoundBlock, 0)); + rOptions = ReadOptions(); + rOptions.iterate_lower_bound = &lowerBoundSlice; + it.reset(_storage->NewIterator(rOptions, _columnHandles[2])); + + PrimitiveTypeSlice nextRangeBeginSlice(location_id_pair(lastFoundBlock + 1, 0)); + for(it->Seek(nextRangeBeginSlice); it->Valid(); it->Next()) + { + auto keySlice = it->key(); + const auto& key = PrimitiveTypeSlice::unpackSlice(keySlice); + + if(key.second & VIRTUAL_OP_FLAG) + return key.first; + } + + return 0; +} + rocksdb_plugin::impl::ColumnDefinitions rocksdb_plugin::impl::prepareColumnDefinitions(bool addDefaultColumn) { ColumnDefinitions columnDefs; @@ -851,6 +949,7 @@ bool rocksdb_plugin::impl::createDbSchema(const bfs::path& path) if(s.ok()) { ilog("RockDB column definitions created successfully."); + saveStoreVersion(); /// Store initial values of Seq-IDs for held objects. flushWriteBuffer(db); cleanupColumnHandles(); @@ -901,6 +1000,8 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi if(impacted.empty()) return; /// Ignore operations not impacting any account (according to original implementation) + FC_ASSERT(_operationSeqId < MAX_OPERATION_ID, "Operation id limit exceeded"); + tmp_operation_object obj; obj.id = _operationSeqId++; obj.trx_id = txId; @@ -926,7 +1027,11 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi auto s = _writeBuffer.Put(_columnHandles[1], idSlice, Slice(serializedObj.data(), serializedObj.size())); checkStatus(s); - PrimitiveTypeSlice blockNoIdSlice(location_id_pair(blockNum, obj.id)); + uint32_t encodedId = obj.id; + if(is_virtual_operation(op)) + encodedId |= VIRTUAL_OP_FLAG; + + PrimitiveTypeSlice blockNoIdSlice(location_id_pair(blockNum, encodedId)); s = _writeBuffer.Put(_columnHandles[2], blockNoIdSlice, idSlice); checkStatus(s); @@ -1295,4 +1400,11 @@ void rocksdb_plugin::find_operations_by_block(size_t blockNum, _my->find_operations_by_block(blockNum, processor); } +uint32_t rocksdb_plugin::enum_operations_from_block_range(uint32_t blockRangeBegin, uint32_t blockRangeEnd, + std::function processor) const +{ + return _my->enumVirtualOperationsFromBlockRange(blockRangeBegin, blockRangeEnd, processor); +} + + } } }