Skip to content

Commit

Permalink
refactor!: use Service generic in transport connections (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed May 13, 2024
2 parents df54382 + 5bfae3c commit 64ed5ef
Show file tree
Hide file tree
Showing 19 changed files with 212 additions and 215 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ concurrency:
cancel-in-progress: true

env:
MSRV: "1.63"
MSRV: "1.76"
RUST_BACKTRACE: 1
RUSTFLAGS: -Dwarnings

Expand Down
51 changes: 21 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ repository = "https://github.com/n0-computer/quic-rpc"
description = "A streaming rpc system based on quic"

# Sadly this also needs to be updated in .github/workflows/ci.yml
rust-version = "1.65"
rust-version = "1.76"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bincode = { version = "1.3.3", optional = true }
bytes = { version = "1", optional = true }
derive_more = "0.99.17"
flume = { version = "0.11", optional = true }
futures-lite = "2.3.0"
futures-sink = "0.3.30"
Expand All @@ -39,7 +40,7 @@ version = "0.4.20"
[dev-dependencies]
anyhow = "1.0.73"
async-stream = "0.3.3"
derive_more = { version = "1.0.0-beta.1", features = ["full"] }

serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
quinn = { package = "iroh-quinn", version = "0.10" }
Expand All @@ -52,7 +53,7 @@ proc-macro2 = "1.0.66"
futures-buffered = "0.2.4"

[features]
hyper-transport = ["flume", "hyper", "bincode", "bytes"]
hyper-transport = ["flume", "hyper", "bincode", "bytes", "tokio-serde", "tokio-util"]
quinn-transport = ["flume", "quinn", "bincode", "tokio-serde", "tokio-util"]
flume-transport = ["flume"]
combined-transport = []
Expand Down
6 changes: 3 additions & 3 deletions examples/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl Fs {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let fs = Fs;
let (server, client) = quic_rpc::transport::flume::connection(1);
let client = RpcClient::<IoService, _>::new(client);
let server = RpcServer::<IoService, _>::new(server);
let (server, client) = quic_rpc::transport::flume::connection::<IoService>(1);
let client = RpcClient::new(client);
let server = RpcServer::new(server);
let handle = tokio::task::spawn(async move {
for _ in 0..1 {
let (req, chan) = server.accept().await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ create_store_dispatch!(Store, dispatch_store_request);

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (server, client) = flume::connection::<StoreRequest, StoreResponse>(1);
let (server, client) = flume::connection::<StoreService>(1);
let server_handle = tokio::task::spawn(async move {
let target = Store;
run_server_loop(StoreService, server, target, dispatch_store_request).await
Expand Down
2 changes: 1 addition & 1 deletion examples/modularize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use app::AppService;
async fn main() -> Result<()> {
// Spawn an inmemory connection.
// Could use quic equally (all code in this example is generic over the transport)
let (server_conn, client_conn) = flume::connection::<app::Request, app::Response>(1);
let (server_conn, client_conn) = flume::connection::<AppService>(1);

// spawn the server
let handler = app::Handler::default();
Expand Down
11 changes: 5 additions & 6 deletions examples/split/client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(unknown_lints, non_local_definitions)]
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::RpcClient;
use quinn::{ClientConfig, Endpoint};
use std::io;
Expand All @@ -14,12 +16,9 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let server_addr: SocketAddr = "127.0.0.1:12345".parse()?;
let endpoint = make_insecure_client_endpoint("0.0.0.0:0".parse()?)?;
let client = quic_rpc::transport::quinn::QuinnConnection::new(
endpoint,
server_addr,
"localhost".to_string(),
);
let client = RpcClient::<ComputeService, _>::new(client);
let client =
QuinnConnection::<ComputeService>::new(endpoint, server_addr, "localhost".to_string());
let client = RpcClient::new(client);
// let mut client = ComputeClient(client);

// a rpc call
Expand Down
3 changes: 2 additions & 1 deletion examples/split/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_stream::stream;
use futures::stream::{Stream, StreamExt};
use quic_rpc::server::run_server_loop;
use quic_rpc::transport::quinn::QuinnServerEndpoint;
use quinn::{Endpoint, ServerConfig};
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -61,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let server_addr: SocketAddr = "127.0.0.1:12345".parse()?;
let (server, _server_certs) = make_server_endpoint(server_addr)?;
let channel = quic_rpc::transport::quinn::QuinnServerEndpoint::new(server)?;
let channel = QuinnServerEndpoint::<ComputeService>::new(server)?;
let target = Compute;
run_server_loop(
ComputeService,
Expand Down
2 changes: 1 addition & 1 deletion examples/split/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
derive_more = { version = "1.0.0-beta.1", features = ["from", "display", "try_into"] }
futures = "0.3.26"
quic-rpc = { path = "../../..", features = ["macros"] }
serde = { version = "1", features = ["derive"] }
derive_more = "0.99.17"
10 changes: 8 additions & 2 deletions examples/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async fn main() -> anyhow::Result<()> {
}
}

let (server, client) = flume::connection::<StoreRequest, StoreResponse>(1);
let (server, client) = flume::connection::<StoreService>(1);
let client = RpcClient::<StoreService, _>::new(client);
let server = RpcServer::<StoreService, _>::new(server);
let server_handle = tokio::task::spawn(server_future(server));
Expand Down Expand Up @@ -231,7 +231,13 @@ async fn main() -> anyhow::Result<()> {
}

async fn _main_unsugared() -> anyhow::Result<()> {
let (server, client) = flume::connection::<u64, String>(1);
#[derive(Clone, Debug)]
struct Service;
impl crate::Service for Service {
type Req = u64;
type Res = String;
}
let (server, client) = flume::connection::<Service>(1);
let to_string_service = tokio::spawn(async move {
let (mut send, mut recv) = server.accept_bi().await?;
while let Some(item) = recv.next().await {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
//! }
//!
//! // create a transport channel, here a memory channel for testing
//! let (server, client) = quic_rpc::transport::flume::connection::<PingRequest, PingResponse>(1);
//! let (server, client) = quic_rpc::transport::flume::connection::<PingService>(1);
//!
//! // client side
//! // create the rpc client given the channel and the service type
Expand Down
Loading

0 comments on commit 64ed5ef

Please sign in to comment.