From 5d071c548acd3d09813f074f6cb9db396361a287 Mon Sep 17 00:00:00 2001 From: Youyuan Wu Date: Wed, 8 May 2024 21:19:10 -0700 Subject: [PATCH] refactor listner context and locking --- crates/libs/msquic/src/listener.rs | 183 ++++++----------------------- 1 file changed, 38 insertions(+), 145 deletions(-) diff --git a/crates/libs/msquic/src/listener.rs b/crates/libs/msquic/src/listener.rs index 0e963c5..7af4e44 100644 --- a/crates/libs/msquic/src/listener.rs +++ b/crates/libs/msquic/src/listener.rs @@ -17,22 +17,17 @@ use crate::{config::QConfiguration, reg::QRegistration, utils::SBox, QApi}; pub struct QListener { _api: QApi, inner: SBox, - ctx: Box, + ctx: Box>, } struct QListenerCtx { _api: QApi, ch: QQueue, config: Arc>, - state: Mutex, // cannot use tokio mutex because the callback maybe invoked sync and block tokio thread + state: State, // for validation only sig_stop: QSignal, } -// #[derive(Debug)] -// enum Payload { -// Conn(QConnection), -// } - #[derive(PartialEq, Debug)] enum State { Idle, @@ -49,31 +44,29 @@ extern "C" fn listener_handler( event: &ListenerEvent, ) -> u32 { assert!(!context.is_null()); - let ctx = unsafe { (context as *mut QListenerCtx).as_mut().unwrap() }; + let ctx = unsafe { (context as *mut Mutex).as_mut().unwrap() }; + #[allow(clippy::mut_mutex_lock)] + // clippy asks us to borrow because it does not know we are using unsafe + let mut ctx = ctx.lock().unwrap(); let status = 0; - // take the state to sync with frontend. - let mut state = ctx.state.lock().unwrap(); match event.event_type { LISTENER_EVENT_NEW_CONNECTION => { let raw = unsafe { event.payload.new_connection }; let h = raw.connection as Handle; - let c = QConnection::attach(ctx._api.clone(), h, &ctx.config.inner); let info = unsafe { raw.info.as_ref().unwrap() }; info!( "[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}", listener, h, info ); - ctx.ch.insert(c); + assert_eq!(ctx.state, State::Started); + ctx.on_new_connection(h); } QUIC_LISTENER_EVENT_STOP_COMPLETE => { info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener); - assert_eq!(*state, State::StopRequested); - *state = State::Stopped; + assert_eq!(ctx.state, State::StopRequested); + ctx.state = State::Stopped; // stop the acceptor - ctx.ch.close(0); - // stop the stop action - info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE stop_tx", listener); - ctx.sig_stop.set(()); + ctx.on_stop_complete(); } _ => { unreachable!() @@ -86,17 +79,17 @@ extern "C" fn listener_handler( impl QListener { // server open listener pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self { - let context = Box::new(QListenerCtx { + let context = Box::new(Mutex::new(QListenerCtx { _api: registration.api.clone(), ch: QQueue::new(), config: configuration.inner.clone(), - state: Mutex::new(State::Idle), + state: State::Idle, sig_stop: QResetChannel::new(), - }); + })); let l = Listener::new( ®istration.inner.inner, listener_handler, - (&*context) as *const QListenerCtx as *const c_void, + (&*context) as *const Mutex as *const c_void, ); Self { _api: registration.api.clone(), @@ -106,22 +99,23 @@ impl QListener { } pub async fn start(&self, alpn: &[Buffer], local_address: &Addr) { - let mut state = self.ctx.state.lock().unwrap(); - assert_eq!(*state, State::Idle); - *state = State::Started; + { + let mut lk = self.ctx.lock().unwrap(); + assert_eq!(lk.state, State::Idle); + lk.state = State::Started; + } self.inner.inner.start(alpn, local_address) } pub async fn accept(&mut self) -> Option { let rx; { - let state = self.ctx.state.lock().unwrap(); - if *state == State::Stopped { + let mut lk = self.ctx.lock().unwrap(); + if lk.state == State::Stopped { return None; } - // must be started - assert_ne!(*state, State::Idle); - rx = self.ctx.ch.pop(); + assert_ne!(lk.state, State::Idle); + rx = lk.ch.pop(); } let res = rx.await; match res { @@ -133,10 +127,10 @@ impl QListener { pub async fn stop(&mut self) { let rx; { - let mut state = self.ctx.state.lock().unwrap(); - assert_eq!(*state, State::Started); - *state = State::StopRequested; - rx = self.ctx.sig_stop.reset(); + let mut lk = self.ctx.lock().unwrap(); + assert_eq!(lk.state, State::Started); + lk.state = State::StopRequested; + rx = lk.sig_stop.reset(); } // callback may be invoked in the same thread. info!("listner stop requested."); @@ -148,115 +142,14 @@ impl QListener { } } -// Experimental code - -// extern "C" fn listener_handler2( -// listener: Handle, -// context: *mut c_void, -// event: &ListenerEvent, -// ) -> u32 { -// assert!(!context.is_null()); -// let ctx = unsafe { (context as *mut QListnerCtx2).as_mut().unwrap() }; -// let mut status = 0; -// match event.event_type { -// LISTENER_EVENT_NEW_CONNECTION => { -// let raw = unsafe { event.payload.new_connection }; -// let h = raw.connection as Handle; -// let info = unsafe { raw.info.as_ref().unwrap() }; -// info!( -// "[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}", -// listener, h, info -// ); -// if let Some(cb) = ctx.on_new_connection.as_mut() { -// status = cb.invoke(h); -// } -// } -// QUIC_LISTENER_EVENT_STOP_COMPLETE => { -// info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener); -// if let Some(cb) = ctx.on_stop_complete.take() { -// status = cb.invoke(); -// } -// } -// _ => { -// unreachable!() -// } -// } -// status -// } - -// // callback type for new connection -// pub trait OnNewConnectionCallback { -// fn invoke(&mut self, h: Handle) -> u32; -// } - -// pub trait OnStopCompleteCallback { -// fn invoke(self) -> u32; -// } - -// struct QListnerCtx2 { -// on_new_connection: Option, -// on_stop_complete: Option, -// } - -// pub struct QListenerBase { -// _api: QApi, -// inner: SBox, -// ctx: Box>, -// } - -// impl QListenerBase { -// // server open listener -// pub fn open_inner( -// registration: &QRegistration, -// on_new_connection: Option, -// on_stop_complete: Option, -// ) -> Self { -// let context = Box::new(QListnerCtx2 { -// on_new_connection, -// on_stop_complete, -// }); -// let l = Listener::new( -// ®istration.inner.inner, -// listener_handler2::, -// (&*context) as *const QListnerCtx2 as *const c_void, -// ); -// Self { -// _api: registration.api.clone(), -// inner: SBox { inner: l }, -// ctx: context, -// } -// } -// } - -// pub struct NewConnCallback {} - -// impl OnNewConnectionCallback for NewConnCallback { -// fn invoke(&mut self, _h: Handle) -> u32 { -// todo!() -// } -// } - -// pub struct StopCompleteCallback {} - -// impl OnStopCompleteCallback for StopCompleteCallback { -// fn invoke(self) -> u32 { -// todo!() -// } -// } - -// pub struct QListener2 { -// inner: QListenerBase, -// } +impl QListenerCtx { + fn on_new_connection(&mut self, conn: Handle) { + let c = QConnection::attach(self._api.clone(), conn, &self.config.inner); + self.ch.insert(c); + } -// impl QListener2 { -// pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self { -// let on_new_connection = NewConnCallback {}; -// let on_stop_complete = StopCompleteCallback {}; -// let inner = QListenerBase::open_inner( -// registration, -// Some(on_new_connection), -// Some(on_stop_complete), -// ); -// Self { inner } -// } -// } + fn on_stop_complete(&mut self) { + self.ch.close(0); + self.sig_stop.set(()); + } +}