Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed Jun 1, 2024
1 parent d27aa0e commit 310da4d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 33 deletions.
42 changes: 26 additions & 16 deletions crates/libs/msquic/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::{
info,
reg::QRegistration,
stream::QStream,
sync::{QQueue, QReceiver, QResetChannel, QSignal},
sync::{QQueue, QReceiver, QResetChannel, QWakableSig},
};
use std::{ffi::c_void, fmt::Debug, io::Error, sync::Mutex};
use std::{ffi::c_void, fmt::Debug, future::poll_fn, io::Error, sync::Mutex, task::Poll};

use c2::{
Configuration, Connection, ConnectionEvent, Handle, SendResumptionFlags,
Expand Down Expand Up @@ -51,7 +51,7 @@ enum ConnStatus {
struct QConnectionCtx {
_api: QApi,
strm_ch: QQueue<QStream>,
shtdwn_sig: QSignal,
shtdwn_sig: QWakableSig<()>,
//state: Mutex<State>,
conn_ch: QResetChannel<ConnStatus>, // handle connect success or transport close
proceed_rx: Option<QReceiver<ConnStatus>>, // used for server wait conn
Expand Down Expand Up @@ -135,7 +135,7 @@ impl QConnectionCtx {
Self {
_api: api.clone(),
strm_ch: QQueue::new(),
shtdwn_sig: QSignal::new(),
shtdwn_sig: QWakableSig::default(),
conn_ch: QResetChannel::new(),
proceed_rx: None,
}
Expand Down Expand Up @@ -165,9 +165,7 @@ impl QConnectionCtx {
}
fn on_shutdown_complete(&mut self) {
self.strm_ch.close(0);
if self.shtdwn_sig.can_set() {
self.shtdwn_sig.set(());
}
self.shtdwn_sig.set(());
}
fn on_peer_stream_started(&mut self, h: Handle) {
let s = QStream::attach(self._api.clone(), h);
Expand Down Expand Up @@ -269,16 +267,28 @@ impl QConnection {
}
}

pub async fn shutdown(&mut self) {
let rx;
pub fn shutdown_only(&self, ec: u64) {
self.inner.inner.shutdown(CONNECTION_SHUTDOWN_FLAG_NONE, ec); // ec
}

pub fn poll_shutdown(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
let mut lk = self.ctx.lock().unwrap();
let p = lk.shtdwn_sig.poll(cx);
match p {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}

pub async fn shutdown(&mut self, ec: u64) {
{
rx = self.ctx.lock().unwrap().shtdwn_sig.reset();
let mut lk = self.ctx.lock().unwrap();
if !lk.shtdwn_sig.is_frontend_pending() {
lk.shtdwn_sig.set_frontend_pending();
self.shutdown_only(ec)
}
}
info!("conn invoke shutdown");
// callback maybe sync
self.inner.inner.shutdown(CONNECTION_SHUTDOWN_FLAG_NONE, 0); // ec
info!("conn wait for shutdown evnet");
rx.await;
info!("conn wait for shutdown evnet end");
let fu = poll_fn(|cx| self.poll_shutdown(cx));
fu.await
}
}
4 changes: 2 additions & 2 deletions crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ mod tests {
});
}
info!("server conn shutdown");
conn.shutdown().await;
conn.shutdown(0).await;
info!("server conn shutdown end");
});
}
Expand Down Expand Up @@ -235,7 +235,7 @@ mod tests {
info!("client stream drain");
st.drain().await;
info!("client conn shutdown");
conn.shutdown().await;
conn.shutdown(0).await;
// shutdown server
sht_tx.send(()).unwrap();
});
Expand Down
19 changes: 9 additions & 10 deletions crates/libs/msquic/src/msh3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// h3 wrappings for msquic

use std::fmt::Display;
use std::{fmt::Display, task::Poll};

use bytes::{Buf, BytesMut};
use c2::SEND_FLAG_NONE;
Expand Down Expand Up @@ -116,8 +116,8 @@ impl<B: Buf> Connection<B> for H3Conn {
todo!()
}

fn close(&mut self, _code: h3::error::Code, _reason: &[u8]) {
todo!()
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self._inner.shutdown_only(code.value())
}
}

Expand Down Expand Up @@ -184,16 +184,15 @@ impl RecvStream for H3Stream {

type Error = H3Error;

// currently error is not propagated.
fn poll_data(
&mut self,
_cx: &mut std::task::Context<'_>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::Buf>, Self::Error>> {
// let fu = self.inner.receive();
// let innner = <Receiver<T> as Future>::poll(Pin::new(&mut self.rx), _cx);
//Pin::new(&mut fu).poll(cx);
// let mut pinned_fut = pin!(fu);
// pinned_fut.poll(cx);
todo!()
match self.inner.poll_receive(cx) {
std::task::Poll::Ready(br) => Poll::Ready(Ok(br)),
std::task::Poll::Pending => Poll::Pending,
}
}

fn stop_sending(&mut self, error_code: u64) {
Expand Down
11 changes: 6 additions & 5 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
utils::SBox,
QApi,
};
use bytes::Buf;
use bytes::{Buf, BytesMut};
use c2::{
Buffer, Handle, SendFlags, Stream, StreamEvent, StreamOpenFlags, StreamStartFlags,
STREAM_EVENT_PEER_RECEIVE_ABORTED, STREAM_EVENT_PEER_SEND_ABORTED,
Expand Down Expand Up @@ -242,23 +242,24 @@ impl QStream {
fu.await
}

// todo: propagate error
pub fn poll_receive(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<impl Buf, Error>> {
) -> std::task::Poll<Option<BytesMut>> {
let p = self.ctx.lock().unwrap().receive_ch.poll(cx);
match p {
Poll::Ready(op) => match op {
Some(b) => Poll::Ready(Ok(b.0)),
None => Poll::Ready(Err(Error::from(ErrorKind::BrokenPipe))),
Some(b) => Poll::Ready(Some(b.0)),
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}

// receive into this buff
// return num of bytes wrote.
pub async fn receive(&mut self) -> Result<impl Buf, Error> {
pub async fn receive(&mut self) -> Option<BytesMut> {
let fu = poll_fn(|cx| self.poll_receive(cx));
fu.await
}
Expand Down

0 comments on commit 310da4d

Please sign in to comment.