Skip to content

Commit

Permalink
[CP] [SCHEMA] Fix memory expansion problem while refresh full schema
Browse files Browse the repository at this point in the history
  • Loading branch information
tino247 authored and ob-robot committed Sep 12, 2023
1 parent 7e4e5ae commit 834cf35
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 195 deletions.
7 changes: 5 additions & 2 deletions src/share/schema/ob_multi_version_schema_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1812,10 +1812,13 @@ int ObMultiVersionSchemaService::broadcast_tenant_schema(
LOG_INFO("add sys table schema", KR(ret), K(tenant_id), KPC(table_schema));
}
}
ObArray<ObSimpleTableSchemaV2> simple_table_schemas;
ObArenaAllocator allocator("BroFullSchema");
ObArray<ObSimpleTableSchemaV2*> simple_table_schemas(
common::OB_MALLOC_NORMAL_BLOCK_SIZE,
common::ModulePageAllocator(allocator));
ObSchemaMgr *schema_mgr_for_cache = NULL;
const bool refresh_full_schema = true;
if (FAILEDx(convert_to_simple_schema(table_schemas, simple_table_schemas))) {
if (FAILEDx(convert_to_simple_schema(allocator, table_schemas, simple_table_schemas))) {
LOG_WARN("failed to convert", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(
tenant_id, schema_mgr_for_cache))) {
Expand Down
23 changes: 9 additions & 14 deletions src/share/schema/ob_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1401,37 +1401,32 @@ int ObSchemaFetcher::fetch_table_schema(const ObRefreshSchemaStatus &schema_stat
int ret = OB_SUCCESS;
table_schema = NULL;

ObSimpleTableSchemaV2 *tmp_table_schema = NULL;
SchemaKey table_schema_key;
table_schema_key.tenant_id_ = schema_status.tenant_id_;
table_schema_key.table_id_ = table_id;
ObArray<SchemaKey> schema_keys;
ObArray<ObSimpleTableSchemaV2> schema_array;
ObArray<ObSimpleTableSchemaV2 *> schema_array;
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("inner stat error", K(ret));
LOG_WARN("inner stat error", KR(ret));
} else if (OB_INVALID_ID == table_id || schema_version < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(table_id), K(schema_version));
LOG_WARN("invalid argument", KR(ret), K(table_id), K(schema_version));
} else if (OB_FAIL(schema_keys.push_back(table_schema_key))) {
LOG_WARN("fail to push back schema key", K(ret), K(table_id), K(schema_version));
LOG_WARN("fail to push back schema key", KR(ret), K(table_id), K(schema_version));
} else if (OB_FAIL(schema_service_->get_batch_tables(schema_status,
*sql_client_,
allocator,
schema_version,
schema_keys,
schema_array))) {
LOG_WARN("get table schema failed", K(ret), K(table_id), K(schema_version));
LOG_WARN("get table schema failed", KR(ret), K(table_id), K(schema_version));
} else if (OB_UNLIKELY(1 != schema_array.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected schema count", K(ret), K(table_id), K(schema_version));
} else if (OB_FAIL(ObSchemaUtils::alloc_schema(allocator, schema_array.at(0), tmp_table_schema))) {
LOG_WARN("fail to alloc new var", K(ret), K(table_id), K(schema_version));
} else if (OB_ISNULL(tmp_table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is NULL", K(ret), K(table_id), K(schema_version));
LOG_WARN("unexpected schema count", KR(ret), K(table_id), K(schema_version));
} else {
table_schema = tmp_table_schema;
LOG_TRACE("fetch table schema succeed", K(ret), K(table_id), K(schema_version), KPC(table_schema));
table_schema = schema_array.at(0);
LOG_TRACE("fetch table schema succeed", KR(ret), K(table_id), K(schema_version), KPC(table_schema));
}
return ret;
}
Expand Down
86 changes: 51 additions & 35 deletions src/share/schema/ob_schema_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2290,7 +2290,7 @@ int ObSchemaMgr::get_tablegroup_schema(
}

int ObSchemaMgr::add_tables(
const ObIArray<ObSimpleTableSchemaV2> &table_schemas,
const ObIArray<ObSimpleTableSchemaV2 *> &table_schemas,
const bool refresh_full_schema/*= false*/)
{
int ret = OB_SUCCESS;
Expand All @@ -2306,26 +2306,37 @@ int ObSchemaMgr::add_tables(
} else {
bool desc_order = true;
if (OB_SUCC(ret) && table_schemas.count() >= 2) {
// 1. when refresh user simple table schemas, table_schemas will be sorted in desc order by sql.
// 2. when broadcast schema or refresh core/system tables or other situations, table_schemas will be sorted in asc order.
// Because table_infos_ are sorted in asc order, we should also add table in asc order to reduce performance lost.
// Normally, we consider table_schemas are in desc order in most situations.
desc_order = table_schemas.at(0).get_table_id() > table_schemas.at(1).get_table_id();
if (OB_ISNULL(table_schemas.at(0)) || OB_ISNULL(table_schemas.at(1))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table is null", KR(ret), KP(table_schemas.at(0)), K(table_schemas.at(1)));
} else {
// 1. when refresh user simple table schemas, table_schemas will be sorted in desc order by sql.
// 2. when broadcast schema or refresh core/system tables or other situations, table_schemas will be sorted in asc order.
// Because table_infos_ are sorted in asc order, we should also add table in asc order to reduce performance lost.
// Normally, we consider table_schemas are in desc order in most situations.
desc_order = table_schemas.at(0)->get_table_id() > table_schemas.at(1)->get_table_id();
}
}

if (OB_SUCC(ret)) {
if (desc_order) {
for (int64_t i = table_schemas.count() - 1; OB_SUCC(ret) && i >= 0; i--) {
const ObSimpleTableSchemaV2 &table = table_schemas.at(i);
if (OB_FAIL(add_table(table, &cost_array))) {
LOG_WARN("add table failed", KR(ret), K(table));
const ObSimpleTableSchemaV2 *table = table_schemas.at(i);
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table is null", KR(ret), K(i));
} else if (OB_FAIL(add_table(*table, &cost_array))) {
LOG_WARN("add table failed", KR(ret), KPC(table));
}
} // end for
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < table_schemas.count(); i++) {
const ObSimpleTableSchemaV2 &table = table_schemas.at(i);
if (OB_FAIL(add_table(table, &cost_array))) {
LOG_WARN("add table failed", KR(ret), K(table));
const ObSimpleTableSchemaV2 *table = table_schemas.at(i);
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table is null", KR(ret), K(i));
} else if (OB_FAIL(add_table(*table, &cost_array))) {
LOG_WARN("add table failed", KR(ret), KPC(table));
}
} // end for
}
Expand All @@ -2338,7 +2349,7 @@ int ObSchemaMgr::add_tables(
}

int ObSchemaMgr::reserved_mem_for_tables_(
const ObIArray<ObSimpleTableSchemaV2> &table_schemas)
const ObIArray<ObSimpleTableSchemaV2*> &table_schemas)
{
int ret = OB_SUCCESS;
int64_t start_time = ObTimeUtility::current_time();
Expand All @@ -2354,37 +2365,42 @@ int ObSchemaMgr::reserved_mem_for_tables_(
const int64_t OBJECT_SIZE = sizeof(void*);
if (!check_inner_stat()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(table_infos_.reserve(table_cnt))) {
LOG_WARN("fail to reserved array", KR(ret), K(table_cnt));
} else {
//(void) table_id_map_.set_sub_map_mem_size(table_cnt * OBJECT_SIZE);

for (int64_t i = 0; OB_SUCC(ret) && i < table_schemas.count(); i++) {
const ObSimpleTableSchemaV2 &table = table_schemas.at(i);
if (table.is_index_table() || table.is_materialized_view()) {
index_cnt++;
} else if (table.is_aux_vp_table()) {
vp_cnt++;
} else if (table.is_aux_lob_meta_table()) {
lob_meta_cnt++;
} else if (table.is_aux_lob_piece_table()) {
lob_piece_cnt++;
} else if (table.is_user_hidden_table()) {
hidden_table_cnt++;
const ObSimpleTableSchemaV2 *table = table_schemas.at(i);
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table is null", KR(ret), K(i));
} else {
other_table_cnt++;
}
if (table->is_index_table() || table->is_materialized_view()) {
index_cnt++;
} else if (table->is_aux_vp_table()) {
vp_cnt++;
} else if (table->is_aux_lob_meta_table()) {
lob_meta_cnt++;
} else if (table->is_aux_lob_piece_table()) {
lob_piece_cnt++;
} else if (table->is_user_hidden_table()) {
hidden_table_cnt++;
} else {
other_table_cnt++;
}

if ((table.is_table() || table.is_oracle_tmp_table())
&& !table.is_user_hidden_table()) {
fk_cnt += table.get_simple_foreign_key_info_array().count();
}
if ((table->is_table() || table->is_oracle_tmp_table())
&& !table->is_user_hidden_table()) {
fk_cnt += table->get_simple_foreign_key_info_array().count();
}

if ((table.is_table() || table.is_oracle_tmp_table())
&& !table.is_user_hidden_table()
&& !table.is_mysql_tmp_table()) {
cst_cnt += table.get_simple_constraint_info_array().count();
if ((table->is_table() || table->is_oracle_tmp_table())
&& !table->is_user_hidden_table()
&& !table->is_mysql_tmp_table()) {
cst_cnt += table->get_simple_constraint_info_array().count();
}
}
} // end for

Expand Down
4 changes: 2 additions & 2 deletions src/share/schema/ob_schema_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ typedef common::hash::ObPointerHashMap<ObConstraintInfoHashWrapper, ObSimpleCons
int get_tablegroup_ids_in_tenant(const uint64_t tenant_id,
common::ObIArray<uint64_t> &tablegroup_id_array);
// table
int add_tables(const common::ObIArray<ObSimpleTableSchemaV2> &table_schemas,
int add_tables(const common::ObIArray<ObSimpleTableSchemaV2 *> &table_schemas,
const bool refresh_full_schema = false);
int del_tables(const common::ObIArray<ObTenantTableId> &tables);
int add_table(const ObSimpleTableSchemaV2 &table_schema,
Expand Down Expand Up @@ -882,7 +882,7 @@ typedef common::hash::ObPointerHashMap<ObConstraintInfoHashWrapper, ObSimpleCons
int get_table_statistics(ObSchemaStatisticsInfo &schema_info) const;

int reserved_mem_for_tables_(
const common::ObIArray<share::schema::ObSimpleTableSchemaV2> &table_schemas);
const common::ObIArray<share::schema::ObSimpleTableSchemaV2*> &table_schemas);
private:
common::ObArenaAllocator local_allocator_;
common::ObIAllocator &allocator_;
Expand Down
11 changes: 6 additions & 5 deletions src/share/schema/ob_schema_retrieve_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ class ObSchemaRetrieveUtils
ObObj &out_var_obj);

//for batch table
template<typename T>
template<typename T, typename TABLE_SCHEMA>
static int retrieve_table_schema(const uint64_t tenant_id,
const bool check_deleted,
T &result,
common::ObIAllocator &allocator,
common::ObIArray<ObTableSchema *> &table_schema_array);
common::ObIArray<TABLE_SCHEMA *> &table_schema_array);

template<typename TABLE_SCHEMA, typename SCHEMA, typename T>
static int retrieve_schema(const uint64_t tenant_id,
Expand Down Expand Up @@ -359,7 +359,6 @@ class ObSchemaRetrieveUtils
RETRIEVE_SCHEMA_FUNC_DECLARE(user);
RETRIEVE_SCHEMA_FUNC_DECLARE(database);
RETRIEVE_SCHEMA_FUNC_DECLARE(tablegroup);
RETRIEVE_SCHEMA_FUNC_DECLARE(table);
RETRIEVE_SCHEMA_FUNC_DECLARE(outline);
RETRIEVE_SCHEMA_FUNC_DECLARE(db_priv);
RETRIEVE_SCHEMA_FUNC_DECLARE(table_priv);
Expand Down Expand Up @@ -444,7 +443,6 @@ class ObSchemaRetrieveUtils
FILL_SCHEMA_FUNC_DECLARE(user, ObSimpleUserSchema);
FILL_SCHEMA_FUNC_DECLARE(database, ObSimpleDatabaseSchema);
FILL_SCHEMA_FUNC_DECLARE(tablegroup, ObSimpleTablegroupSchema);
FILL_SCHEMA_FUNC_DECLARE(table, ObSimpleTableSchemaV2);
FILL_SCHEMA_FUNC_DECLARE(outline, ObSimpleOutlineSchema);
FILL_SCHEMA_FUNC_DECLARE(routine, ObSimpleRoutineSchema);
FILL_SCHEMA_FUNC_DECLARE(synonym, ObSimpleSynonymSchema);
Expand Down Expand Up @@ -503,7 +501,7 @@ class ObSchemaRetrieveUtils

template<typename T>
static int fill_sys_priv_schema(
const uint64_t tenant_id,
const uint64_t tenant_id,
T &result,
ObSysPriv &schema,
bool &is_deleted,
Expand All @@ -522,6 +520,9 @@ class ObSchemaRetrieveUtils
static int fill_trigger_id(const uint64_t tenant_id, T &result,
uint64_t &trigger_id, bool &is_deleted);
template<typename T>
static int fill_table_schema(const uint64_t tenant_id, const bool check_deleted, T &result,
ObSimpleTableSchemaV2 &table_schema, bool &is_deleted);
template<typename T>
static int fill_table_schema(const uint64_t tenant_id, const bool check_deleted, T &result,
ObTableSchema &table_schema, bool &is_deleted);
template<typename T>
Expand Down
54 changes: 30 additions & 24 deletions src/share/schema/ob_schema_retrieve_utils.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,25 @@ namespace schema
*
*********************************************************************/

template<typename T>
template<typename T, typename TABLE_SCHEMA>
int ObSchemaRetrieveUtils::retrieve_table_schema(
const uint64_t tenant_id,
const bool check_deleted,
T &result,
ObIAllocator &allocator,
ObIArray<ObTableSchema *> &table_schema_array)
ObIArray<TABLE_SCHEMA *> &table_schema_array)
{
int ret = common::OB_SUCCESS;
uint64_t prev_table_id = common::OB_INVALID_ID;
ObArenaAllocator tmp_allocator(ObModIds::OB_TEMP_VARIABLES);
ObTableSchema table_schema(&tmp_allocator);
while (OB_SUCCESS == ret && common::OB_SUCCESS == (ret = result.next())) {
TABLE_SCHEMA table_schema(&tmp_allocator);
while (OB_SUCC(ret) && OB_SUCC(result.next())) {
table_schema.reset();
tmp_allocator.reuse();
bool is_deleted = false;
ObTableSchema *allocated_table_schema = NULL;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(fill_table_schema(tenant_id, check_deleted, result, table_schema, is_deleted))) {
SHARE_SCHEMA_LOG(WARN, "fail to fill table schema. ", K(check_deleted), K(ret));
TABLE_SCHEMA *allocated_table_schema = NULL;
if (OB_FAIL(fill_table_schema(tenant_id, check_deleted, result, table_schema, is_deleted))) {
SHARE_SCHEMA_LOG(WARN, "fail to fill table schema", KR(ret), K(check_deleted));
} else if (table_schema.get_table_id() == prev_table_id) {
ret = common::OB_SUCCESS;
} else if (is_deleted) {
Expand All @@ -88,21 +87,21 @@ int ObSchemaRetrieveUtils::retrieve_table_schema(
"table_name", table_schema.get_table_name(),
"schema_version", table_schema.get_schema_version());
} else if (OB_FAIL(ObSchemaUtils::alloc_schema(allocator, table_schema, allocated_table_schema))) {
SHARE_SCHEMA_LOG(WARN, "alloc_table_schema failed", K(ret));
SHARE_SCHEMA_LOG(WARN, "alloc_table_schema failed", KR(ret));
} else if (OB_FAIL(table_schema_array.push_back(allocated_table_schema))) {
SHARE_SCHEMA_LOG(WARN, "failed to push back", K(ret));
SHARE_SCHEMA_LOG(WARN, "failed to push back", KR(ret));

// free table schema allocated
allocator.free(allocated_table_schema);
allocated_table_schema = NULL;
} else {
SHARE_SCHEMA_LOG(INFO, "retrieve table schema", K(table_schema), K(is_deleted), KR(ret));
SHARE_SCHEMA_LOG(INFO, "retrieve table schema", KR(ret), K(table_schema), K(is_deleted));
}
if (OB_FAIL(ret)) {
SHARE_SCHEMA_LOG(WARN, "retrieve table schema failed",
SHARE_SCHEMA_LOG(WARN, "retrieve table schema failed", KR(ret),
"table_id", table_schema.get_table_id(),
"schema_version", table_schema.get_schema_version(),
K(prev_table_id), K(is_deleted), KR(ret));
K(prev_table_id), K(is_deleted));
}
prev_table_id = table_schema.get_table_id();
}
Expand Down Expand Up @@ -452,8 +451,9 @@ int ObSubPartSchemaRetrieveHelper<TABLE_SCHEMA>::get_table(
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(OB_ISNULL(table) || table->get_table_id() != table_id)) {
ret = common::OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(ERROR, "cannot find table", K(ret), K(table_id), KPC(table));
// may occur when upgrade system tables
ret = common::OB_ENTRY_NOT_EXIST;
SHARE_SCHEMA_LOG(WARN, "cannot find table", K(ret), K(table_id), KPC(table));
}
return ret;
}
Expand Down Expand Up @@ -556,8 +556,9 @@ int ObSchemaRetrieveHelper<TABLE_SCHEMA, SCHEMA>::get_table(
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(OB_ISNULL(table) || table->get_table_id() != table_id)) {
ret = common::OB_ERR_UNEXPECTED;
SHARE_SCHEMA_LOG(ERROR, "cannot find table", K(ret), K(table_id), KPC(table));
// may occur when upgrade system tables
ret = common::OB_ENTRY_NOT_EXIST;
SHARE_SCHEMA_LOG(WARN, "cannot find table", K(ret), K(table_id), KPC(table));
}
return ret;
}
Expand Down Expand Up @@ -1088,8 +1089,11 @@ int ObSchemaRetrieveUtils::fill_temp_table_schema(const uint64_t tenant_id, T &r

template<typename T>
int ObSchemaRetrieveUtils::fill_table_schema(
const uint64_t tenant_id, const bool check_deleted, T &result,
ObTableSchema &table_schema, bool &is_deleted)
const uint64_t tenant_id,
const bool check_deleted,
T &result,
ObTableSchema &table_schema,
bool &is_deleted)
{
int ret = common::OB_SUCCESS;
table_schema.reset();
Expand Down Expand Up @@ -2789,7 +2793,6 @@ int ObSchemaRetrieveUtils::fill_sysvar_schema(const uint64_t tenant_id, T &resul
RETRIEVE_SCHEMA_FUNC_DEFINE(user);
RETRIEVE_SCHEMA_FUNC_DEFINE(database);
RETRIEVE_SCHEMA_FUNC_DEFINE(tablegroup);
RETRIEVE_SCHEMA_FUNC_DEFINE(table);
RETRIEVE_SCHEMA_FUNC_DEFINE(outline);
RETRIEVE_SCHEMA_FUNC_DEFINE(package);
RETRIEVE_SCHEMA_FUNC_DEFINE(trigger);
Expand Down Expand Up @@ -3778,11 +3781,14 @@ int ObSchemaRetrieveUtils::fill_trigger_id(const uint64_t tenant_id, T &result,
}

template<typename T>
int ObSchemaRetrieveUtils::fill_table_schema(const uint64_t tenant_id,
T &result,
ObSimpleTableSchemaV2 &table_schema,
bool &is_deleted)
int ObSchemaRetrieveUtils::fill_table_schema(
const uint64_t tenant_id,
const bool check_deleted,
T &result,
ObSimpleTableSchemaV2 &table_schema,
bool &is_deleted)
{
UNUSED(check_deleted);
int ret = common::OB_SUCCESS;
bool ignore_column_error = false;
table_schema.reset();
Expand Down

0 comments on commit 834cf35

Please sign in to comment.