Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 140 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ hyper = { version = "0.14.23", features = ["http2"] }
hyper-tungstenite = "0.10"
itertools = "0.10.5"
jsonwebtoken = "8.2.0"
memmap = "0.7.0"
mimalloc = { version = "0.1.36", default-features = false }
nix = { version = "0.26.2", features = ["fs"] }
once_cell = "1.17.0"
Expand Down
Binary file added sqld/assets/test/simple_wallog
Binary file not shown.
2 changes: 1 addition & 1 deletion sqld/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut config = Config::new();
config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]);
config.bytes([".wal_log"]);
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]")
Expand Down
7 changes: 1 addition & 6 deletions sqld/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ message LogOffset {
message HelloRequest {}

message HelloResponse {
/// Uuid of the current generation
string generation_id = 1;
/// First frame_no in the current generation
uint64 generation_start_index = 2;
/// Uuid of the database being replicated
string database_id = 3;
string log_id = 3;
}

message Frame {
Expand Down
3 changes: 3 additions & 0 deletions sqld/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum Error {
ConflictingRestoreParameters,
#[error("failed to fork database: {0}")]
Fork(#[from] ForkError),
#[error("fatal replication error")]
FatalReplicationError,
}

trait ResponseError: std::error::Error {
Expand Down Expand Up @@ -129,6 +131,7 @@ impl IntoResponse for Error {
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
Fork(e) => e.into_response(),
FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions sqld/src/namespace/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::time::Duration;
use tokio_stream::StreamExt;

use crate::database::PrimaryDatabase;
use crate::replication::frame::Frame;
use crate::replication::frame::FrameBorrowed;
use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::{LogReadError, ReplicationLogger};
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
Expand Down Expand Up @@ -41,7 +41,7 @@ impl From<tokio::task::JoinError> for ForkError {
}
}

async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> {
let page_no = frame.header().page_no;
let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize;
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl ForkTask<'_> {
match res {
Ok(frame) => {
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(frame, &mut data_file).await?;
write_frame(&frame, &mut data_file).await?;
}
Err(LogReadError::SnapshotRequired) => {
let snapshot = loop {
Expand All @@ -147,7 +147,7 @@ impl ForkTask<'_> {
for frame in iter {
let frame = frame.map_err(ForkError::LogRead)?;
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(frame, &mut data_file).await?;
write_frame(&frame, &mut data_file).await?;
}
}
Err(LogReadError::Ahead) => {
Expand Down
11 changes: 4 additions & 7 deletions sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,30 +561,27 @@ impl Namespace<ReplicaDatabase> {
DatabaseConfigStore::load(&db_path).context("Could not load database config")?,
);

let mut join_set = JoinSet::new();
let replicator = Replicator::new(
db_path.clone(),
config.channel.clone(),
config.uri.clone(),
name.clone(),
&mut join_set,
reset,
)
.await?;

let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone();
let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe();
let mut join_set = JoinSet::new();
join_set.spawn(replicator.run());

let stats = make_stats(
&db_path,
&mut join_set,
config.stats_sender.clone(),
name.clone(),
replicator.current_frame_no_notifier.clone(),
applied_frame_no_receiver.clone(),
)
.await?;

join_set.spawn(replicator.run());

let connection_maker = MakeWriteProxyConn::new(
db_path.clone(),
config.extensions.clone(),
Expand Down
149 changes: 111 additions & 38 deletions sqld/src/replication/frame.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::borrow::Cow;
use std::alloc::Layout;
use std::fmt;
use std::mem::{size_of, transmute};
use std::ops::Deref;
use std::mem::size_of;
use std::ops::{Deref, DerefMut};

use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
use bytes::Bytes;

use crate::LIBSQL_PAGE_SIZE;

Expand All @@ -28,10 +28,18 @@ pub struct FrameHeader {
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
/// The owned version of a replication frame.
/// The shared version of a replication frame.
/// Cloning this is cheap.
pub struct Frame {
data: Bytes,
inner: Bytes,
}

impl TryFrom<&[u8]> for Frame {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
Ok(FrameMut::try_from(data)?.into())
}
}

impl fmt::Debug for Frame {
Expand All @@ -43,64 +51,129 @@ impl fmt::Debug for Frame {
}
}

impl Frame {
/// size of a single frame
pub const SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;
/// Owned version of a frame, on the heap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing the purpose of the new FrameMut and FrameBorrowed. Why do we need it? Feel free to also post the answer in the commit message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, only got back to this PR now. So basically, FrameBorrowed is a stack data structure that contains the frame's raw data. But since it's quite big, you don't really ever get an owned version. Instead, you get a Frame and a FrameMut that are the stack-allocated equivalents. Both deref to a FrameBorrowed, with the difference between FrameMut and Frame being that Frame is a shared pointer that is cheaply clonable, and FrameMut is an exclusive reference to the Frame.

pub struct FrameMut {
inner: Box<FrameBorrowed>,
}

pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
let mut buf = BytesMut::with_capacity(Self::SIZE);
buf.extend_from_slice(bytes_of(header));
buf.extend_from_slice(data);
impl TryFrom<&[u8]> for FrameMut {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
anyhow::ensure!(
data.len() == size_of::<FrameBorrowed>(),
"invalid frame size"
);
// frames are relatively large (~4ko), we want to avoid allocating them on the stack and
// then copying them to the heap, and instead copy them to the heap directly.
let inner = unsafe {
let layout = Layout::new::<FrameBorrowed>();
let ptr = std::alloc::alloc(layout);
ptr.copy_from(data.as_ptr(), data.len());
Box::from_raw(ptr as *mut FrameBorrowed)
};

Ok(Self { inner })
}
}

impl From<FrameMut> for Frame {
fn from(value: FrameMut) -> Self {
// transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of
// [u8] divides the alignement of FrameBorrowed
let data = unsafe {
Vec::from_raw_parts(
Box::into_raw(value.inner) as *mut u8,
size_of::<FrameBorrowed>(),
size_of::<FrameBorrowed>(),
)
};

Self {
inner: Bytes::from(data),
}
}
}

Self { data: buf.freeze() }
impl From<FrameBorrowed> for FrameMut {
fn from(inner: FrameBorrowed) -> Self {
Self {
inner: Box::new(inner),
}
}
}

pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> {
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size");
Ok(Self { data })
impl Frame {
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
FrameBorrowed::from_parts(header, data).into()
}

pub fn bytes(&self) -> Bytes {
self.data.clone()
self.inner.clone()
}
}

impl From<FrameBorrowed> for Frame {
fn from(value: FrameBorrowed) -> Self {
FrameMut::from(value).into()
}
}

/// The borrowed version of Frame
#[repr(transparent)]
#[repr(C)]
#[derive(Pod, Zeroable, Copy, Clone)]
pub struct FrameBorrowed {
data: [u8],
header: FrameHeader,
page: [u8; LIBSQL_PAGE_SIZE as usize],
}

impl FrameBorrowed {
pub fn header(&self) -> Cow<FrameHeader> {
let data = &self.data[..size_of::<FrameHeader>()];
try_from_bytes(data)
.map(Cow::Borrowed)
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
}

/// Returns the bytes for this frame. Includes the header bytes.
pub fn as_slice(&self) -> &[u8] {
&self.data
}

pub fn from_bytes(data: &[u8]) -> &Self {
assert_eq!(data.len(), Frame::SIZE);
// SAFETY: &FrameBorrowed is equivalent to &[u8]
unsafe { transmute(data) }
bytes_of(self)
}

/// returns this frame's page data.
pub fn page(&self) -> &[u8] {
&self.data[size_of::<FrameHeader>()..]
&self.page
}

pub fn header(&self) -> &FrameHeader {
&self.header
}

pub fn header_mut(&mut self) -> &mut FrameHeader {
&mut self.header
}

pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self {
assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize);

FrameBorrowed {
header: *header,
page: page.try_into().unwrap(),
}
}
}

impl Deref for Frame {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
FrameBorrowed::from_bytes(&self.data)
from_bytes(&self.inner)
}
}

impl Deref for FrameMut {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl DerefMut for FrameMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut()
}
}
Loading