Skip to content

Commit

Permalink
replace TIMEOUT errno with EAGAIN when replay medium clog
Browse files Browse the repository at this point in the history
  • Loading branch information
yangqise7en authored and ob-robot committed Aug 10, 2023
1 parent 407f8f5 commit dcd54c9
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 80 deletions.
12 changes: 5 additions & 7 deletions src/share/scheduler/ob_dag_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -876,18 +876,18 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
WEAK_BARRIER();

ObDagRecord *dag_record = nullptr;
int hash_ret = OB_SUCCESS;
ObMutexGuard guard(lock_);
WEAK_BARRIER();
const bool is_stop = is_stopped_;

if (OB_NOT_NULL(dag.get_dag_net())) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "dag already belongs to a dag_net", K(ret), K(dag));
} else if (is_stopped_) {
} else if (is_stop) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("dag_net is in stop state, not allowed to add dag", K(ret), K(is_stopped_));
LOG_WARN("dag_net is in stop state, not allowed to add dag", K(ret), K(is_stop));
} else if (is_cancel_) {
ret = OB_CANCELED;
LOG_WARN("dag net is cancel, do not allow to add new dag", K(ret), K(is_cancel_));
Expand Down Expand Up @@ -1000,7 +1000,6 @@ void ObIDagNet::remove_dag_record_(ObDagRecord &dag_record)
int ObIDagNet::erase_dag_from_dag_net(ObIDag &dag)
{
int ret = OB_SUCCESS;
bool found = false;
ObDagRecord *dag_record = nullptr;
ObMutexGuard guard(lock_);
WEAK_BARRIER();
Expand All @@ -1012,14 +1011,13 @@ int ObIDagNet::erase_dag_from_dag_net(ObIDag &dag)
} else if (OB_ISNULL(dag_record)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "dag record should not be NULL", K(ret), KP(this), KP(&dag));
} else if (dag_record->dag_ptr_ != &dag) {
} else if (OB_UNLIKELY(dag_record->dag_ptr_ != &dag)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "dag record has unexpected dag value", K(ret), KP(this), KP(&dag), KPC(dag_record));
} else if (OB_UNLIKELY(ObIDag::is_finish_status(dag_record->dag_status_))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("dag status is invalid when erase", K(ret), KPC(dag_record));
} else {
found = true;
COMMON_LOG(DEBUG, "success to update status", K(ret), KPC(dag_record));
remove_dag_record_(*dag_record);
dag.clear_dag_net();
Expand Down
2 changes: 0 additions & 2 deletions src/storage/compaction/ob_medium_compaction_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ int ObTabletMediumCompactionInfoRecorder::inner_replay_clog(
int ret = OB_SUCCESS;
ObArenaAllocator tmp_allocator;
ObMediumCompactionInfo replay_medium_info;
ObTabletHandle tmp_tablet_handle;
if (OB_FAIL(replay_medium_info.deserialize(tmp_allocator, buf, size, pos))) {
LOG_WARN("failed to deserialize medium compaction info", K(ret));
} else if (!replay_medium_info.cluster_id_equal()
Expand All @@ -252,7 +251,6 @@ int ObTabletMediumCompactionInfoRecorder::inner_replay_clog(
}
}

tmp_tablet_handle.reset();
return ret;
}

Expand Down
2 changes: 0 additions & 2 deletions src/storage/compaction/ob_tablet_merge_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,6 @@ int ObTabletMergeFinishTask::get_merged_sstable(ObTabletMergeCtx &ctx)
int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
{
int ret = OB_SUCCESS;
ObTablet *old_tablet = ctx.tablet_handle_.get_obj();
const ObMergeType merge_type = ctx.param_.merge_type_;

if (OB_UNLIKELY(!ctx.is_valid())) {
Expand All @@ -1256,7 +1255,6 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
is_minor_merge(ctx.param_.merge_type_)/*need_check_sstable*/,
false/*allow_duplicate_sstable*/,
ctx.param_.get_merge_type());
ObTablet *old_tablet = ctx.tablet_handle_.get_obj();
ObTabletHandle new_tablet_handle;
if (ctx.param_.tablet_id_.is_special_merge_tablet()) {
param.multi_version_start_ = 1;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/ls/ob_ls_storage_clog_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ int ObMediumCompactionClogHandler::inner_replay(
} else if (OB_FAIL(handle.get_obj()->replay_medium_compaction_clog(scn, buffer, buffer_size, new_pos))) {
LOG_WARN("failed to replay medium compaction clog", K(ret), K(tablet_id), K(buffer_size), K(new_pos));
}
if (OB_TIMEOUT == ret) {
LOG_INFO("replace timeout errno", KR(ret), K(scn), K(tablet_id));
ret = OB_EAGAIN;
}
return ret;
}

Expand Down
61 changes: 3 additions & 58 deletions src/storage/ls/ob_ls_tablet_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ int ObLSTabletService::replay(
if (OB_TABLET_NOT_EXIST == ret) {
ret = OB_SUCCESS; // TODO (bowen.gbw): unify multi data replay logic
LOG_INFO("tablet does not exist, skip", K(ret), K(replayer_executor));
} else if (OB_TIMEOUT == ret) {
LOG_INFO("replace timeout errno", KR(ret), K(replayer_executor));
ret = OB_EAGAIN;
} else {
LOG_WARN("failed to replay", K(ret), K(replayer_executor));
}
Expand Down Expand Up @@ -585,64 +588,6 @@ int ObLSTabletService::get_tablet_addr(const ObTabletMapKey &key, ObMetaDiskAddr
return ret;
}

int ObLSTabletService::replay_medium_compaction_clog(
const share::SCN &scn,
const char *buf,
const int64_t buf_size,
const int64_t pos)
{
int ret = OB_SUCCESS;
ObTabletID tablet_id;
ObTabletHandle handle;
int64_t new_pos = pos;

if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(buf_size <= pos || pos < 0 || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(buf_size), K(pos));
} else if (OB_FAIL(tablet_id.deserialize(buf, buf_size, new_pos))) {
LOG_WARN("fail to deserialize tablet id", K(ret), K(buf_size), K(pos), K(tablet_id));
} else if (OB_FAIL(direct_get_tablet(tablet_id, handle))) {
if (OB_TABLET_NOT_EXIST == ret) {
LOG_INFO("tablet not exist", K(ret), K(tablet_id));
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get tablet", K(ret), K(tablet_id));
}
} else if (handle.get_obj()->is_empty_shell()) {
LOG_INFO("old tablet is empty shell tablet, should skip this operation", K(ret), "old_tablet", handle.get_obj());
} else if (OB_FAIL(handle.get_obj()->replay_medium_compaction_clog(scn, buf, buf_size, new_pos))) {
LOG_WARN("update tablet storage schema fail", K(ret), K(tablet_id), K(buf_size), K(new_pos));
}
return ret;
}

int ObLSTabletService::replay_update_reserved_snapshot(
const share::SCN &scn,
const char *buf,
const int64_t buf_size,
const int64_t pos)
{
int ret = OB_SUCCESS;
int64_t new_pos = pos;

if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(buf_size <= pos || pos < 0 || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(buf_size), K(pos));
} else if (OB_ISNULL(ls_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is null", K(ret), KPC(ls_));
} else if (OB_FAIL(ls_->replay_reserved_snapshot_log(scn, buf, buf_size, new_pos))) {
LOG_WARN("replay reserved snapshot log fail", K(ret), KPC(ls_), K(buf_size), K(new_pos));
}
return ret;
}

void ObLSTabletService::report_tablet_to_rs(const common::ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
Expand Down
11 changes: 0 additions & 11 deletions src/storage/ls/ob_ls_tablet_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,6 @@ class ObLSTabletService : public logservice::ObIReplaySubHandler,
int get_bf_optimal_prefix(int64_t &prefix);
int64_t get_tablet_count() const;

int replay_medium_compaction_clog(
const share::SCN &scn,
const char *buf,
const int64_t buf_size,
const int64_t pos);
int replay_update_reserved_snapshot(
const share::SCN &scn,
const char *buf,
const int64_t buf_size,
const int64_t pos);

// update tablet
int update_tablet_checkpoint(
const ObTabletMapKey &key,
Expand Down

0 comments on commit dcd54c9

Please sign in to comment.