Skip to content

Commit

Permalink
remove if let statements from server::run and some session logic
Browse files Browse the repository at this point in the history
for an easier introduction of custom tags on frontends
  • Loading branch information
Keksoj committed Jul 20, 2022
1 parent 15bd0fd commit a4e7dec
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 392 deletions.
196 changes: 103 additions & 93 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,108 +158,118 @@ impl Session {
pub fn upgrade(&mut self) -> bool {
debug!("HTTP::upgrade");
let protocol = unwrap_msg!(self.protocol.take());
if let State::Http(mut http) = protocol {
debug!("switching to pipe");
let front_token = self.frontend_token;
let back_token = unwrap_msg!(http.back_token());
let ws_context = http.websocket_context();

let front_buf = match http.front_buf {
Some(buf) => buf.buffer,
None => {
if let Some(p) = self.pool.upgrade() {
if let Some(buf) = p.borrow_mut().checkout() {
buf
} else {
return false;
}
} else {
return false;
}
}
};

let back_buf = match http.back_buf {
Some(buf) => buf.buffer,
None => {
if let Some(p) = self.pool.upgrade() {
if let Some(buf) = p.borrow_mut().checkout() {
buf
} else {
return false;
}
} else {
return false;
match protocol {
State::Http(mut http) => {
debug!("switching to pipe");
let front_token = self.frontend_token;
let back_token = unwrap_msg!(http.back_token());
let ws_context = http.websocket_context();

let front_buf = match http.front_buf {
Some(buf) => buf.buffer,
None => {
let pool = match self.pool.upgrade() {
Some(p) => p,
None => return false,
};

let buffer = match pool.borrow_mut().checkout() {
Some(buf) => buf,
None => return false,
};
buffer
}
}
};
};

gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
let mut pipe = Pipe::new(
http.frontend,
front_token,
http.request_id,
http.cluster_id,
http.backend_id,
Some(ws_context),
Some(unwrap_msg!(http.backend)),
front_buf,
back_buf,
http.session_address,
Protocol::HTTP,
);
let back_buf = match http.back_buf {
Some(buf) => buf.buffer,
None => {
let pool = match self.pool.upgrade() {
Some(p) => p,
None => return false,
};

let buffer = match pool.borrow_mut().checkout() {
Some(buf) => buf,
None => return false,
};
buffer
}
};

pipe.front_readiness.event = http.front_readiness.event;
pipe.back_readiness.event = http.back_readiness.event;
http.front_timeout
.set_duration(self.frontend_timeout_duration);
http.back_timeout
.set_duration(self.backend_timeout_duration);
pipe.front_timeout = Some(http.front_timeout);
pipe.back_timeout = Some(http.back_timeout);
pipe.set_back_token(back_token);
//pipe.set_cluster_id(self.cluster_id.clone());

self.protocol = Some(State::WebSocket(pipe));
true
} else if let State::Expect(expect) = protocol {
debug!("switching to HTTP");
if let Some((Some(public_address), Some(client_address))) = expect
.addresses
.as_ref()
.map(|add| (add.destination(), add.source()))
{
let readiness = expect.readiness;
let mut http = Http::new(
expect.frontend,
expect.frontend_token,
expect.request_id,
self.pool.clone(),
public_address,
Some(client_address),
self.sticky_name.clone(),
gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
let mut pipe = Pipe::new(
http.frontend,
front_token,
http.request_id,
http.cluster_id,
http.backend_id,
Some(ws_context),
Some(unwrap_msg!(http.backend)),
front_buf,
back_buf,
http.session_address,
Protocol::HTTP,
self.answers.clone(),
self.front_timeout.take(),
self.frontend_timeout_duration,
self.backend_timeout_duration,
);
http.front_readiness.event = readiness.event;

gauge_add!("protocol.proxy.expect", -1);
gauge_add!("protocol.http", 1);
self.protocol = Some(State::Http(http));
pipe.front_readiness.event = http.front_readiness.event;
pipe.back_readiness.event = http.back_readiness.event;
http.front_timeout
.set_duration(self.frontend_timeout_duration);
http.back_timeout
.set_duration(self.backend_timeout_duration);
pipe.front_timeout = Some(http.front_timeout);
pipe.back_timeout = Some(http.back_timeout);
pipe.set_back_token(back_token);
//pipe.set_cluster_id(self.cluster_id.clone());

self.protocol = Some(State::WebSocket(pipe));
true
}
State::Expect(expect) => {
debug!("switching to HTTP");
match expect
.addresses
.as_ref()
.map(|add| (add.destination(), add.source()))
{
Some((Some(public_address), Some(client_address))) => {
let readiness = expect.readiness;
let mut http = Http::new(
expect.frontend,
expect.frontend_token,
expect.request_id,
self.pool.clone(),
public_address,
Some(client_address),
self.sticky_name.clone(),
Protocol::HTTP,
self.answers.clone(),
self.front_timeout.take(),
self.frontend_timeout_duration,
self.backend_timeout_duration,
);
http.front_readiness.event = readiness.event;

gauge_add!("protocol.proxy.expect", -1);
gauge_add!("protocol.http", 1);
self.protocol = Some(State::Http(http));
true
}
_ => {
self.protocol = Some(State::Expect(expect));
false
}
}
}
_ => {
self.protocol = Some(protocol);
true
} else {
self.protocol = Some(State::Expect(expect));
false
}
} else {
self.protocol = Some(protocol);
true
}
}

Expand Down
108 changes: 54 additions & 54 deletions lib/src/https_rustls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,65 +369,65 @@ impl ProxyConfiguration<Session> for Proxy {
wait_time: Duration,
proxy: Rc<RefCell<Self>>,
) -> Result<(), AcceptError> {
if let Some(listener) = self.listeners.get(&Token(token.0)) {
if let Err(e) = frontend_sock.set_nodelay(true) {
error!(
"error setting nodelay on front socket({:?}): {:?}",
frontend_sock, e
);
}

let mut s = self.sessions.borrow_mut();
let entry = s.slab.vacant_entry();
let session_token = Token(entry.key());
let listener = match self.listeners.get(&Token(token.0)) {
Some(l) => l,
None => return Err(AcceptError::IoError), //FIXME
};

if let Err(e) = self.registry.register(
&mut frontend_sock,
session_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!(
"error registering fron socket({:?}): {:?}",
frontend_sock, e
);
}
if let Err(e) = frontend_sock.set_nodelay(true) {
error!(
"error setting nodelay on front socket({:?}): {:?}",
frontend_sock, e
);
}

let session = match ServerConnection::new(listener.ssl_config.clone()) {
Ok(session) => session,
Err(e) => {
error!("failed to create server session: {:?}", e);
return Err(AcceptError::IoError);
}
};
let c = Session::new(
session,
frontend_sock,
session_token,
Rc::downgrade(&self.pool),
proxy,
listener
.config
.public_address
.unwrap_or(listener.config.address),
listener.config.expect_proxy,
listener.config.sticky_name.clone(),
listener.answers.clone(),
Token(token.0),
wait_time,
Duration::seconds(listener.config.front_timeout as i64),
Duration::seconds(listener.config.back_timeout as i64),
Duration::seconds(listener.config.request_timeout as i64),
let mut s = self.sessions.borrow_mut();
let entry = s.slab.vacant_entry();
let session_token = Token(entry.key());

if let Err(e) = self.registry.register(
&mut frontend_sock,
session_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!(
"error registering fron socket({:?}): {:?}",
frontend_sock, e
);
}

let session = match ServerConnection::new(listener.ssl_config.clone()) {
Ok(session) => session,
Err(e) => {
error!("failed to create server session: {:?}", e);
return Err(AcceptError::IoError);
}
};
let c = Session::new(
session,
frontend_sock,
session_token,
Rc::downgrade(&self.pool),
proxy,
listener
.config
.public_address
.unwrap_or(listener.config.address),
listener.config.expect_proxy,
listener.config.sticky_name.clone(),
listener.answers.clone(),
Token(token.0),
wait_time,
Duration::seconds(listener.config.front_timeout as i64),
Duration::seconds(listener.config.back_timeout as i64),
Duration::seconds(listener.config.request_timeout as i64),
);

let session = Rc::new(RefCell::new(c));
entry.insert(session);
let session = Rc::new(RefCell::new(c));
entry.insert(session);

s.incr();
Ok(())
} else {
//FIXME
Err(AcceptError::IoError)
}
s.incr();
Ok(())
}

fn notify(&mut self, message: ProxyRequest) -> ProxyResponse {
Expand Down

0 comments on commit a4e7dec

Please sign in to comment.