Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
  • Loading branch information
youyuanwu committed May 19, 2024
1 parent 9eb9a8f commit 2f4c983
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 72 deletions.
31 changes: 18 additions & 13 deletions crates/libs/msquic/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,35 +144,40 @@ impl From<&SBuffer> for Buffer {
}
}

// Currently we copy the buf into vec for ease of code
pub struct QBufWrap {
_inner: Box<dyn Buf>, // mem owner
_inner: Vec<Vec<u8>>, // mem owner
v: Vec<Buffer>,
}

unsafe impl Send for QBufWrap {}

impl QBufWrap {
pub fn new(mut buf: Box<dyn Buf>) -> Self {
pub fn new(buf: impl Buf) -> Self {
// make on heap so that no ptr move.
let v = Self::convert_buf(&mut buf);
Self { _inner: buf, v }
let (vcs, vbs) = Self::convert_buf(buf);
Self {
_inner: vcs,
v: vbs,
}
}

fn convert_buf(b: &mut Box<dyn Buf>) -> Vec<Buffer> {
let mut v = Vec::new();
fn convert_buf(mut b: impl Buf) -> (Vec<Vec<u8>>, Vec<Buffer>) {
let mut vcs = Vec::new();
let mut vbs = Vec::new();
// change buf to vecs
while b.has_remaining() {
// copy
let ck = b.chunk();
v.push(Buffer {
length: ck.len() as u32,
buffer: ck.as_ptr() as *mut u8,
let vc = Vec::from(ck);
vbs.push(Buffer {
length: vc.len() as u32,
buffer: vc.as_ptr() as *mut u8,
});
vcs.push(vc);
b.advance(ck.len());
}
v
// let bb =
// v.iter().map(|s|{ Buffer{ length: s.len() as u32, buffer: s.as_ptr() as *mut u8 } }).collect::<Vec<_>>();
// bb
(vcs, vbs)
}

pub fn as_buffs(&self) -> &[Buffer] {
Expand Down
2 changes: 1 addition & 1 deletion crates/libs/msquic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod stream;
pub mod sync;
mod utils;

//pub mod msh3;
pub mod msh3;

// Some useful defs
pub const QUIC_STATUS_PENDING: u32 = 0x703e5;
Expand Down
68 changes: 38 additions & 30 deletions crates/libs/msquic/src/msh3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
// h3 wrappings for msquic

use std::{
fmt::Display,
future::Future,
pin::{self, pin},
sync::Arc,
task::Poll,
};
use std::fmt::Display;

use bytes::{Buf, BytesMut};
use c2::SEND_FLAG_NONE;
Expand All @@ -20,9 +14,12 @@ pub struct H3Error {
error_code: Option<u64>,
}

impl H3Error{
pub fn new(status: std::io::Error, ec: Option<u64>) -> Self{
Self { status, error_code: ec }
impl H3Error {
pub fn new(status: std::io::Error, ec: Option<u64>) -> Self {
Self {
status,
error_code: ec,
}
}
}

Expand All @@ -39,13 +36,13 @@ impl h3::quic::Error for H3Error {
impl std::error::Error for H3Error {}

impl Display for H3Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

pub struct H3Conn {
inner: QConnection,
_inner: QConnection,
}

impl<B: Buf> OpenStreams<B> for H3Conn {
Expand All @@ -59,19 +56,19 @@ impl<B: Buf> OpenStreams<B> for H3Conn {

fn poll_open_bidi(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::BidiStream, Self::Error>> {
todo!()
}

fn poll_open_send(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::SendStream, Self::Error>> {
todo!()
}

fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
fn close(&mut self, _code: h3::error::Code, _reason: &[u8]) {
todo!()
}
}
Expand All @@ -89,28 +86,28 @@ impl<B: Buf> Connection<B> for H3Conn {

fn poll_accept_recv(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::RecvStream>, Self::Error>> {
todo!()
}

fn poll_accept_bidi(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::BidiStream>, Self::Error>> {
todo!()
}

fn poll_open_bidi(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::BidiStream, Self::Error>> {
todo!()
}

fn poll_open_send(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::SendStream, Self::Error>> {
todo!()
}
Expand All @@ -119,20 +116,24 @@ impl<B: Buf> Connection<B> for H3Conn {
todo!()
}

fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
fn close(&mut self, _code: h3::error::Code, _reason: &[u8]) {
todo!()
}
}

pub struct H3Stream {
inner: QStream,
id: h3::quic::StreamId,
//read:
shutdown: bool,
}

impl H3Stream {
fn new(s: QStream, id: StreamId) -> Self {
Self { inner: s, id }
Self {
inner: s,
id,
shutdown: false,
}
}
}

Expand All @@ -141,12 +142,11 @@ impl<B: Buf> SendStream<B> for H3Stream {

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
// always ready to send?
// convert this to open or start?
// if send is in progress?
Poll::Ready(Ok(()))
self.inner
.poll_ready_send(cx)
.map_err(|e| H3Error::new(e, None))
}

fn send_data<T: Into<h3::quic::WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
Expand All @@ -155,11 +155,19 @@ impl<B: Buf> SendStream<B> for H3Stream {
Ok(())
}

// send shutdown signal to peer.
fn poll_finish(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_send(cx).map_err(|e|{H3Error::new(e, None)})
// close the stream
if !self.shutdown {
self.inner.shutdown_only();
self.shutdown = true;
}
self.inner
.poll_shutdown(cx)
.map_err(|e| H3Error::new(e, None))
}

fn reset(&mut self, _reset_code: u64) {
Expand All @@ -178,7 +186,7 @@ impl RecvStream for H3Stream {

fn poll_data(
&mut self,
cx: &mut std::task::Context<'_>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::Buf>, Self::Error>> {
// let fu = self.inner.receive();
// let innner = <Receiver<T> as Future>::poll(Pin::new(&mut self.rx), _cx);
Expand All @@ -204,7 +212,7 @@ impl<B: Buf> BidiStream<B> for H3Stream {

fn split(self) -> (Self::SendStream, Self::RecvStream) {
let cp = self.inner.clone();
let id = self.id.clone();
let id = self.id;
(self, H3Stream::new(cp, id))
}
}
62 changes: 34 additions & 28 deletions crates/libs/msquic/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use crate::{
buffer::{debug_buf_to_string, debug_raw_buf_to_string, QBufWrap, QBytesMut},
buffer::{QBufWrap, QBytesMut},
conn::QConnection,
info,
sync::{QSignal, QWakableQueue, QWakableSig},
Expand Down Expand Up @@ -46,7 +46,7 @@ struct QStreamCtx {
start_sig: QWakableQueue<StartPayload>,
receive_ch: QWakableQueue<QBytesMut>,
send_sig: QWakableSig<SentPayload>,
send_shtdwn_sig: QSignal,
send_shtdwn_sig: QWakableSig<()>,
drain_sig: QSignal,
is_drained: bool,
pending_buf: Option<QBufWrap>, // because msquic copies buffers in background we need to hold the buffer temporarily
Expand All @@ -58,7 +58,7 @@ impl QStreamCtx {
start_sig: QWakableQueue::default(),
receive_ch: QWakableQueue::default(),
send_sig: QWakableSig::default(),
send_shtdwn_sig: QSignal::new(),
send_shtdwn_sig: QWakableSig::default(),
drain_sig: QSignal::new(),
is_drained: false,
pending_buf: None,
Expand All @@ -81,15 +81,15 @@ impl QStreamCtx {
fn on_receive(&mut self, buffs: &[Buffer]) {
// send to frontend
let v = QBytesMut::from_buffs(buffs);
let s = debug_buf_to_string(v.0.clone());
let original = debug_raw_buf_to_string(buffs[0]);
info!(
"debug: receive bytes: {} len:{}, original {}, len: {}",
s,
s.len(),
original,
original.len()
);
// let s = debug_buf_to_string(v.0.clone());
// let original = debug_raw_buf_to_string(buffs[0]);
// info!(
// "debug: receive bytes: {} len:{}, original {}, len: {}",
// s,
// s.len(),
// original,
// original.len()
// );
self.receive_ch.insert(v);
}
fn on_peer_send_shutdown(&mut self) {
Expand All @@ -102,9 +102,7 @@ impl QStreamCtx {
self.receive_ch.close();
}
fn on_send_shutdown_complete(&mut self) {
if self.send_shtdwn_sig.can_set() {
self.send_shtdwn_sig.set(());
}
self.send_shtdwn_sig.set(());
}
fn on_shutdown_complete(&mut self) {
// close all channels
Expand Down Expand Up @@ -269,7 +267,7 @@ impl QStream {
// // TODO: handle error
// let _ = self.inner.inner.receive_complete(len);
// }
pub fn send_only(&mut self, buffers: impl Buf + 'static, flags: SendFlags) {
pub fn send_only(&mut self, buffers: impl Buf, flags: SendFlags) {
let mut lk = self.ctx.lock().unwrap();
lk.send_sig.set_frontend_pending();
let b = QBufWrap::new(Box::new(buffers));
Expand Down Expand Up @@ -306,11 +304,7 @@ impl QStream {
}
}

pub async fn send(
&mut self,
buffers: impl Buf + 'static,
flags: SendFlags,
) -> Result<(), Error> {
pub async fn send(&mut self, buffers: impl Buf, flags: SendFlags) -> Result<(), Error> {
self.send_only(buffers, flags);
let fu = poll_fn(|cx| self.poll_send(cx));
fu.await
Expand All @@ -326,15 +320,27 @@ impl QStream {
Self::poll_send_inner(&mut lk, cx)
}

pub fn shutdown_only(&mut self) {
let mut lk = self.ctx.lock().unwrap();
lk.send_shtdwn_sig.reset();
self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0);
}

pub fn poll_shutdown(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Error>> {
let mut lk = self.ctx.lock().unwrap();
let p = lk.send_shtdwn_sig.poll(cx);
match p {
Poll::Ready(_) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}

// send shutdown signal to peer.
// do not call this if already indicated shutdown during send.
pub async fn shutdown(&mut self) {
let rx;
{
rx = self.ctx.lock().unwrap().send_shtdwn_sig.reset();
self.inner.inner.shutdown(STREAM_SHUTDOWN_FLAG_NONE, 0);
}
rx.await;
pub async fn shutdown(&mut self) -> Result<(), Error> {
self.shutdown_only();
let fu = poll_fn(|cx| self.poll_shutdown(cx));
fu.await
}

// this is for h3 where the interface does not wait
Expand Down

0 comments on commit 2f4c983

Please sign in to comment.