Skip to content

Commit

Permalink
Issue #2099 Issue #2066 Implemented enum_virtual_ops method.
Browse files Browse the repository at this point in the history
  • Loading branch information
vogel76 committed Feb 15, 2018
1 parent b9f35f0 commit 9e1561f
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class rocksdb_plugin final : public appbase::plugin< rocksdb_plugin >
bool find_operation_object(size_t opId, tmp_operation_object* data) const;
void find_operations_by_block(size_t blockNum,
std::function<void(const tmp_operation_object&)> processor) const;
uint32_t enum_operations_from_block_range(uint32_t blockRangeBegin, uint32_t blockRangeEnd,
std::function<void(const tmp_operation_object&)> processor) const;

private:
class impl;
Expand Down
17 changes: 17 additions & 0 deletions libraries/plugins/rocksdb/rocksdb_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class api_impl
DECLARE_API_IMPL(
(get_ops_in_block)
(get_account_history)
(enum_virtual_ops)
)

const rocksdb::rocksdb_plugin& _dataSource;
Expand Down Expand Up @@ -62,6 +63,21 @@ DEFINE_API_IMPL( api_impl, get_account_history )
return result;
}

DEFINE_API_IMPL(api_impl, enum_virtual_ops)
{
enum_virtual_ops_return result;

result.next_block_range_begin = _dataSource.enum_operations_from_block_range(args.block_range_begin,
args.block_range_end,
[&result](const tmp_operation_object& op)
{
result.ops.emplace_back(api_operation_object(op));
}
);

return result;
}

} // detail

rocksdb_api::rocksdb_api(const rocksdb::rocksdb_plugin& dataSource) : my( new detail::api_impl(dataSource) )
Expand All @@ -75,6 +91,7 @@ rocksdb_api::~rocksdb_api() {}
DEFINE_LOCKLESS_APIS( rocksdb_api,
(get_ops_in_block)
(get_account_history)
(enum_virtual_ops)
)

} } } // steem::plugins::rocksdb
21 changes: 21 additions & 0 deletions libraries/plugins/rocksdb/rocksdb_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ struct get_account_history_return
std::map< uint32_t, api_operation_object > history;
};

/** Allows to specify range of blocks to retrieve virtual operations for.
* \param block_range_begin - starting block number (inclusive) to search for virtual operations
* \param block_range_end - last block number (exclusive) to search for virtual operations
*/
struct enum_virtual_ops_args
{
uint32_t block_range_begin = 1;
uint32_t block_range_end = 2;
};

struct enum_virtual_ops_return
{
vector<api_operation_object> ops;
uint32_t next_block_range_begin = 0;
};

class rocksdb_api final
{
public:
Expand All @@ -116,6 +132,7 @@ class rocksdb_api final
DECLARE_API(
(get_ops_in_block)
(get_account_history)
(enum_virtual_ops)
)

private:
Expand All @@ -142,3 +159,7 @@ FC_REFLECT( steem::plugins::rocksdb::get_account_history_args,

FC_REFLECT( steem::plugins::rocksdb::get_account_history_return,
(history) )

FC_REFLECT(steem::plugins::rocksdb::enum_virtual_ops_args, (block_range_begin)(block_range_end))
FC_REFLECT(steem::plugins::rocksdb::enum_virtual_ops_return, (ops)(next_block_range_begin))

114 changes: 113 additions & 1 deletion libraries/plugins/rocksdb/rocksdb_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <boost/algorithm/string.hpp>
#include <boost/container/flat_set.hpp>

#include <limits>
#include <string>
#include <typeindex>
#include <typeinfo>
Expand Down Expand Up @@ -351,6 +352,7 @@ class rocksdb_plugin::impl final
if(status.ok())
{
ilog("RocksDB opened successfully storage at location: `${p}'.", ("p", strPath));
verifyStoreVersion(storageDb);
loadSeqIdentifiers(storageDb);
_storage.reset(storageDb);

Expand Down Expand Up @@ -380,6 +382,10 @@ class rocksdb_plugin::impl final
/// Allows to look for all operations present in given block and call `processor` for them.
void find_operations_by_block(size_t blockNum,
std::function<void(const tmp_operation_object&)> processor) const;
/// Allows to enumerate all operations registered in given block range.
uint32_t enumVirtualOperationsFromBlockRange(uint32_t blockRangeBegin,
uint32_t blockRangeEnd, std::function<void(const tmp_operation_object&)> processor) const;

void shutdownDb()
{
chain::util::disconnect_signal(_pre_apply_connection);
Expand Down Expand Up @@ -409,6 +415,35 @@ class rocksdb_plugin::impl final
void prunePotentiallyTooOldItems(account_history_info* ahInfo, const account_name_type& name,
const fc::time_point_sec& now);

void saveStoreVersion()
{
PrimitiveTypeSlice<uint32_t> majorVSlice(STORE_MAJOR_VERSION);
PrimitiveTypeSlice<uint32_t> minorVSlice(STORE_MINOR_VERSION);

auto s = _writeBuffer.Put(Slice("STORE_MAJOR_VERSION"), majorVSlice);
checkStatus(s);
s = _writeBuffer.Put(Slice("STORE_MINOR_VERSION"), minorVSlice);
checkStatus(s);
}

void verifyStoreVersion(DB* storageDb)
{
ReadOptions rOptions;

std::string buffer;
auto s = storageDb->Get(rOptions, "STORE_MAJOR_VERSION", &buffer);
checkStatus(s);
const auto major = PrimitiveTypeSlice<uint32_t>::unpackSlice(buffer);

FC_ASSERT(major == STORE_MAJOR_VERSION, "Store major version mismatch");

s = storageDb->Get(rOptions, "STORE_MINOR_VERSION", &buffer);
checkStatus(s);
const auto minor = PrimitiveTypeSlice<uint32_t>::unpackSlice(buffer);

FC_ASSERT(minor == STORE_MINOR_VERSION, "Store minor version mismatch");
}

void storeSequenceIds()
{
Slice opSeqIdName("OPERATION_SEQ_ID");
Expand Down Expand Up @@ -504,6 +539,15 @@ class rocksdb_plugin::impl final
WRITE_BUFFER_FLUSH_LIMIT = 100,
ACCOUNT_HISTORY_LENGTH_LIMIT = 30,
ACCOUNT_HISTORY_TIME_LIMIT = 30,

VIRTUAL_OP_FLAG = 0x80000000,
/** Because localtion_id_pair stores block_number paired with (VIRTUAL_OP_FLAG|operation_id),
* max allowed operation-id is max_int (instead of max_uint).
*/
MAX_OPERATION_ID = std::numeric_limits<int>::max(),

STORE_MAJOR_VERSION = 1,
STORE_MINOR_VERSION = 0,
};

/// Class attributes:
Expand Down Expand Up @@ -797,6 +841,60 @@ void rocksdb_plugin::impl::find_operations_by_block(size_t blockNum,
}
}

uint32_t rocksdb_plugin::impl::enumVirtualOperationsFromBlockRange(uint32_t blockRangeBegin,
uint32_t blockRangeEnd, std::function<void(const tmp_operation_object&)> processor) const
{
FC_ASSERT(blockRangeEnd > blockRangeBegin, "Block range must be upward");

PrimitiveTypeSlice<location_id_pair> upperBoundSlice(location_id_pair(blockRangeEnd, 0));

PrimitiveTypeSlice<location_id_pair> rangeBeginSlice(location_id_pair(blockRangeBegin, 0));

ReadOptions rOptions;
rOptions.iterate_upper_bound = &upperBoundSlice;

std::unique_ptr<::rocksdb::Iterator> it(_storage->NewIterator(rOptions, _columnHandles[2]));

uint32_t lastFoundBlock = 0;

for(it->Seek(rangeBeginSlice); it->Valid(); it->Next())
{
auto keySlice = it->key();
const auto& key = PrimitiveTypeSlice<location_id_pair>::unpackSlice(keySlice);

/// Accept only virtual operations
if(key.second & VIRTUAL_OP_FLAG)
{
auto valueSlice = it->value();
const auto& opId = PrimitiveTypeSlice<size_t>::unpackSlice(valueSlice);

tmp_operation_object op;
bool found = find_operation_object(opId, &op);
FC_ASSERT(found);

processor(op);
lastFoundBlock = op.block;
}
}

PrimitiveTypeSlice<location_id_pair> lowerBoundSlice(location_id_pair(lastFoundBlock, 0));
rOptions = ReadOptions();
rOptions.iterate_lower_bound = &lowerBoundSlice;
it.reset(_storage->NewIterator(rOptions, _columnHandles[2]));

PrimitiveTypeSlice<location_id_pair> nextRangeBeginSlice(location_id_pair(lastFoundBlock + 1, 0));
for(it->Seek(nextRangeBeginSlice); it->Valid(); it->Next())
{
auto keySlice = it->key();
const auto& key = PrimitiveTypeSlice<location_id_pair>::unpackSlice(keySlice);

if(key.second & VIRTUAL_OP_FLAG)
return key.first;
}

return 0;
}

rocksdb_plugin::impl::ColumnDefinitions rocksdb_plugin::impl::prepareColumnDefinitions(bool addDefaultColumn)
{
ColumnDefinitions columnDefs;
Expand Down Expand Up @@ -851,6 +949,7 @@ bool rocksdb_plugin::impl::createDbSchema(const bfs::path& path)
if(s.ok())
{
ilog("RockDB column definitions created successfully.");
saveStoreVersion();
/// Store initial values of Seq-IDs for held objects.
flushWriteBuffer(db);
cleanupColumnHandles();
Expand Down Expand Up @@ -901,6 +1000,8 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi
if(impacted.empty())
return; /// Ignore operations not impacting any account (according to original implementation)

FC_ASSERT(_operationSeqId < MAX_OPERATION_ID, "Operation id limit exceeded");

tmp_operation_object obj;
obj.id = _operationSeqId++;
obj.trx_id = txId;
Expand All @@ -926,7 +1027,11 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi
auto s = _writeBuffer.Put(_columnHandles[1], idSlice, Slice(serializedObj.data(), serializedObj.size()));
checkStatus(s);

PrimitiveTypeSlice<location_id_pair> blockNoIdSlice(location_id_pair(blockNum, obj.id));
uint32_t encodedId = obj.id;
if(is_virtual_operation(op))
encodedId |= VIRTUAL_OP_FLAG;

PrimitiveTypeSlice<location_id_pair> blockNoIdSlice(location_id_pair(blockNum, encodedId));
s = _writeBuffer.Put(_columnHandles[2], blockNoIdSlice, idSlice);
checkStatus(s);

Expand Down Expand Up @@ -1295,4 +1400,11 @@ void rocksdb_plugin::find_operations_by_block(size_t blockNum,
_my->find_operations_by_block(blockNum, processor);
}

uint32_t rocksdb_plugin::enum_operations_from_block_range(uint32_t blockRangeBegin, uint32_t blockRangeEnd,
std::function<void(const tmp_operation_object&)> processor) const
{
return _my->enumVirtualOperationsFromBlockRange(blockRangeBegin, blockRangeEnd, processor);
}


} } }

0 comments on commit 9e1561f

Please sign in to comment.