Skip to content

Commit

Permalink
Merge pull request #7366 from neondatabase/proxy-hotfix
Browse files Browse the repository at this point in the history
Release proxy (2024-04-11 hotfix)
  • Loading branch information
conradludgate authored Apr 12, 2024
2 parents 3fa17e9 + 95a184e commit 943b1bc
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;

use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL};
Expand Down Expand Up @@ -469,15 +470,32 @@ pub fn poll_client<C: ClientInnerExt>(

let db_user = conn_info.db_and_user();
let idle = global_pool.get_idle_timeout();
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();

tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);

poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
}

match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
}
_ => {}
}

// 5 minute idle connection timeout
Expand Down Expand Up @@ -532,6 +550,7 @@ pub fn poll_client<C: ClientInnerExt>(
let inner = ClientInner {
inner: client,
session: tx,
cancel,
aux,
conn_id,
};
Expand All @@ -541,10 +560,18 @@ pub fn poll_client<C: ClientInnerExt>(
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
aux: MetricsAuxInfo,
conn_id: uuid::Uuid,
}

impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}

pub trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
Expand Down Expand Up @@ -697,6 +724,7 @@ mod tests {
ClientInner {
inner: client,
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
cancel: CancellationToken::new(),
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),
Expand Down

1 comment on commit 943b1bc

@github-actions
Copy link

@github-actions github-actions bot commented on 943b1bc Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2778 tests run: 2660 passed, 0 failed, 118 skipped (full report)


Flaky tests (3)

Postgres 16

  • test_statvfs_pressure_usage: debug
  • test_vm_bit_clear_on_heap_lock: debug

Postgres 14

Code coverage* (full report)

  • functions: 28.1% (6431 of 22870 functions)
  • lines: 46.8% (45155 of 96418 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
943b1bc at 2024-04-12T10:41:50.508Z :recycle:

Please sign in to comment.