From 15250aa332dc19dfbe41f48d55edc12365233d7d Mon Sep 17 00:00:00 2001 From: Ayaz Abbas Date: Tue, 22 Oct 2024 18:13:13 +0100 Subject: [PATCH] add http health endpoint to websocket-server --- Cargo.lock | 101 +++++++++++++++++++++++++++++++--- Cargo.toml | 5 +- example.websocket.config.toml | 3 + src/bin/websocket_server.rs | 81 +++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 194825f..e13726b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -438,6 +438,12 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -1685,6 +1691,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.6.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hash32" version = "0.2.1" @@ -1820,6 +1845,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1848,9 +1896,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1862,6 +1910,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1870,12 +1939,27 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper", + "hyper 0.14.31", "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.0", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -2784,7 +2868,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "async-nats", @@ -2795,6 +2879,9 @@ dependencies = [ "envy", "futures", "hex", + "http-body-util", + "hyper 1.5.0", + "hyper-util", "pyth-sdk-solana", "serde", "serde_json", @@ -3031,10 +3118,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.31", "hyper-rustls", "ipnet", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 7a50708..dbe72ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.2" +version = "0.1.3" edition = "2021" [lib] @@ -25,6 +25,9 @@ config = "0.14" envy = "0.4" futures = "0.3" hex = "0.4.3" +hyper = { version = "1.5.0", features = ["full"] } +hyper-util = { version = "0.1.9", features = ["tokio"] } +http-body-util = "0.1.0" pyth-sdk-solana = "0.10.2" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" diff --git a/example.websocket.config.toml b/example.websocket.config.toml index aded413..5776d91 100644 --- a/example.websocket.config.toml +++ b/example.websocket.config.toml @@ -3,3 +3,6 @@ url = "nats://localhost:4222" [websocket] addr = "0.0.0.0:8080" + +[healthcheck] +addr = "0.0.0.0:8081" diff --git a/src/bin/websocket_server.rs b/src/bin/websocket_server.rs index 28f6d7b..09f9f97 100644 --- a/src/bin/websocket_server.rs +++ b/src/bin/websocket_server.rs @@ -3,12 +3,19 @@ use async_nats::jetstream::{self, consumer}; use clap::Parser; use config::Config; use futures::{SinkExt, StreamExt}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; use pyth_stream::utils::setup_jetstream; use serde::{Deserialize, Serialize}; use std::clone::Clone; use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; use std::panic; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; @@ -37,6 +44,7 @@ struct ServerResponse { struct AppConfig { nats: NatsConfig, websocket: WebSocketConfig, + healthcheck: HealthCheckConfig, } #[derive(Debug, Deserialize, Clone)] @@ -49,6 +57,11 @@ struct WebSocketConfig { addr: String, } +#[derive(Debug, Deserialize, Clone)] +struct HealthCheckConfig { + addr: String, +} + #[derive(Parser, Debug, Deserialize)] #[command(author, version, about, long_about = None)] struct Args { @@ -58,6 +71,9 @@ struct Args { type Clients = Arc, mpsc::UnboundedSender)>>>; +static NATS_CONNECTED: AtomicBool = AtomicBool::new(false); +static WS_LISTENER_ACTIVE: AtomicBool = AtomicBool::new(false); + #[tokio::main] async fn main() -> Result<()> { fmt().with_env_filter(EnvFilter::from_default_env()).init(); @@ -76,8 +92,13 @@ async fn main() -> Result<()> { let clients: Clients = Arc::new(Mutex::new(HashMap::new())); let server = TcpListener::bind(&config.websocket.addr).await?; + WS_LISTENER_ACTIVE.store(true, Ordering::SeqCst); info!("WebSocket server listening on: {}", config.websocket.addr); + // Start the health check server + let health_addr: SocketAddr = config.healthcheck.addr.parse()?; + tokio::spawn(start_health_check_server(health_addr)); + let nats_config = config.nats.clone(); let clients_clone = clients.clone(); tokio::spawn(async move { @@ -331,12 +352,72 @@ fn base58_to_hex(base58: &str) -> Result { } async fn connect_and_handle_nats(config: &NatsConfig, clients: Clients) -> Result<()> { + NATS_CONNECTED.store(false, Ordering::SeqCst); let nats_client = async_nats::connect(&config.url).await?; let jetstream = setup_jetstream(&nats_client).await?; + NATS_CONNECTED.store(true, Ordering::SeqCst); info!("Connected to NATS server"); handle_nats_messages(jetstream, clients).await?; + NATS_CONNECTED.store(false, Ordering::SeqCst); Ok(()) } +async fn start_health_check_server(addr: SocketAddr) { + let listener = TcpListener::bind(addr).await.unwrap(); + info!("Health check server listening on http://{}", addr); + + loop { + let (stream, _) = listener.accept().await.unwrap(); + tokio::spawn(handle_health_check_connection(stream)); + } +} + +async fn handle_health_check_connection(stream: TcpStream) { + let io = TokioIo::new(stream); + + if let Err(err) = http1::Builder::new() + .serve_connection(io, hyper::service::service_fn(health_check)) + .await + { + error!("Error serving connection: {:?}", err); + } +} + +async fn health_check( + req: Request, +) -> Result>, hyper::Error> { + if req.uri().path() != "/health" { + return Ok(Response::builder() + .status(hyper::StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found"))) + .unwrap()); + } + + let nats_status = if NATS_CONNECTED.load(Ordering::SeqCst) { + "Connected" + } else { + "Disconnected" + }; + + let ws_status = if WS_LISTENER_ACTIVE.load(Ordering::SeqCst) { + "Active" + } else { + "Inactive" + }; + + let body = format!("NATS: {}\nWebSocket Listener: {}", nats_status, ws_status); + let status = if nats_status == "Connected" && ws_status == "Active" { + hyper::StatusCode::OK + } else { + hyper::StatusCode::SERVICE_UNAVAILABLE + }; + + let response = Response::builder() + .status(status) + .body(Full::new(Bytes::from(body))) + .unwrap(); + + Ok(response) +}