Skip to content

Commit

Permalink
DBAdapter to use additional IDBClient for metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Toly Kournik committed Apr 1, 2020
1 parent d39a80b commit 3aee377
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 267 deletions.
86 changes: 43 additions & 43 deletions bftengine/src/bcstatetransfer/BCStateTran.cpp
Expand Up @@ -268,60 +268,60 @@ void BCStateTran::init(uint64_t maxNumOfRequiredStoredCheckpoints,
uint32_t numberOfRequiredReservedPages,
uint32_t sizeOfReservedPage) {
try {
Assert(!running_);
Assert(replicaForStateTransfer_ == nullptr);
Assert(sizeOfReservedPage == config_.sizeOfReservedPage);
Assert(!running_);
Assert(replicaForStateTransfer_ == nullptr);
Assert(sizeOfReservedPage == config_.sizeOfReservedPage);

maxNumOfStoredCheckpoints_ = maxNumOfRequiredStoredCheckpoints;
numberOfReservedPages_ = numberOfRequiredReservedPages;
metrics_.number_of_reserved_pages_.Get().Set(numberOfReservedPages_);
metrics_.size_of_reserved_page_.Get().Set(sizeOfReservedPage);
maxNumOfStoredCheckpoints_ = maxNumOfRequiredStoredCheckpoints;
numberOfReservedPages_ = numberOfRequiredReservedPages;
metrics_.number_of_reserved_pages_.Get().Set(numberOfReservedPages_);
metrics_.size_of_reserved_page_.Get().Set(sizeOfReservedPage);

memset(buffer_, 0, maxItemSize_);
memset(buffer_, 0, maxItemSize_);

LOG_INFO(STLogger,
"Init BCStateTran object:"
<< " maxNumOfStoredCheckpoints_=" << maxNumOfStoredCheckpoints_ << " numberOfReservedPages_="
<< numberOfReservedPages_ << " config_.sizeOfReservedPage_=" << config_.sizeOfReservedPage);
LOG_INFO(STLogger,
"Init BCStateTran object:"
<< " maxNumOfStoredCheckpoints_=" << maxNumOfStoredCheckpoints_ << " numberOfReservedPages_="
<< numberOfReservedPages_ << " config_.sizeOfReservedPage_=" << config_.sizeOfReservedPage);

if (psd_->initialized()) {
LOG_INFO(STLogger, "BCStateTran::init - loading existing data from storage");
if (psd_->initialized()) {
LOG_INFO(STLogger, "BCStateTran::init - loading existing data from storage");

checkConsistency(config_.pedanticChecks);
checkConsistency(config_.pedanticChecks);

FetchingState fs = getFetchingState();
LOG_INFO(STLogger, "starting state is " << stateName(fs));
FetchingState fs = getFetchingState();
LOG_INFO(STLogger, "starting state is " << stateName(fs));

if (fs == FetchingState::GettingMissingBlocks || fs == FetchingState::GettingMissingResPages) {
SetAllReplicasAsPreferred();
}
loadMetrics();
} else {
LOG_INFO(STLogger, "BCStateTran::init - initializing a new object");
if (fs == FetchingState::GettingMissingBlocks || fs == FetchingState::GettingMissingResPages) {
SetAllReplicasAsPreferred();
}
loadMetrics();
} else {
LOG_INFO(STLogger, "BCStateTran::init - initializing a new object");

AssertAND(maxNumOfRequiredStoredCheckpoints >= 2,
maxNumOfRequiredStoredCheckpoints <= kMaxNumOfStoredCheckpoints);

AssertAND(numberOfRequiredReservedPages >= 2, numberOfRequiredReservedPages <= config_.maxNumOfReservedPages);
DataStoreTransaction::Guard g(psd_->beginTransaction());
g.txn()->setReplicas(replicas_);
g.txn()->setMyReplicaId(config_.myReplicaId);
g.txn()->setFVal(config_.fVal);
g.txn()->setMaxNumOfStoredCheckpoints(maxNumOfRequiredStoredCheckpoints);
g.txn()->setNumberOfReservedPages(numberOfRequiredReservedPages);
g.txn()->setLastStoredCheckpoint(0);
g.txn()->setFirstStoredCheckpoint(0);
// TODO(TK) - check max transaction size
for (uint32_t i = 0; i < numberOfReservedPages_; i++) // reset all pages
g.txn()->setPendingResPage(i, buffer_, config_.sizeOfReservedPage);

g.txn()->setIsFetchingState(false);
g.txn()->setFirstRequiredBlock(0);
g.txn()->setLastRequiredBlock(0);
g.txn()->setAsInitialized();

Assert(getFetchingState() == FetchingState::NotFetching);
}
AssertAND(numberOfRequiredReservedPages >= 2, numberOfRequiredReservedPages <= config_.maxNumOfReservedPages);
DataStoreTransaction::Guard g(psd_->beginTransaction());
g.txn()->setReplicas(replicas_);
g.txn()->setMyReplicaId(config_.myReplicaId);
g.txn()->setFVal(config_.fVal);
g.txn()->setMaxNumOfStoredCheckpoints(maxNumOfRequiredStoredCheckpoints);
g.txn()->setNumberOfReservedPages(numberOfRequiredReservedPages);
g.txn()->setLastStoredCheckpoint(0);
g.txn()->setFirstStoredCheckpoint(0);
// TODO(TK) - check max transaction size
for (uint32_t i = 0; i < numberOfReservedPages_; i++) // reset all pages
g.txn()->setPendingResPage(i, buffer_, config_.sizeOfReservedPage);

g.txn()->setIsFetchingState(false);
g.txn()->setFirstRequiredBlock(0);
g.txn()->setLastRequiredBlock(0);
g.txn()->setAsInitialized();

Assert(getFetchingState() == FetchingState::NotFetching);
}
} catch (const std::exception &e) {
LOG_FATAL(STLogger, e.what());
exit(1);
Expand Down
85 changes: 33 additions & 52 deletions kvbc/include/ReplicaImp.h
Expand Up @@ -25,41 +25,42 @@

namespace concord::kvbc {

class ReplicaImp: public IReplica,
public ILocalKeyValueStorageReadOnly,
public IBlocksAppender,
public bftEngine::SimpleBlockchainStateTransfer::IAppState {
class ReplicaImp : public IReplica,
public ILocalKeyValueStorageReadOnly,
public IBlocksAppender,
public bftEngine::SimpleBlockchainStateTransfer::IAppState {
public:
// concord::kvbc::IReplica methods
virtual Status start() override;
virtual Status stop() override;

virtual RepStatus getReplicaStatus() const override;

virtual const ILocalKeyValueStorageReadOnly &getReadOnlyStorage() override;

virtual Status addBlockToIdleReplica(const concord::storage::SetOfKeyValuePairs &updates) override;

virtual void set_command_handler(ICommandsHandler *handler) override;

// concord::storage::ILocalKeyValueStorageReadOnly methods
virtual Status get(const Sliver &key, Sliver &outValue) const override;

virtual Status get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const override;

virtual BlockId getLastBlock() const override;

virtual Status getBlockData(BlockId blockId, concord::storage::SetOfKeyValuePairs &outBlockData) const override;

virtual Status mayHaveConflictBetween(const Sliver &key,
BlockId fromBlock,
BlockId toBlock,
bool &outRes) const override;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IReplica implementation
Status start() override;
Status stop() override;
bool isRunning() const override { return (m_currentRepStatus == RepStatus::Running); }
RepStatus getReplicaStatus() const override;
const ILocalKeyValueStorageReadOnly &getReadOnlyStorage() override;
Status addBlockToIdleReplica(const concord::storage::SetOfKeyValuePairs &updates) override;
void set_command_handler(ICommandsHandler *handler) override;

// concord::storage::IBlocksAppender
virtual Status addBlock(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId) override;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ILocalKeyValueStorageReadOnly implementation
Status get(const Sliver &key, Sliver &outValue) const override;
Status get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const override;
BlockId getLastBlock() const override { return getLastBlockNum(); }
Status getBlockData(BlockId blockId, concord::storage::SetOfKeyValuePairs &outBlockData) const override;
Status mayHaveConflictBetween(const Sliver &key, BlockId fromBlock, BlockId toBlock, bool &outRes) const override;
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IBlocksAppender implementation
Status addBlock(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId) override;

bool isRunning() const override { return (m_currentRepStatus == RepStatus::Running); }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IAppState implementation
bool hasBlock(BlockId blockId) const override;
bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override;
bool getPrevDigestFromBlock(uint64_t blockId,
bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *) override;
bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) override;
uint64_t getLastReachableBlockNum() const override { return m_bcDbAdapter->getLastReachableBlockId(); }
uint64_t getLastBlockNum() const override { return m_bcDbAdapter->getLatestBlockId(); }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

ReplicaImp(bftEngine::ICommunication *comm,
bftEngine::ReplicaConfig &config,
Expand All @@ -71,13 +72,10 @@ class ReplicaImp: public IReplica,
~ReplicaImp() override;

protected:
// METHODS

Status addBlockInternal(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId);
Status getInternal(BlockId readVersion, Key key, Sliver &outValue, BlockId &outBlock) const;
void insertBlockInternal(BlockId blockId, Sliver block);
RawBlock getBlockInternal(BlockId blockId) const;
IDbAdapter *getBcDbAdapter() const { return m_bcDbAdapter; }

private:
void createReplicaAndSyncState();
Expand Down Expand Up @@ -130,29 +128,12 @@ class ReplicaImp: public IReplica,
bool &outRes) const override;
};

public:
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IAppState implementation
//
bool hasBlock(BlockId blockId) const override;
bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override;
bool getPrevDigestFromBlock(uint64_t blockId,
bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *) override;
bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) override;
uint64_t getLastReachableBlockNum() const override { return m_lastReachableBlock; }
uint64_t getLastBlockNum() const override { return m_lastBlock; }
// from IAppState. represents maximal block number n such that all
// blocks 1 <= i <= n exist
std::atomic<BlockId> m_lastReachableBlock{0};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private:
concordlogger::Logger logger;
RepStatus m_currentRepStatus;
StorageWrapperForIdleMode m_InternalStorageWrapperForIdleMode;

IDbAdapter *m_bcDbAdapter = nullptr;
BlockId m_lastBlock = 0;
bftEngine::ICommunication *m_ptrComm = nullptr;
bftEngine::ReplicaConfig m_replicaConfig;
bftEngine::IReplica *m_replicaPtr = nullptr;
Expand Down
5 changes: 4 additions & 1 deletion kvbc/include/db_adapter.h
Expand Up @@ -37,8 +37,11 @@ class IDbAdapter {
// Delete a block from the database
virtual void deleteBlock(const BlockId& blockId) = 0;

// Checks whether block exists
virtual bool hasBlock(const BlockId& blockId) const = 0;

// Used to retrieve the latest block.
virtual BlockId getLastestBlockId() const = 0;
virtual BlockId getLatestBlockId() const = 0;

// Used to retrieve the last reachable block.
// From ST perspective, this is maximal block number N such that all blocks
Expand Down
24 changes: 14 additions & 10 deletions kvbc/include/direct_kv_db_adapter.h
Expand Up @@ -56,11 +56,13 @@ class DBKeyManipulator : public DBKeyManipulatorBase {
static Key extractKeyFromMetadataKey(const Key &_composedKey);
static bool isKeyContainBlockId(const Key &_composedKey);
static KeyValuePair composedToSimple(KeyValuePair _p);
static std::string extractFreeKey(const char *_key_data, size_t _key_length);
};

class DBAdapter : public IDbAdapter, public DBAdapterBase {
class DBAdapter : public IDbAdapter, protected DBAdapterBase {
public:
DBAdapter(std::shared_ptr<storage::IDBClient>,
DBAdapter(std::shared_ptr<storage::IDBClient> dataStore,
std::shared_ptr<storage::IDBClient> metadataStore = nullptr,
std::unique_ptr<IDataKeyGenerator> keyGen = std::make_unique<KeyGenerator>());

// Adds a block from a set of key/value pairs and a block ID. Includes:
Expand All @@ -81,23 +83,25 @@ class DBAdapter : public IDbAdapter, public DBAdapterBase {

void deleteBlock(const BlockId &) override;

BlockId getLtestBlockId() const override;
BlockId getLastReachableBlockId() const override;
bool hasBlock(const BlockId &blockId) const override;

BlockId findLatestBlock() const;
BlockId findLastReachableBlock() const;
void setLastReachableBlockNum(BlockId blockId);
void setLatestBlock(BlockId blockId);
BlockId getLatestBlockId() const override;
BlockId getLastReachableBlockId() const override;

//TODO add to IDbInterface
bool hasBlock(BlockId blockId) const;
std::shared_ptr<storage::IDBClient> getDb() const override { return DBAdapterBase::getDb(); }

protected:
BlockId mdtGetLatestBlockId() const;
BlockId mdtGetLastReachableBlockId() const;
void mdtSetLastReachableBlockNum(const BlockId &blockId);
void mdtSetLatestBlock(const BlockId &blockId);

concordUtils::Status getByKey(const concordUtils::Sliver &key, concordUtils::Sliver &val) const;
concordUtils::Status addBlockAndUpdateMultiKey(const SetOfKeyValuePairs &_kvMap,
const BlockId &_block,
const concordUtils::Sliver &_blockRaw);

std::shared_ptr<storage::IDBClient> mdt_db_; // if initialized used to store metadata
std::unique_ptr<IDataKeyGenerator> keyGen_;
};

Expand Down

0 comments on commit 3aee377

Please sign in to comment.