Skip to content
Permalink
Browse files

[hbasestore] HBase-powered KVStore implementation (#413)

* [hbasestore] HBase-powered KVStore implementation

close #179

* [hbasestore] Refactor the hbasestore as a separate module.

1. replace storage/KeyUtils with base/NebulaKeyUtils
2. put the hbase.thrift to kvstore/hbase
3. address current all comments

* [hbasestore] Move hbase to plugins and disable the 'Partition'

NOTE: Temporary disable the hbase related unit tests hbase_*_test in
src/kvstore/plugins/hbase/test/CMakeLists.txt.
  • Loading branch information...
zhangguoqing authored and dangleptr committed Jun 26, 2019
1 parent 0530914 commit e15c06561c65f0b57d9f5294e714d7b91b287d98
@@ -7,6 +7,7 @@ _install
install_manifest.txt

src/common/base/Base.h.gch
gen-*

CMakeCache.txt
Makefile
@@ -16,7 +16,9 @@ namespace thrift {
template<class ClientType>
class ThriftClientManager final {
public:
std::shared_ptr<ClientType> client(const HostAddr& host, folly::EventBase* evb = nullptr);
std::shared_ptr<ClientType> client(const HostAddr& host,
folly::EventBase* evb = nullptr,
bool compatibility = false);

~ThriftClientManager() {
VLOG(3) << "~ThriftClientManager";
@@ -17,7 +17,7 @@ namespace thrift {

template<class ClientType>
std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
const HostAddr& host, folly::EventBase* evb) {
const HostAddr& host, folly::EventBase* evb, bool compatibility) {
VLOG(2) << "Getting a client to "
<< network::NetworkUtils::intToIPv4(host.first)
<< ":" << host.second;
@@ -38,7 +38,7 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
<< ipAddr << ":" << port
<< ", trying to create one";
auto channel = apache::thrift::ReconnectingRequestChannel::newChannel(
*evb, [ipAddr, port] (folly::EventBase& eb) mutable {
*evb, [compatibility, ipAddr, port] (folly::EventBase& eb) mutable {
static thread_local int connectionCount = 0;
VLOG(2) << "Connecting to " << ipAddr << ":" << port
<< " for " << ++connectionCount << " times";
@@ -48,7 +48,12 @@ std::shared_ptr<ClientType> ThriftClientManager<ClientType>::client(
socket = apache::thrift::async::TAsyncSocket::newSocket(
&eb, ipAddr, port, FLAGS_conn_timeout_ms);
});
return apache::thrift::HeaderClientChannel::newChannel(socket);
auto headerClientChannel = apache::thrift::HeaderClientChannel::newChannel(socket);
if (compatibility) {
headerClientChannel->setProtocolId(apache::thrift::protocol::T_BINARY_PROTOCOL);
headerClientChannel->setClientType(THRIFT_UNFRAMED_DEPRECATED);
}
return headerClientChannel;
});
std::shared_ptr<ClientType> client(new ClientType(std::move(channel)), [evb](auto* p) {
evb->runImmediatelyOrRunInEventBaseThreadAndWait([p] {
@@ -29,8 +29,10 @@ std::string NebulaCodecImpl::encode(std::vector<Value> values,
std::shared_ptr<const meta::SchemaProviderIf> schema) {
RowWriter writer(schema);
for (auto& value : values) {
if (value.type() == typeid(int)) {
writer << boost::any_cast<int>(value);
if (value.type() == typeid(int32_t)) {
writer << boost::any_cast<int32_t>(value);
} else if (value.type() == typeid(int64_t)) {
writer << boost::any_cast<int64_t>(value);
} else if (value.type() == typeid(std::string)) {
writer << boost::any_cast<std::string>(value);
} else if (value.type() == typeid(double)) {
@@ -119,5 +121,6 @@ NebulaCodecImpl::decode(std::string encoded,
}
return result;
}

} // namespace dataman
} // namespace nebula

This file was deleted.

@@ -124,3 +124,4 @@ add_library(
gen-cpp2/meta_types.cpp
)
add_dependencies(meta_thrift_obj common_thrift_obj tgt_fbthrift)

@@ -13,3 +13,5 @@ add_subdirectory(raftex)
add_subdirectory(wal)
add_subdirectory(test)

add_subdirectory(plugins)

@@ -46,6 +46,9 @@ inline rocksdb::Slice toSlice(const folly::StringPiece& str) {
return rocksdb::Slice(str.begin(), str.size());
}

using KVMap = std::unordered_map<std::string, std::string>;
using KVArrayIterator = std::vector<KV>::const_iterator;

} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_COMMON_H_
@@ -14,11 +14,18 @@
#include "kvstore/KVIterator.h"
#include "kvstore/PartManager.h"
#include "kvstore/CompactionFilter.h"
#include "meta/SchemaManager.h"

namespace nebula {
namespace kvstore {

struct KVOptions {
// HBase thrift server address.
HostAddr hbaseServer_;

// SchemaManager instance, help the hbasestore to encode/decode data.
std::unique_ptr<meta::SchemaManager> schemaMan_{nullptr};

// Paths for data. It would be used by rocksdb engine.
// Be careful! We should ensure each "paths" has only one instance,
// otherwise it would mix up the data on disk.
@@ -155,10 +155,10 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) {


ResultCode RocksEngine::multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) {
std::vector<std::string>* values) {
rocksdb::ReadOptions options;
std::vector<rocksdb::Slice> slices;
for (unsigned int index = 0 ; index < keys.size() ; index++) {
for (size_t index = 0; index < keys.size(); index++) {
slices.emplace_back(keys[index]);
}

@@ -0,0 +1,2 @@
add_subdirectory(hbase)

@@ -0,0 +1,35 @@
add_custom_command(
OUTPUT
gen-cpp2/THBaseService.cpp
gen-cpp2/THBaseServiceAsyncClient.cpp
gen-cpp2/THBaseService_processmap_binary.cpp
gen-cpp2/THBaseService_processmap_compact.cpp
gen-cpp2/hbase_constants.cpp
gen-cpp2/hbase_data.cpp
gen-cpp2/hbase_types.cpp
COMMAND "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/bin/thrift1" "--strict" "--allow-neg-enum-vals" "--templates" "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/include/thrift/templates" "--gen" "mstch_cpp2:include_prefix=\"hbase\",process_in_event_base,stack_arguments" "-o" "." "./hbase.thrift"
DEPENDS hbase.thrift
)
include_directories(AFTER ${CMAKE_HOME_DIRECTORY}/src/kvstore/plugins/hbase)

add_library(
hbase_thrift_obj OBJECT
gen-cpp2/THBaseService.cpp
gen-cpp2/THBaseServiceAsyncClient.cpp
gen-cpp2/THBaseService_processmap_binary.cpp
gen-cpp2/THBaseService_processmap_compact.cpp
gen-cpp2/hbase_constants.cpp
gen-cpp2/hbase_data.cpp
gen-cpp2/hbase_types.cpp
)
add_dependencies(hbase_thrift_obj common_thrift_obj tgt_fbthrift)

add_library(
hbasestore_obj OBJECT
HBaseStore.cpp
HBaseClient.cpp
)
add_dependencies(hbasestore_obj hbase_thrift_obj base_obj meta_client)

add_subdirectory(test)

0 comments on commit e15c065

Please sign in to comment.
You can’t perform that action at this time.