Skip to content

Commit

Permalink
Merge pull request #584 from tursodatabase/delegate-write-until-sync
Browse files Browse the repository at this point in the history
Replica delegate request until in sync
  • Loading branch information
MarinPostma committed Nov 7, 2023
2 parents 19dfc7e + 7af5b24 commit f436d7b
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions libsql-replication/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message HelloResponse {
/// string-encoded Uuid v4 token for the current session, changes on each restart, and must be passed in subsequent requests header.string
/// If the header session token fails to match the current session token, a NO_HELLO error is returned
bytes session_token = 4;
optional uint64 current_replication_index = 5;
}

message Frame {
Expand Down
2 changes: 2 additions & 0 deletions libsql-replication/src/generated/wal_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct HelloResponse {
/// / If the header session token fails to match the current session token, a NO_HELLO error is returned
#[prost(bytes = "bytes", tag = "4")]
pub session_token: ::prost::bytes::Bytes,
#[prost(uint64, optional, tag = "5")]
pub current_replication_index: ::core::option::Option<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
2 changes: 1 addition & 1 deletion libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<C: ReplicatorClient> Replicator<C> {
}
}

async fn try_perform_handshake(&mut self) -> Result<(), Error> {
pub async fn try_perform_handshake(&mut self) -> Result<(), Error> {
let mut error_printed = false;
for _ in 0..HANDSHAKE_MAX_RETRIES {
tracing::info!("Attempting to perform handshake with primary.");
Expand Down
32 changes: 30 additions & 2 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct MakeWriteProxyConn {
max_total_response_size: u64,
namespace: NamespaceName,
make_read_only_conn: MakeLibSqlConn<TransparentMethods>,
primary_replication_index: Option<FrameNo>,
}

impl MakeWriteProxyConn {
Expand All @@ -57,6 +58,7 @@ impl MakeWriteProxyConn {
max_response_size: u64,
max_total_response_size: u64,
namespace: NamespaceName,
primary_replication_index: Option<FrameNo>,
) -> crate::Result<Self> {
let client = ProxyClient::with_origin(channel, uri);
let make_read_only_conn = MakeLibSqlConn::new(
Expand All @@ -81,6 +83,7 @@ impl MakeWriteProxyConn {
max_total_response_size,
namespace,
make_read_only_conn,
primary_replication_index,
})
}
}
Expand All @@ -100,6 +103,7 @@ impl MakeConnection for MakeWriteProxyConn {
},
self.namespace.clone(),
self.make_read_only_conn.create().await?,
self.primary_replication_index,
)
.await?;
Ok(db)
Expand All @@ -122,6 +126,8 @@ pub struct WriteProxyConnection<R> {
namespace: NamespaceName,

remote_conn: Mutex<Option<RemoteConnection<R>>>,
/// the primary replication index when the namespace was loaded
primary_replication_index: Option<FrameNo>,
}

impl WriteProxyConnection<RpcStream> {
Expand All @@ -133,6 +139,7 @@ impl WriteProxyConnection<RpcStream> {
builder_config: QueryBuilderConfig,
namespace: NamespaceName,
read_conn: LibSqlConnection<TransparentMethods>,
primary_replication_index: Option<u64>,
) -> Result<Self> {
Ok(Self {
read_conn,
Expand All @@ -144,6 +151,7 @@ impl WriteProxyConnection<RpcStream> {
stats,
namespace,
remote_conn: Default::default(),
primary_replication_index,
})
}

Expand Down Expand Up @@ -233,6 +241,27 @@ impl WriteProxyConnection<RpcStream> {
None => Ok(()),
}
}

/// returns whether a request should be unconditionally proxied based on the current state of
/// the replica.
fn should_proxy(&self) -> bool {
// There primary has data
if let Some(primary_index) = self.primary_replication_index {
let last_applied = *self.applied_frame_no_receiver.borrow();
// if we either don't have data while the primary has, or the data we have is
// anterior to that of the primary when we loaded the namespace, then proxy the
// request to the primary
if last_applied.is_none() {
return true;
}

if let Some(last_applied) = last_applied {
return last_applied < primary_index;
}
}

false
}
}

struct RemoteConnection<R = Streaming<ExecResp>> {
Expand Down Expand Up @@ -405,8 +434,7 @@ impl Connection for WriteProxyConnection<RpcStream> {
) -> Result<B> {
let mut state = self.state.lock().await;

// This is a fresh namespace, and it is not replicated yet, proxy the first request.
if self.applied_frame_no_receiver.borrow().is_none() {
if self.should_proxy() {
self.execute_remote(pgm, &mut state, auth, builder).await
} else if *state == TxnStatus::Init && pgm.is_read_only() {
// set the state to invalid before doing anything, and set it to a valid state after.
Expand Down
5 changes: 5 additions & 0 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ impl Namespace<ReplicaDatabase> {
)
.await?;

// force a handshake now, to retrieve the primary's current replication index
replicator.try_perform_handshake().await?;
let primary_current_replicatio_index = replicator.client_mut().primary_replication_index;

let mut join_set = JoinSet::new();
let namespace = name.clone();
join_set.spawn(async move {
Expand Down Expand Up @@ -646,6 +650,7 @@ impl Namespace<ReplicaDatabase> {
config.max_response_size,
config.max_total_response_size,
name.clone(),
primary_current_replicatio_index,
)
.await?
.throttled(
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Client {
pub current_frame_no_notifier: watch::Sender<Option<FrameNo>>,
namespace: NamespaceName,
session_token: Option<Bytes>,
// the primary current replication index, as reported by the last handshake
pub primary_replication_index: Option<FrameNo>,
}

impl Client {
Expand All @@ -43,6 +45,7 @@ impl Client {
current_frame_no_notifier,
meta,
session_token: None,
primary_replication_index: None,
})
}

Expand Down Expand Up @@ -90,6 +93,7 @@ impl ReplicatorClient for Client {
Ok(resp) => {
let hello = resp.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
self.primary_replication_index = hello.current_replication_index;
self.session_token.replace(hello.session_token.clone());
self.meta.init_from_hello(hello)?;
self.current_frame_no_notifier
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/rpc/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl ReplicationLog for ReplicationLogService {
session_token: self.session_token.clone(),
generation_id: self.generation_id.to_string(),
generation_start_index: 0,
current_replication_index: *logger.new_frame_notifier.borrow(),
};

Ok(tonic::Response::new(response))
Expand Down

0 comments on commit f436d7b

Please sign in to comment.