Skip to content

Commit

Permalink
feat: __lbheartbeat__ will return 500 if the connection pool is exh…
Browse files Browse the repository at this point in the history
…austed (#997)

* feat: `__lbheartbeat__` will return 500 if the connection pool is exhausted.

Closes #996
  • Loading branch information
jrconlin committed Feb 3, 2021
1 parent 8cb5b60 commit e72573a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
62 changes: 61 additions & 1 deletion src/server/test.rs
Expand Up @@ -76,7 +76,10 @@ async fn get_test_state(settings: &Settings) -> ServerState {
metrics: Box::new(metrics),
port: settings.port,
quota_enabled: settings.enable_quota,
deadman: Arc::new(RwLock::new(Deadman::default())),
deadman: Arc::new(RwLock::new(Deadman {
max_size: settings.database_pool_max_size,
..Default::default()
})),
}
}

Expand Down Expand Up @@ -695,3 +698,60 @@ async fn overquota() {
let resp = app.call(req).await.unwrap();
assert!(resp.response().status().is_success());
}

#[actix_rt::test]
async fn lbheartbeat_check() {
use actix_web::web::Buf;

let mut settings = get_test_settings();
settings.database_pool_max_size = Some(10);

let mut app = init_app!(settings).await;

// Test all is well.
let lb_req = create_request(http::Method::GET, "/__lbheartbeat__", None, None).to_request();
let sresp = app.call(lb_req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status.is_success());

// Exhaust the connections.
let mut headers: HashMap<&str, String> = HashMap::new();
headers.insert("TEST_CONNECTIONS", "10".to_owned());
headers.insert("TEST_IDLES", "0".to_owned());
let req = create_request(
http::Method::GET,
"/__lbheartbeat__",
Some(headers.clone()),
None,
)
.to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status == StatusCode::INTERNAL_SERVER_ERROR);

// check duration for exhausted connections
std::thread::sleep(std::time::Duration::from_secs(1));
let req =
create_request(http::Method::GET, "/__lbheartbeat__", Some(headers), None).to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
let body = test::read_body(sresp).await;
let resp: HashMap<String, serde_json::value::Value> =
serde_json::de::from_str(std::str::from_utf8(body.bytes()).unwrap()).unwrap();
// dbg!(status, body, &resp);
assert!(status == StatusCode::INTERNAL_SERVER_ERROR);
assert!(resp.get("duration_ms").unwrap().as_u64().unwrap() > 1000);

// check recovery
let mut headers: HashMap<&str, String> = HashMap::new();
headers.insert("TEST_CONNECTIONS", "5".to_owned());
headers.insert("TEST_IDLES", "5".to_owned());
let req =
create_request(http::Method::GET, "/__lbheartbeat__", Some(headers), None).to_request();
let sresp = app.call(req).await.unwrap();
let status = sresp.status();
// dbg!(status, test::read_body(sresp).await);
assert!(status == StatusCode::OK);
}
39 changes: 35 additions & 4 deletions src/web/handlers.rs
@@ -1,7 +1,9 @@
//! API Handlers
use std::collections::HashMap;

use actix_web::{http::StatusCode, web::Data, Error, HttpRequest, HttpResponse};
use actix_web::{
dev::HttpResponseBuilder, http::StatusCode, web::Data, Error, HttpRequest, HttpResponse,
};
use serde::Serialize;
use serde_json::{json, Value};

Expand Down Expand Up @@ -553,15 +555,44 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {

let deadarc = state.deadman.clone();
let mut deadman = *deadarc.read().await;
let db_state = state.db_pool.clone().state();
let db_state = if cfg!(test) {
use crate::db::results::PoolState;
use actix_web::http::header::HeaderValue;
use std::str::FromStr;

let test_pool = PoolState {
connections: u32::from_str(
req.headers()
.get("TEST_CONNECTIONS")
.unwrap_or(&HeaderValue::from_static("0"))
.to_str()
.unwrap_or("0"),
)
.unwrap_or_default(),
idle_connections: u32::from_str(
req.headers()
.get("TEST_IDLES")
.unwrap_or(&HeaderValue::from_static("0"))
.to_str()
.unwrap_or("0"),
)
.unwrap_or_default(),
};
// dbg!(&test_pool, deadman.max_size);
test_pool
} else {
state.db_pool.clone().state()
};

let active = db_state.connections - db_state.idle_connections;
let mut status_code = StatusCode::OK;

if let Some(max_size) = deadman.max_size {
if active >= max_size && db_state.idle_connections == 0 {
if deadman.previous_count > 0 {
if deadman.clock_start.is_none() {
deadman.clock_start = Some(time::Instant::now());
}
status_code = StatusCode::INTERNAL_SERVER_ERROR;
} else if deadman.clock_start.is_some() {
deadman.clock_start = None
}
Expand All @@ -583,7 +614,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {
};
}

Ok(HttpResponse::Ok().json(json!(resp)))
Ok(HttpResponseBuilder::new(status_code).json(json!(resp)))
}

// try returning an API error
Expand Down

0 comments on commit e72573a

Please sign in to comment.