Skip to content

Commit

Permalink
ScmSocket::set_blocking updates its blocking field, takes &mut self
Browse files Browse the repository at this point in the history
rewrite notify_deactivate_listener to accomodate ScmSocket::set_blocking
signature
  • Loading branch information
Keksoj authored and FlorentinDUBOIS committed Dec 1, 2022
1 parent 33b3e5f commit 719b637
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 103 deletions.
10 changes: 7 additions & 3 deletions command/src/scm_socket.rs
Expand Up @@ -20,7 +20,7 @@ pub const MAX_BYTES_OUT: usize = 4096;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScmSocket {
pub fd: RawFd,
pub blocking: bool, // TODO this is never used anywhere. Use it by updating / checking it
pub blocking: bool,
}

impl ScmSocket {
Expand All @@ -43,14 +43,18 @@ impl ScmSocket {
}

/// Use the standard library (unsafe) to set the socket to blocking / unblocking
pub fn set_blocking(&self, blocking: bool) -> anyhow::Result<()> {
pub fn set_blocking(&mut self, blocking: bool) -> anyhow::Result<()> {
if self.blocking == blocking {
return Ok(());
}
unsafe {
let stream = StdUnixStream::from_raw_fd(self.fd);
stream
.set_nonblocking(!blocking)
.with_context(|| "could not change blocking status for stream")?;
let _dropped_fd = stream.into_raw_fd();
}
self.blocking = blocking;
Ok(())
}

Expand Down Expand Up @@ -261,7 +265,7 @@ mod tests {
let scm_socket = ScmSocket::new(raw_file_descriptor);
assert!(scm_socket.is_ok());

let scm_socket = scm_socket.unwrap();
let mut scm_socket = scm_socket.unwrap();

assert!(scm_socket.set_blocking(true).is_ok());
assert!(scm_socket.set_blocking(false).is_ok());
Expand Down
204 changes: 104 additions & 100 deletions lib/src/server.rs
Expand Up @@ -1264,148 +1264,152 @@ impl Server {

match deactivate.proxy {
ListenerType::HTTP => {
match self
let (token, mut listener) = match self
.http
.borrow_mut()
.give_back_listener(deactivate.address)
{
Some((token, mut listener)) => {
if let Err(e) = self.poll.registry().deregister(&mut listener) {
error!(
"error deregistering HTTP listen socket({:?}): {:?}",
deactivate, e
);
}
let mut sessions = self.sessions.borrow_mut();
if sessions.slab.contains(token.0) {
sessions.slab.remove(token.0);
info!("removed listen token {:?}", token);
}

if deactivate.to_scm {
self.unblock_scm_socket();
let listeners = Listeners {
http: vec![(deactivate.address, listener.as_raw_fd())],
tls: vec![],
tcp: vec![],
};
info!("sending HTTP listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);

self.block_scm_socket();

info!("sent HTTP listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
Some((token, listener)) => (token, listener),
None => {
error!(
"Couldn't deactivate HTTP listener at address {:?}",
deactivate.address
);
ProxyResponse::error(
return ProxyResponse::error(
req_id,
format!(
"cannot deactivate HTTP listener at address {:?}",
deactivate.address
),
)
);
}
};

if let Err(e) = self.poll.registry().deregister(&mut listener) {
error!(
"error deregistering HTTP listen socket({:?}): {:?}",
deactivate, e
);
}

{
let mut sessions = self.sessions.borrow_mut();
if sessions.slab.contains(token.0) {
sessions.slab.remove(token.0);
info!("removed listen token {:?}", token);
}
}

if deactivate.to_scm {
self.unblock_scm_socket();
let listeners = Listeners {
http: vec![(deactivate.address, listener.as_raw_fd())],
tls: vec![],
tcp: vec![],
};
info!("sending HTTP listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);

self.block_scm_socket();

info!("sent HTTP listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
ListenerType::HTTPS => {
match self
let (token, mut listener) = match self
.https
.borrow_mut()
.give_back_listener(deactivate.address)
{
Some((token, mut listener)) => {
if let Err(e) = self.poll.registry().deregister(&mut listener) {
error!(
"error deregistering HTTPS listen socket({:?}): {:?}",
deactivate, e
);
}
if self.sessions.borrow().slab.contains(token.0) {
self.sessions.borrow_mut().slab.remove(token.0);
info!("removed listen token {:?}", token);
}
Some((token, listener)) => (token, listener),

if deactivate.to_scm {
self.scm.set_blocking(false);
let listeners = Listeners {
http: vec![],
tls: vec![(deactivate.address, listener.as_raw_fd())],
tcp: vec![],
};
info!("sending HTTPS listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);

self.scm.set_blocking(true);

info!("sent HTTPS listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
None => {
error!(
"Couldn't deactivate HTTPS listener at address {:?}",
deactivate.address
);
ProxyResponse::error(
return ProxyResponse::error(
req_id,
format!(
"cannot deactivate HTTPS listener at address {:?}",
deactivate.address
),
)
);
}
};
if let Err(e) = self.poll.registry().deregister(&mut listener) {
error!(
"error deregistering HTTPS listen socket({:?}): {:?}",
deactivate, e
);
}
if self.sessions.borrow().slab.contains(token.0) {
self.sessions.borrow_mut().slab.remove(token.0);
info!("removed listen token {:?}", token);
}

if deactivate.to_scm {
self.unblock_scm_socket();
let listeners = Listeners {
http: vec![],
tls: vec![(deactivate.address, listener.as_raw_fd())],
tcp: vec![],
};
info!("sending HTTPS listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);

self.block_scm_socket();

info!("sent HTTPS listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
ListenerType::TCP => {
match self.tcp.borrow_mut().give_back_listener(deactivate.address) {
Some((token, mut listener)) => {
if let Err(e) = self.poll.registry().deregister(&mut listener) {
let (token, mut listener) =
match self.tcp.borrow_mut().give_back_listener(deactivate.address) {
Some((token, listener)) => (token, listener),
None => {
error!(
"error deregistering TCP listen socket({:?}): {:?}",
deactivate, e
"Couldn't deactivate TCP listener at address {:?}",
deactivate.address
);
return ProxyResponse::error(
req_id,
format!(
"cannot deactivate TCP listener at address {:?}",
deactivate.address
),
);
}
if self.sessions.borrow().slab.contains(token.0) {
self.sessions.borrow_mut().slab.remove(token.0);
info!("removed listen token {:?}", token);
}
};

if deactivate.to_scm {
self.unblock_scm_socket();
let listeners = Listeners {
http: vec![],
tls: vec![],
tcp: vec![(deactivate.address, listener.as_raw_fd())],
};
info!("sending TCP listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);
if let Err(e) = self.poll.registry().deregister(&mut listener) {
error!(
"error deregistering TCP listen socket({:?}): {:?}",
deactivate, e
);
}
if self.sessions.borrow().slab.contains(token.0) {
self.sessions.borrow_mut().slab.remove(token.0);
info!("removed listen token {:?}", token);
}

self.block_scm_socket();
if deactivate.to_scm {
self.unblock_scm_socket();
let listeners = Listeners {
http: vec![],
tls: vec![],
tcp: vec![(deactivate.address, listener.as_raw_fd())],
};
info!("sending TCP listener: {:?}", listeners);
let res = self.scm.send_listeners(&listeners);

info!("sent TCP listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
None => {
error!(
"Couldn't deactivate TCP listener at address {:?}",
deactivate.address
);
ProxyResponse::error(
req_id,
format!(
"cannot deactivate TCP listener at address {:?}",
deactivate.address
),
)
}
self.block_scm_socket();

info!("sent TCP listener: {:?}", res);
}
ProxyResponse::ok(req_id)
}
}
}
Expand Down Expand Up @@ -1464,13 +1468,13 @@ impl Server {
info!("sent default listeners: {:?}", res);
}

fn block_scm_socket(&self) {
fn block_scm_socket(&mut self) {
if let Err(e) = self.scm.set_blocking(true) {
error!("Could not block scm socket: {}", e);
}
}

fn unblock_scm_socket(&self) {
fn unblock_scm_socket(&mut self) {
if let Err(e) = self.scm.set_blocking(false) {
error!("Could not unblock scm socket: {}", e);
}
Expand Down

0 comments on commit 719b637

Please sign in to comment.