Skip to content

Commit

Permalink
[FEAT MERGE]
Browse files Browse the repository at this point in the history
Co-authored-by: BinChenn <binchenn.bc@gmail.com>
Co-authored-by: taoshuning <616811991@qq.com>
  • Loading branch information
3 people authored and ob-robot committed Nov 28, 2023
1 parent 58dc315 commit fc7670e
Show file tree
Hide file tree
Showing 38 changed files with 609 additions and 132 deletions.
32 changes: 31 additions & 1 deletion mittest/logservice/env/ob_simple_log_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "share/ob_io_device_helper.h"
#include "share/ob_thread_mgr.h"
#include "logservice/palf/palf_options.h"
#include "share/rpc/ob_batch_processor.h"

namespace oceanbase
{
Expand Down Expand Up @@ -278,6 +279,8 @@ int ObSimpleLogServer::init_network_(const common::ObAddr &addr, const bool is_b
int ret = OB_SUCCESS;
ObNetOptions opts;
opts.rpc_io_cnt_ = 10;
opts.high_prio_rpc_io_cnt_ = 10;
opts.batch_rpc_io_cnt_ = 10;
opts.tcp_user_timeout_ = 10 * 1000 * 1000; // 10s
addr_ = addr;
obrpc::ObRpcNetHandler::CLUSTER_ID = 1;
Expand All @@ -293,6 +296,18 @@ int ObSimpleLogServer::init_network_(const common::ObAddr &addr, const bool is_b
SERVER_LOG(ERROR, "net_ listen failed", K(ret));
} else if (is_bootstrap && OB_FAIL(srv_proxy_.init(transport_))) {
SERVER_LOG(ERROR, "init srv_proxy_ failed");
} else if (is_bootstrap && OB_FAIL(net_.add_high_prio_rpc_listen(addr_.get_port(), handler_, high_prio_rpc_transport_))) {
SERVER_LOG(ERROR, "net_ listen failed", K(ret));
} else if (is_bootstrap && OB_FAIL(net_.batch_rpc_net_register(handler_, batch_rpc_transport_))) {
SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret));
} else if (FALSE_IT(batch_rpc_transport_->set_bucket_count(10))) {
} else if (is_bootstrap && OB_FAIL(batch_rpc_.init(batch_rpc_transport_, high_prio_rpc_transport_, addr))) {
SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret));
// } else if (is_bootstrap && OB_FAIL(TG_SET_RUNNABLE_AND_START(lib::TGDefIDs::BRPC, batch_rpc_))) {
} else if (is_bootstrap && OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::BRPC, batch_rpc_tg_id_))) {
SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret));
} else if (is_bootstrap && OB_FAIL(TG_SET_RUNNABLE_AND_START(batch_rpc_tg_id_, batch_rpc_))) {
SERVER_LOG(ERROR, "batch_rpc_ start failed", K(ret));
} else {
deliver_.node_id_ = node_id_;
SERVER_LOG(INFO, "init_network success", K(ret), K(addr_), K(node_id_), K(opts));
Expand Down Expand Up @@ -395,7 +410,7 @@ int ObSimpleLogServer::init_log_service_()
net_keepalive_ = MTL_NEW(MockNetKeepAliveAdapter, "SimpleLog");

if (OB_FAIL(net_keepalive_->init(&deliver_))) {
} else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &ls_service_,
} else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &batch_rpc_, &ls_service_,
&location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_))) {
SERVER_LOG(ERROR, "init_log_service_ fail", K(ret));
} else if (OB_FAIL(log_block_pool_.create_tenant(opts.disk_options_.log_disk_usage_limit_size_))) {
Expand Down Expand Up @@ -442,6 +457,11 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false)
guard.click("destroy_palf_env");

if (is_shutdown) {
TG_STOP(batch_rpc_tg_id_);
TG_WAIT(batch_rpc_tg_id_);
TG_DESTROY(batch_rpc_tg_id_);
batch_rpc_tg_id_ = -1;

net_.rpc_shutdown();
net_.stop();
net_.wait();
Expand Down Expand Up @@ -734,6 +754,12 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
return OB_SUCCESS;
}
switch (pkt.get_pcode()) {
#define BATCH_RPC_PROCESS() \
ObBatchP p;\
p.init(); \
p.set_ob_request(req);\
p.run();\
break;
#define PROCESS(processer) \
processer p;\
p.init();\
Expand Down Expand Up @@ -833,6 +859,10 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
modify_pkt.set_tenant_id(node_id_);
PROCESS(LogNotifyFetchLogReqP)
}
case obrpc::OB_BATCH: {
modify_pkt.set_tenant_id(node_id_);
BATCH_RPC_PROCESS()
}
default:
SERVER_LOG(ERROR, "invalid req type", K(pkt.get_pcode()));
break;
Expand Down
8 changes: 7 additions & 1 deletion mittest/logservice/env/ob_simple_log_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ class ObSimpleLogServer : public ObISimpleLogServer
public:
ObSimpleLogServer()
: handler_(deliver_),
transport_(NULL)
transport_(NULL),
batch_rpc_transport_(NULL),
high_prio_rpc_transport_(NULL)
{
}
~ObSimpleLogServer()
Expand Down Expand Up @@ -405,6 +407,8 @@ class ObSimpleLogServer : public ObISimpleLogServer
logservice::ObLogService log_service_;
ObTenantMutilAllocator *allocator_;
rpc::frame::ObReqTransport *transport_;
rpc::frame::ObReqTransport *batch_rpc_transport_;
rpc::frame::ObReqTransport *high_prio_rpc_transport_;
ObLSService ls_service_;
ObLocationService location_service_;
MockMetaReporter reporter_;
Expand All @@ -421,6 +425,8 @@ class ObSimpleLogServer : public ObISimpleLogServer
// 内部表中记录日志盘规格
palf::PalfDiskOptions inner_table_disk_opts_;
ObLooper looper_;
obrpc::ObBatchRpc batch_rpc_;
int batch_rpc_tg_id_;
};

} // end unittest
Expand Down
2 changes: 1 addition & 1 deletion mittest/logservice/test_ob_simple_log_basic_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
TEST_F(TestObSimpleLogClusterBasicFunc, submit_log)
{
SET_CASE_LOG_FILE(TEST_NAME, "submit_log");
//OB_LOGGER.set_log_level("TRACE");
OB_LOGGER.set_log_level("TRACE");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t create_ts = 100;
share::SCN create_scn;
Expand Down
11 changes: 7 additions & 4 deletions src/logservice/ob_log_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "share/ob_unit_getter.h"
#include "share/rc/ob_tenant_base.h"
#include "share/rc/ob_tenant_module_init_ctx.h"
#include "share/rpc/ob_batch_rpc.h"
#include "storage/tx_storage/ob_ls_map.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "observer/ob_srv_network_frame.h"
Expand Down Expand Up @@ -99,6 +100,7 @@ int ObLogService::mtl_init(ObLogService* &logservice)
self,
alloc_mgr,
net_frame->get_req_transport(),
GCTX.batch_rpc_,
MTL(ObLSService*),
location_service,
reporter,
Expand Down Expand Up @@ -230,6 +232,7 @@ int ObLogService::init(const PalfOptions &options,
const common::ObAddr &self,
common::ObILogAllocator *alloc_mgr,
rpc::frame::ObReqTransport *transport,
obrpc::ObBatchRpc *batch_rpc,
ObLSService *ls_service,
ObLocationService *location_service,
observer::ObIMetaReport *reporter,
Expand All @@ -246,14 +249,14 @@ int ObLogService::init(const PalfOptions &options,
ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObLogService init twice", K(ret));
} else if (false == options.is_valid() || OB_ISNULL(base_dir) || OB_UNLIKELY(!self.is_valid())
|| OB_ISNULL(alloc_mgr) || OB_ISNULL(transport) || OB_ISNULL(ls_service)
|| OB_ISNULL(alloc_mgr) || OB_ISNULL(transport) || OB_ISNULL(batch_rpc) || OB_ISNULL(ls_service)
|| OB_ISNULL(location_service) || OB_ISNULL(reporter) || OB_ISNULL(log_block_pool)
|| OB_ISNULL(sql_proxy) || OB_ISNULL(net_keepalive_adapter)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", K(ret), K(options), KP(base_dir), K(self),
KP(alloc_mgr), KP(transport), KP(ls_service), KP(location_service), KP(reporter),
KP(alloc_mgr), KP(transport), KP(batch_rpc), KP(ls_service), KP(location_service), KP(reporter),
KP(log_block_pool), KP(sql_proxy), KP(net_keepalive_adapter));
} else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport,
} else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport, batch_rpc,
alloc_mgr, log_block_pool, &monitor_, palf_env_))) {
CLOG_LOG(WARN, "failed to create_palf_env", K(base_dir), K(ret));
} else if (OB_ISNULL(palf_env_)) {
Expand Down Expand Up @@ -287,7 +290,7 @@ int ObLogService::init(const PalfOptions &options,
net_keepalive_adapter_ = net_keepalive_adapter;
self_ = self;
is_inited_ = true;
FLOG_INFO("ObLogService init success", K(ret), K(base_dir), K(self), KP(transport),
FLOG_INFO("ObLogService init success", K(ret), K(base_dir), K(self), KP(transport), KP(batch_rpc),
KP(ls_service), K(tenant_id));
}

Expand Down
5 changes: 5 additions & 0 deletions src/logservice/ob_log_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ namespace frame
class ObReqTransport;
}
}
namespace obrpc
{
class ObBatchRpc;
}

namespace share
{
Expand Down Expand Up @@ -98,6 +102,7 @@ class ObLogService
const common::ObAddr &self,
common::ObILogAllocator *alloc_mgr,
rpc::frame::ObReqTransport *transport,
obrpc::ObBatchRpc *batch_rpc,
storage::ObLSService *ls_service,
share::ObLocationService *location_service,
observer::ObIMetaReport *reporter,
Expand Down
7 changes: 7 additions & 0 deletions src/logservice/palf/log_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ constexpr int LOG_WRITE_FLAG = O_RDWR | O_DIRECT | O_SYNC;
constexpr mode_t FILE_OPEN_MODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
// =========== Disk io end ====================


// =========== BatchRPC start ==================
// NOTE: ORDER AND VALUE ARE VITAL, DO NOT CHANGE
constexpr int64_t LOG_BATCH_PUSH_LOG_REQ = 1;
constexpr int64_t LOG_BATCH_PUSH_LOG_RESP = 2;
// =========== BatchRPC end ==================

const int64_t OB_INVALID_CONFIG_CHANGE_LOCK_OWNER = -1;

enum ObReplicaState {
Expand Down
5 changes: 3 additions & 2 deletions src/logservice/palf/log_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,13 +957,14 @@ int LogEngine::submit_push_log_req(const common::ObAddr &server,

int LogEngine::submit_push_log_resp(const ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &lsn)
const LSN &lsn,
const bool is_batch)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
ret = log_net_service_.submit_push_log_resp(server, msg_proposal_id, lsn);
ret = log_net_service_.submit_push_log_resp(server, msg_proposal_id, lsn, is_batch);
PALF_LOG(TRACE, "submit_push_log_resp success", K(ret), K(server));
}
return ret;
Expand Down
42 changes: 22 additions & 20 deletions src/logservice/palf/log_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,29 +201,29 @@ class LogEngine
const int64_t &prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf)
const LogWriteBuf &write_buf,
const bool need_batch_rpc)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogEngine not init", K(ret), KPC(this));
} else if (OB_FAIL(log_net_service_.submit_push_log_req(member_list,
push_log_type,
msg_proposal_id,
prev_log_proposal_id,
prev_lsn,
curr_lsn,
write_buf))) {
// PALF_LOG(ERROR,
// "LogNetService submit_group_entry_to_memberlist failed",
// K(ret),
// KPC(this),
// K(member_list),
// K(prev_log_proposal_id),
// K(prev_lsn),
// K(prev_log_proposal_id),
// K(curr_lsn),
// K(write_buf));
} else if (!need_batch_rpc
&& OB_FAIL(log_net_service_.submit_push_log_req(member_list,
push_log_type,
msg_proposal_id,
prev_log_proposal_id,
prev_lsn,
curr_lsn,
write_buf))) {
} else if (need_batch_rpc
&& OB_FAIL(log_net_service_.submit_batch_push_log_req(member_list,
push_log_type,
msg_proposal_id,
prev_log_proposal_id,
prev_lsn,
curr_lsn,
write_buf))) {
} else {
PALF_LOG(TRACE,
"submit_group_entry_to_memberlist success",
Expand All @@ -233,7 +233,8 @@ class LogEngine
K(msg_proposal_id),
K(prev_log_proposal_id),
K(prev_lsn),
K(curr_lsn));
K(curr_lsn),
K(need_batch_rpc));
}
return ret;
}
Expand All @@ -251,7 +252,8 @@ class LogEngine
// @param[in] lsn: the offset of log
virtual int submit_push_log_resp(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &lsn);
const LSN &lsn,
const bool is_batch);

template <class List>
int submit_prepare_meta_req_(const List &member_list, const int64_t &log_proposal_id)
Expand Down
9 changes: 6 additions & 3 deletions src/logservice/palf/log_net_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ int LogNetService::submit_committed_info_req(
int LogNetService::submit_push_log_resp(
const ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &lsn)
const LSN &lsn,
const bool is_batch)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
Expand All @@ -129,9 +130,11 @@ int LogNetService::submit_push_log_resp(
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id_),
K(server), K(msg_proposal_id), K(lsn));
} else if (is_batch) {
LogBatchPushResp push_log_resp(msg_proposal_id, lsn);
ret = post_request_to_server_(server, push_log_resp);
} else {
LogPushResp push_log_resp(msg_proposal_id,
lsn);
LogPushResp push_log_resp(msg_proposal_id, lsn);
ret = post_request_to_server_(server, push_log_resp);
}
return ret;
Expand Down
30 changes: 29 additions & 1 deletion src/logservice/palf/log_net_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,33 @@ class LogNetService
return ret;
}

template<class List>
int submit_batch_push_log_req(
const List &member_list,
const PushLogType &push_log_type,
const int64_t &msg_proposal_id,
const int64_t &prev_log_proposal_id,
const LSN &prev_lsn,
const LSN &curr_lsn,
const LogWriteBuf &write_buf)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogNetService has not inited!!!", K(ret));
} else {
LogBatchPushReq push_log_req(push_log_type,
msg_proposal_id,
prev_log_proposal_id,
prev_lsn,
curr_lsn,
write_buf);
ret = post_request_to_member_list_(member_list, push_log_req);
PALF_LOG(TRACE, "post_request_to_member_list_ success", K(member_list), K(push_log_req));
}
return ret;
}

int submit_push_log_req(
const ObAddr &server,
const PushLogType &push_log_type,
Expand All @@ -86,7 +113,8 @@ class LogNetService
int submit_push_log_resp(
const common::ObAddr &server,
const int64_t &msg_proposal_id,
const LSN &lsn);
const LSN &lsn,
const bool is_batch);

template<class List>
int submit_prepare_meta_req(
Expand Down

0 comments on commit fc7670e

Please sign in to comment.