Skip to content

Commit 438313f

Browse files
author
zhengshuxin
committed
Fixed bugs in fiber mutex and fiber cond.
1 parent 29f65f8 commit 438313f

File tree

9 files changed

+161
-56
lines changed

9 files changed

+161
-56
lines changed

c/include/fiber/fiber_mutex.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ extern "C" {
1010
typedef struct ACL_FIBER_MUTEX ACL_FIBER_MUTEX;
1111

1212
#define FIBER_MUTEX_F_LOCK_TRY (1 << 0)
13-
#define FIBER_MUTEX_F_SWITCH_FIRST (1 << 1)
13+
#define FIBER_MUTEX_F_LOCK_ONCE (1 << 1)
14+
#define FIBER_MUTEX_F_CHECK_DEADLOCK (1 << 2)
1415

1516
FIBER_API ACL_FIBER_MUTEX *acl_fiber_mutex_create(unsigned flag);
1617
FIBER_API void acl_fiber_mutex_free(ACL_FIBER_MUTEX *mutex);

c/src/common/mbox.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ void mbox_free(MBOX *mbox, void (*free_fn)(void*))
105105
}
106106

107107
int mbox_send(MBOX *mbox, void *msg)
108+
{
109+
return mbox_send2(mbox, mbox->out, msg);
110+
}
111+
112+
int mbox_send2(MBOX *mbox, socket_t out, void *msg)
108113
{
109114
int ret;
110115
long long n = 1;
@@ -134,9 +139,9 @@ int mbox_send(MBOX *mbox, void *msg)
134139
mbox->nsend++;
135140

136141
#if defined(_WIN32) || defined(_WIN64)
137-
ret = (int) acl_fiber_send(mbox->out, (const char*) &n, (int) sizeof(n), 0);
142+
ret = (int) acl_fiber_send(out, (const char*) &n, (int) sizeof(n), 0);
138143
#else
139-
ret = (int) acl_fiber_write(mbox->out, &n, sizeof(n));
144+
ret = (int) acl_fiber_write(out, &n, sizeof(n));
140145
#endif
141146

142147
#if !defined(HAS_EVENTFD)
@@ -147,7 +152,7 @@ int mbox_send(MBOX *mbox, void *msg)
147152

148153
if (ret == -1) {
149154
msg_error("%s(%d), %s: mbox write %d error %s", __FILE__,
150-
__LINE__, __FUNCTION__, mbox->out, last_serror());
155+
__LINE__, __FUNCTION__, (int) out, last_serror());
151156
return -1;
152157
}
153158

c/src/common/mbox.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void mbox_free(MBOX *mbox, void (*free_fn)(void*));
2727
* @return {int} 发送成功返回 0,否则返回 -1
2828
*/
2929
int mbox_send(MBOX *mbox, void *msg);
30+
int mbox_send2(MBOX *mbox, socket_t out, void *msg);
3031

3132
/**
3233
* 从消息队列中读取消息

c/src/event.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ struct FILE_EVENT {
130130
#define STATUS_WRITEWAIT (unsigned) (1 << 6) // Wait for Writable
131131
#define STATUS_CLOSING (unsigned) (1 << 7) // In closing status
132132
#define STATUS_CLOSED (unsigned) (1 << 8) // In closed status
133+
#define STATUS_BUFFED (unsigned) (1 << 9)
133134

134135
#define SET_CONNECTING(x) ((x)->status |= STATUS_CONNECTING)
135136
#define SET_READABLE(x) ((x)->status |= STATUS_READABLE)

c/src/fiber_io.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ void fiber_timer_add(ACL_FIBER *fb, size_t milliseconds)
178178
long long now = event_get_stamp(ev);
179179
TIMER_CACHE_NODE *timer;
180180

181-
fb->when = now + (ssize_t) milliseconds;
181+
fb->when = now + (long long) milliseconds;
182182
ring_detach(&fb->me); // Detach the previous binding.
183183
timer_cache_add(__thread_fiber->ev_timer, fb->when, &fb->me);
184184

@@ -849,6 +849,7 @@ FILE_EVENT *fiber_file_cache_get(socket_t fd)
849849
fe = file_event_alloc(fd);
850850
} else {
851851
file_event_init(fe, fd);
852+
fe->status &= ~STATUS_BUFFED;
852853
}
853854

854855
#ifdef HAS_IO_URING
@@ -862,11 +863,22 @@ FILE_EVENT *fiber_file_cache_get(socket_t fd)
862863

863864
void fiber_file_cache_put(FILE_EVENT *fe)
864865
{
865-
fiber_file_del(fe, fe->fd);
866-
fe->fd = INVALID_SOCKET;
866+
// If the fe is being refered by more than one fibers, don't put it
867+
// back to cache or free it, because the fe is still aliving now.
868+
if (fe->refer > 1) {
869+
return;
870+
}
871+
872+
if (fe->fd != INVALID_SOCKET) {
873+
fiber_file_del(fe, fe->fd);
874+
fe->fd = INVALID_SOCKET;
875+
}
867876

868877
if (array_size(__thread_fiber->cache) < __thread_fiber->cache_max) {
869-
array_push_back(__thread_fiber->cache, fe);
878+
if (!(fe->status & STATUS_BUFFED)) {
879+
array_push_back(__thread_fiber->cache, fe);
880+
fe->status |= STATUS_BUFFED;
881+
}
870882
} else {
871883
file_event_unrefer(fe);
872884
}

c/src/sync/fiber_cond.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ static int fiber_cond_timedwait(ACL_FIBER_COND *cond, ACL_FIBER_MUTEX *mutex,
144144

145145
WAITER_INC(ev);
146146

147-
// Hang the current fiber and will wakeup if the timer arrives or
147+
// Hang the current fiber and will wake up if the timer arrives or
148148
// be awakened by the other fiber or thread.
149149
acl_fiber_switch();
150150

c/src/sync/fiber_mutex.c

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -373,20 +373,34 @@ void acl_fiber_mutex_stats_show(const ACL_FIBER_MUTEX_STATS *stats)
373373

374374
/****************************************************************************/
375375

376+
#if 1
377+
#define LOCK(m) while(pthread_mutex_trylock(&(m)->lock) != 0) {}
378+
#else
379+
#define LOCK(m) do { \
380+
pthread_mutex_lock(&(m)->lock); \
381+
} while (0)
382+
#endif
383+
384+
#define UNLOCK(m) do { \
385+
pthread_mutex_unlock(&(m)->lock); \
386+
} while (0)
387+
388+
// Just for checking deadlock.
376389
static void thread_waiter_add(ACL_FIBER_MUTEX *mutex, unsigned long tid)
377390
{
378391
THREAD_WAITER *waiter = (THREAD_WAITER*) mem_malloc(sizeof(THREAD_WAITER));
379392
waiter->tid = tid;
380-
pthread_mutex_lock(&mutex->lock);
393+
LOCK(mutex);
381394
array_append(mutex->waiting_threads, waiter);
382-
pthread_mutex_unlock(&mutex->lock);
395+
UNLOCK(mutex);
383396
}
384397

398+
// Just for checking deadlock.
385399
static void thread_waiter_remove(ACL_FIBER_MUTEX *mutex, unsigned long tid)
386400
{
387401
ITER iter;
388402

389-
pthread_mutex_lock(&mutex->lock);
403+
LOCK(mutex);
390404
foreach(iter, mutex->waiting_threads) {
391405
THREAD_WAITER *waiter = (THREAD_WAITER*) iter.data;
392406
if (waiter->tid == tid) {
@@ -395,7 +409,7 @@ static void thread_waiter_remove(ACL_FIBER_MUTEX *mutex, unsigned long tid)
395409
break;
396410
}
397411
}
398-
pthread_mutex_unlock(&mutex->lock);
412+
UNLOCK(mutex);
399413
}
400414

401415
static void free_locks_onexit(void)
@@ -427,6 +441,12 @@ ACL_FIBER_MUTEX *acl_fiber_mutex_create(unsigned flags)
427441
mutex = (ACL_FIBER_MUTEX*) mem_calloc(1, sizeof(ACL_FIBER_MUTEX));
428442
ring_init(&mutex->me);
429443

444+
if (flags & FIBER_MUTEX_F_LOCK_ONCE) {
445+
flags &= ~FIBER_MUTEX_F_LOCK_TRY;
446+
} else {
447+
flags |= FIBER_MUTEX_F_LOCK_TRY;
448+
}
449+
430450
mutex->flags = flags;
431451

432452
mutex->waiters = array_create(5, ARRAY_F_UNORDER);
@@ -456,21 +476,23 @@ void acl_fiber_mutex_free(ACL_FIBER_MUTEX *mutex)
456476

457477
static int fiber_mutex_lock_once(ACL_FIBER_MUTEX *mutex)
458478
{
459-
int wakeup = 0, pos;
479+
int pos;
460480
EVENT *ev;
461481
ACL_FIBER *fiber;
462482

463483
while (1) {
464-
pthread_mutex_lock(&mutex->lock);
484+
LOCK(mutex);
465485
if (pthread_mutex_trylock(&mutex->thread_lock) == 0) {
466-
pthread_mutex_unlock(&mutex->lock);
486+
UNLOCK(mutex);
467487
return 0;
468488
}
469489

470490
// For the independent thread, only lock the thread mutex.
471491
if (!var_hook_sys_api) {
472-
pthread_mutex_unlock(&mutex->lock);
473-
thread_waiter_add(mutex, thread_self());
492+
UNLOCK(mutex);
493+
if (mutex->flags & FIBER_MUTEX_F_CHECK_DEADLOCK) {
494+
thread_waiter_add(mutex, thread_self());
495+
}
474496
return pthread_mutex_lock(&mutex->thread_lock);
475497
}
476498

@@ -481,11 +503,11 @@ static int fiber_mutex_lock_once(ACL_FIBER_MUTEX *mutex)
481503

482504
if (pthread_mutex_trylock(&mutex->thread_lock) == 0) {
483505
array_delete(mutex->waiters, pos, NULL);
484-
pthread_mutex_unlock(&mutex->lock);
506+
UNLOCK(mutex);
485507
return 0;
486508
}
487509

488-
pthread_mutex_unlock(&mutex->lock);
510+
UNLOCK(mutex);
489511

490512
fiber->wstatus |= FIBER_WAIT_MUTEX;
491513

@@ -495,19 +517,14 @@ static int fiber_mutex_lock_once(ACL_FIBER_MUTEX *mutex)
495517
WAITER_DEC(ev);
496518

497519
fiber->wstatus &= ~FIBER_WAIT_MUTEX;
498-
499-
if (++wakeup > 5) {
500-
wakeup = 0;
501-
acl_fiber_delay(100);
502-
}
503520
}
504521
}
505522

506523
static int fiber_mutex_lock_try(ACL_FIBER_MUTEX *mutex)
507524
{
508-
int wakeup = 0, pos;
525+
int pos;
509526
EVENT *ev;
510-
ACL_FIBER *fiber = acl_fiber_running();
527+
ACL_FIBER *fiber;
511528

512529
while (1) {
513530
if (pthread_mutex_trylock(&mutex->thread_lock) == 0) {
@@ -516,22 +533,25 @@ static int fiber_mutex_lock_try(ACL_FIBER_MUTEX *mutex)
516533

517534
// For the independent thread, only lock the thread mutex.
518535
if (!var_hook_sys_api) {
519-
thread_waiter_add(mutex, thread_self());
536+
if (mutex->flags & FIBER_MUTEX_F_CHECK_DEADLOCK) {
537+
thread_waiter_add(mutex, thread_self());
538+
}
520539
return pthread_mutex_lock(&mutex->thread_lock);
521540
}
522541

542+
fiber = acl_fiber_running();
523543
fiber->sync = sync_waiter_get();
524544

525-
pthread_mutex_lock(&mutex->lock);
545+
LOCK(mutex);
526546
pos = array_append(mutex->waiters, fiber);
527547

528548
if (pthread_mutex_trylock(&mutex->thread_lock) == 0) {
529549
array_delete(mutex->waiters, pos, NULL);
530-
pthread_mutex_unlock(&mutex->lock);
550+
UNLOCK(mutex);
531551
return 0;
532552
}
533553

534-
pthread_mutex_unlock(&mutex->lock);
554+
UNLOCK(mutex);
535555

536556
fiber->wstatus |= FIBER_WAIT_MUTEX;
537557

@@ -541,28 +561,24 @@ static int fiber_mutex_lock_try(ACL_FIBER_MUTEX *mutex)
541561
WAITER_DEC(ev);
542562

543563
fiber->wstatus &= ~FIBER_WAIT_MUTEX;
544-
545-
if (++wakeup > 5) {
546-
wakeup = 0;
547-
acl_fiber_delay(100);
548-
}
549564
}
550565
}
551566

552567
int acl_fiber_mutex_lock(ACL_FIBER_MUTEX *mutex)
553568
{
554569
int ret;
555570

556-
if (mutex->flags & FIBER_MUTEX_F_LOCK_TRY) {
557-
ret = fiber_mutex_lock_try(mutex);
558-
} else {
571+
if (mutex->flags & FIBER_MUTEX_F_LOCK_ONCE) {
559572
ret = fiber_mutex_lock_once(mutex);
573+
} else {
574+
ret = fiber_mutex_lock_try(mutex);
560575
}
576+
561577
if (ret == 0) {
562578
unsigned id = acl_fiber_self(); // 0 will return in no fiber mode.
563579
long me = id == 0 ? -thread_self() : (long) id;
564580
mutex->owner = me;
565-
if (me < 0) {
581+
if (me < 0 && (mutex->flags & FIBER_MUTEX_F_CHECK_DEADLOCK)) {
566582
thread_waiter_remove(mutex, thread_self());
567583
}
568584

@@ -588,7 +604,7 @@ int acl_fiber_mutex_unlock(ACL_FIBER_MUTEX *mutex)
588604
ACL_FIBER *fiber;
589605
int ret;
590606

591-
pthread_mutex_lock(&mutex->lock);
607+
LOCK(mutex);
592608
fiber = (ACL_FIBER*) array_pop_front(mutex->waiters);
593609

594610
// Just a sanity check!
@@ -605,7 +621,7 @@ int acl_fiber_mutex_unlock(ACL_FIBER_MUTEX *mutex)
605621

606622
// Unlock the internal prive lock must behind the public thread_lock,
607623
// or else the waiter maybe be skipped added in lock waiting process.
608-
pthread_mutex_unlock(&mutex->lock);
624+
UNLOCK(mutex);
609625

610626
if (ret != 0) {
611627
return ret;

0 commit comments

Comments
 (0)