diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index a0b1c52b44..46051f8177 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -5,10 +5,13 @@ use parking_lot::Mutex; use tokio::task::spawn_blocking; use tokio::time::Duration; use tokio_stream::{Stream, StreamExt}; +use tonic::{Code, Status}; use crate::frame::{Frame, FrameNo}; use crate::injector::Injector; -use crate::rpc::replication::{Frame as RpcFrame, NEED_SNAPSHOT_ERROR_MSG}; +use crate::rpc::replication::{ + Frame as RpcFrame, NAMESPACE_DOESNT_EXIST, NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, +}; pub use tokio_util::either::Either; @@ -34,6 +37,23 @@ pub enum Error { Meta(#[from] super::meta::Error), #[error("Hanshake required")] NoHandshake, + #[error("Requested namespace doesn't exist")] + NamespaceDoesntExist, +} + +impl From for Error { + fn from(status: Status) -> Self { + if status.code() == Code::FailedPrecondition { + match status.message() { + NEED_SNAPSHOT_ERROR_MSG => Error::NeedSnapshot, + NO_HELLO_ERROR_MSG => Error::NoHandshake, + NAMESPACE_DOESNT_EXIST => Error::NamespaceDoesntExist, + _ => Error::Client(status.into()), + } + } else { + Error::Client(status.into()) + } + } } impl From for Error { @@ -109,11 +129,19 @@ where pub struct Replicator { client: C, injector: Arc>, - has_handshake: bool, + state: ReplicatorState, } const INJECTOR_BUFFER_CAPACITY: usize = 10; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ReplicatorState { + NeedFrames, + NeedHandshake, + NeedSnapshot, + Exit, +} + impl Replicator { /// Creates a repicator for the db file pointed at by `db_path` pub async fn new(client: C, db_path: PathBuf, auto_checkpoint: u32) -> Result { @@ -128,7 +156,7 @@ impl Replicator { Ok(Self { client, injector: Arc::new(Mutex::new(injector)), - has_handshake: false, + state: ReplicatorState::NeedHandshake, }) } @@ -140,9 +168,6 @@ impl Replicator { pub async fn run(&mut self) -> Error { loop { if let Err(e) = self.replicate().await { - // Replication encountered an error. We log the error, and then shut down the - // injector and propagate a potential panic from there. - self.has_handshake = false; return e; } } @@ -154,15 +179,15 @@ impl Replicator { tracing::info!("Attempting to perform handshake with primary."); match self.client.handshake().await { Ok(_) => { - self.has_handshake = true; + self.state = ReplicatorState::NeedFrames; return Ok(()); } - Err(e @ (Error::Fatal(_) | Error::Meta(_))) => return Err(e), - Err(e) if !error_printed => { + Err(Error::Client(e)) if !error_printed => { tracing::error!("error connecting to primary. retrying. error: {e}"); error_printed = true; } - _ => (), + Err(Error::Client(_)) if error_printed => (), + Err(e) => return Err(e), } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -171,61 +196,56 @@ impl Replicator { } pub async fn replicate(&mut self) -> Result<(), Error> { - if !self.has_handshake { - self.try_perform_handshake().await?; + loop { + self.try_replicate_step().await?; + if self.state == ReplicatorState::Exit { + self.state = ReplicatorState::NeedFrames; + return Ok(()); + } } + } - let mut stream = self.start_next_frames().await?; + async fn try_replicate_step(&mut self) -> Result<(), Error> { + let state = self.state; + let ret = match state { + ReplicatorState::NeedHandshake => self.try_perform_handshake().await, + ReplicatorState::NeedFrames => self.try_replicate().await, + ReplicatorState::NeedSnapshot => self.load_snapshot().await, + ReplicatorState::Exit => unreachable!("trying to step replicator on exit"), + }; - loop { - match stream.next().await { - Some(Ok(frame)) => { - self.inject_frame(frame).await?; - } - Some(Err(Error::NeedSnapshot)) => { - tracing::debug!("loading snapshot"); - // remove any outstanding frames in the buffer that are not part of a - // transaction: they are now part of the snapshot. - match self.load_snapshot().await { - Ok(()) => (), - Err(Error::NoHandshake) => { - self.has_handshake = false; - self.try_perform_handshake().await?; - } - Err(e) => return Err(e), - } + self.state = match ret { + // perform normal operation state transition + Ok(()) => match state { + ReplicatorState::Exit => unreachable!(), + ReplicatorState::NeedFrames => ReplicatorState::Exit, + ReplicatorState::NeedSnapshot | ReplicatorState::NeedHandshake => { + ReplicatorState::NeedFrames } - Some(Err(Error::NoHandshake)) => { - tracing::debug!("session expired, new handshake required"); - self.has_handshake = false; - self.try_perform_handshake().await?; + }, + Err(Error::NoHandshake) => { + if state == ReplicatorState::NeedHandshake { + return Err(Error::Fatal( + "Received handshake error while performing handshake".into(), + )); } - Some(Err(e)) => return Err(e), - None => return Ok(()), + ReplicatorState::NeedHandshake } - } + Err(Error::NeedSnapshot) => ReplicatorState::NeedSnapshot, + Err(e) => return Err(e), + }; + + Ok(()) } - async fn start_next_frames(&mut self) -> Result { - loop { - match self.client.next_frames().await { - Ok(s) => return Ok(s), - Err(Error::NeedSnapshot) => { - tracing::debug!("loading snapshot"); - // remove any outstanding frames in the buffer that are not part of a - // transaction: they are now part of the snapshot. - match self.load_snapshot().await { - Ok(()) => continue, - Err(Error::NoHandshake) => { - self.has_handshake = false; - self.try_perform_handshake().await?; - } - Err(e) => return Err(e), - } - } - Err(e) => return Err(e), - } + async fn try_replicate(&mut self) -> Result<(), Error> { + let mut stream = self.client.next_frames().await?; + + while let Some(frame) = stream.next().await.transpose()? { + self.inject_frame(frame).await?; } + + Ok(()) } async fn load_snapshot(&mut self) -> Result<(), Error> { @@ -267,15 +287,352 @@ impl Replicator { } /// Helper function to convert rpc frames results to replicator frames -pub fn map_frame_err(f: Result) -> Result { - match f { - Ok(frame) => Ok(Frame::try_from(&*frame.data).map_err(|e| Error::Client(e.into()))?), - Err(err) - if err.code() == tonic::Code::FailedPrecondition - && err.message() == NEED_SNAPSHOT_ERROR_MSG => - { - Err(Error::NeedSnapshot) +pub fn map_frame_err(f: Result) -> Result { + let frame = f?; + Ok(Frame::try_from(&*frame.data).map_err(|e| Error::Client(e.into()))?) +} + +#[cfg(test)] +mod test { + use std::pin::Pin; + + use async_stream::stream; + + use super::*; + + #[tokio::test] + async fn handshake_error_namespace_doesnt_exist() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + Err(Error::NamespaceDoesntExist) + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + unreachable!() + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unreachable!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + + assert!(matches!( + replicator.try_replicate_step().await.unwrap_err(), + Error::NamespaceDoesntExist + )); + } + + #[tokio::test] + async fn no_handshake_error_in_next_frame() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + Err(Error::NoHandshake) + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unreachable!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } } - Err(err) => Err(Error::Client(err.into()))?, + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); + assert_eq!(replicator.state, ReplicatorState::NeedHandshake); + } + + #[tokio::test] + async fn stream_frame_returns_handshake_error() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + Ok(Box::pin(stream! { + yield Err(Error::NoHandshake); + })) + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unreachable!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); + assert_eq!(replicator.state, ReplicatorState::NeedHandshake); + } + + #[tokio::test] + async fn stream_frame_returns_need_snapshot() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + Ok(Box::pin(stream! { + yield Err(Error::NeedSnapshot); + })) + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unreachable!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); + assert_eq!(replicator.state, ReplicatorState::NeedSnapshot); + } + + #[tokio::test] + async fn next_frames_returns_need_snapshot() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + Err(Error::NeedSnapshot) + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unreachable!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); + assert_eq!(replicator.state, ReplicatorState::NeedSnapshot); + } + + #[tokio::test] + async fn load_snapshot_returns_need_handshake() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + unimplemented!() + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + Err(Error::NoHandshake) + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + replicator.state = ReplicatorState::NeedSnapshot; + replicator.try_replicate_step().await.unwrap(); + assert_eq!(replicator.state, ReplicatorState::NeedHandshake); + } + + #[tokio::test] + async fn load_snapshot_stream_returns_need_handshake() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + unimplemented!() + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + unimplemented!() + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + Ok(Box::pin(stream! { + yield Err(Error::NoHandshake) + })) + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedSnapshot; + replicator.try_replicate_step().await.unwrap(); + + assert_eq!(replicator.state, ReplicatorState::NeedHandshake); + } + + #[tokio::test] + async fn receive_handshake_error_while_handshaking() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + struct Client; + + #[async_trait::async_trait] + impl ReplicatorClient for Client { + type FrameStream = Pin> + Send + 'static>>; + + /// Perform handshake with remote + async fn handshake(&mut self) -> Result<(), Error> { + Err(Error::NoHandshake) + } + /// Return a stream of frames to apply to the database + async fn next_frames(&mut self) -> Result { + unimplemented!() + } + /// Return a snapshot for the current replication index. Called after next_frame has returned a + /// NeedSnapshot error + async fn snapshot(&mut self) -> Result { + unimplemented!() + } + /// set the new commit frame_no + async fn commit_frame_no(&mut self, _frame_no: FrameNo) -> Result<(), Error> { + unreachable!() + } + /// Returns the currently committed replication index + fn committed_frame_no(&self) -> Option { + unreachable!() + } + } + + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); + replicator.state = ReplicatorState::NeedHandshake; + assert!(matches!( + replicator.try_replicate_step().await.unwrap_err(), + Error::Fatal(_) + )); } } diff --git a/libsql-replication/src/rpc.rs b/libsql-replication/src/rpc.rs index ada36910b0..c2021be8a9 100644 --- a/libsql-replication/src/rpc.rs +++ b/libsql-replication/src/rpc.rs @@ -29,6 +29,8 @@ pub mod replication { pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO"; pub const NEED_SNAPSHOT_ERROR_MSG: &str = "NEED_SNAPSHOT"; + /// A tonic error code to signify that a namespace doesn't exist. + pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST"; pub const SESSION_TOKEN_KEY: &str = "x-session-token"; pub const NAMESPACE_METADATA_KEY: &str = "x-namespace-bin"; diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 12b9adf5c5..e47cce052d 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -36,7 +36,6 @@ use crate::database::{Database, PrimaryDatabase, ReplicaDatabase}; use crate::error::{Error, LoadDumpError}; use crate::metrics::NAMESPACE_LOAD_LATENCY; use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS}; -use crate::replication::replicator_client::NamespaceDoesntExist; use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger}; use crate::stats::Stats; use crate::{ @@ -720,15 +719,12 @@ impl Namespace { use libsql_replication::replicator::Error; loop { match replicator.run().await { - Error::Fatal(e) => { - if let Ok(err) = e.downcast::() { - tracing::error!("namespace {namespace} doesn't exist, destroying..."); - (reset)(ResetOp::Destroy(namespace.clone())); - Err(err)?; - } else { - unreachable!("unexpected fatal replication error") - } - }, + err @ Error::Fatal(_) => Err(err)?, + err @ Error::NamespaceDoesntExist => { + tracing::error!("namespace {namespace} doesn't exist, destroying..."); + (reset)(ResetOp::Destroy(namespace.clone())); + Err(err)?; + } Error::Meta(err) => { use libsql_replication::meta::Error; match err { diff --git a/libsql-server/src/replication/replicator_client.rs b/libsql-server/src/replication/replicator_client.rs index cba63ab2db..25e55c37e4 100644 --- a/libsql-server/src/replication/replicator_client.rs +++ b/libsql-server/src/replication/replicator_client.rs @@ -7,18 +7,16 @@ use libsql_replication::meta::WalIndexMeta; use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient}; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use libsql_replication::rpc::replication::{ - verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, NEED_SNAPSHOT_ERROR_MSG, - NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, + verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, SESSION_TOKEN_KEY, }; use tokio::sync::watch; use tokio_stream::{Stream, StreamExt}; use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue}; use tonic::transport::Channel; -use tonic::{Code, Request}; +use tonic::Request; use crate::namespace::NamespaceName; use crate::replication::FrameNo; -use crate::rpc::NAMESPACE_DOESNT_EXIST; pub struct Client { client: ReplicationLogClient, @@ -78,10 +76,6 @@ impl Client { } } -#[derive(Debug, thiserror::Error)] -#[error("namespace doesn't exist")] -pub struct NamespaceDoesntExist; - #[async_trait::async_trait] impl ReplicatorClient for Client { type FrameStream = Pin> + Send + 'static>>; @@ -89,26 +83,16 @@ impl ReplicatorClient for Client { async fn handshake(&mut self) -> Result<(), Error> { tracing::info!("Attempting to perform handshake with primary."); let req = self.make_request(HelloRequest::new()); - match self.client.hello(req).await { - Ok(resp) => { - let hello = resp.into_inner(); - verify_session_token(&hello.session_token).map_err(Error::Client)?; - self.primary_replication_index = hello.current_replication_index; - self.session_token.replace(hello.session_token.clone()); - self.meta.init_from_hello(hello)?; - self.current_frame_no_notifier - .send_replace(self.meta.current_frame_no()); - - Ok(()) - } - Err(e) - if e.code() == Code::FailedPrecondition - && e.message() == NAMESPACE_DOESNT_EXIST => - { - Err(Error::Fatal(NamespaceDoesntExist.into()))? - } - Err(e) => Err(Error::Client(e.into()))?, - } + let resp = self.client.hello(req).await?; + let hello = resp.into_inner(); + verify_session_token(&hello.session_token).map_err(Error::Client)?; + self.primary_replication_index = hello.current_replication_index; + self.session_token.replace(hello.session_token.clone()); + self.meta.init_from_hello(hello)?; + self.current_frame_no_notifier + .send_replace(self.meta.current_frame_no()); + + Ok(()) } async fn next_frames(&mut self) -> Result { @@ -119,14 +103,7 @@ impl ReplicatorClient for Client { let stream = self .client .log_entries(req) - .await - .map_err(|e| { - if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG { - Error::NeedSnapshot - } else { - Error::Client(e.into()) - } - })? + .await? .into_inner() .map(map_frame_err); @@ -141,8 +118,7 @@ impl ReplicatorClient for Client { let stream = self .client .snapshot(req) - .await - .map_err(map_status)? + .await? .into_inner() .map(map_frame_err); Ok(Box::pin(stream)) @@ -162,11 +138,3 @@ impl ReplicatorClient for Client { self.meta.current_frame_no() } } - -fn map_status(status: tonic::Status) -> Error { - if status.code() == Code::FailedPrecondition && status.message() == NO_HELLO_ERROR_MSG { - Error::NoHandshake - } else { - Error::Client(status.into()) - } -} diff --git a/libsql-server/src/rpc/mod.rs b/libsql-server/src/rpc/mod.rs index e927b54985..434e7ac246 100644 --- a/libsql-server/src/rpc/mod.rs +++ b/libsql-server/src/rpc/mod.rs @@ -26,9 +26,6 @@ pub mod replication_log; pub mod replication_log_proxy; pub mod streaming_exec; -/// A tonic error code to signify that a namespace doesn't exist. -pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST"; - pub async fn run_rpc_server( proxy_service: ProxyService, acceptor: A, diff --git a/libsql-server/src/rpc/proxy.rs b/libsql-server/src/rpc/proxy.rs index 002a5eb16d..a2c489b433 100644 --- a/libsql-server/src/rpc/proxy.rs +++ b/libsql-server/src/rpc/proxy.rs @@ -11,6 +11,7 @@ use libsql_replication::rpc::proxy::{ describe_result, Ack, DescribeRequest, DescribeResult, Description, DisconnectMessage, ExecReq, ExecResp, ExecuteResults, QueryResult, ResultRows, Row, }; +use libsql_replication::rpc::replication::NAMESPACE_DOESNT_EXIST; use rusqlite::types::ValueRef; use uuid::Uuid; @@ -24,8 +25,6 @@ use crate::query_result_builder::{ use crate::replication::FrameNo; use crate::rpc::streaming_exec::make_proxy_stream; -use super::NAMESPACE_DOESNT_EXIST; - pub mod rpc { use std::sync::Arc; diff --git a/libsql-server/src/rpc/replication_log.rs b/libsql-server/src/rpc/replication_log.rs index 25b22b935a..b3919a630e 100644 --- a/libsql-server/src/rpc/replication_log.rs +++ b/libsql-server/src/rpc/replication_log.rs @@ -8,8 +8,8 @@ use futures::stream::BoxStream; pub use libsql_replication::rpc::replication as rpc; use libsql_replication::rpc::replication::replication_log_server::ReplicationLog; use libsql_replication::rpc::replication::{ - Frame, Frames, HelloRequest, HelloResponse, LogOffset, NEED_SNAPSHOT_ERROR_MSG, - NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, + Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST, + NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, }; use tokio_stream::StreamExt; use tonic::transport::server::TcpConnectInfo; @@ -22,7 +22,7 @@ use crate::replication::primary::frame_stream::FrameStream; use crate::replication::LogReadError; use crate::utils::services::idle_shutdown::IdleShutdownKicker; -use super::{extract_namespace, NAMESPACE_DOESNT_EXIST}; +use super::extract_namespace; pub struct ReplicationLogService { namespaces: NamespaceStore, diff --git a/libsql-server/tests/cluster/replication.rs b/libsql-server/tests/cluster/replication.rs index 4f0fd7bbbd..39c4008846 100644 --- a/libsql-server/tests/cluster/replication.rs +++ b/libsql-server/tests/cluster/replication.rs @@ -115,6 +115,12 @@ fn apply_partial_snapshot() { notify.notify_waiters(); let client = Client::new(); + + // wait for replica to start up + while client.get("http://replica:8080/").await.is_err() { + tokio::time::sleep(Duration::from_millis(100)).await; + } + loop { let resp = client .get("http://replica:9090/v1/namespaces/default/stats") diff --git a/libsql/src/replication/remote_client.rs b/libsql/src/replication/remote_client.rs index 8119de74a1..83bc46f1bf 100644 --- a/libsql/src/replication/remote_client.rs +++ b/libsql/src/replication/remote_client.rs @@ -8,11 +8,10 @@ use libsql_replication::frame::{Frame, FrameHeader, FrameNo}; use libsql_replication::meta::WalIndexMeta; use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient}; use libsql_replication::rpc::replication::{ - verify_session_token, HelloRequest, LogOffset, NEED_SNAPSHOT_ERROR_MSG, SESSION_TOKEN_KEY, + verify_session_token, HelloRequest, LogOffset, SESSION_TOKEN_KEY, }; use tokio_stream::Stream; use tonic::metadata::AsciiMetadataValue; -use tonic::Code; /// A remote replicator client, that pulls frames over RPC pub struct RemoteClient { @@ -64,31 +63,27 @@ impl ReplicatorClient for RemoteClient { async fn handshake(&mut self) -> Result<(), Error> { tracing::info!("Attempting to perform handshake with primary."); let req = self.make_request(HelloRequest::new()); - match self.remote.replication.hello(req).await { - Ok(resp) => { - let hello = resp.into_inner(); - verify_session_token(&hello.session_token).map_err(Error::Client)?; - self.session_token = Some(hello.session_token.clone()); - if self.dirty { - self.meta.reset(); - self.dirty = false; - } - if let Err(e) = self.meta.init_from_hello(hello) { - // set the meta as dirty. The caller should catch the error and clean the db - // file. On the next call to replicate, the db will be replicated from the new - // log. - if let libsql_replication::meta::Error::LogIncompatible = e { - self.dirty = true; - } - - Err(e)?; - } - self.meta.flush().await?; - - Ok(()) + let resp = self.remote.replication.hello(req).await?; + let hello = resp.into_inner(); + verify_session_token(&hello.session_token).map_err(Error::Client)?; + self.session_token = Some(hello.session_token.clone()); + if self.dirty { + self.meta.reset(); + self.dirty = false; + } + if let Err(e) = self.meta.init_from_hello(hello) { + // set the meta as dirty. The caller should catch the error and clean the db + // file. On the next call to replicate, the db will be replicated from the new + // log. + if let libsql_replication::meta::Error::LogIncompatible = e { + self.dirty = true; } - Err(e) => Err(Error::Client(e.into()))?, + + Err(e)?; } + self.meta.flush().await?; + + Ok(()) } /// Return a stream of frames to apply to the database @@ -100,14 +95,7 @@ impl ReplicatorClient for RemoteClient { .remote .replication .batch_log_entries(req) - .await - .map_err(|e| { - if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG { - Error::NeedSnapshot - } else { - Error::Client(e.into()) - } - })? + .await? .into_inner() .frames; @@ -136,8 +124,7 @@ impl ReplicatorClient for RemoteClient { .remote .replication .snapshot(req) - .await - .map_err(|e| Error::Client(e.into()))? + .await? .into_inner() .map(map_frame_err) .peekable();