From af38f8b3ba33affbaf4b0f6419b3d4c993941b1c Mon Sep 17 00:00:00 2001 From: Herman <43386816+DrGermanius@users.noreply.github.com> Date: Sun, 2 Jul 2023 15:45:30 +0300 Subject: [PATCH] channels: refactor the channel_select function (#18711) --- vlib/sync/channels.c.v | 108 +++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 68 deletions(-) diff --git a/vlib/sync/channels.c.v b/vlib/sync/channels.c.v index 4bb9f0f1b17491..8a0c163d4e3440 100644 --- a/vlib/sync/channels.c.v +++ b/vlib/sync/channels.c.v @@ -556,35 +556,24 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo sem.init(0) for i, ch in channels { subscr[i].sem = unsafe { &sem } - if dir[i] == .push { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - subscr[i].prev = unsafe { &ch.write_subscriber } - unsafe { - subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.write_subscriber), - &subscr[i])) - } - if voidptr(subscr[i].nxt) != unsafe { nil } { - subscr[i].nxt.prev = unsafe { &subscr[i].nxt } - } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + sub_mtx, subscriber := if dir[i] == .push { + &ch.write_sub_mtx, &ch.write_subscriber } else { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - subscr[i].prev = unsafe { &ch.read_subscriber } - unsafe { - subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(&ch.read_subscriber), - &subscr[i])) - } - if voidptr(subscr[i].nxt) != unsafe { nil } { - subscr[i].nxt.prev = unsafe { &subscr[i].nxt } - } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + &ch.read_sub_mtx, &ch.read_subscriber + } + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) { + null16 = u16(0) } + subscr[i].prev = unsafe { subscriber } + unsafe { + subscr[i].nxt = &Subscription(C.atomic_exchange_ptr(&voidptr(subscriber), + &subscr[i])) + } + if voidptr(subscr[i].nxt) != unsafe { nil } { + subscr[i].nxt.prev = unsafe { &subscr[i].nxt } + } + C.atomic_store_u16(sub_mtx, u16(0)) } stopwatch := if timeout == time.infinite || timeout <= 0 { time.StopWatch{} @@ -601,22 +590,16 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo if i >= channels.len { i -= channels.len } - if dir[i] == .push { - stat := channels[i].try_push_priv(objrefs[i], true) - if stat == .success { - event_idx = i - break outer - } else if stat == .closed { - num_closed++ - } + stat := if dir[i] == .push { + channels[i].try_push_priv(objrefs[i], true) } else { - stat := channels[i].try_pop_priv(objrefs[i], true) - if stat == .success { - event_idx = i - break outer - } else if stat == .closed { - num_closed++ - } + channels[i].try_pop_priv(objrefs[i], true) + } + if stat == .success { + event_idx = i + break outer + } else if stat == .closed { + num_closed++ } } if num_closed == channels.len { @@ -637,34 +620,23 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo } // reset subscribers for i, ch in channels { - if dir[i] == .push { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - unsafe { - *subscr[i].prev = subscr[i].nxt - } - if unsafe { subscr[i].nxt != 0 } { - subscr[i].nxt.prev = subscr[i].prev - // just in case we have missed a semaphore during restore - subscr[i].nxt.sem.post() - } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + sub_mtx := if dir[i] == .push { + &ch.write_sub_mtx } else { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - unsafe { - *subscr[i].prev = subscr[i].nxt - } - if unsafe { subscr[i].nxt != 0 } { - subscr[i].nxt.prev = subscr[i].prev - subscr[i].nxt.sem.post() - } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + &ch.read_sub_mtx + } + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + unsafe { + *subscr[i].prev = subscr[i].nxt + } + if unsafe { subscr[i].nxt != 0 } { + subscr[i].nxt.prev = subscr[i].prev + subscr[i].nxt.sem.post() } + C.atomic_store_u16(sub_mtx, u16(0)) } sem.destroy() return event_idx