Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ panic = "abort"

[patch.crates-io]
tonic = { git = "https://github.com/hyperium/tonic" }
tonic-web = { git = "https://github.com/hyperium/tonic", branch = "lucio/grpc-web-client" }
tonic-build = { git = "https://github.com/hyperium/tonic" }
1 change: 1 addition & 0 deletions crates/replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ regex = "1.9.0"
serde_json = "1.0.103"
serde = { version = "1.0.173", features = ["serde_derive"] }
tonic = { version = "0.9", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tonic-web = "0.9"
prost = "0.11"
hyper = "0.14"
hyper-rustls = { version = "0.24", features = ["webpki-roots"] }
Expand Down
73 changes: 9 additions & 64 deletions crates/replication/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use hyper::{
client::{conn::SendRequest, HttpConnector},
Client,
};
use hyper::{client::HttpConnector, Client};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use tokio::sync::Mutex;
use tonic::body::BoxBody;
use tonic_web::{GrpcWebCall, GrpcWebClientService};
use tower::Service;

#[derive(Debug, Clone)]
pub struct H2cChannel {
client: Client<HttpsConnector<HttpConnector>>,
h2_client: Arc<Mutex<Option<SendRequest<BoxBody>>>>,
client: GrpcWebClientService<Client<HttpsConnector<HttpConnector>, GrpcWebCall<BoxBody>>>,
}

impl H2cChannel {
Expand All @@ -29,15 +24,15 @@ impl H2cChannel {
.build();

let client = Client::builder().build(https);
let h2_client = Arc::new(Mutex::new(None));
let client = GrpcWebClientService::new(client);

Self { client, h2_client }
Self { client }
}
}

impl Service<http::Request<BoxBody>> for H2cChannel {
type Response = http::Response<hyper::Body>;
type Error = anyhow::Error;
type Response = http::Response<GrpcWebCall<hyper::Body>>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;

Expand All @@ -46,57 +41,7 @@ impl Service<http::Request<BoxBody>> for H2cChannel {
}

fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
let client = self.client.clone();
let h2_client_lock = self.h2_client.clone();

Box::pin(async move {
let mut lock = h2_client_lock.lock().await;

if let Some(client) = &mut *lock {
let res = client.send_request(req).await;

// If we get an error from the request then we should throw away
// the client so that it can recreate. Most normal operational
// errors go through the response type and the Err's returned
// from hyper are for more fatal style errors.
if let Err(e) = &res {
tracing::debug!("Client error: {}, throwing it away", e);
*lock = None;
}

res.map_err(Into::into)
} else {
let origin = req.uri();

let h2c_req = hyper::Request::builder()
.uri(origin)
.header(http::header::UPGRADE, "h2c")
.body(hyper::Body::empty())
.unwrap();

let res = client.request(h2c_req).await?;

if res.status() != http::StatusCode::SWITCHING_PROTOCOLS {
anyhow::bail!("We did not get an http2 upgrade, status: {}", res.status());
}

let upgraded_io = hyper::upgrade::on(res).await?;

tracing::debug!("Upgraded connection to h2");

let (mut h2_client, conn) = hyper::client::conn::Builder::new()
.http2_only(true)
.http2_keep_alive_interval(std::time::Duration::from_secs(10))
.http2_keep_alive_while_idle(true)
.handshake(upgraded_io)
.await?;
tokio::spawn(conn);

let fut = h2_client.send_request(req);
*lock = Some(h2_client);

fut.await.map_err(Into::into)
}
})
let fut = self.client.call(req);
Box::pin(fut)
}
}
30 changes: 15 additions & 15 deletions crates/replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ pub use replica::snapshot::TempSnapshot;
use tonic::codegen::InterceptedService;
use tonic::metadata::{Ascii, MetadataValue};
use tonic::service::Interceptor;
use tonic::transport::Channel;
// use tonic::transport::Channel;

use uuid::Uuid;

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

// use crate::client::H2cChannel;
use crate::client::H2cChannel;
use crate::pb::HelloRequest;

type RpcClient = pb::ReplicationLogClient<InterceptedService<Channel, AuthInterceptor>>;
type RpcClient = pb::ReplicationLogClient<InterceptedService<H2cChannel, AuthInterceptor>>;

pub struct Replicator {
pub frames_sender: Sender<Frames>,
Expand Down Expand Up @@ -144,18 +144,18 @@ impl Replicator {
// TODO: Once fly fixes their proxy to correctly accept h2 we can drop
// the h2c client but for now lets keep this commented.
//
let channel = Channel::builder(
endpoint
.as_ref()
.try_into()
.context("Unable to convert endpoint into a Uri")?,
)
.http2_keep_alive_interval(std::time::Duration::from_secs(5))
.keep_alive_while_idle(true)
.tls_config(tonic::transport::ClientTlsConfig::new())?
.connect_lazy();

// let channel = H2cChannel::new();
//let channel = Channel::builder(
// endpoint
// .as_ref()
// .try_into()
// .context("Unable to convert endpoint into a Uri")?,
//)
//.http2_keep_alive_interval(std::time::Duration::from_secs(5))
//.keep_alive_while_idle(true)
//.tls_config(tonic::transport::ClientTlsConfig::new())?
//.connect_lazy();

let channel = H2cChannel::new();

let mut client = pb::ReplicationLogClient::with_origin(
InterceptedService::new(channel, AuthInterceptor(auth_token)),
Expand Down
4 changes: 2 additions & 2 deletions crates/replication/src/replica/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ unsafe impl WalHook for InjectorHook {
return LIBSQL_CONTINUE_REPLICATION as c_int;
}
}
_ => {
tracing::warn!("replication channel closed");
Err(e) => {
tracing::warn!("replication channel closed: {}", e);
return LIBSQL_EXIT_REPLICATION as c_int;
}
}
Expand Down