Skip to content

Commit

Permalink
abstract out HTTP and HTTPS notify methods
Browse files Browse the repository at this point in the history
* abstract out methods for Http Proxy::notify

* abstract out methods for Https Proxy::notify

* remove request_id from HTTP proxy methods called by notify()

* remove request_id from HTTPS proxy methods called by notify()

* log error and success of HTTP proxy shutdown

* log errors and successes of HTTPS proxy shutdown

* log order execution success and failure on HTTPS proxy

* add context to notify() functions of HTTP and HTTPS functions

* HTTP proxy returns more concise error on AddHttpFrontend

* HTTPS proxy: provide listener address in certificate errors
  • Loading branch information
Keksoj committed Dec 8, 2022
1 parent 029cc0b commit 319119a
Show file tree
Hide file tree
Showing 2 changed files with 489 additions and 298 deletions.
263 changes: 159 additions & 104 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use anyhow::{bail, Context};
use mio::{net::*, unix::SourceFd, *};
use rusty_ulid::Ulid;
use slab::Slab;
use sozu_command::proxy::RemoveListener;
use time::{Duration, Instant};

use crate::{
Expand Down Expand Up @@ -1449,11 +1450,15 @@ impl Proxy {
self.listeners.get(token).map(Clone::clone)
}

pub fn remove_listener(&mut self, address: SocketAddr) -> bool {
pub fn remove_listener(&mut self, remove: RemoveListener) -> anyhow::Result<()> {
let len = self.listeners.len();
self.listeners
.retain(|_, l| l.borrow().address != remove.address);

self.listeners.retain(|_, l| l.borrow().address != address);
self.listeners.len() < len
if !self.listeners.len() < len {
info!("no HTTP listener to remove at address {:?}", remove.address);
}
Ok(())
}

pub fn activate_listener(
Expand Down Expand Up @@ -1495,7 +1500,7 @@ impl Proxy {
})
}

pub fn add_cluster(&mut self, cluster: Cluster) {
pub fn add_cluster(&mut self, cluster: Cluster) -> anyhow::Result<()> {
if let Some(answer_503) = &cluster.answer_503 {
for listener in self.listeners.values() {
listener
Expand All @@ -1505,11 +1510,11 @@ impl Proxy {
.add_custom_answer(&cluster.cluster_id, answer_503);
}
}

self.clusters.insert(cluster.cluster_id.clone(), cluster);
Ok(())
}

pub fn remove_cluster(&mut self, cluster_id: &str) {
pub fn remove_cluster(&mut self, cluster_id: &str) -> anyhow::Result<()> {
self.clusters.remove(cluster_id);

for listener in self.listeners.values() {
Expand All @@ -1519,6 +1524,96 @@ impl Proxy {
.borrow_mut()
.remove_custom_answer(cluster_id);
}
Ok(())
}

pub fn add_http_frontend(&mut self, front: HttpFrontend) -> anyhow::Result<()> {
match self
.listeners
.values()
.find(|l| l.borrow().address == front.address)
{
Some(listener) => {
let mut owned = listener.borrow_mut();

let hostname = front.hostname.to_owned();
let tags = front.tags.to_owned();

match owned.add_http_front(front) {
Ok(_) => {
owned.set_tags(hostname, tags);
Ok(())
}
Err(err) => Err(anyhow::Error::msg(err)),
}
}
None => bail!("no HTTP listener found for address: {}", front.address),
}
}

pub fn remove_http_frontend(&mut self, front: HttpFrontend) -> anyhow::Result<()> {
if let Some(listener) = self
.listeners
.values()
.find(|l| l.borrow().address == front.address)
{
let mut owned = listener.borrow_mut();
let hostname = front.hostname.to_owned();

match owned.remove_http_front(front) {
Ok(_) => owned.set_tags(hostname, None),
Err(err) => return Err(anyhow::Error::msg(err)),
}
}
Ok(())
}

pub fn soft_stop(&mut self) -> anyhow::Result<()> {
let listeners: HashMap<_, _> = self.listeners.drain().collect();
let mut socket_errors = vec![];
for (_, l) in listeners.iter() {
if let Some(mut sock) = l.borrow_mut().listener.take() {
debug!("Deregistering socket {:?}", sock);
if let Err(e) = self.registry.deregister(&mut sock) {
let error = format!("socket {:?}: {:?}", sock, e);
socket_errors.push(error);
}
}
}

if !socket_errors.is_empty() {
bail!("Error deregistering listen sockets: {:?}", socket_errors);
}

Ok(())
}

pub fn hard_stop(&mut self) -> anyhow::Result<()> {
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
let mut socket_errors = vec![];
for (_, l) in listeners.drain() {
if let Some(mut sock) = l.borrow_mut().listener.take() {
debug!("Deregistering socket {:?}", sock);
if let Err(e) = self.registry.deregister(&mut sock) {
let error = format!("socket {:?}: {:?}", sock, e);
socket_errors.push(error);
}
}
}

if !socket_errors.is_empty() {
bail!("Error deregistering listen sockets: {:?}", socket_errors);
}

Ok(())
}

pub fn logging(&mut self, logging_filter: String) -> anyhow::Result<()> {
logging::LOGGER.with(|l| {
let directives = logging::parse_logging_spec(&logging_filter);
l.borrow_mut().set_directives(directives);
});
Ok(())
}
}

Expand Down Expand Up @@ -1641,131 +1736,91 @@ impl Listener {
}

impl ProxyConfiguration<Session> for Proxy {
fn notify(&mut self, message: ProxyRequest) -> ProxyResponse {
// ToDo temporary
//trace!("{} notified", message);
match message.order {
fn notify(&mut self, request: ProxyRequest) -> ProxyResponse {
let request_id = request.id.clone();

let result = match request.order {
ProxyRequestOrder::AddCluster(cluster) => {
debug!("{} add cluster {:?}", message.id, cluster);
self.add_cluster(cluster);
ProxyResponse::ok(message.id)
info!("{} add cluster {:?}", request.id, cluster);
self.add_cluster(cluster.clone())
.with_context(|| format!("Could not add cluster {}", cluster.cluster_id))
}
ProxyRequestOrder::RemoveCluster { cluster_id } => {
debug!("{} remove cluster {:?}", message.id, cluster_id);
self.remove_cluster(&cluster_id);
ProxyResponse::ok(message.id)
info!("{} remove cluster {:?}", request_id, cluster_id);
self.remove_cluster(&cluster_id)
.with_context(|| format!("Could not remove cluster {}", cluster_id))
}
ProxyRequestOrder::AddHttpFrontend(front) => {
debug!("{} add front {:?}", message.id, front);
if let Some(listener) = self
.listeners
.values()
.find(|l| l.borrow().address == front.address)
{
let mut owned = listener.borrow_mut();

let hostname = front.hostname.to_owned();
let tags = front.tags.to_owned();

match owned.add_http_front(front) {
Ok(_) => {
owned.set_tags(hostname, tags);

ProxyResponse::ok(message.id)
}
Err(err) => ProxyResponse::error(message.id, err),
}
} else {
ProxyResponse::error(
message.id,
format!("no HTTP listener found for front: {:?}", front),
)

// let (listener, tokens) = Listener::new(HttpListener::default(), event_loop,
// self.pool.clone(), None, token: Token) -> (Listener,HashSet<Token>
}
info!("{} add front {:?}", request_id, front);
self.add_http_frontend(front)
.with_context(|| "Could not add http frontend")
}
ProxyRequestOrder::RemoveHttpFrontend(front) => {
debug!("{} front {:?}", message.id, front);
if let Some(listener) = self
.listeners
.values()
.find(|l| l.borrow().address == front.address)
{
let mut owned = listener.borrow_mut();

let hostname = front.hostname.to_owned();

match owned.remove_http_front(front) {
Ok(_) => {
owned.set_tags(hostname, None);
ProxyResponse::ok(message.id)
}
Err(err) => ProxyResponse::error(message.id, err),
}
} else {
ProxyResponse::error(
message.id,
"trying to remove front from non existing listener",
)
}
info!("{} remove front {:?}", request_id, front);
self.remove_http_frontend(front)
.with_context(|| "Could not remove http frontend")
}
ProxyRequestOrder::RemoveListener(remove) => {
debug!("removing HTTP listener at address {:?}", remove.address);
if !self.remove_listener(remove.address) {
ProxyResponse::error(
message.id,
format!("no HTTP listener to remove at address {:?}", remove.address),
)
} else {
ProxyResponse::ok(message.id)
}
info!("removing HTTP listener at address {:?}", remove.address);
self.remove_listener(remove.clone()).with_context(|| {
format!("Could not remove listener at address {:?}", remove.address)
})
}
ProxyRequestOrder::SoftStop => {
info!("{} processing soft shutdown", message.id);
let listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.iter() {
if let Some(mut sock) = l.borrow_mut().listener.take() {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error deregistering listen socket({:?}): {:?}", sock, e);
}
info!("{} processing soft shutdown", request_id);
match self
.soft_stop()
.with_context(|| "Could not perform soft stop")
{
Ok(()) => {
info!("{} soft stop successful", request_id);
return ProxyResponse::processing(request.id);
}
Err(e) => Err(e),
}
ProxyResponse::processing(message.id)
}
ProxyRequestOrder::HardStop => {
info!("{} hard shutdown", message.id);
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.drain() {
if let Some(mut sock) = l.borrow_mut().listener.take() {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error deregistering listen socket({:?}): {:?}", sock, e);
}
info!("{} processing hard shutdown", request_id);
match self
.hard_stop()
.with_context(|| "Could not perform hard stop")
{
Ok(()) => {
info!("{} hard stop successful", request_id);
return ProxyResponse::processing(request.id);
}
Err(e) => Err(e),
}
ProxyResponse::processing(message.id)
}
ProxyRequestOrder::Status => {
debug!("{} status", message.id);
ProxyResponse::ok(message.id)
info!("{} status", request_id);
Ok(())
}
ProxyRequestOrder::Logging(logging_filter) => {
info!(
"{} changing logging filter to {}",
message.id, logging_filter
request_id, logging_filter
);
logging::LOGGER.with(|l| {
let directives = logging::parse_logging_spec(&logging_filter);
l.borrow_mut().set_directives(directives);
});
ProxyResponse::ok(message.id)
self.logging(logging_filter.clone())
.with_context(|| format!("Could not set logging level to {}", logging_filter))
}
command => {
debug!(
other_command => {
info!(
"{} unsupported message for HTTP proxy, ignoring: {:?}",
message.id, command
request.id, other_command
);
ProxyResponse::error(message.id, "unsupported message")
Err(anyhow::Error::msg("unsupported message"))
}
};

match result {
Ok(()) => {
info!("{} successful", request_id);
ProxyResponse::ok(request_id)
}
Err(error_message) => {
error!("{} unsuccessful: {:#}", request_id, error_message);
ProxyResponse::error(request_id, format!("{:#}", error_message))
}
}
}
Expand Down

0 comments on commit 319119a

Please sign in to comment.