Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ bincode = "1.3.3"
bytes = { version = "1.4.0", features = ["serde"] }
clap = { version = "4.3.11", features = ["derive"] }
color-eyre = "0.6.2"
either = "1.8.1"
futures = "0.3.28"
hmac = "0.12.1"
hyper = { version = "0.14.27", features = ["h2", "server"] }
itertools = "0.11.0"
libsqlx = { version = "0.1.0", path = "../libsqlx" }
libsqlx = { version = "0.1.0", path = "../libsqlx", features = ["tokio"] }
moka = { version = "0.11.2", features = ["future"] }
parking_lot = "0.12.1"
priority-queue = "1.3.2"
Expand Down
63 changes: 41 additions & 22 deletions libsqlx-server/src/allocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use bytes::Bytes;
use either::Either;
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType, ReplicaType};
use libsqlx::proxy::WriteProxyDatabase;
use libsqlx::proxy::{WriteProxyConnection, WriteProxyDatabase};
use libsqlx::result_builder::ResultBuilder;
use libsqlx::{
Database as _, DescribeResponse, Frame, FrameNo, InjectableDatabase, Injector, LogReadError,
ReplicationLogger,
Expand All @@ -27,7 +29,11 @@ use self::config::{AllocConfig, DbConfig};

pub mod config;

type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;
type LibsqlConnection = Either<
libsqlx::libsql::LibsqlConnection<PrimaryType>,
WriteProxyConnection<libsqlx::libsql::LibsqlConnection<ReplicaType>, DummyConn>,
>;
type ExecFn = Box<dyn FnOnce(&mut LibsqlConnection) + Send>;

#[derive(Clone)]
pub struct ConnectionId {
Expand All @@ -47,10 +53,10 @@ pub struct DummyDb;
pub struct DummyConn;

impl libsqlx::Connection for DummyConn {
fn execute_program(
fn execute_program<B: ResultBuilder>(
&mut self,
_pgm: libsqlx::program::Program,
_result_builder: &mut dyn libsqlx::result_builder::ResultBuilder,
_pgm: &libsqlx::program::Program,
_result_builder: B,
) -> libsqlx::Result<()> {
todo!()
}
Expand Down Expand Up @@ -207,7 +213,12 @@ impl FrameStreamer {
if !self.buffer.is_empty() {
self.send_frames().await;
}
if self.notifier.wait_for(|fno| dbg!(*fno) >= self.next_frame_no).await.is_err() {
if self
.notifier
.wait_for(|fno| *fno >= self.next_frame_no)
.await
.is_err()
{
break;
}
}
Expand Down Expand Up @@ -244,7 +255,9 @@ impl Database {
path,
Compactor,
false,
Box::new(move |fno| { let _ = sender.send(fno); } ),
Box::new(move |fno| {
let _ = sender.send(fno);
}),
)
.unwrap();

Expand All @@ -253,7 +266,7 @@ impl Database {
replica_streams: HashMap::new(),
frame_notifier: receiver,
}
},
}
DbConfig::Replica { primary_node_id } => {
let rdb = LibsqlDatabase::new_replica(path, MAX_INJECTOR_BUFFER_CAP, ()).unwrap();
let wdb = DummyDb;
Expand Down Expand Up @@ -285,10 +298,10 @@ impl Database {
}
}

fn connect(&self) -> Box<dyn libsqlx::Connection + Send> {
fn connect(&self) -> LibsqlConnection {
match self {
Database::Primary { db, .. } => Box::new(db.connect().unwrap()),
Database::Replica { db, .. } => Box::new(db.connect().unwrap()),
Database::Primary { db, .. } => Either::Left(db.connect().unwrap()),
Database::Replica { db, .. } => Either::Right(db.connect().unwrap()),
}
}
}
Expand All @@ -315,11 +328,11 @@ pub struct ConnectionHandle {
impl ConnectionHandle {
pub async fn exec<F, R>(&self, f: F) -> crate::Result<R>
where
F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
F: for<'a> FnOnce(&'a mut LibsqlConnection) -> R + Send + 'static,
R: Send + 'static,
{
let (sender, ret) = oneshot::channel();
let cb = move |conn: &mut dyn libsqlx::Connection| {
let cb = move |conn: &mut LibsqlConnection| {
let res = f(conn);
let _ = sender.send(res);
};
Expand Down Expand Up @@ -371,9 +384,15 @@ impl Allocation {
Message::Handshake { .. } => unreachable!("handshake should have been caught earlier"),
Message::ReplicationHandshake { .. } => todo!(),
Message::ReplicationHandshakeResponse { .. } => todo!(),
Message::Replicate { req_no, next_frame_no } => match &mut self.database {
Database::Primary { db, replica_streams, frame_notifier } => {
dbg!(next_frame_no);
Message::Replicate {
req_no,
next_frame_no,
} => match &mut self.database {
Database::Primary {
db,
replica_streams,
frame_notifier,
} => {
let streamer = FrameStreamer {
logger: db.logger(),
database_id: DatabaseId::from_name(&self.db_name),
Expand All @@ -396,15 +415,15 @@ impl Allocation {
*old_req_no = req_no;
old_handle.abort();
}
},
}
Entry::Vacant(e) => {
let handle = tokio::spawn(streamer.run());
// For some reason, not yielding causes the task not to be spawned
tokio::task::yield_now().await;
e.insert((req_no, handle));
},
}
}
},
}
Database::Replica { .. } => todo!("not a primary!"),
},
Message::Frames(frames) => match &mut self.database {
Expand Down Expand Up @@ -459,7 +478,7 @@ impl Allocation {

struct Connection {
id: u32,
conn: Box<dyn libsqlx::Connection + Send>,
conn: LibsqlConnection,
exit: oneshot::Receiver<()>,
exec: mpsc::Receiver<ExecFn>,
}
Expand All @@ -470,7 +489,7 @@ impl Connection {
tokio::select! {
_ = &mut self.exit => break,
Some(exec) = self.exec.recv() => {
tokio::task::block_in_place(|| exec(&mut *self.conn));
tokio::task::block_in_place(|| exec(&mut self.conn));
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions libsqlx-server/src/hrana/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use super::stmt::{proto_stmt_to_query, stmt_error_from_sqld_error};
use super::{proto, ProtocolError, Version};

use color_eyre::eyre::anyhow;
use libsqlx::Connection;
use libsqlx::analysis::Statement;
use libsqlx::program::{Cond, Program, Step};
use libsqlx::query::{Params, Query};
use libsqlx::result_builder::{StepResult, StepResultsBuilder};
use tokio::sync::oneshot;

fn proto_cond_to_cond(cond: &proto::BatchCond, max_step_i: usize) -> color_eyre::Result<Cond> {
let try_convert_step = |step: i32| -> Result<usize, ProtocolError> {
Expand Down Expand Up @@ -73,15 +75,15 @@ pub async fn execute_batch(
db: &ConnectionHandle,
pgm: Program,
) -> color_eyre::Result<proto::BatchResult> {
let builder = db
let fut = db
.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = HranaBatchProtoBuilder::default();
conn.execute_program(pgm, &mut builder)?;
Ok(builder)
let (builder, ret) = HranaBatchProtoBuilder::new();
conn.execute_program(&pgm, builder)?;
Ok(ret)
})
.await??;

Ok(builder.into_ret())
Ok(fut.await?)
}

pub fn proto_sequence_to_program(sql: &str) -> color_eyre::Result<Program> {
Expand Down Expand Up @@ -110,17 +112,17 @@ pub fn proto_sequence_to_program(sql: &str) -> color_eyre::Result<Program> {
}

pub async fn execute_sequence(conn: &ConnectionHandle, pgm: Program) -> color_eyre::Result<()> {
let builder = conn
let fut = conn
.exec(move |conn| -> color_eyre::Result<_> {
let mut builder = StepResultsBuilder::default();
conn.execute_program(pgm, &mut builder)?;
let (snd, rcv) = oneshot::channel();
let builder = StepResultsBuilder::new(snd);
conn.execute_program(&pgm, builder)?;

Ok(builder)
Ok(rcv)
})
.await??;

builder
.into_ret()
fut.await?
.into_iter()
.try_for_each(|result| match result {
StepResult::Ok => Ok(()),
Expand Down
Loading