Skip to content

Commit

Permalink
Fix 4721 when fetch tablet ls tablet seq
Browse files Browse the repository at this point in the history
  • Loading branch information
Hongqin-Li authored and ob-robot committed Nov 16, 2023
1 parent d9160ad commit 7e1e373
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 29 deletions.
1 change: 1 addition & 0 deletions mittest/simple_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ ob_unittest_observer(test_transfer_lock_info_operator storage_ha/test_transfer_l
ob_unittest_observer(test_mds_recover test_mds_recover.cpp)
ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp)
ob_unittest_observer(test_ls_replica test_ls_replica.cpp)
ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp)
# TODO(muwei.ym): open later
ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp)
ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp)
Expand Down
216 changes: 216 additions & 0 deletions mittest/simple_server/test_tablet_autoinc_mgr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <gmock/gmock.h>

#define USING_LOG_PREFIX SHARE
#define protected public
#define private public

#include "env/ob_simple_server_restart_helper.h"
#include "env/ob_simple_cluster_test_base.h"
#include "storage_ha/test_transfer_common_fun.h"
#include "lib/ob_errno.h"
#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService
#include "share/transfer/ob_transfer_task_operator.h" // ObTransferTaskOperator
#include "share/location_cache/ob_location_service.h" // ObLocationService
#include "share/ob_tablet_autoincrement_service.h"
#include "storage/high_availability/ob_transfer_handler.h" //ObTransferHandler
#include "lib/utility/utility.h"
#include "storage/ls/ob_ls_tablet_service.h"
#include "storage/ls/ob_ls.h"
#include "storage/tablet/ob_tablet.h"
#include "storage/tx_storage/ob_ls_service.h"

namespace oceanbase
{
using namespace unittest;
using rootserver::ObTenantTransferService;
namespace share
{
using namespace common;

static const int64_t TOTAL_NUM = 110;
static uint64_t g_tenant_id;
static ObTransferPartList g_part_list;
static ObSEArray<ObTabletLSPair, TOTAL_NUM> g_tablet_ls_pairs;

class TestTabletAutoincMgr : public unittest::ObSimpleClusterTestBase
{
public:
TestTabletAutoincMgr() : unittest::ObSimpleClusterTestBase("test_tablet_autoinc_mgr") {}
int prepare_tablet_ls_pairs(ObMySQLProxy &sql_proxy, const char *table_name, ObIArray<ObTabletLSPair> &tablet_ls_pairs);
int prepare_part_list(ObMySQLProxy &sql_proxy, const char *table_name, ObTransferPartList &part_list);
};

int TestTabletAutoincMgr::prepare_tablet_ls_pairs(
ObMySQLProxy &sql_proxy,
const char *table_name,
ObIArray<ObTabletLSPair> &tablet_ls_pairs)
{
int ret = OB_SUCCESS;
tablet_ls_pairs.reset();
ObSqlString sql;
int64_t affected_rows = 0;
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql.assign_fmt("select TABLET_ID, LS_ID from oceanbase.__all_tablet_to_ls where table_id in (select table_id from oceanbase.__all_table where table_id = (select table_id from oceanbase.__all_table where table_name = '%s') union select table_id from oceanbase.__all_table where data_table_id = (select table_id from oceanbase.__all_table where table_name = '%s')) order by TABLET_ID", table_name, table_name))) {
} else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) {
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null result", KR(ret), K(sql));
} else {
sqlclient::ObMySQLResult &res = *result.get_result();
uint64_t tablet_id = ObTabletID::INVALID_TABLET_ID;
int64_t ls_id = ObLSID::INVALID_LS_ID;
while(OB_SUCC(ret) && OB_SUCC(res.next())) {
EXTRACT_INT_FIELD_MYSQL(res, "TABLET_ID", tablet_id, uint64_t);
EXTRACT_INT_FIELD_MYSQL(res, "LS_ID", ls_id, int64_t);
if (OB_FAIL(tablet_ls_pairs.push_back(ObTabletLSPair(tablet_id, ls_id)))) {}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to generate data", K(sql));
}
}
}
return ret;
}

int TestTabletAutoincMgr::prepare_part_list(
ObMySQLProxy &sql_proxy,
const char *table_name,
ObTransferPartList &part_list)
{
int ret = OB_SUCCESS;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
if (OB_FAIL(sql.assign_fmt("select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='%s'", table_name))) {
} else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) {
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
} else {
sqlclient::ObMySQLResult &res = *result.get_result();
uint64_t table_id = OB_INVALID_ID;
uint64_t part_id = OB_INVALID_ID;
int64_t part_count = 0;
if (OB_SUCC(ret) && OB_SUCC(res.next())) {
EXTRACT_INT_FIELD_MYSQL(res, "object_id", table_id, uint64_t);
}
while(OB_SUCC(ret)) {
part_id = OB_INVALID_ID;
ObTransferPartInfo part_info;
if (OB_SUCC(res.next())) {
++part_count;
EXTRACT_INT_FIELD_MYSQL(res, "object_id", part_id, uint64_t);
if (OB_FAIL(part_info.init(table_id, part_id))) {
} else if (OB_FAIL(part_list.push_back(part_info))) {
}
}
}
if (OB_ITER_END == ret) {
if (0 == part_count && OB_INVALID_ID != table_id && OB_INVALID_ID == part_id) {
ObTransferPartInfo part_info(table_id, 0);
(void)part_list.push_back(part_info);
}
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to generate data", K(sql));
}
LOG_INFO("finish read sql", K(sql), K(part_list), K(table_id), K(part_id));
}
}
return ret;
}

TEST_F(TestTabletAutoincMgr, test_lob_tablet_autoinc_location_cache)
{
g_tenant_id = OB_INVALID_TENANT_ID;

ASSERT_EQ(OB_SUCCESS, create_tenant());
ASSERT_EQ(OB_SUCCESS, get_tenant_id(g_tenant_id));
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
ObSqlString sql;
int64_t affected_rows = 0;

// create table and prepare basic info
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table t1(c1 int, c2 longtext)"));
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
ASSERT_EQ(OB_SUCCESS, prepare_tablet_ls_pairs(sql_proxy, "t1", g_tablet_ls_pairs));
ASSERT_EQ(OB_SUCCESS, prepare_part_list(sql_proxy, "t1", g_part_list));
ASSERT_EQ(1, g_part_list.count());
ASSERT_EQ(3, g_tablet_ls_pairs.count());

// refresh tablet ls cache
ObLocationService *location_service = GCTX.location_service_;
ASSERT_TRUE(OB_NOT_NULL(location_service));
ObTabletLSService *tablet_ls_service = &(location_service->tablet_ls_service_);
ObLSLocationService *ls_location_service = &(location_service->ls_location_service_);
bool is_cache_hit = false;
ObArray<ObTabletLSCache> old_tablet_ls_cache;
for (int64_t i = 0; i < g_tablet_ls_pairs.count(); i++) {
const ObTabletID &tablet_id = g_tablet_ls_pairs.at(i).tablet_id_;
ObLSID ls_id;
ObTabletLSCache tablet_ls_cache;
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get(g_tenant_id, tablet_id, INT64_MAX, is_cache_hit, ls_id));
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->get_from_cache_(g_tenant_id, tablet_id, tablet_ls_cache));
ASSERT_EQ(OB_SUCCESS, old_tablet_ls_cache.push_back(tablet_ls_cache));
}

// create other ls by cluster table
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("create table dup_table(c1 int) duplicate_scope = 'CLUSTER' partition by hash(c1) partitions 4"));
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));

share::ObTenantSwitchGuard tenant_guard;
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));

// transfer t1 to other ls
ObTransferTaskID task_id;
ObMySQLTransaction trans;
const ObLSID src_ls_id(1001);
const ObLSID dst_ls_id(1002);
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, src_ls_id, dst_ls_id, g_part_list, ObBalanceTaskID(123), task_id));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ObTransferStatus expected_status(ObTransferStatus::COMPLETED);
ObTransferTask task;
ASSERT_EQ(OB_SUCCESS, wait_transfer_task(g_tenant_id, task_id, expected_status, true/*is_from_his*/, inner_sql_proxy, task));
ASSERT_EQ(OB_SUCCESS, task.result_);

// restore old tablet ls cache
for (int64_t i = 0; i < old_tablet_ls_cache.count(); i++) {
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->update_cache_(old_tablet_ls_cache.at(i)));
}

// remove source ls and clear src ls cache
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(src_ls_id, false));
ASSERT_EQ(OB_SUCCESS, ls_location_service->erase_location_(GCONF.cluster_id, g_tenant_id, src_ls_id));

// insert lob
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("insert into t1 values (2, repeat('abcde0123456789', 1000));"));
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
}

} // namespace rootserver
} // namespace oceanbase
int main(int argc, char **argv)
{
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level("WDIAG");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
42 changes: 14 additions & 28 deletions src/share/ob_tablet_autoincrement_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam &param,
obrpc::ObSrvRpcProxy *srv_rpc_proxy = nullptr;
share::ObLocationService *location_service = nullptr;
ObAddr leader_addr;
ObLSID ls_id;
bool is_cache_hit = false;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
Expand All @@ -135,52 +134,39 @@ int ObTabletAutoincMgr::fetch_new_range(const ObTabletAutoincParam &param,
|| OB_ISNULL(location_service = GCTX.location_service_)) {
ret = OB_ERR_SYS;
LOG_WARN("root service or location_cache is null", K(ret), KP(srv_rpc_proxy), KP(location_service));
} else if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, ls_id))) {
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
// try to use location cache first, if the cache is wrong, try force renew.
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
ls_id,
false,/*force_renew*/
leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(ls_id));
} else {
obrpc::ObFetchTabletSeqArg arg;
obrpc::ObFetchTabletSeqRes res;
arg.cache_size_ = MAX(cache_size_, param.auto_increment_cache_size_); // TODO(shuangcan): confirm this
arg.tenant_id_ = param.tenant_id_;
arg.tablet_id_ = tablet_id;
arg.ls_id_ = ls_id;
// arg.ls_id_ will be filled by location_service->get

bool finish = false;
for (int64_t retry_times = 0; OB_SUCC(ret) && !finish; retry_times++) {
if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, 0/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
LOG_WARN("fail to get log stream id", K(ret), K(tablet_id));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
arg.ls_id_,
false,/*force_renew*/
leader_addr))) {
LOG_WARN("get leader failed", K(ret), K(arg.ls_id_));
} else if (OB_FAIL(srv_rpc_proxy->to(leader_addr).fetch_tablet_autoinc_seq_cache(arg, res))) {
LOG_WARN("fail to fetch autoinc cache for tablets", K(ret), K(retry_times), K(arg));
} else {
finish = true;
}
if (OB_FAIL(ret)) {
const bool force_refresh_leader = OB_NOT_MASTER == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret;
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, !is_block_renew_location(ret)/*is_nonblock*/);
if (is_retryable(ret)) {
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
res.reset();
if (OB_FAIL(THIS_WORKER.check_status())) { // overwrite ret
LOG_WARN("failed to check status", K(ret));
} else {
res.reset();
ob_usleep<common::ObWaitEventIds::STORAGE_AUTOINC_FETCH_RETRY_SLEEP>(RETRY_INTERVAL);
}
}
if (OB_SUCC(ret) && force_refresh_leader) {
if (OB_FAIL(location_service->get(param.tenant_id_, tablet_id, INT64_MAX/*expire_renew_time*/, is_cache_hit, arg.ls_id_))) {
LOG_WARN("fail to get log stream id", K(ret), K(ret), K(tablet_id));
} else if (OB_FAIL(location_service->get_leader(GCONF.cluster_id,
param.tenant_id_,
arg.ls_id_,
true/*force_renew*/,
leader_addr))) {
LOG_WARN("force get leader failed", K(ret), K(ret), K(arg.ls_id_));
}
} else {
(void)location_service->renew_tablet_location(param.tenant_id_, tablet_id, ret, true/*is_nonblock*/);
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/share/ob_tablet_autoincrement_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,16 @@ class ObTabletAutoincMgr: public common::LinkHashValue<ObTabletAutoincKey>
}
bool is_retryable(int ret)
{
return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret;
return OB_NOT_MASTER == ret || OB_NOT_INIT == ret || OB_TIMEOUT == ret || OB_EAGAIN == ret || OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_TENANT_NOT_IN_SERVER == ret || OB_LS_LOCATION_NOT_EXIST == ret;
}
bool is_block_renew_location(int ret)
{
return OB_LOCATION_LEADER_NOT_EXIST == ret || OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_NO_READABLE_REPLICA == ret
|| OB_NOT_MASTER == ret || OB_RS_NOT_MASTER == ret || OB_RS_SHUTDOWN == ret || OB_PARTITION_NOT_EXIST == ret || OB_LOCATION_NOT_EXIST == ret
|| OB_PARTITION_IS_STOPPED == ret || OB_SERVER_IS_INIT == ret || OB_SERVER_IS_STOPPING == ret || OB_TENANT_NOT_IN_SERVER == ret
|| OB_TRANS_RPC_TIMEOUT == ret || OB_USE_DUP_FOLLOW_AFTER_DML == ret || OB_TRANS_STMT_NEED_RETRY == ret
|| OB_LS_NOT_EXIST == ret || OB_TABLET_NOT_EXIST == ret || OB_LS_LOCATION_NOT_EXIST == ret || OB_PARTITION_IS_BLOCKED == ret || OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST == ret
|| OB_GET_LOCATION_TIME_OUT == ret;
}
private:
static const int64_t PREFETCH_THRESHOLD = 4;
Expand Down

0 comments on commit 7e1e373

Please sign in to comment.