Skip to content

Commit

Permalink
- ReplicaForStateTransfer owns the IStateTransfer* instance instead o…
Browse files Browse the repository at this point in the history
…f kvbc::ReplicaImp

- remove RorAppState
- IAppState implemented by kvbc::ReplicaImp
- v1::DBAdapter
	- setLatestBlock()
	- setLastReachableBlock()
	- add an ability to add free keys
- ecs::S3Client -> s3::client
  • Loading branch information
Toly Kournik committed Mar 19, 2020
1 parent 1777054 commit acd8bf8
Show file tree
Hide file tree
Showing 29 changed files with 292 additions and 747 deletions.
10 changes: 4 additions & 6 deletions bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,14 @@ class IAppState {
// blockId - the block number
// block - pointer to a buffer that contains the new block
// blockSize - the size of the new block
virtual bool putBlock(uint64_t blockId, char *block, uint32_t blockSize) = 0;
virtual bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) = 0;

// returns the maximal block number n such that all blocks 1 <= i <= n exist.
// if block 1 does not exist, returns 0.
virtual uint64_t getLastReachableBlockNum() = 0;
virtual void setLastReachableBlockNum(uint64_t) = 0;

virtual uint64_t getLastReachableBlockNum() const = 0;
// returns the maximum block number that is currently stored in
// the application/storage layer.
virtual uint64_t getLastBlockNum() = 0;
virtual uint64_t getLastBlockNum() const = 0;

// When the state is updated by the application, getLastReachableBlockNum()
// and getLastBlockNum() should always return the same block number.
Expand All @@ -101,7 +99,7 @@ struct Config {
uint16_t cVal = 0;
uint16_t numReplicas = 0;
bool pedanticChecks = false;
bool separateContext = false;
bool isReadOnly = false;

#if defined USE_COMM_PLAIN_TCP || defined USE_COMM_TLS_TCP
uint32_t maxChunkSize = 16384;
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bcstatetransfer/BCStateTran.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt
<< config_.maxBlockSize << " maxNumOfChunksInAppBlock_=" << maxNumOfChunksInAppBlock_
<< " maxNumOfChunksInVBlock_=" << maxNumOfChunksInVBlock_);

if (config_.separateContext) {
if (config_.isReadOnly) {
messageHandler_ = std::bind(&BCStateTran::handoff, this, _1, _2, _3);
} else {
messageHandler_ = std::bind(&BCStateTran::handleStateTransferMessageImp, this, _1, _2, _3);
Expand Down
4 changes: 3 additions & 1 deletion bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ void ReplicaForStateTransfer::start() {
}

void ReplicaForStateTransfer::stop() {
TimersSingleton::getInstance().cancel(stateTranTimer_);
// stop in reverse order
ReplicaBase::stop();
stateTransfer->stopRunning();
TimersSingleton::getInstance().cancel(stateTranTimer_);
}

template <>
Expand Down
4 changes: 2 additions & 2 deletions bftengine/src/bftengine/ReplicaForStateTransfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB
bool firstTime // TODO [TK] get rid of this
);

IStateTransfer* getStateTransfer() const { return stateTransfer; }
IStateTransfer* getStateTransfer() const { return stateTransfer.get(); }

// IReplicaForStateTransfer
void freeStateTransferMsg(char* m) override;
Expand Down Expand Up @@ -56,7 +56,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB
void onMessage(T*);

protected:
bftEngine::IStateTransfer* stateTransfer = nullptr;
std::unique_ptr<bftEngine::IStateTransfer> stateTransfer;
Timers::Handle stateTranTimer_;
CounterHandle metric_received_state_transfers_;
};
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2832,7 +2832,7 @@ ReplicaImp::ReplicaImp(bool firstTime,
clientsManager =
new ClientsManager(config_.replicaId, clientsSet, ReplicaConfigSingleton::GetInstance().GetSizeOfReservedPage());

clientsManager->init(stateTransfer);
clientsManager->init(stateTransfer.get());

if (!firstTime || config_.debugPersistentStorageEnabled)
clientsManager->loadInfoFromReservedPages();
Expand Down
14 changes: 6 additions & 8 deletions bftengine/src/simplestatetransfer/SimpleStateTran.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ class SimpleStateTran : public ISimpleInMemoryStateTransfer {
bool getPrevDigestFromBlock(uint64_t blockId,
SimpleBlockchainStateTransfer::StateTransferDigest* outPrevBlockDigest) override;

bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override;
bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override;

uint64_t getLastReachableBlockNum() override;
void setLastReachableBlockNum(uint64_t blockId) override {}

uint64_t getLastBlockNum() override;
uint64_t getLastReachableBlockNum() const override;
uint64_t getLastBlockNum() const override;
};

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -577,14 +575,14 @@ bool SimpleStateTran::DummyBDState::getPrevDigestFromBlock(
return false;
}

bool SimpleStateTran::DummyBDState::putBlock(uint64_t blockId, char* block, uint32_t blockSize) {
bool SimpleStateTran::DummyBDState::putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) {
Assert(false);
return false;
}

uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() { return 0; }
uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() const { return 0; }

uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() { return 0; }
uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() const { return 0; }

} // namespace impl
} // namespace SimpleInMemoryStateTransfer
Expand Down
8 changes: 3 additions & 5 deletions bftengine/tests/bcstatetransfer/test_app_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TestAppState : public IAppState {
return true;
};

bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override {
bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override {
assert(blockId < last_block_);
Block bl;
computeBlockDigest(blockId, block, blockSize, &bl.digest);
Expand All @@ -66,10 +66,8 @@ class TestAppState : public IAppState {
}

// TODO(AJS): How does this differ from getLastBlockNum?
uint64_t getLastReachableBlockNum() override { return last_block_; }
void setLastReachableBlockNum(uint64_t blockId) override { last_block_ = blockId; }

uint64_t getLastBlockNum() override { return last_block_; };
uint64_t getLastReachableBlockNum() const override { return last_block_; }
uint64_t getLastBlockNum() const override { return last_block_; };

private:
uint64_t last_block_;
Expand Down
6 changes: 3 additions & 3 deletions kvbc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ project(libkvbc VERSION 0.1.0.0 LANGUAGES CXX)
add_library(kvbc src/ClientImp.cpp
src/ReplicaImp.cpp
src/replica_state_sync_imp.cpp
src/block_metadata.cpp
)
target_link_libraries(kvbc PUBLIC corebft threshsign util concordbft_storage)
src/block_metadata.cpp)

target_link_libraries(kvbc PUBLIC corebft threshsign util concordbft_storage object_store_lib)

target_include_directories(kvbc PUBLIC include)

Expand Down
2 changes: 0 additions & 2 deletions kvbc/include/KVBCInterfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ class IReplica {
/// need to split interfaces implementations to differrent modules
/// instead of being all implemented bt ReplicaImpl
virtual void set_command_handler(ICommandsHandler* handler) = 0;

virtual void set_app_state(std::shared_ptr<bftEngine::SimpleBlockchainStateTransfer::IAppState> appState) = 0;
};

/////////////////////////////////////////////////////////////////////////////
Expand Down
61 changes: 17 additions & 44 deletions kvbc/include/ReplicaImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,10 @@
namespace concord {
namespace kvbc {

class ReplicaInitException : public std::exception {
public:
explicit ReplicaInitException(const std::string &what) : msg(what){};

virtual const char *what() const noexcept override { return msg.c_str(); }

private:
std::string msg;
};

class ReplicaImp : public IReplica,
public concord::storage::blockchain::ILocalKeyValueStorageReadOnly,
public concord::storage::blockchain::IBlocksAppender,
public bftEngine::SimpleBlockchainStateTransfer::IAppState,
public std::enable_shared_from_this<ReplicaImp> {
public:
// concord::kvbc::IReplica methods
Expand All @@ -53,8 +44,6 @@ class ReplicaImp : public IReplica,

virtual void set_command_handler(ICommandsHandler *handler) override;

virtual void set_app_state(std::shared_ptr<bftEngine::SimpleBlockchainStateTransfer::IAppState> appState) override;

// concord::storage::ILocalKeyValueStorageReadOnly methods
virtual Status get(const Sliver &key, Sliver &outValue) const override;

Expand Down Expand Up @@ -91,12 +80,6 @@ class ReplicaImp : public IReplica,
concord::storage::blockchain::DBAdapter *dbAdapter,
std::shared_ptr<concordMetrics::Aggregator> aggregator);

ReplicaImp(bftEngine::ICommunication *comm,
bftEngine::ReplicaConfig &config,
concord::storage::blockchain::DBAdapter *dbAdapter,
std::shared_ptr<concordMetrics::Aggregator> aggregator,
std::shared_ptr<bftEngine::SimpleBlockchainStateTransfer::IAppState> appState);

void setReplicaStateSync(ReplicaStateSync *rss) { replicaStateSync_.reset(rss); }

~ReplicaImp() override;
Expand All @@ -110,8 +93,7 @@ class ReplicaImp : public IReplica,
Sliver key,
Sliver &outValue,
concord::storage::blockchain::BlockId &outBlock) const;
void insertBlockByStateTransfer(concord::storage::blockchain::BlockId blockId, Sliver block);
Status getBlockInternal(concord::storage::blockchain::BlockId blockId, Sliver& block) const;
Status getBlockInternal(concord::storage::blockchain::BlockId blockId, Sliver &block) const;
bool hasBlock(concord::storage::blockchain::BlockId);
concord::storage::blockchain::DBAdapter *getBcDbAdapter() const { return m_bcDbAdapter; }

Expand Down Expand Up @@ -240,29 +222,22 @@ class ReplicaImp : public IReplica,
virtual Status freeInternalIterator();
};

class BlockchainAppState : public bftEngine::SimpleBlockchainStateTransfer::IAppState {
public:
BlockchainAppState(std::shared_ptr<ReplicaImp> parent);

bool hasBlock(uint64_t blockId) const override;
bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override;
bool getPrevDigestFromBlock(
uint64_t blockId, bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *outPrevBlockDigest) override;
bool putBlock(uint64_t blockId, char *block, uint32_t blockSize) override;
uint64_t getLastReachableBlockNum() override;
void setLastReachableBlockNum(uint64_t blockNum) override { m_lastReachableBlock = blockNum; }
uint64_t getLastBlockNum() override;

private:
std::shared_ptr<ReplicaImp> m_ptrReplicaImpl = nullptr;
concordlogger::Logger m_logger;

// from IAppState. represents maximal block number n such that all
// blocks 1 <= i <= n exist
std::atomic<concord::storage::blockchain::BlockId> m_lastReachableBlock{0};
};
public:
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// IAppState implementation
//
bool hasBlock(uint64_t blockId) const override;
bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override;
bool getPrevDigestFromBlock(uint64_t blockId,
bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *) override;
bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) override;
uint64_t getLastReachableBlockNum() const override { return m_lastReachableBlock; }
uint64_t getLastBlockNum() const override { return m_lastBlock; }
// from IAppState. represents maximal block number n such that all
// blocks 1 <= i <= n exist
std::atomic<concord::storage::blockchain::BlockId> m_lastReachableBlock{0};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// DATA
private:
concordlogger::Logger logger;
RepStatus m_currentRepStatus;
Expand All @@ -275,11 +250,9 @@ class ReplicaImp : public IReplica,
bftEngine::IReplica *m_replicaPtr = nullptr;
ICommandsHandler *m_cmdHandler = nullptr;
bftEngine::IStateTransfer *m_stateTransfer = nullptr;
std::shared_ptr<bftEngine::SimpleBlockchainStateTransfer::IAppState> m_appState = nullptr;
concord::storage::DBMetadataStorage *m_metadataStorage = nullptr;
std::unique_ptr<ReplicaStateSync> replicaStateSync_;
std::shared_ptr<concordMetrics::Aggregator> aggregator_;
bftEngine::SimpleBlockchainStateTransfer::Config state_transfer_config_;
};

} // namespace kvbc
Expand Down
65 changes: 0 additions & 65 deletions kvbc/include/ror_app_state.hpp

This file was deleted.

0 comments on commit acd8bf8

Please sign in to comment.