Skip to content

Commit

Permalink
Issue #2066 Prerequisite changes for prune option support. Slightly c…
Browse files Browse the repository at this point in the history
…hanged data structures to allow counting AH records for given account (similary to original implementation).
  • Loading branch information
vogel76 committed Feb 4, 2018
1 parent 9dad4d2 commit de1c13e
Showing 1 changed file with 61 additions and 12 deletions.
73 changes: 61 additions & 12 deletions libraries/plugins/rocksdb/rocksdb_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@

#include <appbase/application.hpp>

#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/utilities/write_batch_with_index.h>

#include <boost/type.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/container/flat_set.hpp>

#include <string>
#include <typeindex>
#include <typeinfo>

namespace bpo = boost::program_options;

Expand Down Expand Up @@ -371,6 +373,10 @@ class rocksdb_plugin::impl final

void importOperation(uint32_t blockNum, const fc::time_point_sec& blockTime, const transaction_id_type& txId,
uint32_t txInBlock, const operation& op, uint16_t opInTx);
void buildAccountHistoryRecord(const account_name_type& name, const tmp_operation_object& obj, const operation& op,
const fc::time_point_sec& blockTime);
bool prunePotentiallyTooOldItems(::rocksdb::WBWIIterator* dataItr, const account_name_type& name,
size_t lastFoundNo, const fc::time_point_sec& now);

void storeSequenceIds()
{
Expand Down Expand Up @@ -414,7 +420,7 @@ class rocksdb_plugin::impl final
storage = _storage.get();

::rocksdb::WriteOptions wOptions;
auto s = storage->Write(wOptions, &_writeBuffer);
auto s = storage->Write(wOptions, _writeBuffer.GetWriteBatch());
checkStatus(s);
_writeBuffer.Clear();
_collectedOps = 0;
Expand Down Expand Up @@ -447,7 +453,8 @@ class rocksdb_plugin::impl final

enum
{
WRITE_BUFFER_FLUSH_LIMIT = 100
WRITE_BUFFER_FLUSH_LIMIT = 100,
ACCOUNT_HISTORY_LENGTH_LIMIT = 30
};

/// Class attributes:
Expand All @@ -459,7 +466,7 @@ class rocksdb_plugin::impl final
std::unique_ptr<rocksdb_api> _api;
std::unique_ptr<DB> _storage;
std::vector<ColumnFamilyHandle*> _columnHandles;
::rocksdb::WriteBatch _writeBuffer;
::rocksdb::WriteBatchWithIndex _writeBuffer;
boost::signals2::connection _pre_apply_connection;
/// Helper member to be able to detect another incomming tx and increment tx-counter.
transaction_id_type _lastTx;
Expand Down Expand Up @@ -871,12 +878,7 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi
if(isTrackedOperation(op))
{
for(const auto& name : impacted)
{
account_name_storage_id_pair nameIdPair(name.data, _accountHistorySeqId++);
PrimitiveTypeSlice<account_name_storage_id_pair> nameIdPairSlice(nameIdPair);
s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice);
checkStatus(s);
}
buildAccountHistoryRecord(name, obj, op, blockTime);
}
else
{
Expand All @@ -889,6 +891,53 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi
++_totalOps;
}

void rocksdb_plugin::impl::buildAccountHistoryRecord(const account_name_type& name, const tmp_operation_object& obj,
const operation& op, const fc::time_point_sec& blockTime)
{
std::unique_ptr<::rocksdb::WBWIIterator> latestEntryItr(_writeBuffer.NewIterator(_columnHandles[3]));
account_name_storage_id_pair maxIdPair(name.data, std::numeric_limits<size_t>::max());
PrimitiveTypeSlice<account_name_storage_id_pair> maxIdSlice(maxIdPair);
latestEntryItr->SeekForPrev(maxIdSlice);
size_t lastNo = 0;
if(latestEntryItr->Valid())
{
/// Some last ID has been found.
auto e = latestEntryItr->Entry();
auto foundEntry = PrimitiveTypeSlice<account_name_storage_id_pair>::unpackSlice(e.key);
lastNo = foundEntry.second;
// account_name_type foundAccount;
// foundAccount.data = foundEntry.first;
// std::string strAccount = foundAccount;
// ilog("Found last entry no: ${e} for account: `${n}'", ("e", lastNo)("n", strAccount));
}

auto nextSeqId = lastNo + 1;
account_name_storage_id_pair nameIdPair(name.data, nextSeqId);
PrimitiveTypeSlice<account_name_storage_id_pair> nameIdPairSlice(nameIdPair);
PrimitiveTypeSlice<size_t> idSlice(obj.id);

if(_prune && lastNo >= ACCOUNT_HISTORY_LENGTH_LIMIT)
{
if(prunePotentiallyTooOldItems(latestEntryItr.get(), name, lastNo, blockTime))
{
auto s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice);
checkStatus(s);
}
}
else
{
auto s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice);
checkStatus(s);
}
}

bool rocksdb_plugin::impl::prunePotentiallyTooOldItems(::rocksdb::WBWIIterator* dataItr, const account_name_type& name,
size_t lastFoundNo, const fc::time_point_sec& now)
{
FC_TODO("NOT IMPLEMENTED YET");
return true;
}

void rocksdb_plugin::impl::importData(unsigned int blockLimit)
{
if(_storage == nullptr)
Expand Down

0 comments on commit de1c13e

Please sign in to comment.