Skip to content

Commit

Permalink
[CP] [OBKV] fix ttl issue 51073384
Browse files Browse the repository at this point in the history
  • Loading branch information
shenyunlong authored and junye committed Sep 11, 2023
1 parent 7a4fcc3 commit 7594790
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 23 deletions.
43 changes: 24 additions & 19 deletions src/observer/table/ob_table_ttl_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int ObTTLManager::scan_all_tenanat_handle_event()
iter != ttl_tenant_parts_map_.end() && OB_SUCC(ret); ++iter) {
tenant_id = iter->first;
tenant_info = iter->second;
if (tenant_info->need_check_ && OB_FAIL(check_tenants.push_back(tenant_id))) {
if (tenant_info->check_status_ == NEED_CHECK && OB_FAIL(check_tenants.push_back(tenant_id))) {
// after observer restart, need check tenant even when cancel and move state
LOG_WARN("fail to push back check tenants", K(ret));
}
Expand Down Expand Up @@ -433,7 +433,7 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
} else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, create_if_not_exists))) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to get ttl tenant info", K(tenant_id), K(create_if_not_exists));
} else if (tenant_info->need_check_) {
} else if (tenant_info->check_status_ == NEED_CHECK) {
ret = OB_EAGAIN;
LOG_INFO("tenant info need check, please resend message later", KPC(tenant_info), K(expected_state));
} else if (OB_UNLIKELY(tenant_info->task_id_ != OB_INVALID_ID && tenant_info->task_id_ != task_id)) {
Expand All @@ -448,7 +448,7 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
tenant_info->task_id_ = task_id;
tenant_info->is_usr_trigger_ = is_usr_trigger;
tenant_info->state_ = expected_state;
tenant_info->need_check_ = true;
tenant_info->check_status_ = NEED_CHECK;
tenant_info->is_dirty_ = true;
if (OB_TTL_TASK_MOVING == expected_state) {
// after restart, rs send moving means all tasks was finished or canceled
Expand All @@ -475,14 +475,14 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
return ret;
}

void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id)
void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id, bool report_error)
{
int ret = OB_SUCCESS;
ObTTLTenantInfo* tenant_info = NULL;
if (common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) {
common::ObSpinLockGuard guard(lock_);
if (OB_NOT_NULL(tenant_info = get_tenant_info(tenant_id, false))) {
tenant_info->need_check_ = true;

ObTenanCheckStatusOp check_stauts_op(NEED_CHECK);
if (OB_FAIL(ttl_tenant_parts_map_.atomic_refactored(tenant_id, check_stauts_op))) {
if (ret != OB_HASH_NOT_EXIST && report_error) {
LOG_WARN("fail to change check tenant status", KR(ret), K(tenant_id));
}
}

Expand All @@ -492,8 +492,6 @@ void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id)
void ObTTLManager::on_leader_active(const ObPartitionKey& pkey)
{
int ret = OB_SUCCESS;
ObTTLPara para;
bool can_ttl = false;
uint64_t tenant_id = pkey.get_tenant_id();
if (!is_init_) {
ret = OB_NOT_INIT;
Expand All @@ -502,12 +500,9 @@ void ObTTLManager::on_leader_active(const ObPartitionKey& pkey)
// do nothing
} else if(OB_SYS_TENANT_ID == tenant_id) {
// do nothing
} else if (!common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) {
//do nothing
} else if (OB_FAIL(check_partition_can_gen_ttl(pkey, para, can_ttl))) {
LOG_WARN("fail to check partition can ttl", K(ret), K(pkey), K(para));
} else if (can_ttl) {
mark_tenant_need_check(tenant_id);
} else {
// not report error to ensure leader active
mark_tenant_need_check(tenant_id, false);
}
}

Expand Down Expand Up @@ -622,14 +617,15 @@ int ObTTLManager::generate_one_partition_task(ObTTLTaskInfo& task_info, ObTTLPar
return ret;
}


void ObTTLManager::mark_tenant_checked(uint64_t tenant_id)
{
common::ObSpinLockGuard guard(lock_);
ObTTLTenantInfo* tenant_info = get_tenant_info(tenant_id, false);
if (OB_ISNULL(tenant_info)) {
LOG_WARN("fail to get ttl tenant info", K(tenant_id));
} else {
tenant_info->need_check_ = false;
tenant_info->check_status_ = NO_NEED_CHECK;
}
}

Expand Down Expand Up @@ -1517,7 +1513,7 @@ int ObTTLManager::check_and_reset_droped_tenant()
LOG_WARN("fail to check tenant exists", K(ret), K(tenant_id));
} else if (tenant_not_exist) {
tenant_info->is_dirty_ = false; // no need to scan and sync sys table
tenant_info->need_check_ = false; // no need to check partitions
tenant_info->check_status_ = NO_NEED_CHECK; // no need to check partitions
tenant_info->rsp_time_ == OB_INVALID_ID; // no need response
tenant_info->state_ = OB_TTL_TASK_INVALID;
tenant_info->is_droped_ = true;
Expand Down Expand Up @@ -1710,3 +1706,12 @@ int ObTTLManager::refresh_partition_task(ObTTLTaskCtx &ttl_task, bool refresh_st

return ret;
}

void ObTTLManager::ObTenanCheckStatusOp::operator()(hash::HashMapPair<int64_t, ObTTLTenantInfo*> &entry)
{
ObTTLTenantInfo *tenant_info = entry.second;
// not report error here to ensure leader active
if (OB_NOT_NULL(tenant_info)) {
tenant_info->check_status_ = target_status_;
}
}
28 changes: 24 additions & 4 deletions src/observer/table/ob_table_ttl_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class ObTTLManager
typedef common::hash::ObHashMap<ObPartitionKey, ObTTLTaskCtx*> PartTasksMap;
typedef PartTasksMap::iterator ttl_parts_iterator;

enum ObTenantCheckStatus
{
NO_NEED_CHECK = 0,
NEED_CHECK = 1
};

struct ObTTLTenantInfo
{
public:
Expand All @@ -102,7 +108,7 @@ class ObTTLManager
tenant_id_(OB_INVALID_ID),
task_id_(OB_INVALID_ID),
is_usr_trigger_(false),
need_check_(false),
check_status_(NO_NEED_CHECK),
is_dirty_(false),
ttl_continue_(true),
cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE),
Expand All @@ -119,7 +125,7 @@ class ObTTLManager
TO_STRING_KV(K_(tenant_id),
K_(task_id),
K_(is_usr_trigger),
K_(need_check),
K_(check_status),
K_(is_dirty),
K_(ttl_continue),
K_(cmd_type),
Expand All @@ -134,7 +140,7 @@ class ObTTLManager
uint64_t tenant_id_;
uint64_t task_id_;
bool is_usr_trigger_;
bool need_check_; /*need scan partition & check*/
ObTenantCheckStatus check_status_; /*need scan partition & check*/
bool is_dirty_; /*need check the current ctx task*/
bool ttl_continue_;
obrpc::ObTTLRequestArg::TTLRequestType cmd_type_;
Expand All @@ -144,6 +150,20 @@ class ObTTLManager
bool is_finished_; // all delete task is finished (or canceled)
};

// atomic tenant check status modification callback
class ObTenanCheckStatusOp
{
public:
void operator()(hash::HashMapPair<int64_t, ObTTLTenantInfo*> &entry);

explicit ObTenanCheckStatusOp(ObTenantCheckStatus target_status)
: target_status_(target_status)
{}

private:
ObTenantCheckStatus target_status_;
};

typedef common::hash::ObHashMap<int64_t, ObTTLTenantInfo*> TenantPartsMap;
typedef TenantPartsMap::iterator ttl_tenants_iterator;

Expand All @@ -162,7 +182,7 @@ class ObTTLManager
int check_partition_can_gen_ttl(const ObPartitionKey& pkey,
ObTTLPara &para, bool& can_ttl);
int check_and_do_rsp(uint64_t tenant_id);
void mark_tenant_need_check(uint64_t tenant_id);
void mark_tenant_need_check(uint64_t tenant_id, bool report_error = true);
void mark_tenant_rsp(uint64_t tenant_id, int64_t rsp_time);
virtual int generate_ttl_dag(ObTTLTaskInfo& task_info, ObTTLPara& para);
int response_ttl_cmd(const uint64_t& tenant_id, const uint64_t& task_id,
Expand Down

0 comments on commit 7594790

Please sign in to comment.