Skip to content

Commit

Permalink
Merge pull request #660 from tursodatabase/lucio/fix-replication
Browse files Browse the repository at this point in the history
libsql: fix embedded replica snapshot handling
  • Loading branch information
LucioFranco committed Nov 21, 2023
2 parents 365d921 + 4974434 commit 1e97a62
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 22 deletions.
26 changes: 25 additions & 1 deletion libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ impl<C: ReplicatorClient> Replicator<C> {
if !self.has_handshake {
self.try_perform_handshake().await?;
}
let mut stream = self.client.next_frames().await?;

let mut stream = self.start_next_frames().await?;

loop {
match stream.next().await {
Some(Ok(frame)) => {
Expand Down Expand Up @@ -204,6 +206,28 @@ impl<C: ReplicatorClient> Replicator<C> {
}
}

async fn start_next_frames(&mut self) -> Result<C::FrameStream, Error> {
loop {
match self.client.next_frames().await {
Ok(s) => return Ok(s),
Err(Error::NeedSnapshot) => {
tracing::debug!("loading snapshot");
// remove any outstanding frames in the buffer that are not part of a
// transaction: they are now part of the snapshot.
match self.load_snapshot().await {
Ok(()) => continue,
Err(Error::NoHandshake) => {
self.has_handshake = false;
self.try_perform_handshake().await?;
}
Err(e) => return Err(e),
}
}
Err(e) => return Err(e),
}
}
}

async fn load_snapshot(&mut self) -> Result<(), Error> {
self.injector.lock().clear_buffer();
let mut stream = self.client.snapshot().await?;
Expand Down
12 changes: 9 additions & 3 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, NO_HELLO_ERROR_MSG,
SESSION_TOKEN_KEY,
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, NEED_SNAPSHOT_ERROR_MSG,
NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
};
use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -120,7 +120,13 @@ impl ReplicatorClient for Client {
.client
.log_entries(req)
.await
.map_err(|e| Error::Client(e.into()))?
.map_err(|e| {
if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG {
Error::NeedSnapshot
} else {
Error::Client(e.into())
}
})?
.into_inner()
.map(map_frame_err);

Expand Down
21 changes: 12 additions & 9 deletions libsql/examples/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ async fn main() {
let db_file = tempfile::NamedTempFile::new().unwrap();
println!("Database {}", db_file.path().display());

let auth_token = std::env::var("TURSO_AUTH_TOKEN").unwrap_or_else(|_| {
println!("Using empty token since TURSO_AUTH_TOKEN was not set");
let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or_else(|_| {
println!("Using empty token since LIBSQL_TOKEN was not set");
"".to_string()
});

let db = Database::open_with_remote_sync(
db_file.path().to_str().unwrap(),
"http://localhost:8080",
auth_token,
)
.await
.unwrap();
let url = std::env::var("LIBSQL_URL")
.unwrap_or_else(|_| {
println!("Using empty token since LIBSQL_URL was not set");
"http://localhost:8080".to_string()
})
.replace("libsql", "https");

let db = Database::open_with_remote_sync(db_file.path().to_str().unwrap(), url, auth_token)
.await
.unwrap();
let conn = db.connect().unwrap();

let f = db.sync().await.unwrap();
Expand Down
34 changes: 25 additions & 9 deletions libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use std::pin::Pin;

use bytes::Bytes;
use futures::StreamExt as _;
use libsql_replication::replicator::{ReplicatorClient, Error, map_frame_err};
use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
use libsql_replication::meta::WalIndexMeta;
use libsql_replication::frame::{FrameNo, Frame, FrameHeader};
use libsql_replication::rpc::replication::{HelloRequest, LogOffset, verify_session_token, SESSION_TOKEN_KEY};
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, NEED_SNAPSHOT_ERROR_MSG, SESSION_TOKEN_KEY,
};
use tokio_stream::Stream;
use tonic::metadata::AsciiMetadataValue;
use tonic::Code;

/// A remote replicator client, that pulls frames over RPC
pub struct RemoteClient {
Expand Down Expand Up @@ -39,12 +42,14 @@ impl RemoteClient {
None => 0,
}
}

fn make_request<T>(&self, req: T) -> tonic::Request<T> {
let mut req = tonic::Request::new(req);
if let Some(token) = self.session_token.clone() {
// SAFETY: we always validate the token
req.metadata_mut().insert(SESSION_TOKEN_KEY, unsafe { AsciiMetadataValue::from_shared_unchecked(token) });
req.metadata_mut().insert(SESSION_TOKEN_KEY, unsafe {
AsciiMetadataValue::from_shared_unchecked(token)
});
}

req
Expand Down Expand Up @@ -88,18 +93,27 @@ impl ReplicatorClient for RemoteClient {

/// Return a stream of frames to apply to the database
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
let req = self.make_request(LogOffset { next_offset: self.next_offset() });
let req = self.make_request(LogOffset {
next_offset: self.next_offset(),
});
let frames = self
.remote
.replication
.batch_log_entries(req)
.await
.map_err(|e| Error::Client(e.into()))?
.map_err(|e| {
if e.code() == Code::FailedPrecondition && e.message() == NEED_SNAPSHOT_ERROR_MSG {
Error::NeedSnapshot
} else {
Error::Client(e.into())
}
})?
.into_inner()
.frames;

if let Some(f) = frames.last() {
let header: FrameHeader = bytemuck::pod_read_unaligned(&f.data[0..size_of::<FrameHeader>()]);
let header: FrameHeader =
bytemuck::pod_read_unaligned(&f.data[0..size_of::<FrameHeader>()]);
self.last_received = Some(header.frame_no);
}

Expand All @@ -115,7 +129,9 @@ impl ReplicatorClient for RemoteClient {
/// Return a snapshot for the current replication index. Called after next_frame has returned a
/// NeedSnapshot error
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error> {
let req = self.make_request(LogOffset { next_offset: self.next_offset() });
let req = self.make_request(LogOffset {
next_offset: self.next_offset(),
});
let mut frames = self
.remote
.replication
Expand Down

0 comments on commit 1e97a62

Please sign in to comment.