Skip to content

Commit

Permalink
Merge pull request #661 from tursodatabase/improve-replication-error-…
Browse files Browse the repository at this point in the history
…handling

improve replication error handling
  • Loading branch information
MarinPostma committed Nov 23, 2023
2 parents a5cd846 + 499ad76 commit 2e7b88f
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 164 deletions.
487 changes: 422 additions & 65 deletions libsql-replication/src/replicator.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions libsql-replication/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub mod replication {

pub const NO_HELLO_ERROR_MSG: &str = "NO_HELLO";
pub const NEED_SNAPSHOT_ERROR_MSG: &str = "NEED_SNAPSHOT";
/// A tonic error code to signify that a namespace doesn't exist.
pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST";

pub const SESSION_TOKEN_KEY: &str = "x-session-token";
pub const NAMESPACE_METADATA_KEY: &str = "x-namespace-bin";
Expand Down
16 changes: 6 additions & 10 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::database::{Database, PrimaryDatabase, ReplicaDatabase};
use crate::error::{Error, LoadDumpError};
use crate::metrics::NAMESPACE_LOAD_LATENCY;
use crate::replication::primary::logger::{ReplicationLoggerHookCtx, REPLICATION_METHODS};
use crate::replication::replicator_client::NamespaceDoesntExist;
use crate::replication::{FrameNo, NamespacedSnapshotCallback, ReplicationLogger};
use crate::stats::Stats;
use crate::{
Expand Down Expand Up @@ -720,15 +719,12 @@ impl Namespace<ReplicaDatabase> {
use libsql_replication::replicator::Error;
loop {
match replicator.run().await {
Error::Fatal(e) => {
if let Ok(err) = e.downcast::<NamespaceDoesntExist>() {
tracing::error!("namespace {namespace} doesn't exist, destroying...");
(reset)(ResetOp::Destroy(namespace.clone()));
Err(err)?;
} else {
unreachable!("unexpected fatal replication error")
}
},
err @ Error::Fatal(_) => Err(err)?,
err @ Error::NamespaceDoesntExist => {
tracing::error!("namespace {namespace} doesn't exist, destroying...");
(reset)(ResetOp::Destroy(namespace.clone()));
Err(err)?;
}
Error::Meta(err) => {
use libsql_replication::meta::Error;
match err {
Expand Down
60 changes: 14 additions & 46 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,16 @@ use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, NEED_SNAPSHOT_ERROR_MSG,
NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, SESSION_TOKEN_KEY,
};
use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
use tonic::metadata::{AsciiMetadataValue, BinaryMetadataValue};
use tonic::transport::Channel;
use tonic::{Code, Request};
use tonic::Request;

use crate::namespace::NamespaceName;
use crate::replication::FrameNo;
use crate::rpc::NAMESPACE_DOESNT_EXIST;

pub struct Client {
client: ReplicationLogClient<Channel>,
Expand Down Expand Up @@ -78,37 +76,23 @@ impl Client {
}
}

#[derive(Debug, thiserror::Error)]
#[error("namespace doesn't exist")]
pub struct NamespaceDoesntExist;

#[async_trait::async_trait]
impl ReplicatorClient for Client {
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>> + Send + 'static>>;

async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest::new());
match self.client.hello(req).await {
Ok(resp) => {
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.primary_replication_index = hello.current_replication_index;
self.session_token.replace(hello.session_token.clone());
self.meta.init_from_hello(hello)?;
self.current_frame_no_notifier
.send_replace(self.meta.current_frame_no());

Ok(())
}
Err(e)
if e.code() == Code::FailedPrecondition
&& e.message() == NAMESPACE_DOESNT_EXIST =>
{
Err(Error::Fatal(NamespaceDoesntExist.into()))?
}
Err(e) => Err(Error::Client(e.into()))?,
}
let resp = self.client.hello(req).await?;
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.primary_replication_index = hello.current_replication_index;
self.session_token.replace(hello.session_token.clone());
self.meta.init_from_hello(hello)?;
self.current_frame_no_notifier
.send_replace(self.meta.current_frame_no());

Ok(())
}

async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
Expand All @@ -119,14 +103,7 @@ impl ReplicatorClient for Client {
let stream = self
.client
.log_entries(req)
.await
.map_err(|e| {
if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG {
Error::NeedSnapshot
} else {
Error::Client(e.into())
}
})?
.await?
.into_inner()
.map(map_frame_err);

Expand All @@ -141,8 +118,7 @@ impl ReplicatorClient for Client {
let stream = self
.client
.snapshot(req)
.await
.map_err(map_status)?
.await?
.into_inner()
.map(map_frame_err);
Ok(Box::pin(stream))
Expand All @@ -162,11 +138,3 @@ impl ReplicatorClient for Client {
self.meta.current_frame_no()
}
}

fn map_status(status: tonic::Status) -> Error {
if status.code() == Code::FailedPrecondition && status.message() == NO_HELLO_ERROR_MSG {
Error::NoHandshake
} else {
Error::Client(status.into())
}
}
3 changes: 0 additions & 3 deletions libsql-server/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ pub mod replication_log;
pub mod replication_log_proxy;
pub mod streaming_exec;

/// A tonic error code to signify that a namespace doesn't exist.
pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST";

pub async fn run_rpc_server<A: crate::net::Accept>(
proxy_service: ProxyService,
acceptor: A,
Expand Down
3 changes: 1 addition & 2 deletions libsql-server/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use libsql_replication::rpc::proxy::{
describe_result, Ack, DescribeRequest, DescribeResult, Description, DisconnectMessage, ExecReq,
ExecResp, ExecuteResults, QueryResult, ResultRows, Row,
};
use libsql_replication::rpc::replication::NAMESPACE_DOESNT_EXIST;
use rusqlite::types::ValueRef;
use uuid::Uuid;

Expand All @@ -24,8 +25,6 @@ use crate::query_result_builder::{
use crate::replication::FrameNo;
use crate::rpc::streaming_exec::make_proxy_stream;

use super::NAMESPACE_DOESNT_EXIST;

pub mod rpc {
use std::sync::Arc;

Expand Down
6 changes: 3 additions & 3 deletions libsql-server/src/rpc/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::stream::BoxStream;
pub use libsql_replication::rpc::replication as rpc;
use libsql_replication::rpc::replication::replication_log_server::ReplicationLog;
use libsql_replication::rpc::replication::{
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NEED_SNAPSHOT_ERROR_MSG,
NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST,
NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
};
use tokio_stream::StreamExt;
use tonic::transport::server::TcpConnectInfo;
Expand All @@ -22,7 +22,7 @@ use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::LogReadError;
use crate::utils::services::idle_shutdown::IdleShutdownKicker;

use super::{extract_namespace, NAMESPACE_DOESNT_EXIST};
use super::extract_namespace;

pub struct ReplicationLogService {
namespaces: NamespaceStore<PrimaryNamespaceMaker>,
Expand Down
6 changes: 6 additions & 0 deletions libsql-server/tests/cluster/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ fn apply_partial_snapshot() {
notify.notify_waiters();

let client = Client::new();

// wait for replica to start up
while client.get("http://replica:8080/").await.is_err() {
tokio::time::sleep(Duration::from_millis(100)).await;
}

loop {
let resp = client
.get("http://replica:9090/v1/namespaces/default/stats")
Expand Down
57 changes: 22 additions & 35 deletions libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, NEED_SNAPSHOT_ERROR_MSG, SESSION_TOKEN_KEY,
verify_session_token, HelloRequest, LogOffset, SESSION_TOKEN_KEY,
};
use tokio_stream::Stream;
use tonic::metadata::AsciiMetadataValue;
use tonic::Code;

/// A remote replicator client, that pulls frames over RPC
pub struct RemoteClient {
Expand Down Expand Up @@ -64,31 +63,27 @@ impl ReplicatorClient for RemoteClient {
async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest::new());
match self.remote.replication.hello(req).await {
Ok(resp) => {
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.session_token = Some(hello.session_token.clone());
if self.dirty {
self.meta.reset();
self.dirty = false;
}
if let Err(e) = self.meta.init_from_hello(hello) {
// set the meta as dirty. The caller should catch the error and clean the db
// file. On the next call to replicate, the db will be replicated from the new
// log.
if let libsql_replication::meta::Error::LogIncompatible = e {
self.dirty = true;
}

Err(e)?;
}
self.meta.flush().await?;

Ok(())
let resp = self.remote.replication.hello(req).await?;
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.session_token = Some(hello.session_token.clone());
if self.dirty {
self.meta.reset();
self.dirty = false;
}
if let Err(e) = self.meta.init_from_hello(hello) {
// set the meta as dirty. The caller should catch the error and clean the db
// file. On the next call to replicate, the db will be replicated from the new
// log.
if let libsql_replication::meta::Error::LogIncompatible = e {
self.dirty = true;
}
Err(e) => Err(Error::Client(e.into()))?,

Err(e)?;
}
self.meta.flush().await?;

Ok(())
}

/// Return a stream of frames to apply to the database
Expand All @@ -100,14 +95,7 @@ impl ReplicatorClient for RemoteClient {
.remote
.replication
.batch_log_entries(req)
.await
.map_err(|e| {
if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG {
Error::NeedSnapshot
} else {
Error::Client(e.into())
}
})?
.await?
.into_inner()
.frames;

Expand Down Expand Up @@ -136,8 +124,7 @@ impl ReplicatorClient for RemoteClient {
.remote
.replication
.snapshot(req)
.await
.map_err(|e| Error::Client(e.into()))?
.await?
.into_inner()
.map(map_frame_err)
.peekable();
Expand Down

0 comments on commit 2e7b88f

Please sign in to comment.