Skip to content

Commit

Permalink
Run rustfmt and clippy on everything
Browse files Browse the repository at this point in the history
Also add to github actions
  • Loading branch information
carlsverre committed Jan 6, 2024
1 parent 110562d commit 99404b8
Show file tree
Hide file tree
Showing 40 changed files with 609 additions and 595 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
version: "8"
run_install: true
# build, test, and package sqlsync
- name: Lint & Fmt
run: just lint
- name: Build all
run: just build
- name: Unit tests
Expand Down
5 changes: 2 additions & 3 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# https://rust-lang.github.io/rustfmt
overflow_delimited_expr = true
max_width = 80
struct_lit_width = 80
max_width = 100
struct_lit_width = 40
21 changes: 9 additions & 12 deletions demo/cloudflare-backend/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ impl Coordinator {

console_log!("creating new document with id {}", id);

let mut storage = MemoryJournal::open(id)
.map_err(|e| Error::RustError(e.to_string()))?;
let mut storage = MemoryJournal::open(id).map_err(|e| Error::RustError(e.to_string()))?;

// load the persistence layer
let persistence = Persistence::init(state.storage()).await?;
Expand All @@ -55,7 +54,11 @@ impl Coordinator {

Ok((
Self { accept_queue: accept_queue_tx },
CoordinatorTask { accept_queue: accept_queue_rx, persistence, doc },
CoordinatorTask {
accept_queue: accept_queue_rx,
persistence,
doc,
},
))
}

Expand Down Expand Up @@ -189,10 +192,7 @@ impl Client {
(Self { protocol, writer }, reader)
}

async fn start_replication(
&mut self,
doc: &Document,
) -> anyhow::Result<()> {
async fn start_replication(&mut self, doc: &Document) -> anyhow::Result<()> {
let msg = self.protocol.start(doc);
self.send_msg(msg).await
}
Expand Down Expand Up @@ -223,12 +223,9 @@ impl Client {
match msg {
Ok(Message::Bytes(bytes)) => {
let mut cursor = Cursor::new(bytes);
let msg: ReplicationMsg =
bincode::deserialize_from(&mut cursor)?;
let msg: ReplicationMsg = bincode::deserialize_from(&mut cursor)?;
console_log!("received message {:?}", msg);
if let Some(resp) =
self.protocol.handle(doc, msg, &mut cursor)?
{
if let Some(resp) = self.protocol.handle(doc, msg, &mut cursor)? {
self.send_msg(resp).await?;
}
Ok(())
Expand Down
52 changes: 17 additions & 35 deletions demo/cloudflare-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ impl DurableObject for DocumentCoordinator {

async fn fetch(&mut self, req: Request) -> Result<Response> {
// check that the Upgrade header is set and == "websocket"
let is_upgrade_req =
req.headers().get("Upgrade")?.unwrap_or("".into()) == "websocket";
let is_upgrade_req = req.headers().get("Upgrade")?.unwrap_or("".into()) == "websocket";
if !is_upgrade_req {
return Response::error("Bad Request", 400);
}
Expand All @@ -37,11 +36,10 @@ impl DurableObject for DocumentCoordinator {
if self.coordinator.is_none() {
// retrieve the reducer digest from the request url
let url = req.url()?;
let reducer_digest =
match url.query_pairs().find(|(k, _)| k == "reducer") {
Some((_, v)) => v,
None => return Response::error("Bad Request", 400),
};
let reducer_digest = match url.query_pairs().find(|(k, _)| k == "reducer") {
Some((_, v)) => v,
None => return Response::error("Bad Request", 400),
};
let bucket = self.env.bucket(REDUCER_BUCKET)?;
let object = bucket
.get(format!("{}.wasm", reducer_digest))
Expand All @@ -51,27 +49,19 @@ impl DurableObject for DocumentCoordinator {
Some(object) => {
object
.body()
.ok_or_else(|| {
Error::RustError(
"reducer not found in bucket".to_string(),
)
})?
.ok_or_else(|| Error::RustError("reducer not found in bucket".to_string()))?
.bytes()
.await?
}
None => {
return Response::error(
format!(
"reducer {} not found in bucket",
reducer_digest
),
format!("reducer {} not found in bucket", reducer_digest),
404,
)
}
};

let (coordinator, task) =
Coordinator::init(&self.state, reducer_bytes).await?;
let (coordinator, task) = Coordinator::init(&self.state, reducer_bytes).await?;
spawn_local(task.into_task());
self.coordinator = Some(coordinator);
}
Expand Down Expand Up @@ -106,11 +96,10 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
// upload a reducer to the bucket
let bucket = ctx.bucket(REDUCER_BUCKET)?;

let data_len: u64 =
match req.headers().get("Content-Length")?.map(|s| s.parse()) {
Some(Ok(len)) => len,
_ => return Response::error("Bad Request", 400),
};
let data_len: u64 = match req.headers().get("Content-Length")?.map(|s| s.parse()) {
Some(Ok(len)) => len,
_ => return Response::error("Bad Request", 400),
};
if data_len > 10 * 1024 * 1024 {
return Response::error("Payload Too Large", 413);
}
Expand All @@ -130,10 +119,8 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
.subtle();

// sha256 sum the data and convert to bs58
let digest = JsFuture::from(
subtle.digest_with_str_and_buffer_source("SHA-256", &data)?,
)
.await?;
let digest =
JsFuture::from(subtle.digest_with_str_and_buffer_source("SHA-256", &data)?).await?;

// convert digest to base58
let digest = bs58::encode(Uint8Array::new(&digest).to_vec())
Expand Down Expand Up @@ -164,8 +151,7 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
if let Some(name) = ctx.param("name") {
let namespace = ctx.durable_object(DURABLE_OBJECT_NAME)?;
// until SQLSync is stable, named doc resolution will periodically break when we increment this counter
let id =
namespace.id_from_name(&format!("sqlsync-1-{}", name))?;
let id = namespace.id_from_name(&format!("sqlsync-1-{}", name))?;
let id = object_id_to_journal_id(id)?;
Response::ok(id.to_base58())
} else {
Expand All @@ -176,15 +162,11 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
if let Some(id) = ctx.param("id") {
console_log!("forwarding request to document with id: {}", id);
let namespace = ctx.durable_object(DURABLE_OBJECT_NAME)?;
let id = JournalId::from_base58(&id)
.map_err(|e| Error::RustError(e.to_string()))?;
let id = JournalId::from_base58(id).map_err(|e| Error::RustError(e.to_string()))?;
let id = match namespace.id_from_string(&id.to_hex()) {
Ok(id) => id,
Err(e) => {
return Response::error(
format!("Invalid Durable Object ID: {}", e),
400,
)
return Response::error(format!("Invalid Durable Object ID: {}", e), 400)
}
};
let stub = id.get_stub()?;
Expand Down
5 changes: 0 additions & 5 deletions examples/reducer-guestbook/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ version.workspace = true
[lib]
crate-type = ["cdylib"]

[profile.release]
lto = true
strip = "debuginfo"
codegen-units = 1

[dependencies]
sqlsync-reducer = "0.2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ SQLSYNC_PROD_URL := "https://sqlsync.orbitinghail.workers.dev"
default:
@just --choose

lint:
cargo clippy --all-targets --all-features -- -D warnings
cargo fmt --check

unit-test:
cargo test

Expand Down
3 changes: 1 addition & 2 deletions lib/sqlsync-reducer/examples/guest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
log::info!("running query and execute at the same time");

let x: Option<i64> = None;
let query_future =
query!("SELECT * FROM foo WHERE bar = ?", "baz", 1, 1.23, x);
let query_future = query!("SELECT * FROM foo WHERE bar = ?", "baz", 1, 1.23, x);
let exec_future = execute!("SELECT * FROM foo WHERE bar = ?", "baz");

let (result, result2) = futures::join!(query_future, exec_future);
Expand Down
24 changes: 9 additions & 15 deletions lib/sqlsync-reducer/examples/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ fn main() -> anyhow::Result<()> {
.init()?;

// build guest.wasm using: `cargo build --target wasm32-unknown-unknown --example guest`
let wasm_bytes = include_bytes!(
"../../../target/wasm32-unknown-unknown/debug/examples/guest.wasm"
);
let wasm_bytes =
include_bytes!("../../../target/wasm32-unknown-unknown/debug/examples/guest.wasm");

let engine = Engine::default();
let module = Module::new(&engine, &wasm_bytes[..])?;
Expand All @@ -33,12 +32,11 @@ fn main() -> anyhow::Result<()> {
register_log_handler(&mut linker)?;

let mut store = Store::new(&engine, WasmFFI::uninitialized());
let instance =
linker.instantiate(&mut store, &module)?.start(&mut store)?;
let instance = linker.instantiate(&mut store, &module)?.start(&mut store)?;

// initialize the FFI
let ffi = WasmFFI::initialized(&store, &instance)?;
(*store.data_mut()) = ffi.clone();
(*store.data_mut()) = ffi;

// initialize the reducer
ffi.init_reducer(&mut store)?;
Expand Down Expand Up @@ -70,20 +68,16 @@ fn main() -> anyhow::Result<()> {
if sql == "FAIL" {
let ptr = ffi.encode(
&mut store,
&Err::<ExecResponse, _>(
ErrorResponse::SqliteError {
code: 1,
message: "error".to_string(),
},
),
&Err::<ExecResponse, _>(ErrorResponse::SqliteError {
code: 1,
message: "error".to_string(),
}),
)?;
responses.insert(id, ptr);
} else {
let ptr = ffi.encode(
&mut store,
&Ok::<_, ErrorResponse>(ExecResponse {
changes: 1,
}),
&Ok::<_, ErrorResponse>(ExecResponse { changes: 1 }),
)?;
responses.insert(id, ptr);
}
Expand Down
44 changes: 32 additions & 12 deletions lib/sqlsync-reducer/src/guest_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,20 @@ pub fn fbm() -> &'static mut FFIBufManager {
static ONCE: Once = Once::new();
unsafe {
ONCE.call_once(|| {
let singleton = FFIBufManager::new();
let singleton = FFIBufManager::default();
SINGLETON.write(singleton);
});
SINGLETON.assume_init_mut()
}
}

#[derive(Default)]
pub struct FFIBufManager {
// map from pointer to buffer to length of buffer
bufs: BTreeMap<FFIBufPtr, FFIBufLen>,
}

impl FFIBufManager {
pub fn new() -> Self {
Self {
bufs: BTreeMap::new(),
}
}

pub fn alloc(&mut self, len: FFIBufLen) -> FFIBufPtr {
let mut buf = Vec::with_capacity(len as usize);
let ptr = buf.as_mut_ptr();
Expand All @@ -40,7 +35,11 @@ impl FFIBufManager {
ptr
}

pub fn dealloc(&mut self, ptr: FFIBufPtr) {
/// frees the memory pointed to by ptr
///
/// # Safety
/// The pointer must have been allocated by FFIBufManager::alloc.
pub unsafe fn dealloc(&mut self, ptr: FFIBufPtr) {
self.consume(ptr);
// immediately drops the vec, freeing the memory
}
Expand All @@ -49,9 +48,13 @@ impl FFIBufManager {
*self.bufs.get(&ptr).unwrap()
}

pub fn consume(&mut self, ptr: FFIBufPtr) -> FFIBuf {
/// consumes the buffer pointed to by ptr and returns a Vec<u8> with the same contents.
///
/// # Safety
/// The pointer must have been allocated by FFIBufManager::alloc.
pub unsafe fn consume(&mut self, ptr: FFIBufPtr) -> FFIBuf {
let len = self.bufs.remove(&ptr).unwrap();
unsafe { Vec::from_raw_parts(ptr, len as usize, len as usize) }
Vec::from_raw_parts(ptr, len as usize, len as usize)
}

pub fn encode<T: Serialize>(&mut self, data: &T) -> Result<FFIBufPtr, bincode::Error> {
Expand All @@ -62,7 +65,20 @@ impl FFIBufManager {
Ok(ptr)
}

pub fn decode<T: DeserializeOwned>(&mut self, ptr: FFIBufPtr) -> Result<T, bincode::Error> {
/// decode will consume the raw memory pointed to by ptr and return a deserialized object.
/// After calling decode, manually deallocating the ptr is no longer needed.
///
/// # Errors
///
/// This function will return an error if deserialization fails. If this
/// happens the memory pointed to by the ptr will also be dropped.
///
/// # Safety
/// The pointer must have been allocated by FFIBufManager::alloc.
pub unsafe fn decode<T: DeserializeOwned>(
&mut self,
ptr: FFIBufPtr,
) -> Result<T, bincode::Error> {
let buf = self.consume(ptr);
bincode::deserialize(&buf)
}
Expand All @@ -73,8 +89,12 @@ pub fn ffi_buf_allocate(length: FFIBufLen) -> FFIBufPtr {
fbm().alloc(length)
}

/// ffi_buf_deallocate will immediately drop the buffer pointed to by the pointer, freeing the memory
///
/// # Safety
/// The pointer must have been allocated by ffi_buf_allocate or FFIBufManager::alloc.
#[no_mangle]
pub fn ffi_buf_deallocate(ptr: FFIBufPtr) {
pub unsafe fn ffi_buf_deallocate(ptr: FFIBufPtr) {
fbm().dealloc(ptr)
}

Expand Down
Loading

0 comments on commit 99404b8

Please sign in to comment.