Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 94 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-stream"
version = "0.1.2"
version = "0.1.3"
edition = "2021"

[lib]
Expand All @@ -25,6 +25,9 @@ config = "0.14"
envy = "0.4"
futures = "0.3"
hex = "0.4.3"
hyper = { version = "1.5.0", features = ["full"] }
hyper-util = { version = "0.1.9", features = ["tokio"] }
http-body-util = "0.1.0"
pyth-sdk-solana = "0.10.2"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
Expand Down
3 changes: 3 additions & 0 deletions example.websocket.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ url = "nats://localhost:4222"

[websocket]
addr = "0.0.0.0:8080"

[healthcheck]
addr = "0.0.0.0:8081"
81 changes: 81 additions & 0 deletions src/bin/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ use async_nats::jetstream::{self, consumer};
use clap::Parser;
use config::Config;
use futures::{SinkExt, StreamExt};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use pyth_stream::utils::setup_jetstream;
use serde::{Deserialize, Serialize};
use std::clone::Clone;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::panic;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -37,6 +44,7 @@ struct ServerResponse {
struct AppConfig {
nats: NatsConfig,
websocket: WebSocketConfig,
healthcheck: HealthCheckConfig,
}

#[derive(Debug, Deserialize, Clone)]
Expand All @@ -49,6 +57,11 @@ struct WebSocketConfig {
addr: String,
}

#[derive(Debug, Deserialize, Clone)]
struct HealthCheckConfig {
addr: String,
}

#[derive(Parser, Debug, Deserialize)]
#[command(author, version, about, long_about = None)]
struct Args {
Expand All @@ -58,6 +71,9 @@ struct Args {

type Clients = Arc<Mutex<HashMap<String, (HashSet<String>, mpsc::UnboundedSender<Message>)>>>;

static NATS_CONNECTED: AtomicBool = AtomicBool::new(false);
static WS_LISTENER_ACTIVE: AtomicBool = AtomicBool::new(false);

#[tokio::main]
async fn main() -> Result<()> {
fmt().with_env_filter(EnvFilter::from_default_env()).init();
Expand All @@ -76,8 +92,13 @@ async fn main() -> Result<()> {
let clients: Clients = Arc::new(Mutex::new(HashMap::new()));

let server = TcpListener::bind(&config.websocket.addr).await?;
WS_LISTENER_ACTIVE.store(true, Ordering::SeqCst);
info!("WebSocket server listening on: {}", config.websocket.addr);

// Start the health check server
let health_addr: SocketAddr = config.healthcheck.addr.parse()?;
tokio::spawn(start_health_check_server(health_addr));

let nats_config = config.nats.clone();
let clients_clone = clients.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -331,12 +352,72 @@ fn base58_to_hex(base58: &str) -> Result<String, anyhow::Error> {
}

async fn connect_and_handle_nats(config: &NatsConfig, clients: Clients) -> Result<()> {
NATS_CONNECTED.store(false, Ordering::SeqCst);
let nats_client = async_nats::connect(&config.url).await?;
let jetstream = setup_jetstream(&nats_client).await?;

NATS_CONNECTED.store(true, Ordering::SeqCst);
info!("Connected to NATS server");

handle_nats_messages(jetstream, clients).await?;

NATS_CONNECTED.store(false, Ordering::SeqCst);
Ok(())
}
async fn start_health_check_server(addr: SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
info!("Health check server listening on http://{}", addr);

loop {
let (stream, _) = listener.accept().await.unwrap();
tokio::spawn(handle_health_check_connection(stream));
}
}

async fn handle_health_check_connection(stream: TcpStream) {
let io = TokioIo::new(stream);

if let Err(err) = http1::Builder::new()
.serve_connection(io, hyper::service::service_fn(health_check))
.await
{
error!("Error serving connection: {:?}", err);
}
}

async fn health_check(
req: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
if req.uri().path() != "/health" {
return Ok(Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("Not Found")))
.unwrap());
}

let nats_status = if NATS_CONNECTED.load(Ordering::SeqCst) {
"Connected"
} else {
"Disconnected"
};

let ws_status = if WS_LISTENER_ACTIVE.load(Ordering::SeqCst) {
"Active"
} else {
"Inactive"
};

let body = format!("NATS: {}\nWebSocket Listener: {}", nats_status, ws_status);
let status = if nats_status == "Connected" && ws_status == "Active" {
hyper::StatusCode::OK
} else {
hyper::StatusCode::SERVICE_UNAVAILABLE
};

let response = Response::builder()
.status(status)
.body(Full::new(Bytes::from(body)))
.unwrap();

Ok(response)
}