Skip to content

Commit

Permalink
Merge branch 'rickard/thr-prgr-use/OTP-10116' into maint
Browse files Browse the repository at this point in the history
* rickard/thr-prgr-use/OTP-10116:
  Fix faulty use of thread progress in handle_aux_work()
  • Loading branch information
rickard-green committed Jun 20, 2012
2 parents c6b0348 + 2e5c1db commit ba8fb1d
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 33 deletions.
2 changes: 1 addition & 1 deletion erts/emulator/beam/erl_alloc_util.c
Expand Up @@ -949,7 +949,7 @@ ddq_check_incoming(ErtsAllctrDDQueue_t *ddq)
ERTS_THR_MEMORY_BARRIER;
else {
ddq->head.next.unref_end = (ErtsAllctrDDBlock_t *) ilast;
ddq->head.next.thr_progress = erts_thr_progress_later();
ddq->head.next.thr_progress = erts_thr_progress_later(NULL);
erts_atomic32_set_relb(&ddq->tail.data.um_refc_ix,
um_refc_ix);
ddq->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
Expand Down
52 changes: 37 additions & 15 deletions erts/emulator/beam/erl_process.c
Expand Up @@ -899,13 +899,13 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs)
#ifdef ERTS_SMP

static ERTS_INLINE void
thr_prgr_current_reset(ErtsAuxWorkData *awdp)
haw_thr_prgr_current_reset(ErtsAuxWorkData *awdp)
{
awdp->current_thr_prgr = ERTS_THR_PRGR_INVALID;
}

static ERTS_INLINE ErtsThrPrgrVal
thr_prgr_current(ErtsAuxWorkData *awdp)
haw_thr_prgr_current(ErtsAuxWorkData *awdp)
{
ErtsThrPrgrVal current = awdp->current_thr_prgr;
if (current == ERTS_THR_PRGR_INVALID) {
Expand All @@ -915,6 +915,21 @@ thr_prgr_current(ErtsAuxWorkData *awdp)
return current;
}

static ERTS_INLINE void
haw_thr_prgr_current_check_progress(ErtsAuxWorkData *awdp)
{
ErtsThrPrgrVal current = awdp->current_thr_prgr;
if (current != ERTS_THR_PRGR_INVALID
&& !erts_thr_progress_equal(current, erts_thr_progress_current())) {
/*
* We have used a previouly read current value that isn't the
* latest; need to poke ourselfs in order to guarantee no loss
* of wakeups.
*/
erts_sched_poke(awdp->ssi);
}
}

#endif

typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t;
Expand Down Expand Up @@ -1013,7 +1028,7 @@ static ERTS_INLINE erts_aint32_t
handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work)
{
if (!erts_thr_progress_has_reached_this(thr_prgr_current(awdp),
if (!erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->misc.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR;

Expand Down Expand Up @@ -1118,7 +1133,7 @@ handle_async_ready_clean(ErtsAuxWorkData *awdp,

#ifdef ERTS_SMP
if (awdp->async_ready.need_thr_prgr
&& !erts_thr_progress_has_reached_this(thr_prgr_current(awdp),
&& !erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->async_ready.thr_prgr)) {
return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
}
Expand Down Expand Up @@ -1199,7 +1214,7 @@ handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)

if (need_thr_progress) {
if (wakeup == ERTS_THR_PRGR_INVALID)
wakeup = erts_thr_progress_later_than(thr_prgr_current(awdp));
wakeup = erts_thr_progress_later(awdp->esdp);
awdp->dd.thr_prgr = wakeup;
set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD_THR_PRGR);
awdp->dd.thr_prgr = wakeup;
Expand All @@ -1220,7 +1235,7 @@ handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
int need_thr_progress;
int more_work;
ErtsThrPrgrVal wakeup = ERTS_THR_PRGR_INVALID;
ErtsThrPrgrVal current = thr_prgr_current(awdp);
ErtsThrPrgrVal current = haw_thr_prgr_current(awdp);

if (!erts_thr_progress_has_reached_this(current, awdp->dd.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_DD_THR_PRGR;
Expand All @@ -1242,7 +1257,7 @@ handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)

if (need_thr_progress) {
if (wakeup == ERTS_THR_PRGR_INVALID)
wakeup = erts_thr_progress_later_than(current);
wakeup = erts_thr_progress_later(awdp->esdp);
awdp->dd.thr_prgr = wakeup;
erts_thr_progress_wakeup(awdp->esdp, wakeup);
}
Expand Down Expand Up @@ -1431,7 +1446,7 @@ handle_setup_aux_work_timer(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
}

static erts_aint32_t
handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work)
handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work, int waiting)
{
#undef HANDLE_AUX_WORK
#define HANDLE_AUX_WORK(FLG, HNDLR) \
Expand All @@ -1449,7 +1464,7 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work)
erts_aint32_t ignore = 0;

#ifdef ERTS_SMP
thr_prgr_current_reset(awdp);
haw_thr_prgr_current_reset(awdp);
#endif

ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
Expand Down Expand Up @@ -1515,6 +1530,11 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work)

ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);

#ifdef ERTS_SMP
if (waiting && !aux_work)
haw_thr_prgr_current_check_progress(awdp);
#endif

return aux_work;

#undef HANDLE_AUX_WORK
Expand Down Expand Up @@ -1975,7 +1995,7 @@ aux_thread(void *unused)
if (aux_work) {
if (!thr_prgr_active)
erts_thr_progress_active(NULL, thr_prgr_active = 1);
aux_work = handle_aux_work(awdp, aux_work);
aux_work = handle_aux_work(awdp, aux_work, 1);
if (aux_work && erts_thr_progress_update(NULL))
erts_thr_progress_leader_update(NULL);
}
Expand Down Expand Up @@ -2054,7 +2074,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
}
Expand Down Expand Up @@ -2158,7 +2178,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (!thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
#endif
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
#ifdef ERTS_SMP
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
Expand Down Expand Up @@ -4418,7 +4438,9 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
aux_work = handle_aux_work(&esdp->aux_work_data,
aux_work,
1);
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
}
Expand Down Expand Up @@ -6660,7 +6682,7 @@ Process *schedule(Process *p, int calls)
if (leader_update)
erts_thr_progress_leader_update(esdp);
if (aux_work)
handle_aux_work(&esdp->aux_work_data, aux_work);
handle_aux_work(&esdp->aux_work_data, aux_work, 0);
erts_smp_runq_lock(rq);
}
}
Expand All @@ -6673,7 +6695,7 @@ Process *schedule(Process *p, int calls)
erts_aint32_t aux_work;
aux_work = erts_atomic32_read_acqb(&esdp->ssi->aux_work);
if (aux_work)
handle_aux_work(&esdp->aux_work_data, aux_work);
handle_aux_work(&esdp->aux_work_data, aux_work, 0);
}
#endif /* ERTS_SMP */

Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/erl_sched_spec_pre_alloc.c
Expand Up @@ -227,7 +227,7 @@ fetch_remote(erts_sspa_chunk_header_t *chdr, int max)
ERTS_THR_MEMORY_BARRIER;
else {
chdr->head.next.unref_end = (erts_sspa_blk_t *) ilast;
chdr->head.next.thr_progress = erts_thr_progress_later();
chdr->head.next.thr_progress = erts_thr_progress_later(NULL);
erts_atomic32_set_relb(&chdr->tail.data.um_refc_ix,
um_refc_ix);
chdr->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
Expand Down
8 changes: 4 additions & 4 deletions erts/emulator/beam/erl_thr_progress.c
Expand Up @@ -891,16 +891,16 @@ has_reached_wakeup(ErtsThrPrgrVal wakeup)
ErtsThrPrgrVal limit;
/*
* erts_thr_progress_later() returns values which are
* equal to 'current + 2'. That is, users should never
* get a hold of values larger than that.
* equal to 'current + 2', or 'current + 3'. That is, users
* should never get a hold of values larger than that.
*
* That is, valid values are values less than 'current + 3'.
* That is, valid values are values less than 'current + 4'.
*
* Values larger than this won't work with the wakeup
* algorithm.
*/

limit = current + 3;
limit = current + 4;
if (limit == ERTS_THR_PRGR_VAL_WAITING)
limit = 0;
else if (limit < current) /* Wrapped */
Expand Down
36 changes: 25 additions & 11 deletions erts/emulator/beam/erl_thr_progress.h
Expand Up @@ -139,11 +139,12 @@ ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_prgr_read_mb__(ERTS_THR_PRGR_ATOMIC *atm

ERTS_GLB_INLINE int erts_thr_progress_is_managed_thread(void);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current_to_later__(ErtsThrPrgrVal val);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_later_than(ErtsThrPrgrVal val);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_later(void);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_later(ErtsSchedulerData *);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current(void);
ERTS_GLB_INLINE int erts_thr_progress_has_passed__(ErtsThrPrgrVal val1, ErtsThrPrgrVal val2);
ERTS_GLB_INLINE int erts_thr_progress_has_reached_this(ErtsThrPrgrVal this, ErtsThrPrgrVal val);
ERTS_GLB_INLINE int erts_thr_progress_equal(ErtsThrPrgrVal val1,
ErtsThrPrgrVal val2);
ERTS_GLB_INLINE int erts_thr_progress_cmp(ErtsThrPrgrVal val1, ErtsThrPrgrVal val2);
ERTS_GLB_INLINE int erts_thr_progress_has_reached(ErtsThrPrgrVal val);

Expand Down Expand Up @@ -230,16 +231,23 @@ erts_thr_progress_current_to_later__(ErtsThrPrgrVal val)
}

ERTS_GLB_INLINE ErtsThrPrgrVal
erts_thr_progress_later_than(ErtsThrPrgrVal val)
erts_thr_progress_later(ErtsSchedulerData *esdp)
{
ERTS_THR_MEMORY_BARRIER;
return erts_thr_progress_current_to_later__(val);
}

ERTS_GLB_INLINE ErtsThrPrgrVal
erts_thr_progress_later(void)
{
ErtsThrPrgrVal val = erts_thr_prgr_read_mb__(&erts_thr_prgr__.current);
ErtsThrPrgrData *tpd;
ErtsThrPrgrVal val;
if (esdp) {
tpd = &esdp->thr_progress_data;
managed_thread:
val = tpd->previous.local;
ERTS_THR_MEMORY_BARRIER;
}
else {
tpd = erts_tsd_get(erts_thr_prgr_data_key__);
if (tpd && tpd->is_managed)
goto managed_thread;
val = erts_thr_prgr_read_mb__(&erts_thr_prgr__.current);
}
ASSERT(val != ERTS_THR_PRGR_VAL_WAITING);
return erts_thr_progress_current_to_later__(val);
}

Expand Down Expand Up @@ -278,6 +286,12 @@ erts_thr_progress_has_reached_this(ErtsThrPrgrVal this, ErtsThrPrgrVal val)
return erts_thr_progress_has_passed__(this, val);
}

ERTS_GLB_INLINE int
erts_thr_progress_equal(ErtsThrPrgrVal val1, ErtsThrPrgrVal val2)
{
return val1 == val2 && val1 != ERTS_THR_PRGR_INVALID;
}

ERTS_GLB_INLINE int
erts_thr_progress_cmp(ErtsThrPrgrVal val1, ErtsThrPrgrVal val2)
{
Expand Down
2 changes: 1 addition & 1 deletion erts/emulator/beam/erl_thr_queue.c
Expand Up @@ -422,7 +422,7 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
else {
q->head.next.unref_end = (ErtsThrQElement_t *) ilast;
#ifdef ERTS_SMP
q->head.next.thr_progress = erts_thr_progress_later();
q->head.next.thr_progress = erts_thr_progress_later(NULL);
#endif
erts_atomic32_set_relb(&q->tail.data.um_refc_ix,
um_refc_ix);
Expand Down

0 comments on commit ba8fb1d

Please sign in to comment.