Skip to content

Commit

Permalink
refactor listner context and locking
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 9, 2024
1 parent 5f71447 commit 5d071c5
Showing 1 changed file with 38 additions and 145 deletions.
183 changes: 38 additions & 145 deletions crates/libs/msquic/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,17 @@ use crate::{config::QConfiguration, reg::QRegistration, utils::SBox, QApi};
pub struct QListener {
_api: QApi,
inner: SBox<Listener>,
ctx: Box<QListenerCtx>,
ctx: Box<Mutex<QListenerCtx>>,
}

struct QListenerCtx {
_api: QApi,
ch: QQueue<QConnection>,
config: Arc<SBox<Configuration>>,
state: Mutex<State>, // cannot use tokio mutex because the callback maybe invoked sync and block tokio thread
state: State, // for validation only
sig_stop: QSignal,
}

// #[derive(Debug)]
// enum Payload {
// Conn(QConnection),
// }

#[derive(PartialEq, Debug)]
enum State {
Idle,
Expand All @@ -49,31 +44,29 @@ extern "C" fn listener_handler(
event: &ListenerEvent,
) -> u32 {
assert!(!context.is_null());
let ctx = unsafe { (context as *mut QListenerCtx).as_mut().unwrap() };
let ctx = unsafe { (context as *mut Mutex<QListenerCtx>).as_mut().unwrap() };
#[allow(clippy::mut_mutex_lock)]
// clippy asks us to borrow because it does not know we are using unsafe
let mut ctx = ctx.lock().unwrap();
let status = 0;
// take the state to sync with frontend.
let mut state = ctx.state.lock().unwrap();
match event.event_type {
LISTENER_EVENT_NEW_CONNECTION => {
let raw = unsafe { event.payload.new_connection };
let h = raw.connection as Handle;
let c = QConnection::attach(ctx._api.clone(), h, &ctx.config.inner);
let info = unsafe { raw.info.as_ref().unwrap() };
info!(
"[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}",
listener, h, info
);
ctx.ch.insert(c);
assert_eq!(ctx.state, State::Started);
ctx.on_new_connection(h);
}
QUIC_LISTENER_EVENT_STOP_COMPLETE => {
info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener);
assert_eq!(*state, State::StopRequested);
*state = State::Stopped;
assert_eq!(ctx.state, State::StopRequested);
ctx.state = State::Stopped;
// stop the acceptor
ctx.ch.close(0);
// stop the stop action
info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE stop_tx", listener);
ctx.sig_stop.set(());
ctx.on_stop_complete();
}
_ => {
unreachable!()
Expand All @@ -86,17 +79,17 @@ extern "C" fn listener_handler(
impl QListener {
// server open listener
pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self {
let context = Box::new(QListenerCtx {
let context = Box::new(Mutex::new(QListenerCtx {
_api: registration.api.clone(),
ch: QQueue::new(),
config: configuration.inner.clone(),
state: Mutex::new(State::Idle),
state: State::Idle,
sig_stop: QResetChannel::new(),
});
}));
let l = Listener::new(
&registration.inner.inner,
listener_handler,
(&*context) as *const QListenerCtx as *const c_void,
(&*context) as *const Mutex<QListenerCtx> as *const c_void,
);
Self {
_api: registration.api.clone(),
Expand All @@ -106,22 +99,23 @@ impl QListener {
}

pub async fn start(&self, alpn: &[Buffer], local_address: &Addr) {
let mut state = self.ctx.state.lock().unwrap();
assert_eq!(*state, State::Idle);
*state = State::Started;
{
let mut lk = self.ctx.lock().unwrap();
assert_eq!(lk.state, State::Idle);
lk.state = State::Started;
}
self.inner.inner.start(alpn, local_address)
}

pub async fn accept(&mut self) -> Option<QConnection> {
let rx;
{
let state = self.ctx.state.lock().unwrap();
if *state == State::Stopped {
let mut lk = self.ctx.lock().unwrap();
if lk.state == State::Stopped {
return None;
}
// must be started
assert_ne!(*state, State::Idle);
rx = self.ctx.ch.pop();
assert_ne!(lk.state, State::Idle);
rx = lk.ch.pop();
}
let res = rx.await;
match res {
Expand All @@ -133,10 +127,10 @@ impl QListener {
pub async fn stop(&mut self) {
let rx;
{
let mut state = self.ctx.state.lock().unwrap();
assert_eq!(*state, State::Started);
*state = State::StopRequested;
rx = self.ctx.sig_stop.reset();
let mut lk = self.ctx.lock().unwrap();
assert_eq!(lk.state, State::Started);
lk.state = State::StopRequested;
rx = lk.sig_stop.reset();
}
// callback may be invoked in the same thread.
info!("listner stop requested.");
Expand All @@ -148,115 +142,14 @@ impl QListener {
}
}

// Experimental code

// extern "C" fn listener_handler2<T1: OnNewConnectionCallback, T2: OnStopCompleteCallback>(
// listener: Handle,
// context: *mut c_void,
// event: &ListenerEvent,
// ) -> u32 {
// assert!(!context.is_null());
// let ctx = unsafe { (context as *mut QListnerCtx2<T1, T2>).as_mut().unwrap() };
// let mut status = 0;
// match event.event_type {
// LISTENER_EVENT_NEW_CONNECTION => {
// let raw = unsafe { event.payload.new_connection };
// let h = raw.connection as Handle;
// let info = unsafe { raw.info.as_ref().unwrap() };
// info!(
// "[{:?}] LISTENER_EVENT_NEW_CONNECTION conn=[{:?}] info={:?}",
// listener, h, info
// );
// if let Some(cb) = ctx.on_new_connection.as_mut() {
// status = cb.invoke(h);
// }
// }
// QUIC_LISTENER_EVENT_STOP_COMPLETE => {
// info!("[{:?}] QUIC_LISTENER_EVENT_STOP_COMPLETE", listener);
// if let Some(cb) = ctx.on_stop_complete.take() {
// status = cb.invoke();
// }
// }
// _ => {
// unreachable!()
// }
// }
// status
// }

// // callback type for new connection
// pub trait OnNewConnectionCallback {
// fn invoke(&mut self, h: Handle) -> u32;
// }

// pub trait OnStopCompleteCallback {
// fn invoke(self) -> u32;
// }

// struct QListnerCtx2<T1: OnNewConnectionCallback, T2: OnStopCompleteCallback> {
// on_new_connection: Option<T1>,
// on_stop_complete: Option<T2>,
// }

// pub struct QListenerBase<T1: OnNewConnectionCallback, T2: OnStopCompleteCallback> {
// _api: QApi,
// inner: SBox<Listener>,
// ctx: Box<QListnerCtx2<T1, T2>>,
// }

// impl<T1: OnNewConnectionCallback, T2: OnStopCompleteCallback> QListenerBase<T1, T2> {
// // server open listener
// pub fn open_inner(
// registration: &QRegistration,
// on_new_connection: Option<T1>,
// on_stop_complete: Option<T2>,
// ) -> Self {
// let context = Box::new(QListnerCtx2 {
// on_new_connection,
// on_stop_complete,
// });
// let l = Listener::new(
// &registration.inner.inner,
// listener_handler2::<T1, T2>,
// (&*context) as *const QListnerCtx2<T1, T2> as *const c_void,
// );
// Self {
// _api: registration.api.clone(),
// inner: SBox { inner: l },
// ctx: context,
// }
// }
// }

// pub struct NewConnCallback {}

// impl OnNewConnectionCallback for NewConnCallback {
// fn invoke(&mut self, _h: Handle) -> u32 {
// todo!()
// }
// }

// pub struct StopCompleteCallback {}

// impl OnStopCompleteCallback for StopCompleteCallback {
// fn invoke(self) -> u32 {
// todo!()
// }
// }

// pub struct QListener2 {
// inner: QListenerBase<NewConnCallback, StopCompleteCallback>,
// }
impl QListenerCtx {
fn on_new_connection(&mut self, conn: Handle) {
let c = QConnection::attach(self._api.clone(), conn, &self.config.inner);
self.ch.insert(c);
}

// impl QListener2 {
// pub fn open(registration: &QRegistration, configuration: &QConfiguration) -> Self {
// let on_new_connection = NewConnCallback {};
// let on_stop_complete = StopCompleteCallback {};
// let inner = QListenerBase::open_inner(
// registration,
// Some(on_new_connection),
// Some(on_stop_complete),
// );
// Self { inner }
// }
// }
fn on_stop_complete(&mut self) {
self.ch.close(0);
self.sig_stop.set(());
}
}

0 comments on commit 5d071c5

Please sign in to comment.