-
Notifications
You must be signed in to change notification settings - Fork 0
feat: implement cluster health monitoring and chunk replication #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
murka
commented
Jan 16, 2026
- Add new cluster module with health check and replication loops
- Track node status and chunk locations in AppState
- Implement chunk replication when under-replicated
- Add new API endpoint for registering chunk locations
- Update cluster info endpoint to include storage nodes
- Add new cluster module with health check and replication loops - Track node status and chunk locations in AppState - Implement chunk replication when under-replicated - Add new API endpoint for registering chunk locations - Update cluster info endpoint to include storage nodes
Add background
|
| // Start with this gateway node as leader | ||
| let mut members = vec![NodeInfo { | ||
| node_id: state.node_id.clone(), | ||
| address: "127.0.0.1:8080".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_cluster hardcodes the gateway address (127.0.0.1:8080). Consider sourcing it from the actual bind/config (e.g., store in AppState) so cluster info is accurate; if this is a dev default, consider documenting why.
🚀 Want me to fix this? Reply ex: "fix it for me".
| ) -> (StatusCode, Json<AllocateChunksResponse>) { | ||
| let session_id = uuid::Uuid::new_v4().to_string(); | ||
| let _rf = req.replication_factor.unwrap_or(1); // Default to 1 for local demo | ||
| let rf = req.replication_factor.unwrap_or(1) as usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: In placement selection, handle replication_factor == 0 explicitly (reject or return empty placements) and remove the unreachable fallback—just return selected. This avoids implicit localhost behavior and clarifies intent.
+ if rf == 0 {
+ return (StatusCode::BAD_REQUEST, Json(AllocateChunksResponse {
+ session_id,
+ placements: Vec::new(),
+ }));
+ }🚀 Want me to fix this? Reply ex: "fix it for me".
| locations | ||
| .entry(cid_hex.clone()) | ||
| .or_insert_with(Vec::new) | ||
| .push(node_address); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put_chunk always appends node_address for a CID, causing duplicates. Consider only pushing when it’s not already present (consistent with register_chunk_location).
| locations | |
| .entry(cid_hex.clone()) | |
| .or_insert_with(Vec::new) | |
| .push(node_address); | |
| let addrs = locations | |
| .entry(cid_hex.clone()) | |
| .or_insert_with(Vec::new); | |
| if !addrs.contains(&node_address) { | |
| addrs.push(node_address); | |
| } |
🚀 Want me to fix this? Reply ex: "fix it for me".
| let data = client.get(format!("{}/api/v1/chunks/{}", src, cid)) | ||
| .send() | ||
| .await? | ||
| .bytes() | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replicate_chunk doesn’t validate the source response; an error (e.g., 404/500) could be read and uploaded. Consider calling .error_for_status()? on the source response before .bytes().
| let data = client.get(format!("{}/api/v1/chunks/{}", src, cid)) | |
| .send() | |
| .await? | |
| .bytes() | |
| .await?; | |
| let data = client.get(format!("{}/api/v1/chunks/{}", src, cid)) | |
| .send() | |
| .await? | |
| .error_for_status()? | |
| .bytes() | |
| .await?; |
🚀 Want me to fix this? Reply ex: "fix it for me".
| use crate::handlers::AppState; | ||
| use std::time::{SystemTime, UNIX_EPOCH}; | ||
|
|
||
| pub async fn health_check_loop(state: &AppState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Holding the node_registry write lock across awaits in health_check_loop can block readers/writers and cause contention. Consider snapshotting node IDs/addresses under a read lock, drop the lock, run health checks, then re-acquire a write lock to apply updates.
🚀 Want me to fix this? Reply ex: "fix it for me".
| .route("/chunks/{cid}/locations", get(handlers::get_chunk_locations)) | ||
| .route("/chunks/{cid}/locations", post(handlers::register_chunk_location)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two .route calls for /chunks/{cid}/locations will panic in axum 0.7+. Consider combining them into a single route with both methods: get(...).post(...).
- .route("/chunks/{cid}/locations", get(handlers::get_chunk_locations))
- .route("/chunks/{cid}/locations", post(handlers::register_chunk_location))
+ .route("/chunks/{cid}/locations", get(handlers::get_chunk_locations).post(handlers::register_chunk_location))🚀 Want me to fix this? Reply ex: "fix it for me".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds cluster health monitoring and chunk replication capabilities to support distributed storage across multiple nodes. The implementation tracks node health status, monitors chunk locations, and automatically replicates under-replicated chunks to maintain data availability.
Changes:
- Added new
clustermodule with health check and replication background loops - Extended
AppStateto track node registry and chunk locations - Enhanced chunk placement logic to distribute chunks across online storage nodes using round-robin selection
- Added API endpoint for registering chunk locations
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/lib.rs | Exports new cluster module |
| src/http.rs | Adds POST endpoint for registering chunk locations |
| src/handlers.rs | Adds node registry, chunk location tracking, health check function, and updated handlers for distributed chunk placement |
| src/cluster.rs | New module implementing health check and replication loops with chunk replication logic |
| src/bin/gateway.rs | Spawns background tasks for health checks and replication |
| Cargo.toml | Adds reqwest dependency for HTTP client functionality |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub async fn check_node_health(address: &str) -> bool { | ||
| let client = reqwest::Client::builder() | ||
| .timeout(std::time::Duration::from_secs(2)) | ||
| .build() | ||
| .ok(); | ||
| if let Some(client) = client { | ||
| client.get(format!("{}/health", address)) | ||
| .send() | ||
| .await | ||
| .map(|r| r.status().is_success()) | ||
| .unwrap_or(false) | ||
| } else { | ||
| false | ||
| } | ||
| } |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check_node_health function in handlers.rs is duplicated in cluster.rs (lines 28-42). This code duplication should be eliminated by keeping only one implementation and importing it where needed. Consider keeping the function in cluster.rs and having handlers.rs reference it, or extract it to a shared utility module.
| locations | ||
| .entry(cid_hex.clone()) | ||
| .or_insert_with(Vec::new) | ||
| .push(node_address); |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The put_chunk handler always appends to the chunk locations without checking for duplicates. If the same chunk is uploaded multiple times with the same node_id, the node_address will be added multiple times to the locations list. Add a check similar to the one in register_chunk_location (line 625) to prevent duplicate entries.
| locations | |
| .entry(cid_hex.clone()) | |
| .or_insert_with(Vec::new) | |
| .push(node_address); | |
| let entry = locations | |
| .entry(cid_hex.clone()) | |
| .or_insert_with(Vec::new); | |
| if !entry.contains(&node_address) { | |
| entry.push(node_address); | |
| } |
| .collect(); | ||
|
|
||
| // If under-replicated (less than 2 replicas online), try to replicate | ||
| if online_replicas.len() < 2 && !online_replicas.is_empty() { |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The replication target of 2 replicas is hardcoded in the replication loop. This should be configurable (e.g., from the original replication_factor used during chunk allocation) to support different replication requirements across different chunks or use cases.
| // Round-robin selection of rf distinct nodes | ||
| let mut selected = Vec::with_capacity(rf); | ||
| for i in 0..rf { | ||
| let node_idx = (idx + i) % online_nodes.len(); | ||
| let addr = &online_nodes[node_idx]; | ||
| if !selected.contains(addr) { | ||
| selected.push(addr.clone()); | ||
| } | ||
| } | ||
| // If we couldn't get enough distinct nodes, just use what we have |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The selected.contains(addr) check on line 324 performs a linear search for each iteration, resulting in O(n²) complexity for the selection loop. When rf is close to online_nodes.len(), this could select fewer nodes than requested. Consider using a HashSet for O(1) lookups or rotating through the list more carefully to ensure distinct node selection.
| // Round-robin selection of rf distinct nodes | |
| let mut selected = Vec::with_capacity(rf); | |
| for i in 0..rf { | |
| let node_idx = (idx + i) % online_nodes.len(); | |
| let addr = &online_nodes[node_idx]; | |
| if !selected.contains(addr) { | |
| selected.push(addr.clone()); | |
| } | |
| } | |
| // If we couldn't get enough distinct nodes, just use what we have | |
| // Round-robin selection of up to rf distinct nodes | |
| let max_to_select = rf.min(online_nodes.len()); | |
| let mut selected = Vec::with_capacity(max_to_select); | |
| for i in 0..max_to_select { | |
| let node_idx = (idx + i) % online_nodes.len(); | |
| selected.push(online_nodes[node_idx].clone()); | |
| } | |
| // If we couldn't get any nodes (shouldn't happen unless rf is zero), fall back |
|
|
||
| pub async fn health_check_loop(state: &AppState) { | ||
| let mut registry = state.node_registry.write().await; | ||
| let now = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .unwrap() | ||
| .as_millis() as u64; | ||
|
|
||
| for (node_id, record) in registry.nodes.iter_mut() { | ||
| let is_healthy = check_node_health(&record.address).await; | ||
| let was_online = record.is_online; | ||
| record.is_online = is_healthy; | ||
| if is_healthy { | ||
| record.last_seen_ms = now; | ||
| } | ||
| if was_online != is_healthy { | ||
| if is_healthy { | ||
| tracing::info!("Node {} is now ONLINE", node_id); | ||
| } else { | ||
| tracing::warn!("Node {} is now OFFLINE", node_id); | ||
| } |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The health_check_loop holds a write lock on the entire node registry while performing potentially slow health checks for all nodes sequentially. This blocks other operations that need to read or write the registry. Consider collecting addresses first, releasing the lock, performing health checks concurrently, then re-acquiring the lock only to update the results.
| pub async fn health_check_loop(state: &AppState) { | |
| let mut registry = state.node_registry.write().await; | |
| let now = SystemTime::now() | |
| .duration_since(UNIX_EPOCH) | |
| .unwrap() | |
| .as_millis() as u64; | |
| for (node_id, record) in registry.nodes.iter_mut() { | |
| let is_healthy = check_node_health(&record.address).await; | |
| let was_online = record.is_online; | |
| record.is_online = is_healthy; | |
| if is_healthy { | |
| record.last_seen_ms = now; | |
| } | |
| if was_online != is_healthy { | |
| if is_healthy { | |
| tracing::info!("Node {} is now ONLINE", node_id); | |
| } else { | |
| tracing::warn!("Node {} is now OFFLINE", node_id); | |
| } | |
| use tokio::task::JoinSet; | |
| pub async fn health_check_loop(state: &AppState) { | |
| // Collect node IDs and their addresses under a short-lived read lock | |
| let nodes: Vec<(String, String)> = { | |
| let registry = state.node_registry.read().await; | |
| registry | |
| .nodes | |
| .iter() | |
| .map(|(node_id, record)| (node_id.clone(), record.address.clone())) | |
| .collect() | |
| }; | |
| // Perform health checks concurrently without holding the registry lock | |
| let mut tasks: JoinSet<(String, bool)> = JoinSet::new(); | |
| for (node_id, address) in nodes.into_iter() { | |
| tasks.spawn(async move { | |
| let is_healthy = check_node_health(&address).await; | |
| (node_id, is_healthy) | |
| }); | |
| } | |
| let mut results: Vec<(String, bool)> = Vec::new(); | |
| while let Some(join_result) = tasks.join_next().await { | |
| if let Ok((node_id, is_healthy)) = join_result { | |
| results.push((node_id, is_healthy)); | |
| } | |
| } | |
| // Update registry state with health check results under a write lock | |
| let mut registry = state.node_registry.write().await; | |
| let now = SystemTime::now() | |
| .duration_since(UNIX_EPOCH) | |
| .unwrap() | |
| .as_millis() as u64; | |
| for (node_id, is_healthy) in results.into_iter() { | |
| if let Some(record) = registry.nodes.get_mut(&node_id) { | |
| let was_online = record.is_online; | |
| record.is_online = is_healthy; | |
| if is_healthy { | |
| record.last_seen_ms = now; | |
| } | |
| if was_online != is_healthy { | |
| if is_healthy { | |
| tracing::info!("Node {} is now ONLINE", node_id); | |
| } else { | |
| tracing::warn!("Node {} is now OFFLINE", node_id); | |
| } | |
| } |
| let node_addresses = if online_nodes.is_empty() { | ||
| // Fall back to gateway address (single-node mode) | ||
| vec!["http://localhost:8080".to_string()] | ||
| } else { |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded fallback address 'http://localhost:8080' appears in multiple places (lines 317, 330, 553, 555, 605). This should be extracted to a constant or derived from the application configuration to avoid inconsistencies and make the code more maintainable.
| // Check if chunk is also stored locally | ||
| let store = state.store.read().await; | ||
| let is_stored_locally = store.list().iter().any(|cid| cid.to_hex() == cid_hex); |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_chunk_locations function calls store.list() and iterates through all chunks to check if one is stored locally. For large stores, this is inefficient. Consider using store.get() directly with a parsed CID, or maintain a separate index for faster lookups.
| // Check if chunk is also stored locally | |
| let store = state.store.read().await; | |
| let is_stored_locally = store.list().iter().any(|cid| cid.to_hex() == cid_hex); | |
| // Check if chunk is also stored locally without scanning the entire store | |
| let store = state.store.read().await; | |
| let is_stored_locally = if let Ok(cid) = Cid::from_hex(&cid_hex) { | |
| matches!(store.get(&cid), Ok(Some(_))) | |
| } else { | |
| false | |
| }; |
| async fn replicate_chunk(cid: &str, src: &str, dest: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | ||
| let client = reqwest::Client::new(); | ||
|
|
||
| // Fetch from source | ||
| let data = client.get(format!("{}/api/v1/chunks/{}", src, cid)) | ||
| .send() | ||
| .await? | ||
| .bytes() | ||
| .await?; | ||
|
|
||
| // Upload to destination | ||
| client.put(format!("{}/api/v1/chunks/{}", dest, cid)) | ||
| .body(data) | ||
| .send() | ||
| .await? | ||
| .error_for_status()?; |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The replicate_chunk function creates a new HTTP client for each replication. Creating a client has overhead (connection pools, DNS resolution, etc.). Consider reusing a shared client instance across replication operations for better performance.
| let online_replicas: Vec<&String> = node_addrs.iter() | ||
| .filter(|addr| online_nodes.contains(addr)) | ||
| .collect(); |
Copilot
AI
Jan 16, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The online_nodes.contains(addr) check on line 63 performs a linear search for each address. Since this is inside a loop over all chunks and their addresses, this creates O(n*m) complexity. Convert online_nodes to a HashSet for O(1) lookups.