Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use macp_runtime::storage::{
cleanup_temp_files, migrate_if_needed, FileBackend, MemoryBackend, StorageBackend,
};
use server::MacpServer;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use tonic::transport::{Identity, Server, ServerTlsConfig};
Expand All @@ -31,6 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let memory_only = std::env::var("MACP_MEMORY_ONLY").ok().as_deref() == Some("1");
let data_dir =
PathBuf::from(std::env::var("MACP_DATA_DIR").unwrap_or_else(|_| ".macp-data".into()));
let strict_recovery = std::env::var("MACP_STRICT_RECOVERY").ok().as_deref() == Some("1");

let storage: Arc<dyn StorageBackend> = if memory_only {
Arc::new(MemoryBackend)
Expand All @@ -50,30 +52,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enumerate session directories and replay from logs
let sessions_dir = data_dir.join("sessions");
let mut recovered = 0usize;
if sessions_dir.exists() {
for entry in std::fs::read_dir(&sessions_dir)? {
let entry = entry?;
if !entry.file_type()?.is_dir() {
if tokio::fs::metadata(&sessions_dir).await.is_ok() {
let mut entries = tokio::fs::read_dir(&sessions_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_dir() {
continue;
}
let session_id = entry.file_name().to_string_lossy().to_string();
let log_entries = storage.load_log(&session_id).await?;
let log_entries = match storage.load_log(&session_id).await {
Ok(entries) => entries,
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to load log for {session_id}: {e}"),
)
.into());
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to load session log; skipping"
);
continue;
}
};
if log_entries.is_empty() {
continue;
}

match replay_session(&session_id, &log_entries, &mode_registry) {
Ok(session) => {
// Best-effort snapshot update
if let Err(e) = storage.save_session(&session).await {
if strict_recovery {
return Err(io::Error::other(format!(
"strict recovery: failed to persist recovered session {session_id}: {e}"
))
.into());
}
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to persist recovered session"
);
}

// Populate in-memory log store
log_store.create_session_log(&session_id).await;
for log_entry in &log_entries {
log_store.append(&session_id, log_entry.clone()).await;
Expand All @@ -82,6 +105,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
registry.insert_recovered_session(session_id, session).await;
recovered += 1;
}
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to replay session {session_id}: {e}"),
)
.into());
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
Expand All @@ -93,7 +123,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
if recovered > 0 {
tracing::info!(count = recovered, "replayed sessions from log");
tracing::info!(
count = recovered,
strict_recovery,
"replayed sessions from log"
);
}
}

Expand Down
Loading
Loading