Skip to content

Commit

Permalink
Introducing S3 store support.
Browse files Browse the repository at this point in the history
The commit includes new code that support S3 as possible implementation of IDBClient.
We plan to use it with ReadOnly replica for the replication of the data to external storage.

Introduce has(Key) method in IDBClient interface and implement it in all its derived classes

ROReplica: handoff state transfer messages to a dedicated thread to avoid process stuck in case of long blocking call to an external object store.
Re-factoring: replace various BCStateTran config params by their Config struct equivalent.

IDBclient:
- add has(Sliver key)

- ReplicaForStateTransfer owns the IStateTransfer* instance instead of kvbc::ReplicaImp
- remove RorAppState
- IAppState implemented by kvbc::ReplicaImp
- v1::DBAdapter
	- setLatestBlock()
	- setLastReachableBlock()
	- add an ability to add free keys
- ecs::S3Client -> s3::client

DBAdapter to use additional IDBClient for metadata
  • Loading branch information
groupvrg authored and Toly Kournik committed Apr 11, 2020
1 parent 688eb11 commit baba245
Show file tree
Hide file tree
Showing 46 changed files with 1,814 additions and 600 deletions.
16 changes: 16 additions & 0 deletions .conan/libs3_pkg/conanfile.py
@@ -0,0 +1,16 @@
from conans import ConanFile, CMake, tools
from conans.tools import os_info, SystemPackageTool


class LibS3Conan(ConanFile):
name = "libs3-dev"
version = "2.0-3"
def system_requirements(self):
pack_name = None
if os_info.linux_distro == "ubuntu":
pack_name = "libs3-dev"

if pack_name:
installer = SystemPackageTool()
installer.install(pack_name)

15 changes: 5 additions & 10 deletions bftengine/include/bcstatetransfer/SimpleBCStateTransfer.hpp
Expand Up @@ -69,7 +69,7 @@ class IAppState {

// returns true IFF block blockId exists
// (i.e. the block is stored in the application/storage layer).
virtual bool hasBlock(uint64_t blockId) = 0;
virtual bool hasBlock(uint64_t blockId) const = 0;

// If block blockId exists, then its content is returned via the arguments
// outBlock and outBlockSize. Returns true IFF block blockId exists.
Expand All @@ -83,25 +83,19 @@ class IAppState {
// blockId - the block number
// block - pointer to a buffer that contains the new block
// blockSize - the size of the new block
virtual bool putBlock(uint64_t blockId, char *block, uint32_t blockSize) = 0;
virtual bool putBlock(const uint64_t blockId, const char *block, const uint32_t blockSize) = 0;

// returns the maximal block number n such that all blocks 1 <= i <= n exist.
// if block 1 does not exist, returns 0.
virtual uint64_t getLastReachableBlockNum() = 0;

virtual uint64_t getLastReachableBlockNum() const = 0;
// returns the maximum block number that is currently stored in
// the application/storage layer.
virtual uint64_t getLastBlockNum() = 0;
virtual uint64_t getLastBlockNum() const = 0;

// When the state is updated by the application, getLastReachableBlockNum()
// and getLastBlockNum() should always return the same block number.
// When that state transfer module is updating the state, then these methods
// may return different block numbers.

// This method MAY be called by ST module when putBlock returns false. The call will block until the underlying
// storage becomes available (storage non availabilty is the reason for putBlock to fail). Mainly relevant for network
// storages. In regular replica which uses local DB this function should not be called.
virtual void wait() = 0;
};

struct Config {
Expand All @@ -110,6 +104,7 @@ struct Config {
uint16_t cVal = 0;
uint16_t numReplicas = 0;
bool pedanticChecks = false;
bool isReadOnly = false;

#if defined USE_COMM_PLAIN_TCP || defined USE_COMM_TLS_TCP
uint32_t maxChunkSize = 16384;
Expand Down
225 changes: 119 additions & 106 deletions bftengine/src/bcstatetransfer/BCStateTran.cpp

Large diffs are not rendered by default.

42 changes: 12 additions & 30 deletions bftengine/src/bcstatetransfer/BCStateTran.hpp
Expand Up @@ -11,8 +11,7 @@
// terms and conditions of the subcomponent's license, as noted in the LICENSE
// file.

#ifndef BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_
#define BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_
#pragma once

#include <set>
#include <map>
Expand All @@ -37,10 +36,11 @@
using std::set;
using std::map;
using std::string;
using concordMetrics::StatusHandle;
using concordMetrics::GaugeHandle;
using concordMetrics::CounterHandle;

namespace bftEngine {
namespace SimpleBlockchainStateTransfer {
namespace impl {
namespace bftEngine::SimpleBlockchainStateTransfer::impl {

class BCStateTran : public IStateTransfer {
public:
Expand Down Expand Up @@ -82,7 +82,11 @@ class BCStateTran : public IStateTransfer {
void handleStateTransferMessage(char* msg, uint32_t msgLen, uint16_t senderId) override;

protected:
const bool pedanticChecks_;
std::function<void(char*, uint32_t, uint16_t)> messageHandler_;
// actual handling function. can be used in context of dedicated thread
void handleStateTransferMessageImp(char* msg, uint32_t msgLen, uint16_t senderId);
// handling from other context
void handoff(char* msg, uint32_t msgLen, uint16_t senderId);

///////////////////////////////////////////////////////////////////////////
// Constants
Expand All @@ -103,21 +107,8 @@ class BCStateTran : public IStateTransfer {
///////////////////////////////////////////////////////////////////////////
// Management and general data
///////////////////////////////////////////////////////////////////////////

const Config config_;
const set<uint16_t> replicas_;
const uint16_t myId_;
const uint16_t fVal_;
const uint32_t maxBlockSize_;
const uint32_t maxChunkSize_;
const uint16_t maxNumberOfChunksInBatch_;
const uint32_t maxPendingDataFromSourceReplica_;

const uint32_t maxNumOfReservedPages_;
const uint32_t sizeOfReservedPage_;
const uint32_t refreshTimerMilli_;
const uint32_t checkpointSummariesRetransmissionTimeoutMilli_;
const uint32_t maxAcceptableMsgDelayMilli_;

const uint32_t maxVBlockSize_;
const uint32_t maxItemSize_;
const uint32_t maxNumOfChunksInAppBlock_;
Expand Down Expand Up @@ -328,11 +319,6 @@ class BCStateTran : public IStateTransfer {
std::chrono::seconds last_metrics_dump_time_;
std::chrono::seconds metrics_dump_interval_in_sec_;
concordMetrics::Component metrics_component_;

typedef concordMetrics::Component::Handle<concordMetrics::Gauge> GaugeHandle;
typedef concordMetrics::Component::Handle<concordMetrics::Status> StatusHandle;
typedef concordMetrics::Component::Handle<concordMetrics::Counter> CounterHandle;

struct Metrics {
StatusHandle fetching_state_;
StatusHandle pedantic_checks_enabled_;
Expand Down Expand Up @@ -395,8 +381,4 @@ class BCStateTran : public IStateTransfer {
mutable Metrics metrics_;
};

} // namespace impl
} // namespace SimpleBlockchainStateTransfer
} // namespace bftEngine

#endif // BFTENGINE_SRC_BCSTATETRANSFER_BCSTATETRAN_HPP_
} // namespace bftEngine::SimpleBlockchainStateTransfer::impl
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/IncomingMsgsStorageImp.cpp
Expand Up @@ -146,7 +146,7 @@ void IncomingMsgsStorageImp::dispatchMessages(std::promise<void>& signalStarted)
if (msgHandlerCallback) {
msgHandlerCallback(message);
} else {
LOG_WARN(GL, "Unknown message - delete");
LOG_WARN(GL, "Delete unknown message type: " << message->type());
delete message;
}
break;
Expand Down
5 changes: 4 additions & 1 deletion bftengine/src/bftengine/ReadOnlyReplica.cpp
Expand Up @@ -47,7 +47,10 @@ void ReadOnlyReplica::start() {
lastExecutedSeqNum = ps_->getLastExecutedSeqNum();
askForCheckpointMsgTimer_ = TimersSingleton::getInstance().add(std::chrono::seconds(5), // TODO [TK] config
Timers::Timer::RECURRING,
[this](Timers::Handle) { sendAskForCheckpointMsg(); });
[this](Timers::Handle) {
if (!this->isCollectingState())
sendAskForCheckpointMsg();
});
}

void ReadOnlyReplica::stop() {
Expand Down
4 changes: 3 additions & 1 deletion bftengine/src/bftengine/ReplicaForStateTransfer.cpp
Expand Up @@ -49,8 +49,10 @@ void ReplicaForStateTransfer::start() {
}

void ReplicaForStateTransfer::stop() {
TimersSingleton::getInstance().cancel(stateTranTimer_);
// stop in reverse order
ReplicaBase::stop();
stateTransfer->stopRunning();
TimersSingleton::getInstance().cancel(stateTranTimer_);
}

template <>
Expand Down
4 changes: 2 additions & 2 deletions bftengine/src/bftengine/ReplicaForStateTransfer.hpp
Expand Up @@ -28,7 +28,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB
bool firstTime // TODO [TK] get rid of this
);

IStateTransfer* getStateTransfer() const { return stateTransfer; }
IStateTransfer* getStateTransfer() const { return stateTransfer.get(); }

// IReplicaForStateTransfer
void freeStateTransferMsg(char* m) override;
Expand Down Expand Up @@ -56,7 +56,7 @@ class ReplicaForStateTransfer : public IReplicaForStateTransfer, public ReplicaB
void onMessage(T*);

protected:
bftEngine::IStateTransfer* stateTransfer = nullptr;
std::unique_ptr<bftEngine::IStateTransfer> stateTransfer;
Timers::Handle stateTranTimer_;
CounterHandle metric_received_state_transfers_;
};
Expand Down
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/ReplicaImp.cpp
Expand Up @@ -2839,7 +2839,7 @@ ReplicaImp::ReplicaImp(bool firstTime,
clientsManager =
new ClientsManager(config_.replicaId, clientsSet, ReplicaConfigSingleton::GetInstance().GetSizeOfReservedPage());

clientsManager->init(stateTransfer);
clientsManager->init(stateTransfer.get());

if (!firstTime || config_.debugPersistentStorageEnabled)
clientsManager->loadInfoFromReservedPages();
Expand Down
21 changes: 8 additions & 13 deletions bftengine/src/simplestatetransfer/SimpleStateTran.cpp
Expand Up @@ -93,20 +93,17 @@ class SimpleStateTran : public ISimpleInMemoryStateTransfer {
// IAppState methods
////////////////////////////////////////////////////////////////////////

bool hasBlock(uint64_t blockId) override;
bool hasBlock(uint64_t blockId) const override;

bool getBlock(uint64_t blockId, char* outBlock, uint32_t* outBlockSize) override;

bool getPrevDigestFromBlock(uint64_t blockId,
SimpleBlockchainStateTransfer::StateTransferDigest* outPrevBlockDigest) override;

bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override;
bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override;

uint64_t getLastReachableBlockNum() override;

uint64_t getLastBlockNum() override;

void wait() override;
uint64_t getLastReachableBlockNum() const override;
uint64_t getLastBlockNum() const override;
};

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -564,7 +561,7 @@ void SimpleStateTran::onComplete(int64_t checkpointNumberOfNewState) {
lastKnownCheckpoint = checkpointNumberOfNewState;
}

bool SimpleStateTran::DummyBDState::hasBlock(uint64_t blockId) { return false; }
bool SimpleStateTran::DummyBDState::hasBlock(uint64_t blockId) const { return false; }

bool SimpleStateTran::DummyBDState::getBlock(uint64_t blockId, char* outBlock, uint32_t* outBlockSize) {
Assert(false);
Expand All @@ -577,16 +574,14 @@ bool SimpleStateTran::DummyBDState::getPrevDigestFromBlock(
return false;
}

bool SimpleStateTran::DummyBDState::putBlock(uint64_t blockId, char* block, uint32_t blockSize) {
bool SimpleStateTran::DummyBDState::putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) {
Assert(false);
return false;
}

uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() { return 0; }

uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() { return 0; }
uint64_t SimpleStateTran::DummyBDState::getLastReachableBlockNum() const { return 0; }

void SimpleStateTran::DummyBDState::wait() {}
uint64_t SimpleStateTran::DummyBDState::getLastBlockNum() const { return 0; }

} // namespace impl
} // namespace SimpleInMemoryStateTransfer
Expand Down
11 changes: 4 additions & 7 deletions bftengine/tests/bcstatetransfer/test_app_state.hpp
Expand Up @@ -34,7 +34,7 @@ struct Block {

class TestAppState : public IAppState {
public:
bool hasBlock(uint64_t blockId) override {
bool hasBlock(uint64_t blockId) const override {
auto it = blocks_.find(blockId);
return it != blocks_.end();
}
Expand All @@ -55,7 +55,7 @@ class TestAppState : public IAppState {
return true;
};

bool putBlock(uint64_t blockId, char* block, uint32_t blockSize) override {
bool putBlock(const uint64_t blockId, const char* block, const uint32_t blockSize) override {
assert(blockId < last_block_);
Block bl;
computeBlockDigest(blockId, block, blockSize, &bl.digest);
Expand All @@ -66,11 +66,8 @@ class TestAppState : public IAppState {
}

// TODO(AJS): How does this differ from getLastBlockNum?
uint64_t getLastReachableBlockNum() override { return last_block_; };

uint64_t getLastBlockNum() override { return last_block_; };

void wait() override{};
uint64_t getLastReachableBlockNum() const override { return last_block_; }
uint64_t getLastBlockNum() const override { return last_block_; };

private:
uint64_t last_block_;
Expand Down
3 changes: 2 additions & 1 deletion conanfile.txt
Expand Up @@ -13,6 +13,7 @@ zstd/1.4.0@bincrafters/stable
rocksdb/5.7.3
hdr_histogram/0.9.12
rapidcheck/258d907da00a0855f92c963d8f76eef115531716
libs3-dev/2.0-3

[options]
OpenSSL:shared=True
Expand Down Expand Up @@ -46,4 +47,4 @@ boost:without_context=True
[imports]
bin, *.dll -> ./lib # Copies all dll files from packages bin folder to my "lib" folder
lib, *.dylib* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder
lib, *.so* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder
lib, *.so* -> ./lib # Copies all dylib files from packages lib folder to my "lib" folder
9 changes: 4 additions & 5 deletions kvbc/CMakeLists.txt
Expand Up @@ -2,9 +2,9 @@ cmake_minimum_required (VERSION 3.2)
project(libkvbc VERSION 0.1.0.0 LANGUAGES CXX)

add_library(kvbc src/ClientImp.cpp
src/ReplicaImp.cpp
src/replica_state_sync_imp.cpp
src/block_metadata.cpp
src/ReplicaImp.cpp
src/replica_state_sync_imp.cpp
src/block_metadata.cpp
src/direct_kv_db_adapter.cpp
src/merkle_tree_db_adapter.cpp
src/direct_kv_block.cpp
Expand All @@ -17,10 +17,9 @@ add_library(kvbc src/ClientImp.cpp
src/sparse_merkle/walker.cpp
)

target_link_libraries(kvbc PUBLIC threshsign util concordbft_storage corebft)
target_link_libraries(kvbc PUBLIC corebft )

target_include_directories(kvbc PUBLIC include)
target_include_directories(kvbc PUBLIC ${bftengine_SOURCE_DIR}/include)

find_package(OpenSSL REQUIRED)
target_link_libraries(kvbc PRIVATE ${OPENSSL_LIBRARIES})
Expand Down

0 comments on commit baba245

Please sign in to comment.