Skip to content

Conversation

@parmesant
Copy link
Contributor

@parmesant parmesant commented Dec 15, 2025

Add backend for SSE

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Added Server-Sent Events (SSE) support enabling real-time message broadcasting to connected clients.
    • Implemented alert notifications system with severity levels (Info, Warn, Error).
    • Introduced control plane events for system-level communications.
    • Enabled session-based message routing with automatic stale connection cleanup via periodic health checks.

✏️ Tip: You can customize this high-level summary in your review settings.

Add backend for SSE
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 15, 2025

Walkthrough

The pull request introduces a new Server-Sent Events (SSE) broadcasting module for real-time event streaming, including session-based client management, message broadcasting, and connection pruning. Dependencies are updated to support the new functionality.

Changes

Cohort / File(s) Summary
Dependency Updates
Cargo.toml
Added actix-web-lab (0.24.3) dependency; upgraded tokio-stream from 0.1 to 0.1.17
Module Exports
src/lib.rs
Exposed new public sse module in the library API
SSE Broadcasting System
src/sse/mod.rs
New module implementing Broadcaster struct with async-safe client registry (DashMap-backed), ping-based connection pruning, and public interfaces: register_sse_client() for Actix-web integration, Broadcaster::create(), new_client(), and broadcast() methods. Defines serializable event types: SSEEvent, Message (variants: AlertEvent, ControlPlaneEvent), Criticality, SSEAlertInfo, and ControlPlaneEvent.

Sequence Diagram

sequenceDiagram
    actor Client
    participant Handler as register_sse_client()
    participant Broadcaster
    participant Stream as SSE Stream
    
    Client->>Handler: HTTP request
    Handler->>Broadcaster: extract session ID from request
    Broadcaster->>Broadcaster: new_client(session)
    Broadcaster->>Stream: create SSE channel
    Stream-->>Client: SSE connection established
    
    Note over Broadcaster: Ping loop runs periodically
    Broadcaster->>Broadcaster: broadcast(message, sessions)
    Broadcaster->>Stream: send event to registered clients
    Stream-->>Client: receive event
    
    Broadcaster->>Broadcaster: ping() → prune stale connections
    alt Connection responsive
        Client-->>Broadcaster: ack
    else Connection dead
        Broadcaster->>Broadcaster: remove from registry
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Async concurrency patterns: Arc<DashMap>, tokio channels, and concurrent client registry require careful review for thread-safety and deadlock prevention
  • Streaming implementation: SSE stream creation, event serialization, and receiver-sender channel management; verify proper cleanup on client disconnect
  • Session management: Session extraction from HTTP requests and per-session client isolation; validate Ulid handling and session lifecycle
  • Ping loop logic: Understand periodic pruning mechanism for stale connections and ensure it correctly identifies and removes unresponsive clients
  • Integration points: register_sse_client() Actix-web handler signature and request data access patterns

Poem

🐰 Broadcasting dreams in real-time streams,
A Broadcaster hops through async themes,
Clients gather 'round the SSE glow,
With pings that prune when connections go! 🌊✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description largely mirrors the template with placeholder text but lacks substantive details. The goal, chosen solutions, and key changes are not filled in; all checklist items remain unchecked. Fill in the Description section with the goal, rationale, and key changes. Check applicable checklist items and provide evidence of testing and documentation where applicable.
Title check ❓ Inconclusive The title 'feat: SSE' is vague and lacks specificity about what SSE functionality is being added. While it relates to the changeset, it does not clearly convey the main purpose or what feature is being introduced. Use a more descriptive title such as 'feat: Add SSE broadcasting backend for real-time client notifications' to better communicate the primary change.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
src/sse/mod.rs (1)

56-76: Consider cleaning up empty sessions.

When all clients for a session disconnect, the session key remains in the map with an empty Vec. This can lead to accumulation of stale session entries over time.

Apply this diff to filter out empty sessions:

         for (session, clients) in sse_inner.iter() {
             let mut ok_clients = Vec::new();
             for client in clients {
                 if client
                     .send(sse::Event::Comment("ping".into()))
                     .await
                     .is_ok()
                 {
                     ok_clients.push(client.clone())
                 }
             }
-            ok_sessions.insert(*session, ok_clients);
+            if !ok_clients.is_empty() {
+                ok_sessions.insert(*session, ok_clients);
+            }
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5c68c3f and 6ce3508.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • Cargo.toml (2 hunks)
  • src/lib.rs (1 hunks)
  • src/sse/mod.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
Repo: parseablehq/parseable PR: 1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/lib.rs
🧬 Code graph analysis (1)
src/sse/mod.rs (1)
src/utils/actix.rs (1)
  • extract_session_key_from_req (51-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (4)
src/lib.rs (1)

45-45: LGTM!

The new sse module is correctly exposed and follows the existing alphabetical ordering convention.

Cargo.toml (2)

87-87: LGTM!

Pinning tokio-stream to 0.1.17 for reproducibility is reasonable.


33-33: The actix-web-lab version 0.24.3 is the latest stable release and is compatible with actix-web 4.9.0. No changes needed.

src/sse/mod.rs (1)

20-40: LGTM!

The Broadcaster design is appropriate: Arc for shared ownership across handlers, RwLock for async read-write access, and spawning a cleanup loop on creation.

Comment on lines +87 to +91
if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
clients.push(tx);
} else {
self.inner.write().await.clients.insert(*session, vec![tx]);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix race condition with double write lock acquisition.

The code acquires a write lock on line 87, checks if the session exists, then drops the lock. In the else branch on line 90, it acquires another write lock. Between these two acquisitions, another concurrent call to new_client for the same session could insert an entry, leading to lost client registrations.

Use the Entry API to handle this atomically:

-        if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
-            clients.push(tx);
-        } else {
-            self.inner.write().await.clients.insert(*session, vec![tx]);
-        }
+        self.inner
+            .write()
+            .await
+            .clients
+            .entry(*session)
+            .or_default()
+            .push(tx);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(clients) = self.inner.write().await.clients.get_mut(session) {
clients.push(tx);
} else {
self.inner.write().await.clients.insert(*session, vec![tx]);
}
self.inner
.write()
.await
.clients
.entry(*session)
.or_default()
.push(tx);
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 87-91, avoid acquiring the write lock twice by
holding a single write lock and using the HashMap Entry API to insert-or-get the
client Vec atomically: obtain let mut guard = self.inner.write().await once,
then call guard.clients.entry(key).or_insert_with(Vec::new).push(tx) (use the
appropriate key ownership/clone or dereference as needed) so concurrent
new_client calls cannot race and lose registrations.

Comment on lines +99 to +101
pub async fn broadcast(&self, msg: &str, sessions: Option<&[Ulid]>) {
let clients = self.inner.read().await.clients.clone();
warn!(clients=?clients);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Change log level from warn to debug or trace.

Logging all connected clients at warn level on every broadcast will spam logs during normal operation. This appears to be debug logging left over from development.

-        warn!(clients=?clients);
+        tracing::debug!(clients=?clients);

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 99 to 101, the code currently logs connected
clients at warn level (warn!(clients=?clients)); change this to a lower
verbosity (use debug! or trace! instead of warn!) so normal broadcasts don't
spam logs; simply replace the warn! macro with debug! (or trace! if you prefer
more verbosity) and keep the same structured field formatting (clients=?clients)
so only the log level changes.

Comment on lines +130 to +140
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let session = extract_session_key_from_req(&req).unwrap();
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
_ => unreachable!("SSE requires a valid session. Unable to register client."),
};
broadcaster.new_client(&sessionid).await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Replace panics with proper error handling.

This function will panic on unauthenticated requests (.unwrap() on line 134) or when a user authenticates via BasicAuth instead of session cookie (unreachable!() on line 137). HTTP handlers should not panic; they should return appropriate error responses.

Consider returning a Result or an Actix error response:

-pub async fn register_sse_client(
+pub async fn register_sse_client(
     broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
     req: HttpRequest,
-) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
-    let session = extract_session_key_from_req(&req).unwrap();
-    let sessionid = match session {
-        SessionKey::SessionId(ulid) => ulid,
-        _ => unreachable!("SSE requires a valid session. Unable to register client."),
-    };
-    broadcaster.new_client(&sessionid).await
+) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
+    let session = extract_session_key_from_req(&req)?;
+    let sessionid = match session {
+        SessionKey::SessionId(ulid) => ulid,
+        SessionKey::BasicAuth { .. } => {
+            return Err(actix_web::error::ErrorBadRequest(
+                "SSE requires session-based authentication, not BasicAuth",
+            ));
+        }
+    };
+    Ok(broadcaster.new_client(&sessionid).await)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let session = extract_session_key_from_req(&req).unwrap();
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
_ => unreachable!("SSE requires a valid session. Unable to register client."),
};
broadcaster.new_client(&sessionid).await
}
pub async fn register_sse_client(
broadcaster: actix_web::web::Data<Arc<Broadcaster>>,
req: HttpRequest,
) -> Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> {
let session = extract_session_key_from_req(&req)?;
let sessionid = match session {
SessionKey::SessionId(ulid) => ulid,
SessionKey::BasicAuth { .. } => {
return Err(actix_web::error::ErrorBadRequest(
"SSE requires session-based authentication, not BasicAuth",
));
}
};
Ok(broadcaster.new_client(&sessionid).await)
}
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 130–140, replace the panic-causing unwrap() and
unreachable!() with proper error handling and change the handler to return a
Result so unauthorized/invalid requests produce HTTP errors instead of panics:
update the function signature to return
Result<Sse<InfallibleStream<ReceiverStream<sse::Event>>>, actix_web::Error> (or
Result<_, HttpResponse>), call extract_session_key_from_req and map its None/Err
to an appropriate HttpResponse::Unauthorized/BadRequest, match SessionKey and
return an Err(HttpResponse::Unauthorized().finish()) when not a SessionId, and
finally return Ok(broadcaster.new_client(&sessionid).await) so any broadcaster
errors can be propagated or converted into an HTTP 500 response.

Comment on lines +165 to +177
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SSEAlertInfo {
id: Ulid,
state: AlertState,
message: String,
}

#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ControlPlaneEvent {
message: String,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Private fields prevent external instantiation.

SSEAlertInfo and ControlPlaneEvent have private fields (Rust default), making them impossible to construct from outside this module. If these are intended for use by other modules (e.g., to create and broadcast alert events), either make the fields pub or provide constructor functions.

 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct SSEAlertInfo {
-    id: Ulid,
-    state: AlertState,
-    message: String,
+    pub id: Ulid,
+    pub state: AlertState,
+    pub message: String,
 }

 #[derive(Serialize, Debug, Deserialize)]
 #[serde(rename_all = "camelCase")]
 pub struct ControlPlaneEvent {
-    message: String,
+    pub message: String,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SSEAlertInfo {
id: Ulid,
state: AlertState,
message: String,
}
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ControlPlaneEvent {
message: String,
}
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SSEAlertInfo {
pub id: Ulid,
pub state: AlertState,
pub message: String,
}
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ControlPlaneEvent {
pub message: String,
}
🤖 Prompt for AI Agents
In src/sse/mod.rs around lines 165 to 177, the structs SSEAlertInfo and
ControlPlaneEvent define fields with private visibility which prevents external
modules from constructing instances; either make each field public (e.g., pub
id: Ulid, pub state: AlertState, pub message: String for SSEAlertInfo and pub
message: String for ControlPlaneEvent) or add public constructor functions
(e.g., pub fn new(...) -> Self) that accept the necessary parameters and return
the struct, and ensure serde attributes remain intact for serialization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant