Skip to content

Commit

Permalink
Stabilaze windows polling feature
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiburt committed Mar 3, 2024
1 parent 88539a3 commit c48d7fe
Showing 1 changed file with 18 additions and 59 deletions.
77 changes: 18 additions & 59 deletions src/interact/session.rs
Expand Up @@ -334,7 +334,7 @@ where
}

#[cfg(all(windows, feature = "polling", not(feature = "async")))]
impl<I, O> InteractSession<&mut Session, I, O>
impl<I, O, C> InteractSession<crate::session::OsSession, I, O, C>
where
I: Read + Clone + Send + 'static,
O: Write,
Expand All @@ -344,16 +344,8 @@ where
/// See [`Session::interact`].
///
/// [`Session::interact`]: crate::session::Session::interact
pub fn spawn<C, IF, OF, IA, OA, WA, OPS>(&mut self, mut ops: OPS) -> Result<bool, Error>
where
OPS: BorrowMut<InteractOptions<C, IF, OF, IA, OA, WA>>,
IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
{
interact_polling_on_thread(self, ops.borrow_mut())
pub fn spawn(&mut self) -> Result<bool, Error> {
interact_polling_on_thread(self)
}
}

Expand Down Expand Up @@ -618,38 +610,32 @@ where
}

#[cfg(all(windows, not(feature = "async"), feature = "polling"))]
fn interact_polling_on_thread<O, I, C, IF, OF, IA, OA, WA>(
interact: &mut InteractSession<&mut Session, I, O>,
opts: &mut InteractOptions<C, IF, OF, IA, OA, WA>,
fn interact_polling_on_thread<I, O, C>(
s: &mut InteractSession<crate::session::OsSession, I, O, C>,
) -> Result<bool, Error>
where
I: Read + Clone + Send + 'static,
O: Write,
IF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
OF: FnMut(&[u8]) -> Result<Cow<'_, [u8]>, Error>,
IA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
OA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
WA: FnMut(Context<'_, Session, I, O, C>) -> Result<bool, Error>,
{
use crate::{
error::to_io_error,
waiter::{Recv, Wait2},
};

// Create a poller and register interest in readability on the socket.
let stream = interact
let stream = s
.session
.get_stream()
.try_clone()
.map_err(to_io_error(""))?;
let mut poller = Wait2::new(interact.input.clone(), stream);
let mut poller = Wait2::new(s.input.clone(), stream);

loop {
// In case where proceses exits we are trying to
// fill buffer to run callbacks if there was something in.
//
// We ignore errors because there might be errors like EOCHILD etc.
if interact.session.is_alive()? {
if s.session.is_alive()? {
return Ok(false);
}

Expand All @@ -663,30 +649,21 @@ where
let n = if eof { 0 } else { 1 };
let buf = &buf[..n];

let buf = call_filter(opts.input_filter.as_mut(), buf)?;

let exit = call_action(
opts.input_action.as_mut(),
interact.session,
&mut interact.input,
&mut interact.output,
&mut opts.state,
&buf,
eof,
)?;
let buf = call_filter(s.opts.input_filter.as_mut(), buf)?;

let exit = run_action_input(s, &buf, eof)?;
if eof || exit {
return Ok(true);
}

// todo: replace all of these by 1 by 1 write
let escape_char_pos = buf.iter().position(|c| *c == interact.escape_character);
let escape_char_pos = buf.iter().position(|c| *c == s.escape_character);
match escape_char_pos {
Some(pos) => {
interact.session.write_all(&buf[..pos])?;
s.session.write_all(&buf[..pos])?;
return Ok(true);
}
None => interact.session.write_all(&buf[..])?,
None => s.session.write_all(&buf[..])?,
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
Expand All @@ -699,39 +676,21 @@ where
let n = if eof { 0 } else { 1 };
let buf = &buf[..n];

let buf = call_filter(opts.output_filter.as_mut(), buf)?;

let exit = call_action(
opts.output_action.as_mut(),
interact.session,
&mut interact.input,
&mut interact.output,
&mut opts.state,
&buf,
eof,
)?;
let buf = call_filter(s.opts.output_filter.as_mut(), buf)?;

let exit = run_action_output(s, &buf, eof)?;
if eof || exit {
return Ok(true);
}

interact.output.write_all(&buf)?;
interact.output.flush()?;
s.output.write_all(&buf)?;
s.output.flush()?;
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
Err(err) => return Err(err.into()),
},
Recv::Timeout => {
let exit = call_action(
opts.idle_action.as_mut(),
interact.session,
&mut interact.input,
&mut interact.output,
&mut opts.state,
&[],
false,
)?;

let exit = run_action_idle(s, &[], false)?;
if exit {
return Ok(true);
}
Expand Down

0 comments on commit c48d7fe

Please sign in to comment.