diff --git a/.conan/libs3_pkg/conanfile.py b/.conan/libs3_pkg/conanfile.py new file mode 100644 index 0000000000..7c6e3bf36b --- /dev/null +++ b/.conan/libs3_pkg/conanfile.py @@ -0,0 +1,16 @@ +from conans import ConanFile, CMake, tools +from conans.tools import os_info, SystemPackageTool + + +class LibS3Conan(ConanFile): + name = "libs3-dev" + version = "2.0-3" + def system_requirements(self): + pack_name = None + if os_info.linux_distro == "ubuntu": + pack_name = "libs3-dev" + + if pack_name: + installer = SystemPackageTool() + installer.install(pack_name) + diff --git a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp index b8b8e9e270..8f0aa6e772 100644 --- a/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp +++ b/bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp @@ -69,7 +69,7 @@ class IAppState { // returns true IFF block blockId exists // (i.e. the block is stored in the application/storage layer). - virtual bool hasBlock(uint64_t blockId) = 0; + virtual bool hasBlock(uint64_t blockId) const = 0; // If block blockId exists, then its content is returned via the arguments // outBlock and outBlockSize. Returns true IFF block blockId exists. @@ -83,25 +83,19 @@ class IAppState { // blockId - the block number // block - pointer to a buffer that contains the new block // blockSize - the size of the new block - virtual bool putBlock(uint64_t blockId, char *block, uint32_t blockSize) = 0; + virtual bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) = 0; // returns the maximal block number n such that all blocks 1 <= i <= n exist. // if block 1 does not exist, returns 0. - virtual uint64_t getLastReachableBlockNum() = 0; - + virtual uint64_t getLastReachableBlockNum() const = 0; // returns the maximum block number that is currently stored in // the application/storage layer. - virtual uint64_t getLastBlockNum() = 0; + virtual uint64_t getLastBlockNum() const = 0; // When the state is updated by the application, getLastReachableBlockNum() // and getLastBlockNum() should always return the same block number. // When that state transfer module is updating the state, then these methods // may return different block numbers. - - // This method MAY be called by ST module when putBlock returns false. The call will block until the underlying - // storage becomes available (storage non availabilty is the reason for putBlock to fail). Mainly relevant for network - // storages. In regular replica which uses local DB this function should not be called. - virtual void wait() = 0; }; struct Config { @@ -110,6 +104,7 @@ struct Config { uint16_t cVal = 0; uint16_t numReplicas = 0; bool pedanticChecks = false; + bool isReadOnly = false; #if defined USE_COMM_PLAIN_TCP || defined USE_COMM_TLS_TCP uint32_t maxChunkSize = 16384; diff --git a/bftengine/src/bcstatetransfer/BCStateTran.cpp b/bftengine/src/bcstatetransfer/BCStateTran.cpp index 3dac0db4a9..c3e288d38c 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.cpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "assertUtils.hpp" #include "BCStateTran.hpp" @@ -27,6 +28,8 @@ #include "DBDataStore.hpp" #include "storage/db_interface.h" #include "memorydb/client.h" +#include "Handoff.hpp" + // TODO(GG): for debugging - remove // #define DEBUG_SEND_CHECKPOINTS_IN_REVERSE_ORDER (1) @@ -36,6 +39,7 @@ using std::chrono::duration_cast; using std::chrono::milliseconds; using std::chrono::time_point; using std::chrono::system_clock; +using namespace std::placeholders; namespace bftEngine { namespace SimpleBlockchainStateTransfer { @@ -132,33 +136,23 @@ static set generateSetOfReplicas(const int16_t numberOfReplicas) { } BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataStore *ds) - : pedanticChecks_{config.pedanticChecks}, - as_{stateApi}, - psd_(ds), - replicas_{generateSetOfReplicas(config.numReplicas)}, - myId_{config.myReplicaId}, - fVal_{config.fVal}, - maxBlockSize_{config.maxBlockSize}, - maxChunkSize_{config.maxChunkSize}, - maxNumberOfChunksInBatch_{config.maxNumberOfChunksInBatch}, - maxPendingDataFromSourceReplica_{config.maxPendingDataFromSourceReplica}, - maxNumOfReservedPages_{config.maxNumOfReservedPages}, - sizeOfReservedPage_{config.sizeOfReservedPage}, - refreshTimerMilli_{config.refreshTimerMilli}, - checkpointSummariesRetransmissionTimeoutMilli_{config.checkpointSummariesRetransmissionTimeoutMilli}, - maxAcceptableMsgDelayMilli_{config.maxAcceptableMsgDelayMilli}, - maxVBlockSize_{calcMaxVBlockSize(config.maxNumOfReservedPages, sizeOfReservedPage_)}, - maxItemSize_{calcMaxItemSize(config.maxBlockSize, config.maxNumOfReservedPages, config.sizeOfReservedPage)}, + : as_{stateApi}, + psd_{ds}, + config_{config}, + replicas_{generateSetOfReplicas(config_.numReplicas)}, + maxVBlockSize_{calcMaxVBlockSize(config_.maxNumOfReservedPages, config_.sizeOfReservedPage)}, + maxItemSize_{calcMaxItemSize(config_.maxBlockSize, config_.maxNumOfReservedPages, config_.sizeOfReservedPage)}, maxNumOfChunksInAppBlock_{ - calcMaxNumOfChunksInBlock(maxItemSize_, config.maxBlockSize, config.maxChunkSize, false)}, - maxNumOfChunksInVBlock_{calcMaxNumOfChunksInBlock(maxItemSize_, config.maxBlockSize, config.maxChunkSize, true)}, + calcMaxNumOfChunksInBlock(maxItemSize_, config_.maxBlockSize, config_.maxChunkSize, false)}, + maxNumOfChunksInVBlock_{ + calcMaxNumOfChunksInBlock(maxItemSize_, config_.maxBlockSize, config_.maxChunkSize, true)}, maxNumOfStoredCheckpoints_{0}, numberOfReservedPages_{0}, randomGen_{randomDevice_()}, sourceSelector_{SourceSelector( - allOtherReplicas(), config.fetchRetransmissionTimeoutMilli, config.sourceReplicaReplacementTimeoutMilli)}, + allOtherReplicas(), config_.fetchRetransmissionTimeoutMilli, config_.sourceReplicaReplacementTimeoutMilli)}, last_metrics_dump_time_(0), - metrics_dump_interval_in_sec_{config.metricsDumpIntervalSeconds}, + metrics_dump_interval_in_sec_{config_.metricsDumpIntervalSeconds}, metrics_component_{ concordMetrics::Component("bc_state_transfer", std::make_shared())}, @@ -166,14 +160,14 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt // same order as defined in the header file. metrics_{ metrics_component_.RegisterStatus("fetching_state", stateName(FetchingState::NotFetching)), - metrics_component_.RegisterStatus("pedantic_checks_enabled", pedanticChecks_ ? "true" : "false"), + metrics_component_.RegisterStatus("pedantic_checks_enabled", config_.pedanticChecks ? "true" : "false"), metrics_component_.RegisterStatus("preferred_replicas", ""), metrics_component_.RegisterGauge("current_source_replica", NO_REPLICA), metrics_component_.RegisterGauge("checkpoint_being_fetched", 0), metrics_component_.RegisterGauge("last_stored_checkpoint", 0), metrics_component_.RegisterGauge("number_of_reserved_pages", 0), - metrics_component_.RegisterGauge("size_of_reserved_page", sizeOfReservedPage_), + metrics_component_.RegisterGauge("size_of_reserved_page", config_.sizeOfReservedPage), metrics_component_.RegisterGauge("last_msg_seq_num", lastMsgSeqNum_), metrics_component_.RegisterGauge("next_required_block_", nextRequiredBlock_), metrics_component_.RegisterGauge("num_pending_item_data_msgs_", pendingItemDataMsgs.size()), @@ -221,9 +215,9 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt metrics_component_.RegisterCounter("on_transferring_complete"), } { Assert(stateApi != nullptr); - Assert(replicas_.size() >= 3U * fVal_ + 1U); - Assert(replicas_.count(myId_) == 1); - Assert(maxNumOfReservedPages_ >= 2); + Assert(replicas_.size() >= 3U * config_.fVal + 1U); + Assert(replicas_.count(config_.myReplicaId) == 1); + Assert(config_.maxNumOfReservedPages >= 2); // Register metrics component with the default aggregator. metrics_component_.Register(); @@ -232,17 +226,25 @@ BCStateTran::BCStateTran(const Config &config, IAppState *const stateApi, DataSt buffer_ = reinterpret_cast(std::malloc(maxItemSize_)); LOG_INFO(STLogger, "Creating BCStateTran object:" - << " myId_=" << myId_ << " fVal_=" << fVal_ << " maxVBlockSize_=" << maxVBlockSize_ - << " maxChunkSize_=" << maxChunkSize_ << " maxNumberOfChunksInBatch_=" << maxNumberOfChunksInBatch_ - << " maxPendingDataFromSourceReplica_=" << maxPendingDataFromSourceReplica_ - << " maxNumOfReservedPages_=" << maxNumOfReservedPages_ << " sizeOfReservedPage_=" << sizeOfReservedPage_ - << " refreshTimerMilli_=" << refreshTimerMilli_ - << " checkpointSummariesRetransmissionTimeoutMilli_=" << checkpointSummariesRetransmissionTimeoutMilli_ - << " maxAcceptableMsgDelayMilli_=" << maxAcceptableMsgDelayMilli_ - << " sourceReplicaReplacementTimeoutMilli_=" << config.sourceReplicaReplacementTimeoutMilli - << " fetchRetransmissionTimeoutMilli_=" << config.fetchRetransmissionTimeoutMilli - << " maxBlockSize_=" << maxBlockSize_ << " maxNumOfChunksInAppBlock_=" << maxNumOfChunksInAppBlock_ + << " myId_=" << config_.myReplicaId << " fVal_=" << config_.fVal << " maxVBlockSize_=" << maxVBlockSize_ + << " maxChunkSize_=" << config_.maxChunkSize + << " maxNumberOfChunksInBatch_=" << config_.maxNumberOfChunksInBatch + << " maxPendingDataFromSourceReplica_=" << config_.maxPendingDataFromSourceReplica + << " maxNumOfReservedPages_=" << config_.maxNumOfReservedPages << "config_.sizeOfReservedPage_=" + << config_.sizeOfReservedPage << " refreshTimerMilli_=" << config_.refreshTimerMilli + << " checkpointSummariesRetransmissionTimeoutMilli_=" + << config_.checkpointSummariesRetransmissionTimeoutMilli + << " maxAcceptableMsgDelayMilli_=" << config_.maxAcceptableMsgDelayMilli + << " sourceReplicaReplacementTimeoutMilli_=" << config_.sourceReplicaReplacementTimeoutMilli + << " fetchRetransmissionTimeoutMilli_=" << config_.fetchRetransmissionTimeoutMilli << " maxBlockSize_=" + << config_.maxBlockSize << " maxNumOfChunksInAppBlock_=" << maxNumOfChunksInAppBlock_ << " maxNumOfChunksInVBlock_=" << maxNumOfChunksInVBlock_); + + if (config_.isReadOnly) { + messageHandler_ = std::bind(&BCStateTran::handoff, this, _1, _2, _3); + } else { + messageHandler_ = std::bind(&BCStateTran::handleStateTransferMessageImp, this, _1, _2, _3); + } } BCStateTran::~BCStateTran() { @@ -274,24 +276,24 @@ void BCStateTran::init(uint64_t maxNumOfRequiredStoredCheckpoints, try { Assert(!running_); Assert(replicaForStateTransfer_ == nullptr); - Assert(sizeOfReservedPage == sizeOfReservedPage_); + Assert(sizeOfReservedPage == config_.sizeOfReservedPage); maxNumOfStoredCheckpoints_ = maxNumOfRequiredStoredCheckpoints; numberOfReservedPages_ = numberOfRequiredReservedPages; metrics_.number_of_reserved_pages_.Get().Set(numberOfReservedPages_); - metrics_.size_of_reserved_page_.Get().Set(sizeOfReservedPage_); + metrics_.size_of_reserved_page_.Get().Set(sizeOfReservedPage); memset(buffer_, 0, maxItemSize_); LOG_INFO(STLogger, "Init BCStateTran object:" << " maxNumOfStoredCheckpoints_=" << maxNumOfStoredCheckpoints_ << " numberOfReservedPages_=" - << numberOfReservedPages_ << " sizeOfReservedPage_=" << sizeOfReservedPage_); + << numberOfReservedPages_ << " config_.sizeOfReservedPage_=" << config_.sizeOfReservedPage); if (psd_->initialized()) { LOG_INFO(STLogger, "BCStateTran::init - loading existing data from storage"); - checkConsistency(pedanticChecks_); + checkConsistency(config_.pedanticChecks); FetchingState fs = getFetchingState(); LOG_INFO(STLogger, "starting state is " << stateName(fs)); @@ -306,18 +308,18 @@ void BCStateTran::init(uint64_t maxNumOfRequiredStoredCheckpoints, AssertAND(maxNumOfRequiredStoredCheckpoints >= 2, maxNumOfRequiredStoredCheckpoints <= kMaxNumOfStoredCheckpoints); - AssertAND(numberOfRequiredReservedPages >= 2, numberOfRequiredReservedPages <= maxNumOfReservedPages_); + AssertAND(numberOfRequiredReservedPages >= 2, numberOfRequiredReservedPages <= config_.maxNumOfReservedPages); DataStoreTransaction::Guard g(psd_->beginTransaction()); g.txn()->setReplicas(replicas_); - g.txn()->setMyReplicaId(myId_); - g.txn()->setFVal(fVal_); + g.txn()->setMyReplicaId(config_.myReplicaId); + g.txn()->setFVal(config_.fVal); g.txn()->setMaxNumOfStoredCheckpoints(maxNumOfRequiredStoredCheckpoints); g.txn()->setNumberOfReservedPages(numberOfRequiredReservedPages); g.txn()->setLastStoredCheckpoint(0); g.txn()->setFirstStoredCheckpoint(0); // TODO(TK) - check max transaction size for (uint32_t i = 0; i < numberOfReservedPages_; i++) // reset all pages - g.txn()->setPendingResPage(i, buffer_, sizeOfReservedPage_); + g.txn()->setPendingResPage(i, buffer_, config_.sizeOfReservedPage); g.txn()->setIsFetchingState(false); g.txn()->setFirstRequiredBlock(0); @@ -343,7 +345,7 @@ void BCStateTran::startRunning(IReplicaForStateTransfer *r) { running_ = true; replicaForStateTransfer_ = r; - replicaForStateTransfer_->changeStateTransferTimerPeriod(refreshTimerMilli_); + replicaForStateTransfer_->changeStateTransferTimerPeriod(config_.refreshTimerMilli); } void BCStateTran::stopRunning() { @@ -432,12 +434,12 @@ STDigest BCStateTran::checkpointReservedPages(uint64_t checkpointNumber, DataSto for (uint32_t p : pages) { STDigest d; - txn->getPendingResPage(p, buffer_, sizeOfReservedPage_); - computeDigestOfPage(p, checkpointNumber, buffer_, sizeOfReservedPage_, d); + txn->getPendingResPage(p, buffer_, config_.sizeOfReservedPage); + computeDigestOfPage(p, checkpointNumber, buffer_, config_.sizeOfReservedPage, d); txn->associatePendingResPageWithCheckpoint(p, checkpointNumber, d); } - memset(buffer_, 0, sizeOfReservedPage_); + memset(buffer_, 0, config_.sizeOfReservedPage); Assert(txn->numOfAllPendingResPage() == 0); DataStore::ResPagesDescriptor *allPagesDesc = txn->getResPagesDescriptor(checkpointNumber); Assert(allPagesDesc->numOfPages == numberOfReservedPages_); @@ -538,13 +540,13 @@ bool BCStateTran::isCollectingState() const { return isFetching(); } uint32_t BCStateTran::numberOfReservedPages() const { return static_cast(numberOfReservedPages_); } -uint32_t BCStateTran::sizeOfReservedPage() const { return sizeOfReservedPage_; } +uint32_t BCStateTran::sizeOfReservedPage() const { return config_.sizeOfReservedPage; } bool BCStateTran::loadReservedPage(uint32_t reservedPageId, uint32_t copyLength, char *outReservedPage) const { LOG_DEBUG(STLogger, "BCStateTran::loadReservedPage - reservedPageId=" << reservedPageId); Assert(reservedPageId < numberOfReservedPages_); - Assert(copyLength <= sizeOfReservedPage_); + Assert(copyLength <= config_.sizeOfReservedPage); metrics_.load_reserved_page_.Get().Inc(); @@ -571,7 +573,7 @@ void BCStateTran::saveReservedPage(uint32_t reservedPageId, uint32_t copyLength, Assert(!isFetching()); Assert(reservedPageId < numberOfReservedPages_); - Assert(copyLength <= sizeOfReservedPage_); + Assert(copyLength <= config_.sizeOfReservedPage); metrics_.save_reserved_page_.Get().Inc(); @@ -589,8 +591,8 @@ void BCStateTran::zeroReservedPage(uint32_t reservedPageId) { Assert(reservedPageId < numberOfReservedPages_); metrics_.zero_reserved_page_.Get().Inc(); - memset(buffer_, 0, sizeOfReservedPage_); - psd_->setPendingResPage(reservedPageId, buffer_, sizeOfReservedPage_); + memset(buffer_, 0, config_.sizeOfReservedPage); + psd_->setPendingResPage(reservedPageId, buffer_, config_.sizeOfReservedPage); } void BCStateTran::startCollectingState() { @@ -626,7 +628,7 @@ void BCStateTran::onTimer() { auto currTime = getMonotonicTimeMilli(); FetchingState fs = getFetchingState(); if (fs == FetchingState::GettingCheckpointSummaries) { - if ((currTime - lastTimeSentAskForCheckpointSummariesMsg) > checkpointSummariesRetransmissionTimeoutMilli_) { + if ((currTime - lastTimeSentAskForCheckpointSummariesMsg) > config_.checkpointSummariesRetransmissionTimeoutMilli) { if (++retransmissionNumberOfAskForCheckpointSummariesMsg > kResetCount_AskForCheckpointSummaries) clearInfoAboutGettingCheckpointSummary(); @@ -639,7 +641,7 @@ void BCStateTran::onTimer() { void BCStateTran::handleStateTransferMessage(char *msg, uint32_t msgLen, uint16_t senderId) { Assert(running_); - if (msgLen < sizeof(BCStateTranBaseMsg) || senderId == myId_ || replicas_.count(senderId) == 0) { + if (msgLen < sizeof(BCStateTranBaseMsg) || senderId == config_.myReplicaId || replicas_.count(senderId) == 0) { // TODO(GG): report about illegal message metrics_.received_illegal_msg_.Get().Inc(); @@ -647,7 +649,16 @@ void BCStateTran::handleStateTransferMessage(char *msg, uint32_t msgLen, uint16_ replicaForStateTransfer_->freeStateTransferMsg(msg); return; } + messageHandler_(msg, msgLen, senderId); +} + +void BCStateTran::handoff(char *msg, uint32_t msgLen, uint16_t senderId) { + static concord::util::Handoff handoff_; + handoff_.push(std::bind(&BCStateTran::handleStateTransferMessageImp, this, msg, msgLen, senderId)); +} +// this function can be executed in context of another thread. +void BCStateTran::handleStateTransferMessageImp(char *msg, uint32_t msgLen, uint16_t senderId) { BCStateTranBaseMsg *msgHeader = reinterpret_cast(msg); LOG_DEBUG(STLogger, "BCStateTran::handleStateTransferMessage - new message with type=" << msgHeader->type); @@ -791,7 +802,7 @@ bool BCStateTran::checkValidityAndSaveMsgSeqNum(uint16_t replicaId, uint64_t msg const uint64_t milliNow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t diffMilli = ((milliMsgTime > milliNow) ? (milliMsgTime - milliNow) : (milliNow - milliMsgTime)); - if (diffMilli > maxAcceptableMsgDelayMilli_) { + if (diffMilli > config_.maxAcceptableMsgDelayMilli) { LOG_WARN(STLogger, "BCStateTran::checkValidityAndSaveMsgSeqNum - msgSeqNum " << msgSeqNum << " was rejected because diffMilli=" << diffMilli); @@ -856,7 +867,7 @@ BCStateTran::FetchingState BCStateTran::getFetchingState() const { void BCStateTran::sendToAllOtherReplicas(char *msg, uint32_t msgSize) { for (int16_t r : replicas_) { - if (r == myId_) continue; + if (r == config_.myReplicaId) continue; replicaForStateTransfer_->sendStateTransferMessage(msg, msgSize, r); } } @@ -1042,7 +1053,8 @@ bool BCStateTran::onMessage(const CheckpointSummaryMsg *m, uint32_t msgLen, uint CheckpointSummaryMsgCert *cert = nullptr; if (p == summariesCerts.end()) { - cert = new CheckpointSummaryMsgCert(replicaForStateTransfer_, replicas_.size(), fVal_, fVal_ + 1, myId_); + cert = new CheckpointSummaryMsgCert( + replicaForStateTransfer_, replicas_.size(), config_.fVal, config_.fVal + 1, config_.myReplicaId); summariesCerts[m->checkpointNum] = cert; } else { cert = p->second; @@ -1075,7 +1087,7 @@ bool BCStateTran::onMessage(const CheckpointSummaryMsg *m, uint32_t msgLen, uint metrics_.preferred_replicas_.Get().Set(sourceSelector_.preferredReplicasToString()); - Assert(sourceSelector_.numberOfPreferredReplicas() >= fVal_ + 1); + Assert(sourceSelector_.numberOfPreferredReplicas() >= config_.fVal + 1); // set new checkpoint DataStore::CheckpointDesc newCheckpoint; @@ -1165,10 +1177,10 @@ bool BCStateTran::onMessage(const FetchBlocksMsg *m, uint32_t msgLen, uint16_t r bool tmp = as_->getBlock(nextBlock, buffer_, &sizeOfNextBlock); AssertAND(tmp, sizeOfNextBlock > 0); - uint32_t sizeOfLastChunk = maxChunkSize_; - uint32_t numOfChunksInNextBlock = sizeOfNextBlock / maxChunkSize_; - if (sizeOfNextBlock % maxChunkSize_ != 0) { - sizeOfLastChunk = sizeOfNextBlock % maxChunkSize_; + uint32_t sizeOfLastChunk = config_.maxChunkSize; + uint32_t numOfChunksInNextBlock = sizeOfNextBlock / config_.maxChunkSize; + if (sizeOfNextBlock % config_.maxChunkSize != 0) { + sizeOfLastChunk = sizeOfNextBlock % config_.maxChunkSize; numOfChunksInNextBlock++; } @@ -1184,11 +1196,11 @@ bool BCStateTran::onMessage(const FetchBlocksMsg *m, uint32_t msgLen, uint16_t r // send chunks uint16_t numOfSentChunks = 0; while (true) { - uint32_t chunkSize = (nextChunk < numOfChunksInNextBlock) ? maxChunkSize_ : sizeOfLastChunk; + uint32_t chunkSize = (nextChunk < numOfChunksInNextBlock) ? config_.maxChunkSize : sizeOfLastChunk; Assert(chunkSize > 0); - char *pRawChunk = buffer_ + (nextChunk - 1) * maxChunkSize_; + char *pRawChunk = buffer_ + (nextChunk - 1) * config_.maxChunkSize; ItemDataMsg *outMsg = ItemDataMsg::alloc(chunkSize); // TODO(GG): improve outMsg->requestMsgSeqNum = m->msgSeqNum; @@ -1211,7 +1223,7 @@ bool BCStateTran::onMessage(const FetchBlocksMsg *m, uint32_t msgLen, uint16_t r numOfSentChunks++; // if we've already sent enough chunks - if (numOfSentChunks >= maxNumberOfChunksInBatch_) { + if (numOfSentChunks >= config_.maxNumberOfChunksInBatch) { break; } // if we still have chunks in block @@ -1229,10 +1241,10 @@ bool BCStateTran::onMessage(const FetchBlocksMsg *m, uint32_t msgLen, uint16_t r bool tmp2 = as_->getBlock(nextBlock, buffer_, &sizeOfNextBlock); AssertAND(tmp2, sizeOfNextBlock > 0); - sizeOfLastChunk = maxChunkSize_; - numOfChunksInNextBlock = sizeOfNextBlock / maxChunkSize_; - if (sizeOfNextBlock % maxChunkSize_ != 0) { - sizeOfLastChunk = sizeOfNextBlock % maxChunkSize_; + sizeOfLastChunk = config_.maxChunkSize; + numOfChunksInNextBlock = sizeOfNextBlock / config_.maxChunkSize; + if (sizeOfNextBlock % config_.maxChunkSize != 0) { + sizeOfLastChunk = sizeOfNextBlock % config_.maxChunkSize; numOfChunksInNextBlock++; } nextChunk = 1; @@ -1300,16 +1312,16 @@ bool BCStateTran::onMessage(const FetchResPagesMsg *m, uint32_t msgLen, uint16_t Assert(cacheOfVirtualBlockForResPages.size() <= kMaxVBlocksInCache); } - uint32_t vblockSize = getSizeOfVirtualBlock(vblock, sizeOfReservedPage_); + uint32_t vblockSize = getSizeOfVirtualBlock(vblock, config_.sizeOfReservedPage); Assert(vblockSize > sizeof(HeaderOfVirtualBlock)); - Assert(checkStructureOfVirtualBlock(vblock, vblockSize, sizeOfReservedPage_)); + Assert(checkStructureOfVirtualBlock(vblock, vblockSize, config_.sizeOfReservedPage)); // compute information about next chunk - uint32_t sizeOfLastChunk = maxChunkSize_; - uint32_t numOfChunksInVBlock = vblockSize / maxChunkSize_; - if (vblockSize % maxChunkSize_ != 0) { - sizeOfLastChunk = vblockSize % maxChunkSize_; + uint32_t sizeOfLastChunk = config_.maxChunkSize; + uint32_t numOfChunksInVBlock = vblockSize / config_.maxChunkSize; + if (vblockSize % config_.maxChunkSize != 0) { + sizeOfLastChunk = vblockSize % config_.maxChunkSize; numOfChunksInVBlock++; } @@ -1326,10 +1338,10 @@ bool BCStateTran::onMessage(const FetchResPagesMsg *m, uint32_t msgLen, uint16_t // send chunks uint16_t numOfSentChunks = 0; while (true) { - uint32_t chunkSize = (nextChunk < numOfChunksInVBlock) ? maxChunkSize_ : sizeOfLastChunk; + uint32_t chunkSize = (nextChunk < numOfChunksInVBlock) ? config_.maxChunkSize : sizeOfLastChunk; Assert(chunkSize > 0); - char *pRawChunk = vblock + (nextChunk - 1) * maxChunkSize_; + char *pRawChunk = vblock + (nextChunk - 1) * config_.maxChunkSize; ItemDataMsg *outMsg = ItemDataMsg::alloc(chunkSize); outMsg->requestMsgSeqNum = m->msgSeqNum; @@ -1352,7 +1364,7 @@ bool BCStateTran::onMessage(const FetchResPagesMsg *m, uint32_t msgLen, uint16_t numOfSentChunks++; // if we've already sent enough chunks - if (numOfSentChunks >= maxNumberOfChunksInBatch_) { + if (numOfSentChunks >= config_.maxNumberOfChunksInBatch) { break; } // if we still have chunks in block @@ -1441,17 +1453,17 @@ bool BCStateTran::onMessage(const ItemDataMsg *m, uint32_t msgLen, uint16_t repl // if msg is not relevant if (sourceSelector_.currentReplica() != replicaId || m->requestMsgSeqNum != lastMsgSeqNum_ || m->blockNumber > lastRequiredBlock || m->blockNumber < firstRequiredBlock || - (m->blockNumber + maxNumberOfChunksInBatch_ + 1 < lastRequiredBlock) || - m->dataSize + totalSizeOfPendingItemDataMsgs > maxPendingDataFromSourceReplica_) { + (m->blockNumber + config_.maxNumberOfChunksInBatch + 1 < lastRequiredBlock) || + m->dataSize + totalSizeOfPendingItemDataMsgs > config_.maxPendingDataFromSourceReplica) { LOG_WARN(STLogger, "BCStateTran::onMessage(ItemDataMsg) - msg is irrelevant: state=GettingMissingBlocks" << ", replicaId=" << replicaId << ", currentReplica=" << sourceSelector_.currentReplica() << ", m->requestMsgSeqNum=" << m->requestMsgSeqNum << ", lastMsgSeqNum_=" << lastMsgSeqNum_ << ", m->blockNumber=" << m->blockNumber << ", firstRequiredBlock=" << firstRequiredBlock << ", lastRequiredBlock=" << lastRequiredBlock - << ", maxNumberOfChunksInBatch_=" << maxNumberOfChunksInBatch_ << ", dataSize=" << m->dataSize + << ", maxNumberOfChunksInBatch_=" << config_.maxNumberOfChunksInBatch << ", dataSize=" << m->dataSize << ", totalSizeOfPendingItemDataMsgs=" << totalSizeOfPendingItemDataMsgs - << ", maxPendingDataFromSourceReplica_=" << maxPendingDataFromSourceReplica_); + << ", maxPendingDataFromSourceReplica_=" << config_.maxPendingDataFromSourceReplica); metrics_.irrelevant_item_data_msg_.Get().Inc(); return false; } @@ -1462,7 +1474,7 @@ bool BCStateTran::onMessage(const ItemDataMsg *m, uint32_t msgLen, uint16_t repl // if msg is not relevant if (sourceSelector_.currentReplica() != replicaId || m->requestMsgSeqNum != lastMsgSeqNum_ || m->blockNumber != ID_OF_VBLOCK_RES_PAGES || - m->dataSize + totalSizeOfPendingItemDataMsgs > maxPendingDataFromSourceReplica_) { + m->dataSize + totalSizeOfPendingItemDataMsgs > config_.maxPendingDataFromSourceReplica) { LOG_WARN(STLogger, "BCStateTran::onMessage(ItemDataMsg) - msg is irrelevant: state=" << stateName(fs) << ", replicaId=" << replicaId @@ -1470,7 +1482,7 @@ bool BCStateTran::onMessage(const ItemDataMsg *m, uint32_t msgLen, uint16_t repl << ", m->requestMsgSeqNum=" << m->requestMsgSeqNum << ", lastMsgSeqNum_=" << lastMsgSeqNum_ << ", blockNumMatches=" << (m->blockNumber == ID_OF_VBLOCK_RES_PAGES) << ", dataSize=" << m->dataSize << ", totalSizeOfPendingItemDataMsgs=" << totalSizeOfPendingItemDataMsgs - << ", maxPendingDataFromSourceReplica_=" << maxPendingDataFromSourceReplica_); + << ", maxPendingDataFromSourceReplica_=" << config_.maxPendingDataFromSourceReplica); metrics_.irrelevant_item_data_msg_.Get().Inc(); return false; } @@ -1549,7 +1561,7 @@ char *BCStateTran::createVBlock(const DescOfVBlockForResPages &desc) { const uint32_t numberOfUpdatedPages = updatedPages.size(); // allocate and fill block - const uint32_t elementSize = sizeof(ElementOfVirtualBlock) + sizeOfReservedPage_ - 1; + const uint32_t elementSize = sizeof(ElementOfVirtualBlock) + config_.sizeOfReservedPage - 1; const uint32_t size = sizeof(HeaderOfVirtualBlock) + numberOfUpdatedPages * elementSize; char *rawVBlock = reinterpret_cast(std::malloc(size)); @@ -1558,7 +1570,7 @@ char *BCStateTran::createVBlock(const DescOfVBlockForResPages &desc) { header->numberOfUpdatedPages = numberOfUpdatedPages; if (numberOfUpdatedPages == 0) { - Assert(checkStructureOfVirtualBlock(rawVBlock, size, sizeOfReservedPage_)); + Assert(checkStructureOfVirtualBlock(rawVBlock, size, config_.sizeOfReservedPage)); LOG_DEBUG(STLogger, "New vblock contains " << 0 << " updated pages , its size is " << size); return rawVBlock; } @@ -1571,7 +1583,8 @@ char *BCStateTran::createVBlock(const DescOfVBlockForResPages &desc) { uint64_t actualPageCheckpoint = 0; STDigest pageDigest; - psd_->getResPage(pageId, desc.checkpointNum, &actualPageCheckpoint, &pageDigest, buffer_, sizeOfReservedPage_); + psd_->getResPage( + pageId, desc.checkpointNum, &actualPageCheckpoint, &pageDigest, buffer_, config_.sizeOfReservedPage); Assert(actualPageCheckpoint <= desc.checkpointNum); Assert(actualPageCheckpoint > desc.lastCheckpointKnownToRequester); Assert(!pageDigest.isZero()); @@ -1580,13 +1593,13 @@ char *BCStateTran::createVBlock(const DescOfVBlockForResPages &desc) { currElement->pageId = pageId; currElement->checkpointNumber = actualPageCheckpoint; currElement->pageDigest = pageDigest; - memcpy(currElement->page, buffer_, sizeOfReservedPage_); - memset(buffer_, 0, sizeOfReservedPage_); + memcpy(currElement->page, buffer_, config_.sizeOfReservedPage); + memset(buffer_, 0, config_.sizeOfReservedPage); idx++; } Assert(idx == numberOfUpdatedPages); - AssertOR(!pedanticChecks_, checkStructureOfVirtualBlock(rawVBlock, size, sizeOfReservedPage_)); + AssertOR(!config_.pedanticChecks, checkStructureOfVirtualBlock(rawVBlock, size, config_.sizeOfReservedPage)); LOG_DEBUG(STLogger, "New vblock contains " << numberOfUpdatedPages << " updated pages , its size is " << size); return rawVBlock; @@ -1656,7 +1669,7 @@ bool BCStateTran::getNextFullBlock(uint64_t requiredBlock, bool isVBLock) { Assert(requiredBlock >= 1); - const uint32_t maxSize = (isVBLock ? maxVBlockSize_ : maxBlockSize_); + const uint32_t maxSize = (isVBLock ? maxVBlockSize_ : config_.maxBlockSize); clearPendingItemsData(requiredBlock + 1); outBadDataDetected = false; @@ -1766,7 +1779,7 @@ bool BCStateTran::checkVirtualBlockOfResPages(const STDigest &expectedDigestOfRe char *vblock, uint32_t vblockSize) const { LOG_DEBUG(STLogger, "BCStateTran::checkVirtualBlockOfResPages"); - if (!checkStructureOfVirtualBlock(vblock, vblockSize, sizeOfReservedPage_)) { + if (!checkStructureOfVirtualBlock(vblock, vblockSize, config_.sizeOfReservedPage)) { LOG_WARN(STLogger, "vblock has illegal structure"); return false; } @@ -1791,7 +1804,7 @@ bool BCStateTran::checkVirtualBlockOfResPages(const STDigest &expectedDigestOfRe if (numberOfUpdatedPages > 0) { uint32_t nextUpdateIndex = 0; - ElementOfVirtualBlock *nextUpdate = getVirtualElement(0, sizeOfReservedPage_, vblock); + ElementOfVirtualBlock *nextUpdate = getVirtualElement(0, config_.sizeOfReservedPage, vblock); for (uint32_t i = 0; i < numberOfReservedPages_; i++) { Assert(pagesDesc->d[i].pageId == i); @@ -1802,7 +1815,7 @@ bool BCStateTran::checkVirtualBlockOfResPages(const STDigest &expectedDigestOfRe pagesDesc->d[i].pageDigest = nextUpdate->pageDigest; nextUpdateIndex++; if (nextUpdateIndex < numberOfUpdatedPages) { - nextUpdate = getVirtualElement(nextUpdateIndex, sizeOfReservedPage_, vblock); + nextUpdate = getVirtualElement(nextUpdateIndex, config_.sizeOfReservedPage, vblock); } else { break; } @@ -1824,13 +1837,13 @@ bool BCStateTran::checkVirtualBlockOfResPages(const STDigest &expectedDigestOfRe // check digests of new pages for (uint32_t i = 0; i < numberOfUpdatedPages; i++) { - ElementOfVirtualBlock *e = getVirtualElement(i, sizeOfReservedPage_, vblock); + ElementOfVirtualBlock *e = getVirtualElement(i, config_.sizeOfReservedPage, vblock); // verified in checkStructureOfVirtualBlock Assert(e->checkpointNumber > 0); STDigest pageDigest; - computeDigestOfPage(e->pageId, e->checkpointNumber, e->page, sizeOfReservedPage_, pageDigest); + computeDigestOfPage(e->pageId, e->checkpointNumber, e->page, config_.sizeOfReservedPage, pageDigest); if (pageDigest != e->pageDigest) { LOG_WARN(STLogger, @@ -1848,7 +1861,7 @@ bool BCStateTran::checkVirtualBlockOfResPages(const STDigest &expectedDigestOfRe set BCStateTran::allOtherReplicas() { set others = replicas_; - others.erase(myId_); + others.erase(config_.myReplicaId); return others; } @@ -1879,7 +1892,7 @@ void BCStateTran::processData() { const FetchingState fs = getFetchingState(); AssertOR(fs == FetchingState::GettingMissingBlocks, fs == FetchingState::GettingMissingResPages); Assert(sourceSelector_.hasPreferredReplicas()); - Assert(totalSizeOfPendingItemDataMsgs <= maxPendingDataFromSourceReplica_); + Assert(totalSizeOfPendingItemDataMsgs <= config_.maxPendingDataFromSourceReplica); const bool isGettingBlocks = (fs == FetchingState::GettingMissingBlocks); @@ -2023,7 +2036,7 @@ void BCStateTran::processData() { // set the updated pages uint32_t numOfUpdates = getNumberOfElements(buffer_); for (uint32_t i = 0; i < numOfUpdates; i++) { - ElementOfVirtualBlock *e = getVirtualElement(i, sizeOfReservedPage_, buffer_); + ElementOfVirtualBlock *e = getVirtualElement(i, config_.sizeOfReservedPage, buffer_); g.txn()->setResPage(e->pageId, e->checkpointNumber, e->pageDigest, e->page); LOG_DEBUG(STLogger, "update page " << e->pageId); } @@ -2081,7 +2094,7 @@ void BCStateTran::processData() { metrics_.last_stored_checkpoint_.Get().Set(cp.checkpointNum); metrics_.checkpoint_being_fetched_.Get().Set(0); - checkConsistency(pedanticChecks_); + checkConsistency(config_.pedanticChecks); // Completion LOG_DEBUG(STLogger, "Calling onTransferringComplete for checkpoint " << cp.checkpointNum); @@ -2122,8 +2135,8 @@ void BCStateTran::checkConsistency(bool checkAllBlocks) { // check configuration Assert(replicas_ == psd_->getReplicas()); - Assert(myId_ == psd_->getMyReplicaId()); - Assert(fVal_ == psd_->getFVal()); + Assert(config_.myReplicaId == psd_->getMyReplicaId()); + Assert(config_.fVal == psd_->getFVal()); Assert(maxNumOfStoredCheckpoints_ == psd_->getMaxNumOfStoredCheckpoints()); Assert(numberOfReservedPages_ == psd_->getNumberOfReservedPages()); @@ -2245,16 +2258,16 @@ void BCStateTran::checkConsistency(bool checkAllBlocks) { Assert(allPagesDesc->d[j].relevantCheckpoint > 0); uint64_t actualCheckpoint = 0; - psd_->getResPage(j, i, &actualCheckpoint, buffer_, sizeOfReservedPage_); + psd_->getResPage(j, i, &actualCheckpoint, buffer_, config_.sizeOfReservedPage); Assert(allPagesDesc->d[j].relevantCheckpoint == actualCheckpoint); { STDigest d3; - computeDigestOfPage(j, actualCheckpoint, buffer_, sizeOfReservedPage_, d3); + computeDigestOfPage(j, actualCheckpoint, buffer_, config_.sizeOfReservedPage, d3); Assert(d3 == allPagesDesc->d[j].pageDigest); } } - memset(buffer_, 0, sizeOfReservedPage_); + memset(buffer_, 0, config_.sizeOfReservedPage); psd_->free(allPagesDesc); } } diff --git a/bftengine/src/bcstatetransfer/BCStateTran.hpp b/bftengine/src/bcstatetransfer/BCStateTran.hpp index 0abd6ddb60..c9097ca053 100644 --- a/bftengine/src/bcstatetransfer/BCStateTran.hpp +++ b/bftengine/src/bcstatetransfer/BCStateTran.hpp @@ -11,8 +11,7 @@ // terms and conditions of the subcomponent's license, as noted in the LICENSE // file. -#ifndef BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_ -#define BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_ +#pragma once #include #include @@ -37,10 +36,11 @@ using std::set; using std::map; using std::string; +using concordMetrics::StatusHandle; +using concordMetrics::GaugeHandle; +using concordMetrics::CounterHandle; -namespace bftEngine { -namespace SimpleBlockchainStateTransfer { -namespace impl { +namespace bftEngine::SimpleBlockchainStateTransfer::impl { class BCStateTran : public IStateTransfer { public: @@ -82,7 +82,11 @@ class BCStateTran : public IStateTransfer { void handleStateTransferMessage(char* msg, uint32_t msgLen, uint16_t senderId) override; protected: - const bool pedanticChecks_; + std::function messageHandler_; + // actual handling function. can be used in context of dedicated thread + void handleStateTransferMessageImp(char* msg, uint32_t msgLen, uint16_t senderId); + // handling from other context + void handoff(char* msg, uint32_t msgLen, uint16_t senderId); /////////////////////////////////////////////////////////////////////////// // Constants @@ -103,21 +107,8 @@ class BCStateTran : public IStateTransfer { /////////////////////////////////////////////////////////////////////////// // Management and general data /////////////////////////////////////////////////////////////////////////// - + const Config config_; const set replicas_; - const uint16_t myId_; - const uint16_t fVal_; - const uint32_t maxBlockSize_; - const uint32_t maxChunkSize_; - const uint16_t maxNumberOfChunksInBatch_; - const uint32_t maxPendingDataFromSourceReplica_; - - const uint32_t maxNumOfReservedPages_; - const uint32_t sizeOfReservedPage_; - const uint32_t refreshTimerMilli_; - const uint32_t checkpointSummariesRetransmissionTimeoutMilli_; - const uint32_t maxAcceptableMsgDelayMilli_; - const uint32_t maxVBlockSize_; const uint32_t maxItemSize_; const uint32_t maxNumOfChunksInAppBlock_; @@ -328,11 +319,6 @@ class BCStateTran : public IStateTransfer { std::chrono::seconds last_metrics_dump_time_; std::chrono::seconds metrics_dump_interval_in_sec_; concordMetrics::Component metrics_component_; - - typedef concordMetrics::Component::Handle GaugeHandle; - typedef concordMetrics::Component::Handle StatusHandle; - typedef concordMetrics::Component::Handle CounterHandle; - struct Metrics { StatusHandle fetching_state_; StatusHandle pedantic_checks_enabled_; @@ -395,8 +381,4 @@ class BCStateTran : public IStateTransfer { mutable Metrics metrics_; }; -} // namespace impl -} // namespace SimpleBlockchainStateTransfer -} // namespace bftEngine - -#endif // BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_ +} // namespace bftEngine::SimpleBlockchainStateTransfer::impl diff --git a/bftengine/src/bftengine/IncomingMsgsStorageImp.cpp b/bftengine/src/bftengine/IncomingMsgsStorageImp.cpp index 1fede6339d..5802701514 100644 --- a/bftengine/src/bftengine/IncomingMsgsStorageImp.cpp +++ b/bftengine/src/bftengine/IncomingMsgsStorageImp.cpp @@ -146,7 +146,7 @@ void IncomingMsgsStorageImp::dispatchMessages(std::promise& signalStarted) if (msgHandlerCallback) { msgHandlerCallback(message); } else { - LOG_WARN(GL, "Unknown message - delete"); + LOG_WARN(GL, "Delete unknown message type: " << message->type()); delete message; } break; diff --git a/bftengine/src/bftengine/ReadOnlyReplica.cpp b/bftengine/src/bftengine/ReadOnlyReplica.cpp index 191f3580d2..adfca8124a 100644 --- a/bftengine/src/bftengine/ReadOnlyReplica.cpp +++ b/bftengine/src/bftengine/ReadOnlyReplica.cpp @@ -47,7 +47,10 @@ void ReadOnlyReplica::start() { lastExecutedSeqNum = ps_->getLastExecutedSeqNum(); askForCheckpointMsgTimer_ = TimersSingleton::getInstance().add(std::chrono::seconds(5), // TODO [TK] config Timers::Timer::RECURRING, - [this](Timers::Handle) { sendAskForCheckpointMsg(); }); + [this](Timers::Handle) { + if (!this->isCollectingState()) + sendAskForCheckpointMsg(); + }); } void ReadOnlyReplica::stop() { diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp index 27c52a5853..d999948621 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.cpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.cpp @@ -49,8 +49,10 @@ void ReplicaForStateTransfer::start() { } void ReplicaForStateTransfer::stop() { - TimersSingleton::getInstance().cancel(stateTranTimer_); + // stop in reverse order ReplicaBase::stop(); + stateTransfer->stopRunning(); + TimersSingleton::getInstance().cancel(stateTranTimer_); } template <> diff --git a/bftengine/src/bftengine/ReplicaForStateTransfer.hpp b/bftengine/src/bftengine/ReplicaForStateTransfer.hpp index bf8c4d1785..efca0103a3 100644 --- a/bftengine/src/bftengine/ReplicaForStateTransfer.hpp +++ b/bftengine/src/bftengine/ReplicaForStateTransfer.hpp @@ -28,7 +28,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB bool firstTime // TODO [TK] get rid of this ); - IStateTransfer* getStateTransfer() const { return stateTransfer; } + IStateTransfer* getStateTransfer() const { return stateTransfer.get(); } // IReplicaForStateTransfer void freeStateTransferMsg(char* m) override; @@ -56,7 +56,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB void onMessage(T*); protected: - bftEngine::IStateTransfer* stateTransfer = nullptr; + std::unique_ptr stateTransfer; Timers::Handle stateTranTimer_; CounterHandle metric_received_state_transfers_; }; diff --git a/bftengine/src/bftengine/ReplicaImp.cpp b/bftengine/src/bftengine/ReplicaImp.cpp index 57d1bbaef0..0aafae01c4 100644 --- a/bftengine/src/bftengine/ReplicaImp.cpp +++ b/bftengine/src/bftengine/ReplicaImp.cpp @@ -2839,7 +2839,7 @@ ReplicaImp::ReplicaImp(bool firstTime, clientsManager = new ClientsManager(config_.replicaId, clientsSet, ReplicaConfigSingleton::GetInstance().GetSizeOfReservedPage()); - clientsManager->init(stateTransfer); + clientsManager->init(stateTransfer.get()); if (!firstTime || config_.debugPersistentStorageEnabled) clientsManager->loadInfoFromReservedPages(); diff --git a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp index 7222efe02e..ba629db496 100644 --- a/bftengine/src/simplestatetransfer/SimpleStateTran.cpp +++ b/bftengine/src/simplestatetransfer/SimpleStateTran.cpp @@ -93,20 +93,17 @@ class SimpleStateTran : public ISimpleInMemoryStateTransfer { // IAppState methods //////////////////////////////////////////////////////////////////////// - bool hasBlock(uint64_t blockId) override; + bool hasBlock(uint64_t blockId) const override; bool getBlock(uint64_t blockId, char* outBlock, uint32_t* outBlockSize) override; bool getPrevDigestFromBlock(uint64_t blockId, SimpleBlockchainStateTransfer::StateTransferDigest* outPrevBlockDigest) override; - bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override; + bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override; - uint64_t getLastReachableBlockNum() override; - - uint64_t getLastBlockNum() override; - - void wait() override; + uint64_t getLastReachableBlockNum() const override; + uint64_t getLastBlockNum() const override; }; /////////////////////////////////////////////////////////////////////////// @@ -564,7 +561,7 @@ void SimpleStateTran::onComplete(int64_t checkpointNumberOfNewState) { lastKnownCheckpoint = checkpointNumberOfNewState; } -bool SimpleStateTran::DummyBDState::hasBlock(uint64_t blockId) { return false; } +bool SimpleStateTran::DummyBDState::hasBlock(uint64_t blockId) const { return false; } bool SimpleStateTran::DummyBDState::getBlock(uint64_t blockId, char* outBlock, uint32_t* outBlockSize) { Assert(false); @@ -577,16 +574,14 @@ bool SimpleStateTran::DummyBDState::getPrevDigestFromBlock( return false; } -bool SimpleStateTran::DummyBDState::putBlock(uint64_t blockId, char* block, uint32_t blockSize) { +bool SimpleStateTran::DummyBDState::putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) { Assert(false); return false; } -uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() { return 0; } - -uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() { return 0; } +uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() const { return 0; } -void SimpleStateTran::DummyBDState::wait() {} +uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() const { return 0; } } // namespace impl } // namespace SimpleInMemoryStateTransfer diff --git a/bftengine/tests/bcstatetransfer/test_app_state.hpp b/bftengine/tests/bcstatetransfer/test_app_state.hpp index 3e8a34f54a..6c9d96a8fd 100644 --- a/bftengine/tests/bcstatetransfer/test_app_state.hpp +++ b/bftengine/tests/bcstatetransfer/test_app_state.hpp @@ -34,7 +34,7 @@ struct Block { class TestAppState : public IAppState { public: - bool hasBlock(uint64_t blockId) override { + bool hasBlock(uint64_t blockId) const override { auto it = blocks_.find(blockId); return it != blocks_.end(); } @@ -55,7 +55,7 @@ class TestAppState : public IAppState { return true; }; - bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override { + bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override { assert(blockId < last_block_); Block bl; computeBlockDigest(blockId, block, blockSize, &bl.digest); @@ -66,11 +66,8 @@ class TestAppState : public IAppState { } // TODO(AJS): How does this differ from getLastBlockNum? - uint64_t getLastReachableBlockNum() override { return last_block_; }; - - uint64_t getLastBlockNum() override { return last_block_; }; - - void wait() override{}; + uint64_t getLastReachableBlockNum() const override { return last_block_; } + uint64_t getLastBlockNum() const override { return last_block_; }; private: uint64_t last_block_; diff --git a/conanfile.txt b/conanfile.txt index 0b3e9749bc..3b739ae097 100644 --- a/conanfile.txt +++ b/conanfile.txt @@ -13,6 +13,7 @@ zstd/1.4.0@bincrafters/stable rocksdb/5.7.3 hdr_histogram/0.9.12 rapidcheck/258d907da00a0855f92c963d8f76eef115531716 +libs3-dev/2.0-3 [options] OpenSSL:shared=True @@ -46,4 +47,4 @@ boost:without_context=True [imports] bin, *.dll -> ./lib # Copies all dll files from packages bin folder to my "lib" folder lib, *.dylib* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder -lib, *.so* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder +lib, *.so* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder \ No newline at end of file diff --git a/kvbc/CMakeLists.txt b/kvbc/CMakeLists.txt index bdae2e858d..196c35da12 100644 --- a/kvbc/CMakeLists.txt +++ b/kvbc/CMakeLists.txt @@ -2,9 +2,9 @@ cmake_minimum_required (VERSION 3.2) project(libkvbc VERSION 0.1.0.0 LANGUAGES CXX) add_library(kvbc src/ClientImp.cpp - src/ReplicaImp.cpp - src/replica_state_sync_imp.cpp - src/block_metadata.cpp + src/ReplicaImp.cpp + src/replica_state_sync_imp.cpp + src/block_metadata.cpp src/direct_kv_db_adapter.cpp src/merkle_tree_db_adapter.cpp src/direct_kv_block.cpp @@ -17,10 +17,9 @@ add_library(kvbc src/ClientImp.cpp src/sparse_merkle/walker.cpp ) -target_link_libraries(kvbc PUBLIC threshsign util concordbft_storage corebft) +target_link_libraries(kvbc PUBLIC corebft ) target_include_directories(kvbc PUBLIC include) -target_include_directories(kvbc PUBLIC ${bftengine_SOURCE_DIR}/include) find_package(OpenSSL REQUIRED) target_link_libraries(kvbc PRIVATE ${OPENSSL_LIBRARIES}) diff --git a/kvbc/include/ReplicaImp.h b/kvbc/include/ReplicaImp.h index b6abaf2c3e..af2cdf99ce 100644 --- a/kvbc/include/ReplicaImp.h +++ b/kvbc/include/ReplicaImp.h @@ -25,52 +25,47 @@ namespace concord::kvbc { -class ReplicaInitException : public std::exception { +class ReplicaImp : public IReplica, + public ILocalKeyValueStorageReadOnly, + public IBlocksAppender, + public bftEngine::SimpleBlockchainStateTransfer::IAppState { public: - explicit ReplicaInitException(const std::string &what) : msg(what){}; - - virtual const char *what() const noexcept override { return msg.c_str(); } - - private: - std::string msg; -}; - -class ReplicaImp : public IReplica, public ILocalKeyValueStorageReadOnly, public IBlocksAppender { - public: - // concord::kvbc::IReplica methods - virtual Status start() override; - virtual Status stop() override; - - virtual RepStatus getReplicaStatus() const override; - - virtual const ILocalKeyValueStorageReadOnly &getReadOnlyStorage() override; - - virtual Status addBlockToIdleReplica(const concord::storage::SetOfKeyValuePairs &updates) override; - - virtual void set_command_handler(ICommandsHandler *handler) override; - - // concord::storage::ILocalKeyValueStorageReadOnly methods - virtual Status get(const Sliver &key, Sliver &outValue) const override; - - virtual Status get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const override; - - virtual BlockId getLastBlock() const override; - - virtual Status getBlockData(BlockId blockId, concord::storage::SetOfKeyValuePairs &outBlockData) const override; - - virtual Status mayHaveConflictBetween(const Sliver &key, - BlockId fromBlock, - BlockId toBlock, - bool &outRes) const override; - - // concord::storage::IBlocksAppender - virtual Status addBlock(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId) override; - + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // IReplica implementation + Status start() override; + Status stop() override; bool isRunning() const override { return (m_currentRepStatus == RepStatus::Running); } + RepStatus getReplicaStatus() const override; + const ILocalKeyValueStorageReadOnly &getReadOnlyStorage() override; + Status addBlockToIdleReplica(const concord::storage::SetOfKeyValuePairs &updates) override; + void set_command_handler(ICommandsHandler *handler) override; + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // ILocalKeyValueStorageReadOnly implementation + Status get(const Sliver &key, Sliver &outValue) const override; + Status get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const override; + BlockId getLastBlock() const override { return getLastBlockNum(); } + Status getBlockData(BlockId blockId, concord::storage::SetOfKeyValuePairs &outBlockData) const override; + Status mayHaveConflictBetween(const Sliver &key, BlockId fromBlock, BlockId toBlock, bool &outRes) const override; + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // IBlocksAppender implementation + Status addBlock(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId) override; + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // IAppState implementation + bool hasBlock(BlockId blockId) const override; + bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override; + bool getPrevDigestFromBlock(uint64_t blockId, + bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *) override; + bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) override; + uint64_t getLastReachableBlockNum() const override { return m_bcDbAdapter->getLastReachableBlockId(); } + uint64_t getLastBlockNum() const override { return m_bcDbAdapter->getLatestBlockId(); } + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ReplicaImp(bftEngine::ICommunication *comm, bftEngine::ReplicaConfig &config, - DBAdapter *dbAdapter, + std::unique_ptr dbAdapter, + std::shared_ptr mdt_dbclient, std::shared_ptr aggregator); void setReplicaStateSync(ReplicaStateSync *rss) { replicaStateSync_.reset(rss); } @@ -78,13 +73,10 @@ class ReplicaImp : public IReplica, public ILocalKeyValueStorageReadOnly, public ~ReplicaImp() override; protected: - // METHODS - Status addBlockInternal(const concord::storage::SetOfKeyValuePairs &updates, BlockId &outBlockId); Status getInternal(BlockId readVersion, Key key, Sliver &outValue, BlockId &outBlock) const; void insertBlockInternal(BlockId blockId, Sliver block); RawBlock getBlockInternal(BlockId blockId) const; - IDbAdapter *getBcDbAdapter() const { return m_bcDbAdapter; } private: friend class StorageWrapperForIdleMode; @@ -118,65 +110,16 @@ class ReplicaImp : public IReplica, public ILocalKeyValueStorageReadOnly, public } }; - // TODO(GG): do we want synchronization here ? - class StorageWrapperForIdleMode : public ILocalKeyValueStorageReadOnly { - private: - const ReplicaImp *rep; - - public: - StorageWrapperForIdleMode(const ReplicaImp *r); - - virtual Status get(const Sliver &key, Sliver &outValue) const override; - - virtual Status get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const override; - virtual BlockId getLastBlock() const override; - - virtual Status getBlockData(BlockId blockId, concord::storage::SetOfKeyValuePairs &outBlockData) const override; - - virtual Status mayHaveConflictBetween(const Sliver &key, - BlockId fromBlock, - BlockId toBlock, - bool &outRes) const override; - }; - - class BlockchainAppState : public bftEngine::SimpleBlockchainStateTransfer::IAppState { - public: - BlockchainAppState(ReplicaImp *const parent); - - virtual bool hasBlock(uint64_t blockId) override; - virtual bool getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) override; - virtual bool getPrevDigestFromBlock( - uint64_t blockId, bftEngine::SimpleBlockchainStateTransfer::StateTransferDigest *outPrevBlockDigest) override; - virtual bool putBlock(uint64_t blockId, char *block, uint32_t blockSize) override; - virtual uint64_t getLastReachableBlockNum() override; - virtual uint64_t getLastBlockNum() override; - virtual void wait() override; - - private: - ReplicaImp *const m_ptrReplicaImpl = nullptr; - concordlogger::Logger m_logger; - - // from IAppState. represents maximal block number n such that all - // blocks 1 <= i <= n exist - std::atomic m_lastReachableBlock{0}; - - friend class ReplicaImp; - }; - - // DATA private: concordlogger::Logger logger; RepStatus m_currentRepStatus; - StorageWrapperForIdleMode m_InternalStorageWrapperForIdleMode; - IDbAdapter *m_bcDbAdapter = nullptr; - BlockId m_lastBlock = 0; + std::unique_ptr m_bcDbAdapter; bftEngine::ICommunication *m_ptrComm = nullptr; bftEngine::ReplicaConfig m_replicaConfig; bftEngine::IReplica *m_replicaPtr = nullptr; ICommandsHandler *m_cmdHandler = nullptr; bftEngine::IStateTransfer *m_stateTransfer = nullptr; - std::unique_ptr m_appState; concord::storage::DBMetadataStorage *m_metadataStorage = nullptr; std::unique_ptr replicaStateSync_; std::shared_ptr aggregator_; diff --git a/kvbc/include/db_adapter.h b/kvbc/include/db_adapter.h index b512e435eb..5aba791fdf 100644 --- a/kvbc/include/db_adapter.h +++ b/kvbc/include/db_adapter.h @@ -38,6 +38,9 @@ class IDbAdapter { // Delete a block from the database virtual void deleteBlock(const BlockId& blockId) = 0; + // Checks whether block exists + virtual bool hasBlock(const BlockId& blockId) const = 0; + // Used to retrieve the latest block. virtual BlockId getLatestBlockId() const = 0; diff --git a/kvbc/include/db_interfaces.h b/kvbc/include/db_interfaces.h index a4c286035b..f8aa7f61b1 100644 --- a/kvbc/include/db_interfaces.h +++ b/kvbc/include/db_interfaces.h @@ -36,15 +36,4 @@ class IBlocksAppender { virtual ~IBlocksAppender() = default; }; -/** Defines interface for blockchain data keys generation - * - */ -class IDataKeyGenerator { - public: - virtual concordUtils::Sliver blockKey(const BlockId&) const = 0; - virtual concordUtils::Sliver dataKey(const Key&, const BlockId&) const = 0; - - virtual ~IDataKeyGenerator() = default; -}; - } // namespace concord::kvbc diff --git a/kvbc/include/direct_kv_db_adapter.h b/kvbc/include/direct_kv_db_adapter.h index 7eadffe76a..7a5afc83b3 100644 --- a/kvbc/include/direct_kv_db_adapter.h +++ b/kvbc/include/direct_kv_db_adapter.h @@ -5,7 +5,6 @@ #pragma once -#include "db_interfaces.h" #include "kv_types.hpp" #include "Logger.hpp" #include "storage/db_interface.h" @@ -17,6 +16,11 @@ namespace concord::kvbc { inline namespace v1DirectKeyValue { using concord::storage::detail::EDBKeyType; +/** Key comparator for sorting keys in database. + * Used with rocksdb when there's no natural lexicographical key comparison. + * + * @deprecated should be removed after switching to merkle-style keys serialization. + */ class DBKeyComparator : public concord::storage::IDBClient::IKeyComparator { public: int composedKeyComparison(const char *_a_data, size_t _a_length, const char *_b_data, size_t _b_length) override; @@ -26,22 +30,75 @@ class DBKeyComparator : public concord::storage::IDBClient::IKeyComparator { } }; -class DBKeyManipulatorBase : public storage::DBKeyManipulatorBase { +/** Defines interface for blockchain data keys generation. + * + */ +class IDataKeyGenerator { + public: + virtual Key blockKey(const BlockId &) const = 0; + virtual Key dataKey(const Key &, const BlockId &) const = 0; + virtual Key mdtKey(const Key &) const = 0; + + virtual ~IDataKeyGenerator() = default; +}; + +/** Default Key Generator + * Used with rocksdb + */ +class RocksKeyGenerator : public IDataKeyGenerator, storage::DBKeyGeneratorBase { + public: + Key blockKey(const BlockId &) const override; + Key dataKey(const Key &, const BlockId &) const override; + Key mdtKey(const Key &key) const override { return key; } + protected: static concordUtils::Sliver genDbKey(EDBKeyType, const Key &, BlockId); static concordlogger::Logger &logger() { - static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.kvbc.DBKeyManipulator"); + static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.kvbc.RocksKeyGenerator"); return logger_; } }; -class KeyGenerator : public IDataKeyGenerator, public DBKeyManipulatorBase { +/** Key generator for S3 storage + * As S3 has textual hierarchical URI like structure the generated keys are as follows: + * prefix/block_id/raw_block + * prefix/block_id/key1 + * ...................... + * prefix/block_id/keyN + * + * metadata have the following form: + * prefix/metadata/key + * + * keys are transformed to hexadecimal strings. + * + * Where prefix is some arbitrary string. + * Typically it should be a blockchain id. + * For tests it is usually a timestamp. + */ +class S3KeyGenerator : public IDataKeyGenerator { public: - concordUtils::Sliver blockKey(const BlockId &) const override; - concordUtils::Sliver dataKey(const Key &, const BlockId &) const override; + S3KeyGenerator(const std::string &prefix = "") : prefix_(prefix + std::string("/")) {} + Key blockKey(const BlockId &) const override; + Key dataKey(const Key &, const BlockId &) const override; + Key mdtKey(const Key &key) const override; + + protected: + static std::string string2hex(const std::string &s); + static std::string hex2string(const std::string &); + static concordlogger::Logger &logger() { + static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.kvbc.S3KeyGenerator"); + return logger_; + } + + std::string prefix_; }; -class DBKeyManipulator : public DBKeyManipulatorBase { +/** Key Manipulator for extracting info from database keys + * Used for rocksdb when there's no natural lexicographical key comparison. + * + * @deprecated should be removed after switching to merkle-style keys serialization. + */ +class DBKeyManipulator { public: static BlockId extractBlockIdFromKey(const Key &_key); static BlockId extractBlockIdFromKey(const char *_key_data, size_t _key_length); @@ -49,17 +106,26 @@ class DBKeyManipulator : public DBKeyManipulatorBase { static EDBKeyType extractTypeFromKey(const char *_key_data); static storage::ObjectId extractObjectIdFromKey(const Key &_key); static storage::ObjectId extractObjectIdFromKey(const char *_key_data, size_t _key_length); - static concordUtils::Sliver extractKeyFromKeyComposedWithBlockId(const Key &_composedKey); + static Key extractKeyFromKeyComposedWithBlockId(const Key &_composedKey); static int compareKeyPartOfComposedKey(const char *a_data, size_t a_length, const char *b_data, size_t b_length); - static concordUtils::Sliver extractKeyFromMetadataKey(const Key &_composedKey); + static Key extractKeyFromMetadataKey(const Key &_composedKey); static bool isKeyContainBlockId(const Key &_composedKey); static KeyValuePair composedToSimple(KeyValuePair _p); + static std::string extractFreeKey(const char *_key_data, size_t _key_length); + static concordlogger::Logger &logger() { + static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.kvbc.DBKeyManipulator"); + return logger_; + } }; +/** DBadapter for managing key/value blockchain atop key/value store in form of simple blocks + * + */ class DBAdapter : public IDbAdapter { public: - DBAdapter(std::shared_ptr, - std::unique_ptr keyGen = std::make_unique()); + DBAdapter(std::shared_ptr dataStore, + std::unique_ptr keyGen = std::make_unique(), + bool use_mdt = false); // Adds a block from a set of key/value pairs and a block ID. Includes: // - adding the key/value pairs in separate keys @@ -79,8 +145,10 @@ class DBAdapter : public IDbAdapter { void deleteBlock(const BlockId &) override; - BlockId getLatestBlockId() const override; - BlockId getLastReachableBlockId() const override; + bool hasBlock(const BlockId &blockId) const override; + + BlockId getLatestBlockId() const override { return lastBlockId_; } + BlockId getLastReachableBlockId() const override { return lastReachableBlockId_; } // Returns the block data in the form of a set of key/value pairs. SetOfKeyValuePairs getBlockData(const RawBlock &rawBlock) const override; @@ -90,14 +158,28 @@ class DBAdapter : public IDbAdapter { std::shared_ptr getDb() const override { return db_; } - private: + protected: + void setLastReachableBlockNum(const BlockId &blockId); + void setLatestBlock(const BlockId &blockId); + + BlockId fetchLatestBlockId() const; + BlockId fetchLastReachableBlockId() const; + + // The following methods are used to store metadata parameters + BlockId mdtGetLatestBlockId() const; + BlockId mdtGetLastReachableBlockId() const; + concordUtils::Status mdtGet(const concordUtils::Sliver &key, concordUtils::Sliver &val) const; + concordUtils::Status mdtPut(const concordUtils::Sliver &key, const concordUtils::Sliver &val); + concordUtils::Status addBlockAndUpdateMultiKey(const SetOfKeyValuePairs &_kvMap, const BlockId &_block, const concordUtils::Sliver &_blockRaw); - concordlogger::Logger logger_; std::shared_ptr db_; std::unique_ptr keyGen_; + bool mdt_ = false; // whether we explicitly store blockchain metadata + BlockId lastBlockId_ = 0; + BlockId lastReachableBlockId_ = 0; }; } // namespace v1DirectKeyValue diff --git a/kvbc/include/merkle_tree_db_adapter.h b/kvbc/include/merkle_tree_db_adapter.h index 5d8c524dec..78ab9a5916 100644 --- a/kvbc/include/merkle_tree_db_adapter.h +++ b/kvbc/include/merkle_tree_db_adapter.h @@ -120,6 +120,9 @@ class DBAdapter : public IDbAdapter { std::shared_ptr getDb() const override { return db_; } + // TODO [TK] implement + bool hasBlock(const BlockId &blockId) const override { return false; } + // Returns the current state hash from the internal merkle tree implementation. const sparse_merkle::Hash &getStateHash() const { return smTree_.get_root_hash(); } diff --git a/kvbc/src/ReplicaImp.cpp b/kvbc/src/ReplicaImp.cpp index 713ad41255..7ee234a7f2 100644 --- a/kvbc/src/ReplicaImp.cpp +++ b/kvbc/src/ReplicaImp.cpp @@ -2,13 +2,11 @@ // // KV Blockchain replica implementation. -#include #include #include #include #include #include - #include "ReplicaImp.h" #include #include @@ -19,7 +17,6 @@ #include "hex_tools.h" #include "replica_state_sync.h" #include "sliver.hpp" -#include "db_interfaces.h" #include "block.h" #include "bftengine/DbMetadataStorage.hpp" @@ -33,8 +30,7 @@ using concord::storage::DBMetadataStorage; using concord::storage::MetadataKeyManipulator; namespace block = concord::kvbc::block; -namespace concord { -namespace kvbc { +namespace concord::kvbc { /** * Opens the database and creates the replica thread. Replica state moves to @@ -48,7 +44,6 @@ Status ReplicaImp::start() { } m_currentRepStatus = RepStatus::Starting; - m_metadataStorage = new DBMetadataStorage(m_bcDbAdapter->getDb().get(), MetadataKeyManipulator::generateMetadataKey); if (m_replicaConfig.isReadOnly) { LOG_INFO(logger, "ReadOnly mode"); @@ -74,13 +69,11 @@ void ReplicaImp::createReplicaAndSyncState() { &m_replicaConfig, m_cmdHandler, m_stateTransfer, m_ptrComm, m_metadataStorage); if (!isNewStorage && !m_stateTransfer->isCollectingState()) { uint64_t removedBlocksNum = replicaStateSync_->execute( - logger, *m_bcDbAdapter, m_appState->m_lastReachableBlock, m_replicaPtr->getLastExecutedSequenceNum()); - m_lastBlock -= removedBlocksNum; - m_appState->m_lastReachableBlock -= removedBlocksNum; + logger, *m_bcDbAdapter, getLastReachableBlockNum(), m_replicaPtr->getLastExecutedSequenceNum()); LOG_INFO(logger, "createReplicaAndSyncState: removedBlocksNum = " - << removedBlocksNum << ", new m_lastBlock = " << m_lastBlock - << ", new m_lastReachableBlock = " << m_appState->m_lastReachableBlock); + << removedBlocksNum << ", new m_lastBlock = " << getLastBlockNum() + << ", new m_lastReachableBlock = " << getLastReachableBlockNum()); } } @@ -96,7 +89,7 @@ Status ReplicaImp::stop() { ReplicaImp::RepStatus ReplicaImp::getReplicaStatus() const { return m_currentRepStatus; } -const ILocalKeyValueStorageReadOnly &ReplicaImp::getReadOnlyStorage() { return m_InternalStorageWrapperForIdleMode; } +const ILocalKeyValueStorageReadOnly &ReplicaImp::getReadOnlyStorage() { return *this; } Status ReplicaImp::addBlockToIdleReplica(const SetOfKeyValuePairs &updates) { if (getReplicaStatus() != IReplica::RepStatus::Idle) { @@ -112,7 +105,7 @@ Status ReplicaImp::get(const Sliver &key, Sliver &outValue) const { // the replica's internal thread) BlockId dummy; - return getInternal(m_lastBlock, key, outValue, dummy); + return getInternal(getLastBlockNum(), key, outValue, dummy); } Status ReplicaImp::get(BlockId readVersion, const Sliver &key, Sliver &outValue, BlockId &outBlock) const { @@ -122,8 +115,6 @@ Status ReplicaImp::get(BlockId readVersion, const Sliver &key, Sliver &outValue, return getInternal(readVersion, key, outValue, outBlock); } -BlockId ReplicaImp::getLastBlock() const { return m_lastBlock; } - Status ReplicaImp::getBlockData(BlockId blockId, SetOfKeyValuePairs &outBlockData) const { // TODO(GG): check legality of operation (the method should be invoked from // the replica's internal thread) @@ -169,30 +160,28 @@ void ReplicaImp::set_command_handler(ICommandsHandler *handler) { m_cmdHandler = ReplicaImp::ReplicaImp(ICommunication *comm, bftEngine::ReplicaConfig &replicaConfig, - DBAdapter *dbAdapter, + std::unique_ptr dbAdapter, + std::shared_ptr mdt_dbclient, std::shared_ptr aggregator) : logger(concordlogger::Log::getLogger("skvbc.replicaImp")), m_currentRepStatus(RepStatus::Idle), - m_InternalStorageWrapperForIdleMode(this), - m_bcDbAdapter(dbAdapter), - m_lastBlock(dbAdapter->getLatestBlockId()), + m_bcDbAdapter(std::move(dbAdapter)), m_ptrComm(comm), m_replicaConfig(replicaConfig), - m_appState(new BlockchainAppState(this)), aggregator_(aggregator) { bftEngine::SimpleBlockchainStateTransfer::Config state_transfer_config; - state_transfer_config.myReplicaId = m_replicaConfig.replicaId; state_transfer_config.cVal = m_replicaConfig.cVal; state_transfer_config.fVal = m_replicaConfig.fVal; state_transfer_config.numReplicas = m_replicaConfig.numReplicas + m_replicaConfig.numRoReplicas; state_transfer_config.metricsDumpIntervalSeconds = std::chrono::seconds(m_replicaConfig.metricsDumpIntervalSeconds); + state_transfer_config.isReadOnly = replicaConfig.isReadOnly; if (replicaConfig.maxNumOfReservedPages > 0) state_transfer_config.maxNumOfReservedPages = replicaConfig.maxNumOfReservedPages; if (replicaConfig.sizeOfReservedPage > 0) state_transfer_config.sizeOfReservedPage = replicaConfig.sizeOfReservedPage; - - m_stateTransfer = bftEngine::SimpleBlockchainStateTransfer::create( - state_transfer_config, m_appState.get(), m_bcDbAdapter->getDb(), aggregator); + m_stateTransfer = + bftEngine::SimpleBlockchainStateTransfer::create(state_transfer_config, this, mdt_dbclient, aggregator_); + m_metadataStorage = new DBMetadataStorage(mdt_dbclient.get(), MetadataKeyManipulator::generateMetadataKey); } ReplicaImp::~ReplicaImp() { @@ -202,25 +191,11 @@ ReplicaImp::~ReplicaImp() { } delete m_replicaPtr; } - - if (m_stateTransfer) { - if (m_stateTransfer->isRunning()) { - m_stateTransfer->stopRunning(); - } - delete m_stateTransfer; - } } Status ReplicaImp::addBlockInternal(const SetOfKeyValuePairs &updates, BlockId &outBlockId) { - m_lastBlock++; - m_appState->m_lastReachableBlock++; - - BlockId block = m_lastBlock; - SetOfKeyValuePairs updatesInNewBlock; - - LOG_DEBUG(logger, "block:" << block << " updates: " << updates.size()); - outBlockId = m_bcDbAdapter->addBlock(updates); + return Status::OK(); } @@ -236,22 +211,18 @@ Status ReplicaImp::getInternal(BlockId readVersion, Key key, Sliver &outValue, B } } -void ReplicaImp::insertBlockInternal(BlockId blockId, Sliver block) { - if (blockId > m_lastBlock) { - m_lastBlock = blockId; - } - // when ST runs, blocks arrive in batches in reverse order. we need to keep - // track on the "Gap" and to close it. Only when it is closed, the last - // reachable block becomes the same as the last block - if (blockId == m_appState->m_lastReachableBlock + 1) { - m_appState->m_lastReachableBlock = m_lastBlock; - } - try { - RawBlock existingBlock = m_bcDbAdapter->getRawBlock(blockId); +/* + * This method can't return false by current insertBlockInternal impl. + * It is used only by State Transfer to synchronize state between replicas. + */ +bool ReplicaImp::putBlock(const uint64_t blockId, const char *block_data, const uint32_t blockSize) { + Sliver block = Sliver::copy(block_data, blockSize); + + if (m_bcDbAdapter->hasBlock(blockId)) { // if we already have a block with the same ID + RawBlock existingBlock = m_bcDbAdapter->getRawBlock(blockId); if (existingBlock.length() != block.length() || memcmp(existingBlock.data(), block.data(), block.length())) { // the replica is corrupted ! - // TODO(GG): what do we want to do now ? LOG_ERROR(logger, "found block " << blockId << ", size in db is " << existingBlock.length() << ", inserted is " << block.length() << ", data in db " << existingBlock << ", data inserted " << block); @@ -260,148 +231,47 @@ void ReplicaImp::insertBlockInternal(BlockId blockId, Sliver block) { << (memcmp(existingBlock.data(), block.data(), block.length()))); m_bcDbAdapter->deleteBlock(blockId); - - // TODO(GG): how do we want to handle this - restart replica? - // exit(1); - return; + throw std::runtime_error(__PRETTY_FUNCTION__ + std::string("data corrupted blockId: ") + std::to_string(blockId)); } - } catch (const NotFoundException &e) { - m_bcDbAdapter->addRawBlock(block, blockId); // TODO [TK] will be replaced after integration of hasBlock - } -} - -RawBlock ReplicaImp::getBlockInternal(BlockId blockId) const { - assert(blockId <= m_lastBlock); - return m_bcDbAdapter->getRawBlock(blockId); -} - -ReplicaImp::StorageWrapperForIdleMode::StorageWrapperForIdleMode(const ReplicaImp *r) : rep(r) {} - -Status ReplicaImp::StorageWrapperForIdleMode::get(const Sliver &key, Sliver &outValue) const { - if (rep->getReplicaStatus() != IReplica::RepStatus::Idle) { - return Status::IllegalOperation(""); } - return rep->get(key, outValue); -} - -Status ReplicaImp::StorageWrapperForIdleMode::get(BlockId readVersion, - const Sliver &key, - Sliver &outValue, - BlockId &outBlock) const { - if (rep->getReplicaStatus() != IReplica::RepStatus::Idle) { - return Status::IllegalOperation(""); - } - - return rep->get(readVersion, key, outValue, outBlock); -} - -BlockId ReplicaImp::StorageWrapperForIdleMode::getLastBlock() const { return rep->getLastBlock(); } - -Status ReplicaImp::StorageWrapperForIdleMode::getBlockData(BlockId blockId, SetOfKeyValuePairs &outBlockData) const { - if (rep->getReplicaStatus() != IReplica::RepStatus::Idle) { - return Status::IllegalOperation(""); - } + m_bcDbAdapter->addRawBlock(block, blockId); - try { - Sliver block = rep->getBlockInternal(blockId); - outBlockData = rep->getBcDbAdapter()->getBlockData(block); - } catch (const NotFoundException &e) { - return Status::NotFound("todo"); - } - - return Status::OK(); + return true; } -Status ReplicaImp::StorageWrapperForIdleMode::mayHaveConflictBetween(const Sliver &key, - BlockId fromBlock, - BlockId toBlock, - bool &outRes) const { - outRes = true; - - Sliver dummy; - BlockId block = 0; - Status s = rep->getInternal(toBlock, key, dummy, block); - - if (s.isOK() && block < fromBlock) { - outRes = false; - } - - return s; +RawBlock ReplicaImp::getBlockInternal(BlockId blockId) const { + assert(blockId <= getLastBlockNum()); + return m_bcDbAdapter->getRawBlock(blockId); } -/* - * These functions are used by the ST module to interact with the KVB - */ -ReplicaImp::BlockchainAppState::BlockchainAppState(ReplicaImp *const parent) - : m_ptrReplicaImpl{parent}, - m_logger{concordlogger::Log::getLogger("blockchainappstate")}, - m_lastReachableBlock{parent->getBcDbAdapter()->getLastReachableBlockId()} {} - /* * This method assumes that *outBlock is big enough to hold block content * The caller is the owner of the memory */ -bool ReplicaImp::BlockchainAppState::getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) { - Sliver res = m_ptrReplicaImpl->getBlockInternal(blockId); - if (0 == res.length()) { - // in normal state it should not happen. If it happened - the data is - // corrupted - LOG_FATAL(m_logger, "Block not found, ID: " << blockId); - exit(1); - } - - *outBlockSize = res.length(); - memcpy(outBlock, res.data(), res.length()); +bool ReplicaImp::getBlock(uint64_t blockId, char *outBlock, uint32_t *outBlockSize) { + RawBlock block = getBlockInternal(blockId); + *outBlockSize = block.length(); + memcpy(outBlock, block.data(), block.length()); return true; } -bool ReplicaImp::BlockchainAppState::hasBlock(uint64_t blockId) { - try { - RawBlock block = m_ptrReplicaImpl->getBlockInternal(blockId); - return true; - } catch (const NotFoundException &e) { - return false; // TODO [TK] use dbadapter::has after implemented - } -} +bool ReplicaImp::hasBlock(BlockId blockId) const { return m_bcDbAdapter->hasBlock(blockId); } -bool ReplicaImp::BlockchainAppState::getPrevDigestFromBlock(uint64_t blockId, StateTransferDigest *outPrevBlockDigest) { +bool ReplicaImp::getPrevDigestFromBlock(BlockId blockId, StateTransferDigest *outPrevBlockDigest) { assert(blockId > 0); try { - RawBlock result = m_ptrReplicaImpl->m_bcDbAdapter->getRawBlock(blockId); - auto parentDigest = m_ptrReplicaImpl->m_bcDbAdapter->getParentDigest(result); + RawBlock result = getBlockInternal(blockId); + auto parentDigest = m_bcDbAdapter->getParentDigest(result); assert(outPrevBlockDigest); static_assert(parentDigest.size() == BLOCK_DIGEST_SIZE); static_assert(sizeof(StateTransferDigest) == BLOCK_DIGEST_SIZE); memcpy(outPrevBlockDigest, parentDigest.data(), BLOCK_DIGEST_SIZE); return true; } catch (const NotFoundException &e) { - LOG_FATAL(m_logger, "Block not found for parent digest, ID: " << blockId); + LOG_FATAL(logger, "Block not found for parent digest, ID: " << blockId); throw; } } -/* - * This method cant return false by current insertBlockInternal impl. - * It is used only by State Transfer to synchronize state between replicas. - */ -bool ReplicaImp::BlockchainAppState::putBlock(uint64_t blockId, char *block, uint32_t blockSize) { - char *tmpBlockPtr = new char[blockSize]; - memcpy(tmpBlockPtr, block, blockSize); - Sliver s(tmpBlockPtr, blockSize); - - m_ptrReplicaImpl->insertBlockInternal(blockId, s); - return true; -} - -uint64_t ReplicaImp::BlockchainAppState::getLastReachableBlockNum() { - LOG_INFO(m_logger, "m_lastReachableBlock=" << m_lastReachableBlock); - return m_lastReachableBlock; -} - -uint64_t ReplicaImp::BlockchainAppState::getLastBlockNum() { return m_ptrReplicaImpl->m_lastBlock; } - -void ReplicaImp::BlockchainAppState::wait() { return; } - -} // namespace kvbc -} // namespace concord +} // namespace concord::kvbc diff --git a/kvbc/src/direct_kv_db_adapter.cpp b/kvbc/src/direct_kv_db_adapter.cpp index 69de645e0d..7f232f3922 100644 --- a/kvbc/src/direct_kv_db_adapter.cpp +++ b/kvbc/src/direct_kv_db_adapter.cpp @@ -17,11 +17,11 @@ #include "status.hpp" #include "kv_types.hpp" #include "block.h" -#include "db_interfaces.h" #include "db_adapter.h" #include "direct_kv_block.h" #include "bcstatetransfer/SimpleBCStateTransfer.hpp" #include "hex_tools.h" +#include "string.hpp" #include @@ -30,6 +30,8 @@ #include #include #include +#include +#include using concordlogger::Logger; using concordUtils::Status; @@ -48,7 +50,7 @@ inline namespace v1DirectKeyValue { * incorporated into the composite database key. * @return Sliver object of the generated composite database key. */ -Sliver KeyGenerator::blockKey(const BlockId &blockId) const { +Key RocksKeyGenerator::blockKey(const BlockId &blockId) const { return genDbKey(EDBKeyType::E_DB_KEY_TYPE_BLOCK, Sliver(), blockId); } @@ -60,10 +62,37 @@ Sliver KeyGenerator::blockKey(const BlockId &blockId) const { * into the composite database Key. * @return Sliver object of the generated composite database key. */ -Sliver KeyGenerator::dataKey(const Key &key, const BlockId &blockId) const { +Key RocksKeyGenerator::dataKey(const Key &key, const BlockId &blockId) const { return genDbKey(EDBKeyType::E_DB_KEY_TYPE_KEY, key, blockId); } +Key S3KeyGenerator::blockKey(const BlockId &blockId) const { + LOG_DEBUG(logger(), prefix_ + std::to_string(blockId) + std::string("/raw_block")); + return prefix_ + std::to_string(blockId) + std::string("/raw_block"); +} + +Key S3KeyGenerator::dataKey(const Key &key, const BlockId &blockId) const { + LOG_DEBUG(logger(), prefix_ + std::to_string(blockId) + std::string("/") + string2hex(key.toString())); + return prefix_ + std::to_string(blockId) + std::string("/") + string2hex(key.toString()); +} + +Key S3KeyGenerator::mdtKey(const Key &key) const { + LOG_DEBUG(logger(), prefix_ + std::string("metadata/") + key.toString()); + return prefix_ + std::string("metadata/") + key.toString(); +} + +std::string S3KeyGenerator::string2hex(const std::string &s) { + std::ostringstream oss; + for (size_t i = 0; i < s.length(); i++) oss << std::hex << std::setw(2) << std::setfill('0') << (uint)s[i]; + return oss.str(); +} +std::string S3KeyGenerator::hex2string(const std::string &s) { + std::string result; + result.reserve(s.length() / 2); + for (size_t i = 0; i < s.length(); i += 2) result.push_back(std::stoi(s.substr(i, 2).c_str(), NULL, 16)); + return result; +} + /* * If key a is smaller than key b, return a negative number; if larger, return a * positive number; if equal, return zero. @@ -140,7 +169,7 @@ int DBKeyComparator::composedKeyComparison(const char *_a_data, * @param _blockId BlockId object. * @return Sliver object of the generated composite database key. */ -Sliver DBKeyManipulatorBase::genDbKey(EDBKeyType _type, const Key &_key, BlockId _blockId) { +Sliver RocksKeyGenerator::genDbKey(EDBKeyType _type, const Key &_key, BlockId _blockId) { size_t sz = sizeof(EDBKeyType) + sizeof(BlockId) + _key.length(); char *out = new char[sz]; size_t offset = 0; @@ -296,7 +325,7 @@ int DBKeyManipulator::compareKeyPartOfComposedKey(const char *a_data, return result; } -Sliver DBKeyManipulator::extractKeyFromMetadataKey(const Key &_composedKey) { +Key DBKeyManipulator::extractKeyFromMetadataKey(const Key &_composedKey) { size_t sz = _composedKey.length() - sizeof(EDBKeyType); Sliver out = Sliver(_composedKey, sizeof(EDBKeyType), sz); LOG_TRACE(logger(), "Got metadata key " << out << " from composed key " << _composedKey); @@ -328,12 +357,13 @@ KeyValuePair DBKeyManipulator::composedToSimple(KeyValuePair _p) { return KeyValuePair(key, _p.second); } -DBAdapter::DBAdapter(std::shared_ptr db, std::unique_ptr keyGen) +DBAdapter::DBAdapter(std::shared_ptr db, std::unique_ptr keyGen, bool use_mdt) : logger_{concordlogger::Log::getLogger("concord.kvbc.v1DirectKeyValue.DBAdapter")}, - db_{db}, - keyGen_{std::move(keyGen)} { - db_->init(false); -} + db_(db), + keyGen_{std::move(keyGen)}, + mdt_{use_mdt}, + lastBlockId_{fetchLatestBlockId()}, + lastReachableBlockId_{fetchLastReachableBlockId()} {} BlockId DBAdapter::addBlock(const SetOfKeyValuePairs &kv) { BlockId blockId = getLastReachableBlockId() + 1; @@ -349,6 +379,10 @@ BlockId DBAdapter::addBlock(const SetOfKeyValuePairs &kv) { const auto block = block::detail::create(kv, outKv, blockDigest); if (Status s = addBlockAndUpdateMultiKey(outKv, blockId, block); !s.isOK()) throw std::runtime_error(__PRETTY_FUNCTION__ + std::string(": failed: ") + s.toString()); + + setLatestBlock(blockId); + setLastReachableBlockNum(blockId); + return blockId; } @@ -360,6 +394,14 @@ void DBAdapter::addRawBlock(const RawBlock &block, const BlockId &blockId) { if (Status s = addBlockAndUpdateMultiKey(keys, blockId, block); !s.isOK()) throw std::runtime_error(__PRETTY_FUNCTION__ + std::string(": failed: blockId: ") + std::to_string(blockId) + std::string(" reason: ") + s.toString()); + + // when ST runs, blocks arrive in batches in reverse order. we need to keep + // track on the "Gap" and to close it. Only when it is closed, the last + // reachable block becomes the same as the last block + BlockId lastReachableBlock = getLastReachableBlockId(); + if (blockId > getLatestBlockId()) setLatestBlock(blockId); + + if (blockId == lastReachableBlock + 1) setLastReachableBlockNum(getLatestBlockId()); } Status DBAdapter::addBlockAndUpdateMultiKey(const SetOfKeyValuePairs &_kvMap, @@ -396,10 +438,12 @@ void DBAdapter::deleteBlock(const BlockId &blockId) { keysVec.push_back(keyGen_->blockKey(blockId)); - if (Status s = db_->multiDel(keysVec); !s.isOK()) { - LOG_FATAL(logger_, "Failed to delete block id: " << blockId); - exit(1); - } + if (Status s = db_->multiDel(keysVec); !s.isOK()) + throw std::runtime_error(__PRETTY_FUNCTION__ + std::string(": failed: blockId: ") + std::to_string(blockId) + + std::string(" reason: ") + s.toString()); + Assert(hasBlock(blockId - 1)); + setLatestBlock(blockId - 1); + setLastReachableBlockNum(blockId - 1); } catch (const NotFoundException &e) { } } @@ -455,6 +499,10 @@ RawBlock DBAdapter::getRawBlock(const BlockId &blockId) const { return blockRaw; } +bool DBAdapter::hasBlock(const BlockId &blockId) const { + if (Status s = db_->has(keyGen_->blockKey(blockId)); s.isNotFound()) return false; + return true; +} // TODO(SG): Add status checks with getStatus() on iterator. // TODO(JGC): unserstand difference between .second and .data() @@ -465,7 +513,8 @@ RawBlock DBAdapter::getRawBlock(const BlockId &blockId) const { * * @return Block ID of the latest block. */ -BlockId DBAdapter::getLatestBlockId() const { +BlockId DBAdapter::fetchLatestBlockId() const { + if (mdt_) return mdtGetLatestBlockId(); // Note: RocksDB stores keys in a sorted fashion as per the logic provided in // a custom comparator (for our case, refer to the `composedKeyComparison` // method above). In short, keys of type 'block' are stored first followed by @@ -500,10 +549,12 @@ BlockId DBAdapter::getLatestBlockId() const { * * @return Block ID of the last reachable block. */ -BlockId DBAdapter::getLastReachableBlockId() const { - storage::IDBClient::IDBClientIterator *iter = db_->getIterator(); +BlockId DBAdapter::fetchLastReachableBlockId() const { + if (mdt_) return mdtGetLastReachableBlockId(); BlockId lastReachableId = 0; + storage::IDBClient::IDBClientIterator *iter = db_->getIterator(); + Sliver blockKey = keyGen_->blockKey(1); KeyValuePair kvp = iter->seekAtLeast(blockKey); if (kvp.first.length() == 0) { @@ -525,6 +576,53 @@ BlockId DBAdapter::getLastReachableBlockId() const { return lastReachableId; } +BlockId DBAdapter::mdtGetLatestBlockId() const { + static Key lastBlockIdKey = keyGen_->mdtKey(std::string("last-block-id")); + Sliver val; + if (Status s = mdtGet(lastBlockIdKey, val); s.isOK()) + return concord::util::to(val.toString()); + else + return 0; +} + +BlockId DBAdapter::mdtGetLastReachableBlockId() const { + static Key lastReachableBlockIdKey = keyGen_->mdtKey(std::string("last-reachable-block-id")); + Sliver val; + if (Status s = mdtGet(lastReachableBlockIdKey, val); s.isOK()) + return concord::util::to(val.toString()); + else + return 0; +} + +void DBAdapter::setLastReachableBlockNum(const BlockId &blockId) { + lastReachableBlockId_ = blockId; + if (mdt_) { + static Key lastReachableBlockIdKey = keyGen_->mdtKey(std::string("last-reachable-block-id")); + mdtPut(lastReachableBlockIdKey, std::to_string(blockId)); + } +} + +void DBAdapter::setLatestBlock(const BlockId &blockId) { + lastBlockId_ = blockId; + if (mdt_) { + static Key lastBlockIdKey = keyGen_->mdtKey(std::string("last-block-id")); + mdtPut(lastBlockIdKey, std::to_string(blockId)); + } +} +Status DBAdapter::mdtPut(const concordUtils::Sliver &key, const concordUtils::Sliver &val) { + if (Status s = db_->put(key, val); s.isOK()) + return s; + else + throw std::runtime_error("failed to put key: " + key.toString() + std::string(" reason: ") + s.toString()); +} + +Status DBAdapter::mdtGet(const concordUtils::Sliver &key, concordUtils::Sliver &val) const { + if (Status s = db_->get(key, val); s.isOK() || s.isNotFound()) + return s; + else + throw std::runtime_error("failed to get key: " + key.toString() + std::string(" reason: ") + s.toString()); +} + SetOfKeyValuePairs DBAdapter::getBlockData(const RawBlock &rawBlock) const { return block::detail::getData(rawBlock); } BlockDigest DBAdapter::getParentDigest(const RawBlock &rawBlock) const { diff --git a/kvbc/test/CMakeLists.txt b/kvbc/test/CMakeLists.txt index d9529ef65f..4465deedfd 100644 --- a/kvbc/test/CMakeLists.txt +++ b/kvbc/test/CMakeLists.txt @@ -14,10 +14,14 @@ endif() add_executable(order_test order_test.cpp $) add_test(order_test order_test) -target_include_directories(order_test PUBLIC ${libkvbc_SOURCE_DIR}/include) -target_link_libraries(order_test GTest::Main util) +target_link_libraries(order_test GTest::Main kvbc util) target_compile_options(order_test PUBLIC -Wno-sign-compare) +add_executable(kvbc_dbadapter_test kvbc_dbadapter_test.cpp $) +add_test(kvbc_dbadapter_test kvbc_dbadapter_test) +target_link_libraries(kvbc_dbadapter_test GTest::Main kvbc util) + + add_executable(sparse_merkle_storage_db_adapter_unit_test sparse_merkle_storage/db_adapter_unit_test.cpp $) add_test(sparse_merkle_storage_db_adapter_unit_test sparse_merkle_storage_db_adapter_unit_test) @@ -31,32 +35,32 @@ target_link_libraries(sparse_merkle_storage_db_adapter_unit_test PUBLIC ) if(RAPIDCHECK_FOUND) - add_executable(sparse_merkle_storage_db_adapter_property_test - sparse_merkle_storage/db_adapter_property_test.cpp $) - add_test(sparse_merkle_storage_db_adapter_property_test sparse_merkle_storage_db_adapter_property_test) - target_include_directories(sparse_merkle_storage_db_adapter_property_test PRIVATE ${RAPIDCHECK_INCLUDE_DIRS}) - target_link_libraries(sparse_merkle_storage_db_adapter_property_test PUBLIC - GTest::Main - GTest::GTest - ${RAPIDCHECK_LIBRARIES} +add_executable(sparse_merkle_storage_db_adapter_property_test + sparse_merkle_storage/db_adapter_property_test.cpp $) +add_test(sparse_merkle_storage_db_adapter_property_test sparse_merkle_storage_db_adapter_property_test) +target_include_directories(sparse_merkle_storage_db_adapter_property_test PRIVATE ${RAPIDCHECK_INCLUDE_DIRS}) +target_link_libraries(sparse_merkle_storage_db_adapter_property_test PUBLIC + GTest::Main + GTest::GTest + ${RAPIDCHECK_LIBRARIES} util corebft kvbc stdc++fs ) - add_executable(sparse_merkle_internal_node_property_test sparse_merkle/internal_node_property_tests.cpp - $) - add_test(sparse_merkle_internal_node_property_test sparse_merkle_internal_node_property_test) - target_include_directories(sparse_merkle_internal_node_property_test PRIVATE ${RAPIDCHECK_INCLUDE_DIRS}) - target_link_libraries(sparse_merkle_internal_node_property_test PUBLIC - GTest::Main - GTest::GTest - ${RAPIDCHECK_LIBRARIES} - util - kvbc - OpenSSL::Crypto - ) +add_executable(sparse_merkle_internal_node_property_test sparse_merkle/internal_node_property_tests.cpp +$) +add_test(sparse_merkle_internal_node_property_test sparse_merkle_internal_node_property_test) +target_include_directories(sparse_merkle_internal_node_property_test PRIVATE ${RAPIDCHECK_INCLUDE_DIRS}) +target_link_libraries(sparse_merkle_internal_node_property_test PUBLIC + GTest::Main + GTest::GTest + ${RAPIDCHECK_LIBRARIES} + util + kvbc + OpenSSL::Crypto +) endif () add_executable(sparse_merkle_base_types_test sparse_merkle/base_types_test.cpp @@ -94,14 +98,14 @@ target_link_libraries(sparse_merkle_tree_test PUBLIC ) if (BUILD_ROCKSDB_STORAGE) - add_executable(multiIO_test multiIO_test.cpp $) - add_test(multiIO_test multiIO_test) +add_executable(multiIO_test multiIO_test.cpp $) +add_test(multiIO_test multiIO_test) - target_link_libraries(multiIO_test PUBLIC - GTest::Main - GTest::GTest +target_link_libraries(multiIO_test PUBLIC + GTest::Main + GTest::GTest util - kvbc + kvbc corebft - ) +) endif(BUILD_ROCKSDB_STORAGE) diff --git a/kvbc/test/kvbc_dbadapter_test.cpp b/kvbc/test/kvbc_dbadapter_test.cpp new file mode 100644 index 0000000000..7fef0f8fd6 --- /dev/null +++ b/kvbc/test/kvbc_dbadapter_test.cpp @@ -0,0 +1,72 @@ +// Copyright 2019 VMware, all rights reserved +/** + * Test multi* functions for RocksDBClient class. + */ + +#include "Logger.hpp" +#include "gtest/gtest.h" +#include "kv_types.hpp" +#include "db_adapter.h" + +#ifdef USE_ROCKSDB +#include "rocksdb/client.h" +#include "rocksdb/key_comparator.h" +#endif + +using namespace std; + +using concordUtils::Status; +using concordUtils::Sliver; +using concord::kvbc::KeysVector; +using concord::kvbc::KeyValuePair; +using concord::kvbc::SetOfKeyValuePairs; +using concord::kvbc::BlockId; +// using concord::storage::rocksdb::Client; +// using concord::storage::rocksdb::KeyComparator; +using concord::storage::ITransaction; +using concord::kvbc::DBKeyManipulator; +using concord::kvbc::DBKeyComparator; + +namespace { + +std::unique_ptr dbClient; + +class kvbc_dbadapter_test : public ::testing::Test { + protected: + void SetUp() override { + // keyGen_.reset(new concord::kvbc::KeyGenerator); + // comparator_ = new KeyComparator(new DBKeyComparator()); + // dbClient.reset(new Client(dbPath_, comparator_)); + // dbClient->init(); + } + + void TearDown() override { + // dbClient.reset(); + // delete comparator_; + // string cmd = string("rm -rf ") + dbPath_; + // if (system(cmd.c_str())) { + // ASSERT_TRUE(false); + // } + } + + std::unique_ptr keyGen_; + const string dbPath_ = "./rocksdb_test"; + // KeyComparator *comparator_; +}; + +TEST_F(kvbc_dbadapter_test, basic) { + // auto config = get_dbadapter_configuration(); + // IDbAdapter* dbAdapter = new concord::kvbc::DBAdapter(std::shared_ptr(std::get<0>(config)), + // std::shared_ptr(std::get<1>(config)), + // std::unique_ptr(std::get<2>(config))); + // dbAdapter->getDb(); +} + +} // end namespace + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + + int res = RUN_ALL_TESTS(); + return res; +} diff --git a/kvbc/test/multiIO_test.cpp b/kvbc/test/multiIO_test.cpp index 37e1cdcba8..3e3684f831 100644 --- a/kvbc/test/multiIO_test.cpp +++ b/kvbc/test/multiIO_test.cpp @@ -64,7 +64,7 @@ void verifyMultiDel(KeysVector &keys) { class multiIO_test : public ::testing::Test { protected: void SetUp() override { - keyGen_.reset(new concord::kvbc::KeyGenerator); + keyGen_.reset(new concord::kvbc::RocksKeyGenerator); comparator_ = new KeyComparator(new DBKeyComparator()); dbClient.reset(new Client(dbPath_, comparator_)); dbClient->init(); diff --git a/logging/src/Logger.cpp b/logging/src/Logger.cpp index a876f13658..3ab3a2ab3d 100644 --- a/logging/src/Logger.cpp +++ b/logging/src/Logger.cpp @@ -26,8 +26,7 @@ concordlogger::MDC::~MDC() { logger_.removeMdc(key_); } concordlogger::Logger initLogger() { log4cplus::SharedAppenderPtr ca_ptr = log4cplus::SharedAppenderPtr(new log4cplus::ConsoleAppender(false, true)); - ca_ptr->setLayout( - std::auto_ptr(new log4cplus::PatternLayout("[Node %X{rid}] [%t] %%%X%% %-5p|%c||%M|%m|%n "))); + ca_ptr->setLayout(std::auto_ptr(new log4cplus::PatternLayout("Node %X{rid}|%t|%-5p|%c|%M|%m%n"))); log4cplus::Logger::getRoot().addAppender(ca_ptr); log4cplus::Logger::getRoot().setLogLevel(log4cplus::INFO_LOG_LEVEL); return log4cplus::Logger::getInstance(LOG4CPLUS_TEXT(DEFAULT_LOGGER_NAME)); diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index f538c0b8df..aad27fb4b4 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -1,9 +1,14 @@ add_library(concordbft_storage STATIC src/memorydb_client.cpp - src/key_manipulator.cpp -) + src/key_manipulator.cpp) target_include_directories(concordbft_storage PUBLIC include) +if (USE_S3_OBJECT_STORE) +target_compile_definitions(concordbft_storage PUBLIC USE_S3_OBJECT_STORE=1) +target_sources(concordbft_storage PRIVATE src/s3/client.cpp) +target_link_libraries(concordbft_storage PRIVATE s3) +endif(USE_S3_OBJECT_STORE) + if (BUILD_ROCKSDB_STORAGE) #TODO [TK] find_package find_library(ROCKSDB rocksdb) diff --git a/storage/include/memorydb/client.h b/storage/include/memorydb/client.h index 7fc7fff4ba..838f7091fd 100644 --- a/storage/include/memorydb/client.h +++ b/storage/include/memorydb/client.h @@ -36,15 +36,15 @@ class ClientIterator : public concord::storage::IDBClient::IDBClientIterator { virtual ~ClientIterator() {} // Inherited via IDBClientIterator - virtual KeyValuePair first() override; - virtual KeyValuePair last() override; - virtual KeyValuePair seekAtLeast(const Sliver &_searchKey) override; - virtual KeyValuePair seekAtMost(const Sliver &_searchKey) override; - virtual KeyValuePair previous() override; - virtual KeyValuePair next() override; - virtual KeyValuePair getCurrent() override; - virtual bool isEnd() override; - virtual concordUtils::Status getStatus() override; + KeyValuePair first() override; + KeyValuePair last() override; + KeyValuePair seekAtLeast(const Sliver &_searchKey) override; + KeyValuePair seekAtMost(const Sliver &_searchKey) override; + KeyValuePair previous() override; + KeyValuePair next() override; + KeyValuePair getCurrent() override; + bool isEnd() override; + Status getStatus() override; private: concordlogger::Logger logger; @@ -70,9 +70,10 @@ class Client : public IDBClient { comp_(comp), map_([this](const Sliver &a, const Sliver &b) { return comp_(a, b); }) {} - virtual void init(bool readOnly) override; - virtual Status get(const Sliver &_key, OUT Sliver &_outValue) const override; - Status get(const Sliver &_key, OUT char *&buf, uint32_t bufSize, OUT uint32_t &_size) const override; + void init(bool readOnly) override; + concordUtils::Status get(const Sliver &_key, OUT Sliver &_outValue) const override; + concordUtils::Status get(const Sliver &_key, OUT char *&buf, uint32_t bufSize, OUT uint32_t &_size) const override; + concordUtils::Status has(const Sliver &_key) const override; virtual IDBClientIterator *getIterator() const override; virtual concordUtils::Status freeIterator(IDBClientIterator *_iter) const override; virtual concordUtils::Status put(const Sliver &_key, const Sliver &_value) override; diff --git a/storage/include/object_store/object_store_client.hpp b/storage/include/object_store/object_store_client.hpp new file mode 100644 index 0000000000..38286143c1 --- /dev/null +++ b/storage/include/object_store/object_store_client.hpp @@ -0,0 +1,59 @@ +// Copyright 2018 VMware, all rights reserved + +#pragma once + +#include +#include "storage/db_interface.h" + +namespace concord::storage { + +using namespace concordUtils; + +/** + * @brief This class implements Abstract object store client assuming S3 + * protocol + * + */ +class ObjectStoreClient : public IDBClient { + public: + ObjectStoreClient(ObjectStoreClient&) = delete; + ObjectStoreClient() = delete; + ~ObjectStoreClient() = default; + + ObjectStoreClient(IDBClient* impl) : pImpl_(impl) {} + + void init(bool readOnly) { return pImpl_->init(readOnly); } + + Status get(const Sliver& _key, OUT Sliver& _outValue) const { return pImpl_->get(_key, _outValue); } + + Status get(const Sliver& _key, OUT char*& buf, uint32_t bufSize, OUT uint32_t& _size) const { + return pImpl_->get(_key, buf, bufSize, _size); + } + + Status put(const Sliver& _key, const Sliver& _value) { return pImpl_->put(_key, _value); } + + Status del(const Sliver& _key) { return pImpl_->del(_key); } + + Status multiGet(const KeysVector& _keysVec, OUT ValuesVector& _valuesVec) { + return pImpl_->multiGet(_keysVec, _valuesVec); + } + + Status multiPut(const SetOfKeyValuePairs& _keyValueMap) { return pImpl_->multiPut(_keyValueMap); } + + Status multiDel(const KeysVector& _keysVec) { return pImpl_->multiDel(_keysVec); } + + bool isNew() { return pImpl_->isNew(); } + + IDBClient::IDBClientIterator* getIterator() const { return pImpl_->getIterator(); } + + Status freeIterator(IDBClientIterator* _iter) const { return pImpl_->freeIterator(_iter); } + + ITransaction* beginTransaction() { return pImpl_->beginTransaction(); } + + Status has(const Sliver& key) const { return pImpl_->has(key); } + + protected: + std::shared_ptr pImpl_ = nullptr; +}; + +} // namespace concord::storage diff --git a/storage/include/rocksdb/client.h b/storage/include/rocksdb/client.h index b3491e3ee9..162f1ccb43 100644 --- a/storage/include/rocksdb/client.h +++ b/storage/include/rocksdb/client.h @@ -77,6 +77,7 @@ class Client : public concord::storage::IDBClient { char*& buf, uint32_t bufSize, uint32_t& _realSize) const override; + concordUtils::Status has(const Sliver& _key) const override; IDBClientIterator* getIterator() const override; concordUtils::Status freeIterator(IDBClientIterator* _iter) const override; concordUtils::Status put(const concordUtils::Sliver& _key, const concordUtils::Sliver& _value) override; diff --git a/storage/include/s3/client.hpp b/storage/include/s3/client.hpp new file mode 100644 index 0000000000..16098d761f --- /dev/null +++ b/storage/include/s3/client.hpp @@ -0,0 +1,291 @@ +// Concord +// +// Copyright (c) 2020 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the +// "License"). You may not use this product except in compliance with the +// Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright +// notices and license terms. Your use of these subcomponents is subject to the +// terms and conditions of the subcomponent's license, as noted in the LICENSE +// file. +// +// This convenience header combines different block implementations. + +#include +#include +#include +#include "Logger.hpp" +#include "assertUtils.hpp" +#include "storage/db_interface.h" + +namespace concord::storage::s3 { + +struct StoreConfig { + std::string bucketName; // assuming pre-configured, no need to create + std::string url; // from the customer + std::string protocol; // currently tested with HTTP + std::string secretKey; // from the customer + std::string accessKey; // from the customer + std::uint32_t maxWaitTime; // in milliseconds +}; + +/** + * @brief Internal implementation of ECS EMC S3 client based on libs3 + * Most of the method are irrelevant for the object store and throw exception if + * called. We assume that only ObjectStoreAppState class will use this class + * + */ +class Client : public concord::storage::IDBClient { + public: + /** + * Transaction class. Is used also internally to implement multiPut() + */ + class Transaction : public ITransaction { + public: + Transaction(Client* client) : ITransaction(nextId()), client_{client} {} + void commit() override { + for (auto&& pair : multiput_) + if (concordUtils::Status s = client_->put(pair.first, pair.second); !s.isOK()) + throw std::runtime_error("S3 client error: commit failed txn id[" + getIdStr() + std::string("], reason: ") + + s.toString()); + } + void rollback() override { multiput_.clear(); } + void put(const concordUtils::Sliver& key, const concordUtils::Sliver& value) override { multiput_[key] = value; } + std::string get(const concordUtils::Sliver& key) override { return multiput_[key].toString(); } + void del(const concordUtils::Sliver& key) override { multiput_.erase(key); } + + protected: + Client* client_; + SetOfKeyValuePairs multiput_; + ID nextId() { + static ID id_ = 0; + return ++id_; + } + }; + + Client(const StoreConfig& config) : config_{config} { LOG_INFO(logger_, "S3 client created"); } + + ~Client() { + /* Destroy LibS3 */ + S3_deinitialize(); + init_ = false; + LOG_INFO(logger_, "libs3 deinit"); + } + + /** + * @brief used only from the test! name should be immutable string literal + * + * @param name + */ + void set_bucket_name(std::string name) { config_.bucketName = name; } + + /** + * @brief Initializing underlying libs3. The S3_initialize function must be + * called exactly once and only from 1 thread + * + * @param readOnly + */ + void init(bool readOnly) override; + /** + * @brief Get object from the store. + * + * @param _key key to be retrieved + * @param _outValue returned object + * @return OK if success, otherwise GeneralError with underlying error status + */ + concordUtils::Status get(const concordUtils::Sliver& _key, OUT concordUtils::Sliver& _outValue) const override { + LOG_DEBUG(logger_, _key.toString()); + using namespace std::placeholders; + concordUtils::Status res = concordUtils::Status::OK(); + std::function f = + std::bind(&Client::get_internal, this, _1, _2); + do_with_retry(res, f, _key, _outValue); + return res; + } + + concordUtils::Status get(const concordUtils::Sliver& _key, + OUT char*& buf, + uint32_t bufSize, + OUT uint32_t& _size) const override { + concordUtils::Sliver res; + if (Status s = get(_key, res); !s.isOK()) return s; + size_t len = std::min(res.length(), (size_t)bufSize); + memcpy(buf, res.data(), len); + _size = len; + return concordUtils::Status::OK(); + } + + /** + * @brief Put object to the store. + * + * @param _key object's key + * @param _value object + * @return OK if success, otherwise GeneralError with underlying error status + */ + concordUtils::Status put(const concordUtils::Sliver& _key, const concordUtils::Sliver& _value) override { + using namespace std::placeholders; + std::function f = + std::bind(&Client::put_internal, this, _1, _2); + concordUtils::Status res = concordUtils::Status::OK(); + do_with_retry(res, f, _key, _value); + return res; + } + + concordUtils::Status test_bucket() { + using namespace std::placeholders; + std::function f = std::bind(&Client::test_bucket_internal, this); + concordUtils::Status res = concordUtils::Status::OK(); + + do_with_retry(res, f); + return res; + } + + concordUtils::Status has(const concordUtils::Sliver& key) const override { + using namespace std::placeholders; + std::function f = std::bind(&Client::object_exists_internal, this, _1); + concordUtils::Status res = concordUtils::Status::OK(); + do_with_retry(res, f, key); + return res; + } + + concordUtils::Status del(const concordUtils::Sliver& key) override; + + concordUtils::Status multiGet(const KeysVector& _keysVec, OUT ValuesVector& _valuesVec) override { + Assert(_keysVec.size() == _valuesVec.size()); + + for (KeysVector::size_type i = 0; i < _keysVec.size(); ++i) + if (Status s = get(_keysVec[i], _valuesVec[i]); !s.isOK()) return s; + + return concordUtils::Status::OK(); + } + + concordUtils::Status multiPut(const SetOfKeyValuePairs& _keyValueMap) override { + ITransaction::Guard g(beginTransaction()); + for (auto&& pair : _keyValueMap) g.txn()->put(pair.first, pair.second); + + return concordUtils::Status::OK(); + } + + concordUtils::Status multiDel(const KeysVector& _keysVec) override { + for (auto&& key : _keysVec) + if (Status s = del(key); !s.isOK()) return s; + return concordUtils::Status::OK(); + } + + bool isNew() override { throw std::logic_error("isNew() Not implemented for ECS S3 object store"); } + + IDBClient::IDBClientIterator* getIterator() const override { + Assert("getIterator() Not implemented for ECS S3 object store" && false); + throw std::logic_error("getIterator() Not implemented for ECS S3 object store"); + } + + concordUtils::Status freeIterator(IDBClientIterator* _iter) const override { + throw std::logic_error("freeIterator() Not implemented for ECS S3 object store"); + } + + ITransaction* beginTransaction() override { return new Transaction(this); } + + ///////////////////////// protected ///////////////////////////// + protected: + // retry forever, increasing the waiting timeout until it reaches the defined maximum + template + void do_with_retry(Status& r, F&& f, Args&&... args) const { + uint16_t delay = initialDelay_; + do { + r = std::forward(f)(std::forward(args)...); + if (!r.isGeneralError()) break; + if (delay < config_.maxWaitTime) delay *= delayFactor_; + LOG_INFO(logger_, "retrying " << typeid(f).name() << " after delay: " << delay); + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); + } while (!r.isOK() || !r.isNotFound()); + } + + concordUtils::Status get_internal(const concordUtils::Sliver& _key, OUT concordUtils::Sliver& _outValue) const; + + concordUtils::Status put_internal(const concordUtils::Sliver& _key, const concordUtils::Sliver& _value); + + concordUtils::Status object_exists_internal(const concordUtils::Sliver& key) const; + + concordUtils::Status test_bucket_internal(); + + struct ResponseData { + S3Status status = S3Status::S3StatusOK; + std::string errorMessage; + }; + + /** + * @brief Represents intermidiate data for get operation and is passed to + * the lambda callback. + * + */ + struct GetObjectResponseData : public ResponseData { + GetObjectResponseData(size_t linitialLength) : data(new char[linitialLength]), dataLength(linitialLength) {} + + void CheckAndResize(int& nextReadLength) { + if (readLength + nextReadLength <= dataLength) return; + + dataLength *= 2; + char* newData = new char[dataLength]; + memcpy(newData, data, readLength); + delete[] data; + data = newData; + } + + void Write(const char* _data, int& dataSize) { + CheckAndResize(dataSize); + memcpy(data + readLength, _data, dataSize); + readLength += dataSize; + } + + ~GetObjectResponseData() { delete[] data; } + + char* data = nullptr; + size_t dataLength = 0; + size_t readLength = 0; + }; + + /** + * @brief Represents intermidiate data for put operation and is passed to + * the lambda callback. + * + */ + struct PutObjectResponseData : public ResponseData { + PutObjectResponseData(const char* _data, size_t&& _dataLength) : data(_data), dataLength(_dataLength) {} + + const char* data; + size_t dataLength; + size_t putCount = 0; + }; + + static void responseCompleteCallback(S3Status status, const S3ErrorDetails* error, void* callbackData) { + ResponseData* cb = nullptr; + if (callbackData) { + cb = static_cast(callbackData); + cb->status = status; + } + if (error) { + if (error->message) { + if (cb) cb->errorMessage = std::string(error->message); + } + } + } + + static S3Status propertiesCallback(const S3ResponseProperties* properties, void* callbackData) { + return S3Status::S3StatusOK; + } + + S3ResponseHandler responseHandler = {NULL, &responseCompleteCallback}; + StoreConfig config_; + S3BucketContext context_; + bool init_ = false; + const uint32_t kInitialGetBufferSize_ = 25000; + std::mutex initLock_; + concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.storage.s3"); + + uint16_t initialDelay_ = 100; + const double delayFactor_ = 1.5; +}; + +} // namespace concord::storage::s3 diff --git a/storage/include/storage/db_interface.h b/storage/include/storage/db_interface.h index 9c313d7ac6..bd1dbb466b 100644 --- a/storage/include/storage/db_interface.h +++ b/storage/include/storage/db_interface.h @@ -57,6 +57,7 @@ class IDBClient { virtual void init(bool readOnly = false) = 0; virtual Status get(const Sliver& _key, OUT Sliver& _outValue) const = 0; virtual Status get(const Sliver& _key, OUT char*& buf, uint32_t bufSize, OUT uint32_t& _size) const = 0; + virtual Status has(const Sliver& _key) const = 0; virtual Status put(const Sliver& _key, const Sliver& _value) = 0; virtual Status del(const Sliver& _key) = 0; virtual Status multiGet(const KeysVector& _keysVec, OUT ValuesVector& _valuesVec) = 0; diff --git a/storage/include/storage/db_types.h b/storage/include/storage/db_types.h index 8ca1119ece..779e143cf8 100644 --- a/storage/include/storage/db_types.h +++ b/storage/include/storage/db_types.h @@ -22,7 +22,7 @@ inline namespace v1DirectKeyValue { namespace detail { enum class EDBKeyType : std::uint8_t { - E_DB_KEY_TYPE_FIRST = 1, + E_DB_KEY_TYPE_FIRST = 65, E_DB_KEY_TYPE_BLOCK = E_DB_KEY_TYPE_FIRST, E_DB_KEY_TYPE_KEY, E_DB_KEY_TYPE_BFT_METADATA_KEY, diff --git a/storage/include/storage/key_manipulator.hpp b/storage/include/storage/key_manipulator.hpp index ebea839708..f150480dd4 100644 --- a/storage/include/storage/key_manipulator.hpp +++ b/storage/include/storage/key_manipulator.hpp @@ -12,29 +12,29 @@ namespace concord::storage { inline namespace v1DirectKeyValue { -class DBKeyManipulatorBase { +class DBKeyGeneratorBase { protected: - static bool copyToAndAdvance(char *_buf, size_t *_offset, size_t _maxOffset, char *_src, size_t _srcSize); - static concordlogger::Logger &logger() { + static bool copyToAndAdvance(char* buf, size_t* offset, size_t _maxOffset, const char* _src, const size_t& _srcSize); + static concordlogger::Logger& logger() { static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.storage.blockchain.DBKeyManipulator"); return logger_; } }; -class MetadataKeyManipulator : public DBKeyManipulatorBase { +class MetadataKeyManipulator : public DBKeyGeneratorBase { public: static concordUtils::Sliver generateMetadataKey(ObjectId objectId); }; -class STKeyManipulator : public DBKeyManipulatorBase { +class STKeyManipulator : public DBKeyGeneratorBase { public: static concordUtils::Sliver generateStateTransferKey(ObjectId objectId); static concordUtils::Sliver generateSTPendingPageKey(uint32_t pageid); static concordUtils::Sliver generateSTCheckpointDescriptorKey(uint64_t chkpt); static concordUtils::Sliver generateSTReservedPageStaticKey(uint32_t pageid, uint64_t chkpt); static concordUtils::Sliver generateSTReservedPageDynamicKey(uint32_t pageid, uint64_t chkpt); - static uint64_t extractCheckPointFromKey(const char *_key_data, size_t _key_length); - static std::pair extractPageIdAndCheckpointFromKey(const char *_key_data, size_t _key_length); + static uint64_t extractCheckPointFromKey(const char* _key_data, size_t _key_length); + static std::pair extractPageIdAndCheckpointFromKey(const char* _key_data, size_t _key_length); private: static Sliver generateReservedPageKey(detail::EDBKeyType, uint32_t pageid, uint64_t chkpt); diff --git a/storage/src/key_manipulator.cpp b/storage/src/key_manipulator.cpp index 16e1a52f19..23dbf89adb 100644 --- a/storage/src/key_manipulator.cpp +++ b/storage/src/key_manipulator.cpp @@ -111,8 +111,8 @@ Sliver STKeyManipulator::generateReservedPageKey(EDBKeyType keyType, uint32_t pa return Sliver(keyBuf, keySize); } -bool DBKeyManipulatorBase::copyToAndAdvance( - char *_buf, size_t *_offset, size_t _maxOffset, char *_src, size_t _srcSize) { +bool DBKeyGeneratorBase::copyToAndAdvance( + char *_buf, size_t *_offset, size_t _maxOffset, const char *_src, const size_t &_srcSize) { if (!_buf && !_offset && !_src) assert(false); if (*_offset >= _maxOffset && _srcSize > 0) assert(false); diff --git a/storage/src/memorydb_client.cpp b/storage/src/memorydb_client.cpp index efcdf28b32..fc499f67d1 100644 --- a/storage/src/memorydb_client.cpp +++ b/storage/src/memorydb_client.cpp @@ -59,6 +59,11 @@ Status Client::get(const Sliver &_key, OUT char *&buf, uint32_t bufSize, OUT uin return status; } +Status Client::has(const Sliver &_key) const { + Sliver dummy_out; + return get(_key, dummy_out); +} + /** * @brief Returns reference to a new object of IDBClientIterator. * diff --git a/storage/src/rocksdb_client.cpp b/storage/src/rocksdb_client.cpp index 331863b000..2b05535e72 100644 --- a/storage/src/rocksdb_client.cpp +++ b/storage/src/rocksdb_client.cpp @@ -178,6 +178,11 @@ Status Client::get(const Sliver &_key, OUT char *&buf, uint32_t bufSize, OUT uin return Status::OK(); } +Status Client::has(const Sliver &_key) const { + Sliver dummy_out; + return get(_key, dummy_out); +} + /** * @brief Returns a RocksDBClientIterator object. * diff --git a/storage/src/s3/client.cpp b/storage/src/s3/client.cpp new file mode 100644 index 0000000000..87a1ee6377 --- /dev/null +++ b/storage/src/s3/client.cpp @@ -0,0 +1,207 @@ +// Concord +// +// Copyright (c) 2020 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the +// "License"). You may not use this product except in compliance with the +// Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright +// notices and license terms. Your use of these subcomponents is subject to the +// terms and conditions of the subcomponent's license, as noted in the LICENSE +// file. +// +// This convenience header combines different block implementations. + +#include "s3/client.hpp" +#include +#include +#include +#include +#include +#include "Logger.hpp" +#include "assertUtils.hpp" +#include "storage/db_interface.h" + +namespace concord::storage::s3 { + +using namespace std; +using namespace concordUtils; + +/** + * @brief Initializing underlying libs3. The S3_initialize function must be + * called exactly once and only from 1 thread + * + * @param readOnly + */ +void Client::init(bool readOnly) { + std::lock_guard g(initLock_); + Assert(!init_); // we may return here instead of firing assert failure, but + // probably we want to catch bugs? + context_.hostName = config_.url.c_str(); + context_.protocol = (config_.protocol == "HTTP" ? S3ProtocolHTTP : S3ProtocolHTTPS); + context_.uriStyle = S3UriStylePath; + /* In ECS terms, this is your object user */ + context_.accessKeyId = config_.accessKey.c_str(); + context_.secretAccessKey = config_.secretKey.c_str(); + /* The name of a bucket to use */ + context_.bucketName = config_.bucketName.c_str(); + + S3Status st = S3_initialize(NULL, S3_INIT_ALL, NULL); + if (S3Status::S3StatusOK != st) { + LOG_FATAL(logger_, "libs3 init failed, status: " + to_string(st)); + Assert("libs3 init failed" && false); + } + LOG_INFO(logger_, "libs3 initialized"); + init_ = true; + log4cplus::Logger::getInstance("concord.storage.s3").setLogLevel(log4cplus::DEBUG_LOG_LEVEL); +} + +Status Client::del(const Sliver& key) { + assert(init_); + ResponseData rData; + S3ResponseHandler rHandler; + rHandler.completeCallback = responseCompleteCallback; + rHandler.propertiesCallback = propertiesCallback; + S3_delete_object(&context_, string(key.data()).c_str(), NULL, &rHandler, &rData); + LOG_DEBUG(logger_, + "del key: " << string(key.data()) << ", status: " << rData.status << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusOK) + return Status::OK(); + else { + LOG_ERROR(logger_, + "del key: " << string(key.data()) << ", status: " << rData.status << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusHttpErrorNotFound || rData.status == S3Status::S3StatusErrorNoSuchBucket || + rData.status == S3Status::S3StatusErrorNoSuchKey) + return Status::NotFound("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + + return Status::GeneralError("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + } +} + +Status Client::get_internal(const Sliver& _key, OUT Sliver& _outValue) const { + Assert(init_); + LOG_INFO(logger_, "get key: " << _key.toString()); + GetObjectResponseData cbData(kInitialGetBufferSize_); + S3GetObjectHandler getObjectHandler; + getObjectHandler.responseHandler = responseHandler; + + // libs3 uses multiple calls to this callaback to append chunks of data from + // the stream + auto f = [](int buf_len, const char* buf, void* cb) -> S3Status { + GetObjectResponseData* cbData = static_cast(cb); + Assert(cbData->data != nullptr); + cbData->Write(buf, buf_len); + return S3Status::S3StatusOK; + }; + getObjectHandler.getObjectDataCallback = f; + S3_get_object(&context_, _key.data(), NULL, 0, 0, NULL, &getObjectHandler, &cbData); + if (cbData.status == S3Status::S3StatusOK) { + _outValue = Sliver::copy(reinterpret_cast(cbData.data), cbData.readLength); + return Status::OK(); + } else { + LOG_INFO(logger_, "get status: " << S3_get_status_name(cbData.status)); + if (cbData.status == S3Status::S3StatusHttpErrorNotFound || cbData.status == S3Status::S3StatusErrorNoSuchBucket || + cbData.status == S3Status::S3StatusErrorNoSuchKey) + return Status::NotFound("Status: " + std::string(S3_get_status_name(cbData.status)) + + "msg: " + cbData.errorMessage); + + return Status::GeneralError("Status: " + std::string(S3_get_status_name(cbData.status)) + + "msg: " + cbData.errorMessage); + } +} + +Status Client::put_internal(const Sliver& _key, const Sliver& _value) { + Assert(init_); + PutObjectResponseData cbData(_value.data(), _value.length()); + S3PutObjectHandler putObjectHandler; + putObjectHandler.responseHandler = responseHandler; + + // libs3 uses multiple calls to this callaback to append chunks of data to + // the stream + auto f = [](int buf_len, char* buf, void* cb) { + PutObjectResponseData* cbData = static_cast(cb); + int toSend = 0; + if (cbData->dataLength) { + toSend = std::min(cbData->dataLength, (size_t)buf_len); + std::memcpy(buf, cbData->data + cbData->putCount, toSend); + } + cbData->putCount += toSend; + cbData->dataLength -= toSend; + return toSend; + }; + putObjectHandler.putObjectDataCallback = f; + string s = string(_key.data()); + S3_put_object(&context_, string(_key.data()).c_str(), _value.length(), NULL, NULL, &putObjectHandler, &cbData); + if (cbData.status == S3Status::S3StatusOK) + return Status::OK(); + else { + LOG_ERROR(logger_, "put status: " << cbData.status); + if (cbData.status == S3Status::S3StatusHttpErrorNotFound || cbData.status == S3Status::S3StatusErrorNoSuchBucket) + return Status::NotFound("Status: " + to_string(cbData.status) + "msg: " + cbData.errorMessage); + + return Status::GeneralError("Status: " + to_string(cbData.status) + "msg: " + cbData.errorMessage); + } +} + +Status Client::object_exists_internal(const Sliver& key) const { + Assert(init_); + ResponseData rData; + S3ResponseHandler rHandler; + rHandler.completeCallback = responseCompleteCallback; + rHandler.propertiesCallback = propertiesCallback; + + S3_head_object(&context_, string(key.data()).c_str(), NULL, &rHandler, &rData); + LOG_DEBUG( + logger_, + "object_exist key: " << string(key.data()) << ", status: " << rData.status << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusOK) + return Status::OK(); + else { + LOG_ERROR( + logger_, + "object_exist key: " << string(key.data()) << ", status: " << rData.status << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusHttpErrorNotFound || rData.status == S3Status::S3StatusErrorNoSuchBucket || + rData.status == S3Status::S3StatusErrorNoSuchKey) + return Status::NotFound("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + + return Status::GeneralError("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + } +} + +Status Client::test_bucket_internal() { + Assert(init_); + ResponseData rData; + S3ResponseHandler rHandler; + rHandler.completeCallback = responseCompleteCallback; + rHandler.propertiesCallback = propertiesCallback; + char location[100]; + + S3_test_bucket(context_.protocol, + context_.uriStyle, + context_.accessKeyId, + context_.secretAccessKey, + context_.hostName, + context_.bucketName, + 100, + location, + NULL, + &rHandler, + &rData); + LOG_DEBUG( + logger_, + "test_bucket bucket: " << context_.bucketName << ", status: " << rData.status << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusOK) + return Status::OK(); + else { + LOG_ERROR(logger_, + "test_bucket bucket: " << context_.bucketName << ", status: " << rData.status + << ", msg: " << rData.errorMessage); + if (rData.status == S3Status::S3StatusHttpErrorNotFound || rData.status == S3Status::S3StatusErrorNoSuchBucket) + return Status::NotFound("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + + return Status::GeneralError("Status: " + to_string(rData.status) + "msg: " + rData.errorMessage); + } +} + +} // namespace concord::storage::s3 diff --git a/storage/test/ecs_s3_test.cpp b/storage/test/ecs_s3_test.cpp new file mode 100644 index 0000000000..8717338565 --- /dev/null +++ b/storage/test/ecs_s3_test.cpp @@ -0,0 +1,370 @@ +// Copyright 2019 VMware, all rights reserved +// +// Unit tests for ECS S3 object store replication +// IMPORTANT: the test assume that the S3 storage is up. + +#include +#include "gtest/gtest.h" +#include "object_store/object_store_client.hpp" +#include "s3/client.hpp" + +using namespace concord::storage; + +namespace concord::storage::test { + +/** + * This class is used to tell the ObjectStore client impl to simulate network failure for predefined + * ammount of time. Also it can be used for controlling deletion of objects (useful for tests) + */ +class ObjectStoreBehavior { + private: + bool allowNetworkFailure_ = false; + std::chrono::time_point startTimeForNetworkFailure_; + bool doNetworkFailure_ = false; + uint64_t doNetworkFailureDurMilli_ = 0; + + public: + ObjectStoreBehavior(bool canSimulateNetFailure) : allowNetworkFailure_{canSimulateNetFailure} {} + // set network failure flag and duration + void set_do_net_failure(bool value, uint64_t durationMilli) { + doNetworkFailure_ = value; + doNetworkFailureDurMilli_ = durationMilli; + startTimeForNetworkFailure_ = std::chrono::steady_clock::now(); + } + bool get_do_net_failure() const { return doNetworkFailure_; } + // used to assert whether its possible to simulate network failure + bool can_simulate_net_failure() const { return allowNetworkFailure_; } + // returns whether the network failure period has been elapsed + bool has_finished() const { + std::chrono::time_point time = std::chrono::steady_clock::now(); + auto diff = std::chrono::duration_cast(time - startTimeForNetworkFailure_).count(); + return diff > 0 && (uint64_t)diff > doNetworkFailureDurMilli_; + } +}; +/** + * Ecs S3 test client + */ +class EcsS3TestClient : public concord::storage::s3::Client { + public: + EcsS3TestClient(const S3_StoreConfig &config, const ObjectStoreBehavior &b) + : concord::storage::s3::Client(config), bh_{b} {} + + Status del(const Sliver &key) override { + if (should_net_fail()) return Status::GeneralError("Network failure simulated"); + return concord::storage::s3::Client::del(key); + } + + Status get_internal(const Sliver &_key, OUT Sliver &_outValue) const override { + if (should_net_fail()) return Status::GeneralError("Network failure simulated"); + return concord::storage::s3::Client::get_internal(_key, _outValue); + } + + Status put_internal(const Sliver &_key, const Sliver &_value) override { + if (this->should_net_fail()) return Status::GeneralError("Network failure simulated"); + return concord::storage::s3::Client::put_internal(_key, _value); + } + + Status object_exists_internal(const Sliver &key) const override { + if (should_net_fail()) return Status::GeneralError("Network failure simulated"); + return concord::storage::s3::Client::object_exists_internal(key); + } + Status test_bucket_internal() override { + if (should_net_fail()) return Status::GeneralError("Network failure simulated"); + return concord::storage::s3::Client::test_bucket_internal(); + } + + protected: + bool should_net_fail() const { + bool b1 = bh_.can_simulate_net_failure(); + if (!b1) return false; + bool b3 = bh_.get_do_net_failure(); + if (!b3) return false; + bool b2 = bh_.has_finished(); + if (b2) return false; + return true; + } + + protected: + ObjectStoreBehavior bh_; +}; +/** + * object store test client + */ +class ObjectStoreTestClient : public concord::storage::ObjectStoreClient { + public: + ObjectStoreTestClient(IDBClient *impl) : concord::storage::ObjectStoreClient(impl) {} + Status test_bucket() { return dynamic_pointer_cast(pImpl_)->test_bucket(); } + + protected: +}; + +/** + * Ecs test fixture + */ +class EcsS3Test : public ::testing::Test { + protected: + EcsS3Test() { + config.bucketName = "blockchain-dev-asx"; + config.accessKey = "blockchain-dev"; + config.protocol = "HTTP"; + config.url = "10.70.30.244:9020"; + config.secretKey = "Rz0mdbUNGJBxqdzprn5XGSXPr2AfkgcQsYS4y698"; + config.maxWaitTime = 10000; + } + + void SetClient(ObjectStoreBehavior b = ObjectStoreBehavior(false)) { + client.reset(new ObjectStoreTestClient(new EcsS3TestClient(config, b))); + client->init(); + } + + void SetBucketName(std::string name) { config.bucketName = name; } + + S3_StoreConfig config; + std::shared_ptr client = nullptr; +}; + +/** + * @brief put and get regular string object + * + */ +TEST_F(EcsS3Test, PutGetStringObject) { + SetClient(); + + Sliver key("unit_test_key"); + Sliver value("unit_test_object"); + Status s = client->put(key, value); + + ASSERT_EQ(s, Status::OK()); + Sliver rvalue; + s = client->get(key, rvalue); + ASSERT_TRUE(s.isOK()); + ASSERT_EQ(value, rvalue); +} + +/** + * @brief put and get several regular string objects in batch + * + */ +TEST_F(EcsS3Test, multiPut) { + SetClient(); + SetOfKeyValuePairs pairs; + for (int i = 0; i < 10; ++i) pairs["multiPutKey" + std::to_string(i)] = "multiPutValue" + std::to_string(i); + + ASSERT_EQ(client->multiPut(pairs), Status::OK()); + + KeysVector keys; + for (int i = 0; i < 10; ++i) keys.push_back("multiPutKey" + std::to_string(i)); + + ValuesVector values(10); + ASSERT_EQ(client->multiGet(keys, values), Status::OK()); + for (int i = 0; i < 10; ++i) + ASSERT_STRCASEEQ(values[i].toString().c_str(), std::string("multiPutValue" + std::to_string(i)).c_str()); + + ASSERT_EQ(client->multiDel(keys), Status::OK()); +} + +/** + * @brief put and get regular string object with retry + * + */ +TEST_F(EcsS3Test, PutGetStringObjectRetryOK) { + ObjectStoreBehavior b(true); + b.set_do_net_failure(true, config.maxWaitTime / 2); + SetClient(b); + + Sliver key("unit_test_key"); + Sliver value("unit_test_object"); + + Status s = client->put(key, value); + + ASSERT_EQ(s, Status::OK()); + Sliver rvalue; + s = client->get(key, rvalue); + ASSERT_TRUE(s.isOK()); + ASSERT_EQ(value, rvalue); +} + +/** + * @brief try to put object into non existing bucket + * + */ +TEST_F(EcsS3Test, PutObjectBucketNotExists) { + SetBucketName("non_existing_bucket"); + SetClient(); + + Sliver key("unit_test_key"); + Sliver value("unit_test_object"); + Status s = client->put(key, value); + ASSERT_TRUE(s.isNotFound()); +} + +/** + * @brief try to get object from non existing bucket + * + */ +TEST_F(EcsS3Test, GetObjectBucketNotExists) { + SetBucketName("non_existing_bucket"); + SetClient(); + + Sliver key("unit_test_key"); + Sliver v; + Status s = client->get(key, v); + ASSERT_TRUE(s.isNotFound()); + ASSERT_EQ(v.length(), 0); +} + +TEST_F(EcsS3Test, GetObjectNotExists) { + SetClient(); + + Sliver key("unit_test_key12345"); + Sliver value; + Status s = client->get(key, value); + ASSERT_TRUE(s.isNotFound()); + ASSERT_EQ(value.length(), 0); +} + +/** + * @brief put and get binary object + * + */ +TEST_F(EcsS3Test, PutGetBinaryObject) { + SetClient(); + + uint8_t *_key = new uint8_t[30]; + for (int i = 0; i < 30; i++) _key[i] = i + 1; + + uint8_t *_value = new uint8_t[250]; + for (int i = 0; i < 250; i++) _value[i] = i; + + Sliver key((const char *)_key, 30); + Sliver wvalue((const char *)_value, 250); + Status s = client->put(key, wvalue); + ASSERT_TRUE(s.isOK()); + Sliver rvalue; + s = client->get(key, rvalue); + ASSERT_TRUE(s.isOK()); + ASSERT_EQ(wvalue, rvalue); +} + +/** + * @brief Put and get large binary object when 25000 is maximum preallocated + * memory for sending the data + * + */ +TEST_F(EcsS3Test, PutGetLargeBinaryObject) { + SetClient(); + + uint8_t *_key = new uint8_t[30]; + for (int i = 0; i < 30; i++) _key[i] = i + 1; + + uint8_t *_value = new uint8_t[25000]; + for (int i = 0; i < 25000; i++) _value[i] = 'a' + 0; + + Sliver key((const char *)_key, 30); + Sliver wvalue((const char *)_value, 25000); + Status s = client->put(key, wvalue); + ASSERT_TRUE(s.isOK()); + Sliver rvalue; + s = client->get(key, rvalue); + ASSERT_TRUE(s.isOK()); + ASSERT_EQ(wvalue, rvalue); +} + +/** + * @brief Put and get large binary object when 25000 is maximum preallocated + * memory for sending the data expecting the buffer to grow exactly 4 times + * + */ +TEST_F(EcsS3Test, PutGetLargeBinaryObjectWithResize) { + SetClient(); + + uint8_t *_key = new uint8_t[30]; + for (int i = 0; i < 30; i++) _key[i] = i + 1; + + uint8_t *_value = new uint8_t[100000]; + for (int i = 0; i < 100000; i++) _value[i] = 'a' + 0; + + Sliver key((const char *)_key, 30); + Sliver wvalue((const char *)_value, 100000); + Status s = client->put(key, wvalue); + ASSERT_TRUE(s.isOK()); + Sliver rvalue; + s = client->get(key, rvalue); + ASSERT_TRUE(s.isOK()); + ASSERT_TRUE(wvalue == rvalue); +} + +/** + * @brief Check retrieving object headers functionality + * + */ +TEST_F(EcsS3Test, ObjectExists) { + SetClient(); + + uint8_t *_key = new uint8_t[30]; + for (int i = 0; i < 30; i++) _key[i] = i + 1; + + uint8_t *_value = new uint8_t[100000]; + for (int i = 0; i < 100000; i++) _value[i] = 'a' + 0; + + Sliver key((const char *)_key, 30); + Sliver wvalue((const char *)_value, 100000); + Status s = client->put(key, wvalue); + ASSERT_TRUE(s.isOK()); + s = client->has(key); + ASSERT_TRUE(s.isOK()); +} + +/** + * @brief Check retrieving object headers functionality + * + */ +TEST_F(EcsS3Test, ObjectNotExists) { + SetClient(); + + uint8_t *_key = new uint8_t[30]; + for (int i = 0; i < 30; i++) _key[i] = i + 1; + + uint8_t *_value = new uint8_t[100000]; + for (int i = 0; i < 100000; i++) _value[i] = 'a' + 0; + + Sliver key((const char *)_key, 30); + Sliver wvalue((const char *)_value, 100000); + Status s = client->put(key, wvalue); + ASSERT_TRUE(s.isOK()); + Sliver key2("non_existing_key"); + s = client->has(key2); + ASSERT_TRUE(s.isNotFound()); +} + +/** + * @brief Check retrieving object headers functionality + * + */ +TEST_F(EcsS3Test, TestBucketExists) { + SetClient(); + Status s = client->test_bucket(); + ASSERT_TRUE(s.isOK()); +} + +TEST_F(EcsS3Test, TestBucketExistsRetryOK) { + ObjectStoreBehavior b{true}; + b.set_do_net_failure(true, config.maxWaitTime / 2); + SetClient(b); + Status s = client->test_bucket(); + ASSERT_EQ(s, Status::OK()); +} + +TEST_F(EcsS3Test, TestBucketNotExists) { + SetBucketName("non_existing_bucket"); + SetClient(); + Status s = client->test_bucket(); + ASSERT_TRUE(s.isNotFound()); +} + +} // namespace concord::storage::test +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + auto exitCode = RUN_ALL_TESTS(); + return exitCode; +} diff --git a/tests/simpleKVBC/TesterReplica/CMakeLists.txt b/tests/simpleKVBC/TesterReplica/CMakeLists.txt index dcbb6d9f8a..b22844ec29 100644 --- a/tests/simpleKVBC/TesterReplica/CMakeLists.txt +++ b/tests/simpleKVBC/TesterReplica/CMakeLists.txt @@ -15,6 +15,10 @@ if(${USE_COMM_TLS_TCP}) target_compile_definitions(skvbc_replica PUBLIC USE_COMM_TLS_TCP) endif() +if(BUILD_ROCKSDB_STORAGE) + target_compile_definitions(skvbc_replica PUBLIC "USE_ROCKSDB=1") +endif() + target_link_libraries(skvbc_replica PUBLIC kvbc corebft threshsign util test_config_lib $) target_include_directories(skvbc_replica PUBLIC ..) diff --git a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp index 6da7c3e92b..d1053cfdd1 100644 --- a/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp +++ b/tests/simpleKVBC/TesterReplica/internalCommandsHandler.hpp @@ -18,8 +18,9 @@ #include "sliver.hpp" #include "simpleKVBTestsBuilder.hpp" #include "db_interfaces.h" -#include "KVBCInterfaces.h" #include "block_metadata.hpp" +#include "KVBCInterfaces.h" +#include class InternalCommandsHandler : public concord::kvbc::ICommandsHandler { public: diff --git a/tests/simpleKVBC/TesterReplica/main.cpp b/tests/simpleKVBC/TesterReplica/main.cpp index f9d2a8ac60..72b7912482 100644 --- a/tests/simpleKVBC/TesterReplica/main.cpp +++ b/tests/simpleKVBC/TesterReplica/main.cpp @@ -13,60 +13,59 @@ #include "setup.hpp" #include "ReplicaImp.h" -#include "memorydb/client.h" #include "internalCommandsHandler.hpp" -#include "commonKVBTests.hpp" #include "replica_state_sync_imp.hpp" #include "block_metadata.hpp" +#include "SimpleBCStateTransfer.hpp" + #ifdef USE_ROCKSDB #include "rocksdb/client.h" #include "rocksdb/key_comparator.h" #endif + #include -using namespace concord::kvbc; +namespace concord::kvbc::test { -int main(int argc, char** argv) { - auto setup = concord::kvbc::TestSetup::ParseArgs(argc, argv); +void run_replica(int argc, char** argv) { + auto setup = TestSetup::ParseArgs(argc, argv); auto logger = setup->GetLogger(); MDC_PUT(GL, "rid", std::to_string(setup->GetReplicaConfig().replicaId)); - auto* db_key_comparator = new concord::kvbc::DBKeyComparator(); - std::shared_ptr db; - if (setup->UsePersistentStorage()) { -#ifdef USE_ROCKSDB - auto* comparator = new concord::storage::rocksdb::KeyComparator(db_key_comparator); - std::stringstream dbPath; - dbPath << BasicRandomTests::DB_FILE_PREFIX << setup->GetReplicaConfig().replicaId; - db.reset(new concord::storage::rocksdb::Client(dbPath.str(), comparator)); -#else - // Abort if we haven't built rocksdb storage - LOG_ERROR( - logger, - "Must build with -DBUILD_ROCKSDB_STORAGE=TRUE cmake option in order to test with persistent storage enabled"); - exit(-1); -#endif - } else { - // Use in-memory storage - auto comparator = concord::storage::memorydb::KeyComparator(db_key_comparator); - db.reset(new concord::storage::memorydb::Client(comparator)); + auto config = setup->get_db_configuration(); + std::shared_ptr mdtDbClient = std::get<0>(config); + IDbAdapter* dbAdapter = std::get<1>(config); + std::shared_ptr replica = std::make_shared(setup->GetCommunication(), + setup->GetReplicaConfig(), + std::unique_ptr(dbAdapter), + mdtDbClient, + setup->GetMetricsServer().GetAggregator()); + + auto* blockMetadata = new BlockMetadata(*replica); + + if (!setup->GetReplicaConfig().isReadOnly) + replica->setReplicaStateSync(new ReplicaStateSyncImp(blockMetadata)); + else { + log4cplus::Logger::getInstance("concord.storage.s3").setLogLevel(log4cplus::TRACE_LOG_LEVEL); + log4cplus::Logger::getInstance("concord.kvbc.v1DirectKeyValue.DBAdapter").setLogLevel(log4cplus::TRACE_LOG_LEVEL); + log4cplus::Logger::getInstance("state-transfer").setLogLevel(log4cplus::TRACE_LOG_LEVEL); + // log4cplus::Logger::getInstance("DBDataStore").setLogLevel(log4cplus::TRACE_LOG_LEVEL); } - auto* dbAdapter = new concord::kvbc::DBAdapter(db); - auto* replica = new ReplicaImp( - setup->GetCommunication(), setup->GetReplicaConfig(), dbAdapter, setup->GetMetricsServer().GetAggregator()); - auto* blockMetadata = new concord::kvbc::BlockMetadata(*replica); - replica->setReplicaStateSync(new ReplicaStateSyncImp(blockMetadata)); + InternalCommandsHandler* cmdHandler = + new InternalCommandsHandler(replica.get(), replica.get(), blockMetadata, logger); + replica->set_command_handler(cmdHandler); + replica->start(); // Start metrics server after creation of the replica so that we ensure // registration of metrics from the replica with the aggregator and don't // return empty metrics from the metrics server. setup->GetMetricsServer().Start(); - - // TODO [TK] to we need it for ROR? - InternalCommandsHandler cmdHandler(replica, replica, blockMetadata, logger); - replica->set_command_handler(&cmdHandler); - replica->start(); - while (replica->isRunning()) std::this_thread::sleep_for(std::chrono::seconds(1)); } +} // namespace concord::kvbc::test + +int main(int argc, char** argv) { + concord::kvbc::test::run_replica(argc, argv); + return 0; +} diff --git a/tests/simpleKVBC/TesterReplica/setup.cpp b/tests/simpleKVBC/TesterReplica/setup.cpp index 175a7f2a4d..6e49fbf908 100644 --- a/tests/simpleKVBC/TesterReplica/setup.cpp +++ b/tests/simpleKVBC/TesterReplica/setup.cpp @@ -16,16 +16,31 @@ #include #include #include +#include +#include #include "setup.hpp" #include "CommFactory.hpp" #include "config/test_comm_config.hpp" -#include "config/test_parameters.hpp" +#include "commonKVBTests.hpp" +#include "db_adapter.h" -using namespace std; +#include "memorydb/client.h" -namespace concord { -namespace kvbc { +#ifdef USE_ROCKSDB +#include "rocksdb/client.h" +#include "rocksdb/key_comparator.h" +#endif + +#ifdef USE_S3_OBJECT_STORE +#include "object_store/object_store_client.hpp" +#include "s3/client.hpp" +using concord::storage::ObjectStoreClient; +#endif + +using namespace concord::storage; + +namespace concord::kvbc { std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { ReplicaParams rp; @@ -39,10 +54,10 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { } char argTempBuffer[PATH_MAX + 10]; - string idStr; + std::string idStr; int o = 0; - while ((o = getopt(argc, argv, "r:i:k:n:s:v:a:p")) != EOF) { + while ((o = getopt(argc, argv, "r:i:k:n:s:v:a:po:")) != EOF) { switch (o) { case 'i': { strncpy(argTempBuffer, optarg, sizeof(argTempBuffer) - 1); @@ -92,9 +107,10 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { } break; // We can only toggle persistence on or off. It defaults to InMemory // unless -p flag is provided. - case 'p': + case 'p': { rp.persistencyMode = PersistencyMode::RocksDB; - break; + + } break; default: break; @@ -144,5 +160,51 @@ std::unique_ptr TestSetup::ParseArgs(int argc, char** argv) { replicaConfig, std::move(comm), logger, metrics_port, rp.persistencyMode == PersistencyMode::RocksDB}); } -} // namespace kvbc -} // namespace concord +std::tuple, IDbAdapter*> TestSetup::get_inmem_db_configuration() { + auto comparator = concord::storage::memorydb::KeyComparator(new DBKeyComparator()); + std::shared_ptr db = std::make_shared(comparator); + return std::make_tuple(db, new DBAdapter{db}); +} +/** Get replica db configuration + * @return storage::IDBClient instance for metadata storage + * @return kvbc::IDBAdapter instance + */ +std::tuple, IDbAdapter*> TestSetup::get_db_configuration() { +#ifndef USE_ROCKSDB + return get_inmem_db_configuration(); +#else + if (!UsePersistentStorage()) get_inmem_db_configuration(); + + auto* comparator = new concord::storage::rocksdb::KeyComparator(new DBKeyComparator()); + std::stringstream dbPath; + dbPath << BasicRandomTests::DB_FILE_PREFIX << GetReplicaConfig().replicaId; + std::shared_ptr db(new concord::storage::rocksdb::Client(dbPath.str(), comparator)); + db->init(); + IDbAdapter* dbAdapter = nullptr; + +#ifdef USE_S3_OBJECT_STORE // TODO [TK] config + if (GetReplicaConfig().isReadOnly) { + concord::storage::s3::StoreConfig config; + config.bucketName = "blockchain-dev-asx"; + config.accessKey = "blockchain-dev"; + config.protocol = "HTTP"; + config.url = "10.70.30.244:9020"; + config.secretKey = "Rz0mdbUNGJBxqdzprn5XGSXPr2AfkgcQsYS4y698"; + std::string prefix = std::to_string(std::chrono::high_resolution_clock::now().time_since_epoch().count()); + + std::shared_ptr s3client(new ObjectStoreClient(new s3::Client(config))); + s3client->init(); + dbAdapter = new DBAdapter(s3client, std::unique_ptr(new S3KeyGenerator(prefix)), true); + } else { + dbAdapter = new DBAdapter(db); + } +#else + dbAdapter = new DBAdapter(db); +#endif + + return std::make_tuple(db, dbAdapter); + +#endif +} + +} // namespace concord::kvbc diff --git a/tests/simpleKVBC/TesterReplica/setup.hpp b/tests/simpleKVBC/TesterReplica/setup.hpp index 666000280b..0b47e8d7bc 100644 --- a/tests/simpleKVBC/TesterReplica/setup.hpp +++ b/tests/simpleKVBC/TesterReplica/setup.hpp @@ -14,19 +14,26 @@ #pragma once #include - +#include #include "ReplicaConfig.hpp" #include "ICommunication.hpp" #include "Logger.hpp" #include "MetricsServer.hpp" +#include "config/test_parameters.hpp" + +namespace concord::storage { +class IDBClient; +} -namespace concord { -namespace kvbc { +namespace concord::kvbc { +class IDbAdapter; class TestSetup { public: static std::unique_ptr ParseArgs(int argc, char** argv); + std::tuple, IDbAdapter*> get_db_configuration(); + bftEngine::ReplicaConfig& GetReplicaConfig() { return replica_config_; } bftEngine::ICommunication* GetCommunication() const { return communication_.get(); } concordMetrics::Server& GetMetricsServer() { return metrics_server_; } @@ -46,6 +53,7 @@ class TestSetup { use_persistent_storage_(use_persistent_storage) {} TestSetup() = delete; + std::tuple, IDbAdapter*> get_inmem_db_configuration(); bftEngine::ReplicaConfig replica_config_; std::unique_ptr communication_; concordlogger::Logger logger_; @@ -53,5 +61,4 @@ class TestSetup { bool use_persistent_storage_; }; -} // namespace kvbc -} // namespace concord +} // namespace concord::kvbc diff --git a/tests/simpleKVBC/scripts/readOnlyReplicaTest.sh b/tests/simpleKVBC/scripts/readOnlyReplicaTest.sh old mode 100644 new mode 100755 index 945e9018fc..cde01dd5aa --- a/tests/simpleKVBC/scripts/readOnlyReplicaTest.sh +++ b/tests/simpleKVBC/scripts/readOnlyReplicaTest.sh @@ -16,7 +16,10 @@ cleanup ../TesterReplica/skvbc_replica -k ro_config_ -i 1 -p & ../TesterReplica/skvbc_replica -k ro_config_ -i 2 -p & ../TesterReplica/skvbc_replica -k ro_config_ -i 3 -p & -#../TesterReplica/skvbc_replica -k ro_config_ -i 4 -p & +../TesterReplica/skvbc_replica -k ro_config_ -i 4 -p -o 1 & + +echo "Sleeping for 5 seconds" +sleep 5 time ../TesterClient/skvbc_client -f 1 -c 0 -p 1800 -i 5 diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt index efcf17d051..137d12ce8d 100644 --- a/util/CMakeLists.txt +++ b/util/CMakeLists.txt @@ -10,6 +10,10 @@ add_library(util STATIC src/sliver.cpp src/hex_tools.cpp) +if(BUILD_ROCKSDB_STORAGE) + target_compile_definitions(util PUBLIC "USE_ROCKSDB=1") +endif() + target_link_libraries(util PUBLIC Threads::Threads) target_include_directories(util PUBLIC include) diff --git a/util/include/Handoff.hpp b/util/include/Handoff.hpp new file mode 100644 index 0000000000..d81edb7c0b --- /dev/null +++ b/util/include/Handoff.hpp @@ -0,0 +1,97 @@ +// Concord +// +// Copyright (c) 2018 VMware, Inc. All Rights Reserved. +// +// This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in +// compliance with the Apache 2.0 License. +// +// This product may include a number of subcomponents with separate copyright notices and license terms. Your use of +// these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE +// file. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "Logger.hpp" + +namespace concord::util { +/** + * Simple handing off to a dedicated thread functionality. + * Implemented by passing a function with a trivial signature void(void). + * Parameters are bound to an arbitrary function by a caller. + */ +class Handoff { + typedef std::lock_guard guard; + typedef std::function func_type; + + public: + Handoff() { + thread_ = std::thread([this] { + try { + for (;;) pop()(); + } catch (ThreadCanceledException& e) { + LOG_DEBUG(getLogger(), "thread stopped " << std::this_thread::get_id()); + } catch (const std::exception& e) { + LOG_ERROR(getLogger(), "exception: " << e.what()); + // TODO [TK] should we allow different behavior for exception handling? + } + }); + } + ~Handoff() { + stopped_ = true; + queue_cond_.notify_one(); + auto tid = thread_.get_id(); + thread_.join(); + LOG_DEBUG(getLogger(), "thread joined " << tid); + } + + void push(func_type f) { + { + guard g(queue_lock_); + task_queue_.push(f); + } + queue_cond_.notify_one(); + } + + protected: + class ThreadCanceledException : public std::runtime_error { + public: + ThreadCanceledException() : std::runtime_error("thread cancelled") {} + const char* what() const noexcept override { return std::runtime_error::what(); } + }; + + func_type pop() { + while (true) { + std::unique_lock ul(queue_lock_); + queue_cond_.wait(ul, [this] { return !(task_queue_.empty() && !stopped_); }); + LOG_TRACE(getLogger(), "notified stopped_: " << stopped_ << " queue size: " << task_queue_.size()); + + if (!stopped_ || (stopped_ && !task_queue_.empty())) { + func_type f = task_queue_.front(); + task_queue_.pop(); + return f; + } + throw ThreadCanceledException(); + } + } + + static concordlogger::Logger getLogger() { + static concordlogger::Logger logger_ = concordlogger::Log::getLogger("concord.util.handoff"); + return logger_; + } + + protected: + std::queue task_queue_; + std::mutex queue_lock_; + std::condition_variable queue_cond_; + std::atomic_bool stopped_; + std::thread thread_; +}; + +} // namespace concord::util diff --git a/util/test/multithreading.cpp b/util/test/multithreading.cpp index aafc388774..c98aabf049 100644 --- a/util/test/multithreading.cpp +++ b/util/test/multithreading.cpp @@ -15,6 +15,7 @@ #include "gtest/gtest.h" #include "SimpleAutoResetEvent.hpp" #include "SimpleThreadPool.hpp" +#include "Handoff.hpp" #include #include #include @@ -25,11 +26,11 @@ namespace mt { /** * Fixture for testing SimpleThreadPool */ -class ThreadPoolFixture : public testing::Test { +class SimpleThreadPoolFixture : public testing::Test { protected: class TestJob : public util::SimpleThreadPool::Job { public: - TestJob(ThreadPoolFixture* f, uint32_t sleep_ms = 10) : fixture_(f), sleep_ms_(sleep_ms) {} + TestJob(SimpleThreadPoolFixture* f, uint32_t sleep_ms = 10) : fixture_(f), sleep_ms_(sleep_ms) {} void execute() { fixture_->result++; std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms_)); // don't exit immediately @@ -37,11 +38,11 @@ class ThreadPoolFixture : public testing::Test { void release() { delete this; } virtual ~TestJob() {} - ThreadPoolFixture* fixture_; + SimpleThreadPoolFixture* fixture_; uint32_t sleep_ms_; }; - ThreadPoolFixture() : result(0) {} + SimpleThreadPoolFixture() : result(0) {} // Sets up the test fixture. virtual void SetUp() { @@ -61,20 +62,20 @@ class ThreadPoolFixture : public testing::Test { std::atomic_int result; }; -TEST_F(ThreadPoolFixture, ThreadPoolStartStopNoJobs) { pool_.stop(); } +TEST_F(SimpleThreadPoolFixture, ThreadPoolStartStopNoJobs) { pool_.stop(); } -TEST_F(ThreadPoolFixture, ThreadPoolStartStopWithJobsNoExecute) { +TEST_F(SimpleThreadPoolFixture, ThreadPoolStartStopWithJobsNoExecute) { for (int i = 0; i < 100; ++i) pool_.add(new TestJob(this)); pool_.stop(false); } -TEST_F(ThreadPoolFixture, ThreadPoolStartStopWithJobsExecute) { +TEST_F(SimpleThreadPoolFixture, ThreadPoolStartStopWithJobsExecute) { for (int i = 0; i < 100; ++i) pool_.add(new TestJob(this)); pool_.stop(true); EXPECT_EQ(result, 100); } -TEST_F(ThreadPoolFixture, ThreadPoolMultipleProducers) { +TEST_F(SimpleThreadPoolFixture, ThreadPoolMultipleProducers) { std::array producers; for (int i = 0; i < 10; ++i) { producers[i] = std::thread([this] { @@ -86,5 +87,30 @@ TEST_F(ThreadPoolFixture, ThreadPoolMultipleProducers) { EXPECT_EQ(result, 1000); } +/** + * Fixture for testing Handoff + */ +using namespace std::placeholders; +class HandoffFixture : public testing::Test { + protected: + // Sets up the test fixture. + virtual void SetUp() { + ASSERT_GT(std::thread::hardware_concurrency(), 1); + handoff_ = new concord::util::Handoff(); + log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("concord.util.handoff")).setLogLevel(log4cplus::DEBUG_LOG_LEVEL); + } + + concord::util::Handoff* handoff_; + std::atomic_int result = 0; +}; + +TEST_F(HandoffFixture, Basic) { + auto g = std::bind([this](int i) { this->result += i; }, _1); + auto f = std::bind(&concord::util::Handoff::push, handoff_, _1); + for (int i = 1; i <= 100; ++i) f(std::bind(g, i)); + delete handoff_; + EXPECT_EQ(result, 5050); +} + } // namespace mt } // namespace test