Skip to content

Commit

Permalink
[CP] [PARALLEL_DDL] Add non-partitioned table's tablet placement cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tino247 authored and ob-robot committed Oct 10, 2023
1 parent 4f83054 commit b8bf17d
Show file tree
Hide file tree
Showing 12 changed files with 715 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/rootserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ ob_set_subtarget(ob_rootserver parallel_ddl
parallel_ddl/ob_create_table_helper.cpp
parallel_ddl/ob_create_view_helper.cpp
parallel_ddl/ob_index_name_checker.cpp
parallel_ddl/ob_tablet_balance_allocator.cpp
)

ob_set_subtarget(ob_rootserver freeze
Expand Down
4 changes: 3 additions & 1 deletion src/rootserver/balance/ob_balance_group_define.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ using namespace share::schema;
namespace rootserver
{

const char* ObBalanceGroup::NON_PART_BG_NAME = "NON_PART_TABLE";

int ObBalanceGroup::init_by_tablegroup(const ObSimpleTablegroupSchema &tg,
const int64_t max_part_level,
const int64_t part_group_index/* = 0*/)
Expand Down Expand Up @@ -85,7 +87,7 @@ int ObBalanceGroup::init_by_table(const ObSimpleTableSchemaV2 &table_schema,
LOG_WARN("table is in tablegroup, should init balance group by tablegroup", KR(ret), K(table_schema));
} else if (PARTITION_LEVEL_ZERO == part_level) {
// All tenant's non-partition table is a balance group
if (OB_FAIL(bg_name_str.append_fmt("NON_PART_TABLE"))) {
if (OB_FAIL(bg_name_str.append_fmt("%s", NON_PART_BG_NAME))) {
LOG_WARN("fail to append fmt", KR(ret), K(table_schema));
} else {
id_ = ObBalanceGroupID(0, 0);
Expand Down
3 changes: 2 additions & 1 deletion src/rootserver/balance/ob_balance_group_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class ObBalanceGroup
}

TO_STRING_KV(K_(id), K_(name));

public:
const static char* NON_PART_BG_NAME;
private:
ObBalanceGroupID id_;
ObBalanceGroupName name_;
Expand Down
139 changes: 131 additions & 8 deletions src/rootserver/ob_balance_group_ls_stat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "share/ls/ob_ls_table_operator.h" // ObLSTableOperator
#include "share/location_cache/ob_location_service.h" // ObLocationService
#include "share/ob_rpc_struct.h" // ObCreateDupLSArg & ObCreateDupLSResult
#include "rootserver/ob_root_service.h"
#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h"

namespace oceanbase
{
Expand Down Expand Up @@ -338,6 +340,46 @@ int ObBalanceGroupLSStatOperator::insert_update_balance_group_ls_stat(
return ret;
}

int ObBalanceGroupLSStatOperator::inc_balance_group_ls_stat(
const int64_t timeout,
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObBalanceGroupLSStat &ls_stat)
{
int ret = OB_SUCCESS;
common::ObTimeoutCtx timeout_ctx;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(
timeout <= 0
|| OB_INVALID_TENANT_ID == tenant_id
|| !ls_stat.get_balance_group_id().is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_stat));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(
timeout_ctx, timeout))) {
LOG_WARN("fail to set timeout", KR(ret), K(timeout));
} else {
const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
common::ObSqlString inc_sql;
int64_t affected_rows = 0;
if (OB_FAIL(generate_inc_sql_(ls_stat, inc_sql))) {
LOG_WARN("fail to generate inc sql", KR(ret),
K(tenant_id), K(ls_stat), K(inc_sql));
} else if (OB_FAIL(sql_client.write(
sql_tenant_id, inc_sql.ptr(), affected_rows))) {
LOG_WARN("fail to insert update", KR(ret),
K(tenant_id), K(inc_sql));
} else if (OB_UNLIKELY(affected_rows > 2)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected affected rows", KR(ret),
K(tenant_id), K(inc_sql), K(affected_rows));
}
}
return ret;
}

int ObBalanceGroupLSStatOperator::delete_balance_group_ls_stat(
const int64_t timeout,
common::ObISQLClient &sql_client,
Expand All @@ -357,6 +399,46 @@ int ObBalanceGroupLSStatOperator::delete_balance_group_ls_stat(
return ret;
}

int ObBalanceGroupLSStatOperator::generate_inc_sql_(
const ObBalanceGroupLSStat &bg_ls_stat,
common::ObSqlString &sql_string)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(!bg_ls_stat.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(bg_ls_stat));
} else if (OB_FAIL(sql_string.append_fmt(
"INSERT INTO %s ("
"tenant_id, "
"balance_group_id_high, "
"balance_group_id_low, "
"ls_id, "
"tablet_group_count, "
"balance_group_name)"
" VALUES ("
"%ld, %ld, %ld, %ld, %ld, '%s') "
"ON DUPLICATE KEY UPDATE "
"tablet_group_count = tablet_group_count + %ld, "
"balance_group_name = '%s'",
OB_ALL_BALANCE_GROUP_LS_STAT_TNAME,
bg_ls_stat.get_tenant_id(),
bg_ls_stat.get_balance_group_id().id_high_,
bg_ls_stat.get_balance_group_id().id_low_,
bg_ls_stat.get_ls_id().id(),
bg_ls_stat.get_tablet_group_count(),
to_cstring(ObHexEscapeSqlStr(bg_ls_stat.get_balance_group_name().str())),
bg_ls_stat.get_tablet_group_count(),
to_cstring(ObHexEscapeSqlStr(bg_ls_stat.get_balance_group_name().str()))))) {
LOG_WARN("fail to append fmt", KR(ret), K(bg_ls_stat));
} else {
LOG_INFO("balance group ls inc sql", K(sql_string));
}
return ret;
}

int ObBalanceGroupLSStatOperator::generate_insert_update_sql(
const ObBalanceGroupLSStat &bg_ls_stat,
common::ObSqlString &sql_string)
Expand Down Expand Up @@ -402,15 +484,17 @@ int ObBalanceGroupLSStatOperator::generate_insert_update_sql(
ObNewTableTabletAllocator::ObNewTableTabletAllocator(
const uint64_t tenant_id,
share::schema::ObSchemaGetterGuard &schema_guard,
common::ObMySQLProxy *sql_proxy)
common::ObMySQLProxy *sql_proxy,
const bool use_parallel_ddl /*= false*/)
: tenant_id_(tenant_id),
schema_guard_(schema_guard),
sql_proxy_(sql_proxy),
bg_ls_stat_operator_(),
status_(MyStatus::INVALID),
ls_id_array_(),
inited_(false),
is_add_partition_(false)
is_add_partition_(false),
use_parallel_ddl_(use_parallel_ddl)
{
}

Expand Down Expand Up @@ -492,10 +576,10 @@ int ObNewTableTabletAllocator::prepare(
// If ls status is not normal or is blocking tablet in, choose new ls for tablet creating.
if (OB_FAIL(ret)) {
} else if (is_related_table(table_schema.get_table_type(), table_schema.get_index_type())) {
// skip lock ls
// skip lock ls
} else if (OB_FAIL(check_and_replace_ls_(trans, table_schema.get_tenant_id()))) {
LOG_WARN("lock user ls failed", KR(ret),
"tenant_id", table_schema.get_tenant_id(), K_(ls_id_array));
LOG_WARN("lock user ls failed", KR(ret),
"tenant_id", table_schema.get_tenant_id(), K_(ls_id_array));
}
}

Expand Down Expand Up @@ -637,7 +721,7 @@ int ObNewTableTabletAllocator::get_available_ls(
if (ls_attr.ls_is_normal()
&& SYS_LS != ls_attr.get_ls_id()
&& !ls_attr.get_ls_flag().is_block_tablet_in()
&& !ls_attr.get_ls_flag().is_duplicate_ls()) {
&& !ls_attr.get_ls_flag().is_duplicate_ls()) {
if (OB_FAIL(ls_id_array.push_back(ls_attr.get_ls_id()))) {
LOG_WARN("fail to push back", KR(ret), K(ls_attr), K(ls_id_array));
}
Expand Down Expand Up @@ -987,6 +1071,39 @@ int ObNewTableTabletAllocator::alloc_tablet_for_non_partitioned_balance_group(
return ret;
}

int ObNewTableTabletAllocator::alloc_tablet_for_non_partitioned_balance_group_by_cache_(
const share::schema::ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
LOG_INFO("alloc tablet for non partitioned balance group by cache",
"tenant_id", table_schema.get_tenant_id(),
"table_id", table_schema.get_table_id());
common::ObArray<share::ObLSID> ls_id_array;
share::ObLSID ls_id;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObNewTableTabletAllocator not init", KR(ret));
} else if (OB_UNLIKELY(PARTITION_LEVEL_ZERO != table_schema.get_part_level())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret),
"part_num", table_schema.get_all_part_num(),
"part_level", table_schema.get_part_level(),
K(table_schema));
} else if (OB_FAIL(get_available_ls(ls_id_array))) {
LOG_WARN("fail to get available ls", KR(ret));
} else if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice is null", KR(ret));
} else if (OB_FAIL(GCTX.root_service_->get_ddl_service()
.get_non_partitioned_tablet_allocator()
.alloc_tablet(tenant_id_, ls_id_array, ls_id))) {
LOG_WARN("fail to alloc tablet by cache", KR(ret), K_(tenant_id));
} else if (OB_FAIL(ls_id_array_.push_back(ls_id))) {
LOG_WARN("fail to push back ls id", KR(ret), K_(tenant_id), K(ls_id));
}
return ret;
}

int ObNewTableTabletAllocator::alloc_tablet_for_partitioned_balance_group(
const share::schema::ObTableSchema &table_schema)
{
Expand Down Expand Up @@ -1036,8 +1153,14 @@ int ObNewTableTabletAllocator::alloc_tablet_by_count_balance(
}
}
} else if (PARTITION_LEVEL_ZERO == table_schema.get_part_level()) {
if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group(table_schema))) {
LOG_WARN("fail to alloc tablet by non partitioned balance group", KR(ret));
if (!use_parallel_ddl_) {
if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group(table_schema))) {
LOG_WARN("fail to alloc tablet by non partitioned balance group", KR(ret));
}
} else {
if (OB_FAIL(alloc_tablet_for_non_partitioned_balance_group_by_cache_(table_schema))) {
LOG_WARN("fail to alloc tablet by non partitioned balance group by cache", KR(ret));
}
}
} else {
if (OB_FAIL(alloc_tablet_for_partitioned_balance_group(table_schema))) {
Expand Down
14 changes: 13 additions & 1 deletion src/rootserver/ob_balance_group_ls_stat_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ class ObBalanceGroupLSStatOperator
const uint64_t tenant_id,
const ObBalanceGroupID &balance_group_id,
const common::ObIArray<ObBalanceGroupLSStat> &balance_group_ls_stat_array);
int inc_balance_group_ls_stat(
const int64_t timeout_abs,
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObBalanceGroupLSStat &ls_stat);
int delete_balance_group_ls_stat(
const int64_t timeout,
common::ObISQLClient &sql_client,
Expand All @@ -138,6 +143,9 @@ class ObBalanceGroupLSStatOperator
int generate_insert_update_sql(
const ObBalanceGroupLSStat &bg_ls_stat,
common::ObSqlString &sql_string);
int generate_inc_sql_(
const ObBalanceGroupLSStat &bg_ls_stat,
common::ObSqlString &sql_string);
private:
bool inited_;
common::ObMySQLProxy *sql_proxy_;
Expand All @@ -151,7 +159,8 @@ class ObNewTableTabletAllocator
ObNewTableTabletAllocator(
const uint64_t tenant_id,
share::schema::ObSchemaGetterGuard &schema_guard,
common::ObMySQLProxy *sql_proxy);
common::ObMySQLProxy *sql_proxy,
const bool use_parallel_ddl = false);
virtual ~ObNewTableTabletAllocator();
public:
int init();
Expand Down Expand Up @@ -197,6 +206,8 @@ class ObNewTableTabletAllocator
const share::schema::ObTableSchema &table_schema);
int alloc_tablet_for_non_partitioned_balance_group(
const share::schema::ObTableSchema &table_schema);
int alloc_tablet_for_non_partitioned_balance_group_by_cache_(
const share::schema::ObTableSchema &table_schema);
int alloc_tablet_for_partitioned_balance_group(
const share::schema::ObTableSchema &table_schema);
int alloc_tablet_for_one_level_partitioned_balance_group(
Expand Down Expand Up @@ -262,6 +273,7 @@ class ObNewTableTabletAllocator
bool inited_;
bool is_add_partition_;
static int64_t alloc_tablet_ls_offset_;
bool use_parallel_ddl_;
};

}//end namespace rootserver
Expand Down
38 changes: 30 additions & 8 deletions src/rootserver/ob_ddl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ ObDDLService::ObDDLService()
unit_mgr_(NULL),
snapshot_mgr_(NULL),
ddl_lock_(),
index_name_checker_()
index_name_checker_(),
non_partitioned_tablet_allocator_()
{
}

Expand All @@ -197,6 +198,8 @@ int ObDDLService::init(obrpc::ObSrvRpcProxy &rpc_proxy,
LOG_WARN("init twice", KR(ret));
} else if (OB_FAIL(index_name_checker_.init(sql_proxy))) {
LOG_WARN("fail to init index name checker", KR(ret));
} else if (OB_FAIL(non_partitioned_tablet_allocator_.init(sql_proxy))) {
LOG_WARN("fail to init non partitioned tablet allocator", KR(ret));
} else {
rpc_proxy_ = &rpc_proxy;
common_rpc_ = &common_rpc;
Expand Down Expand Up @@ -24996,6 +24999,13 @@ int ObDDLService::drop_tenant(const ObDropTenantArg &arg)
LOG_WARN("delete_recycle_object failed", KR(ret), KPC(tenant_schema));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(reset_parallel_cache(meta_tenant_id))) {
LOG_WARN("fail to reset parallel cache", KR(ret), K(meta_tenant_id));
} else if (OB_FAIL(reset_parallel_cache(user_tenant_id))) {
LOG_WARN("fail to reset parallel cache", KR(ret), K(user_tenant_id));
}
}
} else {// put tenant into recyclebin
ObTenantSchema new_tenant_schema;
ObSqlString new_tenant_name;
Expand Down Expand Up @@ -32897,6 +32907,22 @@ ObDDLSQLTransaction::~ObDDLSQLTransaction()
}
}

int ObDDLService::reset_parallel_cache(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(index_name_checker_.reset_cache(tenant_id))) {
ret = OB_FAIL(ret) ? ret : tmp_ret;
LOG_ERROR("reset cache failed", KR(tmp_ret), KR(ret), K(tenant_id));
}

if (OB_TMP_FAIL(non_partitioned_tablet_allocator_.reset_cache(tenant_id))) {
ret = OB_FAIL(ret) ? ret : tmp_ret;
LOG_ERROR("reset cache failed", KR(tmp_ret), KR(ret), K(tenant_id));
}
return ret;
}

/*
* @description:
* start transaction for DDL, lock and check schema has refreshed
Expand Down Expand Up @@ -33011,13 +33037,9 @@ int ObDDLSQLTransaction::end(const bool commit)
if (OB_ISNULL(GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("root_service is null", KR(ret));
} else {
ObIndexNameChecker &checker = GCTX.root_service_
->get_ddl_service()
.get_index_name_checker();
if (OB_FAIL(checker.reset_cache(tenant_id_))) {
LOG_ERROR("reset cache failed", KR(ret), K(tenant_id_));
}
} else if (OB_FAIL(GCTX.root_service_->get_ddl_service()
.reset_parallel_cache(tenant_id_))) {
LOG_WARN("fail to reset parallel cache", KR(ret), K_(tenant_id));
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/rootserver/ob_ddl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "common/ob_common_utility.h"
#include "share/config/ob_config.h" // ObConfigPairs
#include "rootserver/parallel_ddl/ob_index_name_checker.h"
#include "rootserver/parallel_ddl/ob_tablet_balance_allocator.h"

namespace oceanbase
{
Expand Down Expand Up @@ -125,6 +126,10 @@ class ObDDLService
ObSnapshotInfoManager &get_snapshot_mgr() { return *snapshot_mgr_; }
share::ObLSTableOperator &get_lst_operator() { return *lst_operator_; }
share::schema::ObIndexNameChecker &get_index_name_checker() { return index_name_checker_; }
share::schema::ObNonPartitionedTableTabletAllocator &get_non_partitioned_tablet_allocator()
{
return non_partitioned_tablet_allocator_;
}

// create_index_table will fill table_id and frozen_version to table_schema
virtual int create_index_table(const obrpc::ObCreateIndexArg &arg,
Expand Down Expand Up @@ -1121,6 +1126,7 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard
share::schema::ObSchemaGetterGuard &schema_guard,
share::schema::ObTableSchema &schema);

int reset_parallel_cache(const uint64_t tenant_id);
private:
enum PartitionBornMethod : int64_t
{
Expand Down Expand Up @@ -2571,6 +2577,7 @@ int check_table_udt_id_is_exist(share::schema::ObSchemaGetterGuard &schema_guard

// for paralled ddl to cache oracle's index name map
share::schema::ObIndexNameChecker index_name_checker_;
share::schema::ObNonPartitionedTableTabletAllocator non_partitioned_tablet_allocator_;
private:
DISALLOW_COPY_AND_ASSIGN(ObDDLService);
};
Expand Down

0 comments on commit b8bf17d

Please sign in to comment.