Skip to content

Commit

Permalink
switch from reqwest to hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 11, 2023
1 parent 0aaa30e commit 6c32c00
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 75 deletions.
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@ bytes = { version = "1.4.0", optional = true }
anyhow = "1.0.69"
hyper = { version = "0.14.27", optional = true, default-features = false }
hyper-rustls = { version = "0.24.1", optional = true, features = ["http2"] }
hrana-client = { version = "0.3", optional = true }
hrana-client-proto = { version = "0.2" }
# hrana-client = { version = "0.3", optional = true }
hrana-client = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1", optional = true }
# hrana-client-proto = { version = "0.2" }
hrana-client-proto = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1" }
futures-util = { version = "0.3.21", optional = true }
serde = "1.0.159"
tracing = "0.1.37"
futures = "0.3.28"
fallible-iterator = "0.2.0"
libsql = { version = "0.1.6", optional = true }
tower = { version = "0.4.13", features = ["make"] }
tokio = { version = "1", default-features = false, optional = true }

[features]
default = ["local_backend", "hrana_backend", "reqwest_backend", "mapping_names_to_values_in_rows"]
workers_backend = ["worker", "futures-util"]
reqwest_backend = ["hyper", "hyper-rustls"]
reqwest_backend = ["hyper_backend"]
hyper_backend = ["hyper", "hyper-rustls", "tokio"]
local_backend = ["libsql"]
spin_backend = ["spin-sdk", "http", "bytes"]
hrana_backend = ["hrana-client"]
Expand Down
93 changes: 57 additions & 36 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! [Client] is the main structure to interact with the database.
use anyhow::Result;
use hyper::{client::HttpConnector, Uri};
use hyper::client::connect::Connection as HyperConnection;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{make::MakeConnection, Service};

use crate::{proto, BatchResult, ResultSet, Statement, SyncTransaction, Transaction};

Expand All @@ -9,15 +13,15 @@ static TRANSACTION_IDS: std::sync::atomic::AtomicU64 = std::sync::atomic::Atomic
/// It's a convenience struct which allows implementing connect()
/// with backends being passed as env parameters.
#[derive(Debug)]
pub enum Client {
pub enum Client<C = HttpConnector> {
#[cfg(feature = "local_backend")]
Local(crate::local::Client),
#[cfg(any(
feature = "reqwest_backend",
feature = "workers_backend",
feature = "spin_backend"
))]
Http(crate::http::Client),
Http(crate::http::Client<C>),
#[cfg(feature = "hrana_backend")]
Hrana(crate::hrana::Client),
Default,
Expand All @@ -29,7 +33,7 @@ pub struct SyncClient {
inner: Client,
}

unsafe impl Send for Client {}
unsafe impl<C: Send> Send for Client<C> {}

impl Client {
/// Executes a batch of independent SQL statements.
Expand Down Expand Up @@ -267,38 +271,17 @@ impl Client {
}
}

impl Client {
/// Creates an in-memory database
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let db = libsql_client::Client::in_memory().unwrap();
/// # }
/// ```
#[cfg(feature = "local_backend")]
pub fn in_memory() -> anyhow::Result<Client> {
Ok(Client::Local(crate::local::Client::in_memory()?))
}

/// Establishes a database client based on [Config] struct
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let config = Config {
/// url: url::Url::parse("file:////tmp/example.db").unwrap(),
/// auth_token: None
/// };
/// let db = libsql_client::Client::from_config(config).await.unwrap();
/// # }
/// ```
#[allow(unreachable_patterns)]
pub async fn from_config<'a>(mut config: Config) -> anyhow::Result<Client> {
impl<C> Client<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
pub async fn from_config_with_connector(mut config: Config, connector: C) -> anyhow::Result<Client<C>>
where
C: MakeConnection<Uri>,
{
config.url = if config.url.scheme() == "libsql" {
// We cannot use url::Url::set_scheme() because it prevents changing the scheme to http...
// Safe to unwrap, because we know that the scheme is libsql
Expand All @@ -318,7 +301,7 @@ impl Client {
},
#[cfg(feature = "reqwest_backend")]
"http" | "https" => {
let inner = crate::http::InnerClient::Reqwest(crate::hyper::HttpClient::new());
let inner = crate::http::InnerClient::Reqwest(crate::hyper::HttpClient::with_connector(connector));
Client::Http(crate::http::Client::from_config(inner, config)?)
},
#[cfg(feature = "workers_backend")]
Expand All @@ -335,6 +318,44 @@ impl Client {
})
}

}

impl Client {
/// Creates an in-memory database
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let db = libsql_client::Client::in_memory().unwrap();
/// # }
/// ```
#[cfg(feature = "local_backend")]
pub fn in_memory() -> anyhow::Result<Client> {
Ok(Client::Local(crate::local::Client::in_memory()?))
}

/// Establishes a database client based on [Config] struct
///
/// # Examples
///
/// ```
/// # async fn f() {
/// # use libsql_client::Config;
/// let config = Config {
/// url: url::Url::parse("file:////tmp/example.db").unwrap(),
/// auth_token: None
/// };
/// let db = libsql_client::Client::from_config(config).await.unwrap();
/// # }
/// ```
#[allow(unreachable_patterns)]
pub async fn from_config(config: Config) -> anyhow::Result<Client> {
let connector = HttpConnector::new();
Self::from_config_with_connector(config, connector).await
}

/// Establishes a database client based on environment variables
///
/// # Env
Expand Down
43 changes: 33 additions & 10 deletions src/hrana.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use crate::client::Config;
use anyhow::Result;
use hyper::Uri;
use hyper::client::HttpConnector;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tower::Service;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
Expand All @@ -8,13 +13,14 @@ use crate::{utils, BatchResult, ResultSet, Statement};

/// Database client. This is the main structure used to
/// communicate with the database.
pub struct Client {
pub struct Client<C = HttpConnector> {
url: String,
token: Option<String>,

client: hrana_client::Client,
client_future: hrana_client::ConnFut,
streams_for_transactions: RwLock<HashMap<u64, Arc<hrana_client::Stream>>>,
connector: C,
}

impl std::fmt::Debug for Client {
Expand All @@ -26,35 +32,52 @@ impl std::fmt::Debug for Client {
}
}

impl Client {
/// Creates a database client with JWT authentication.
///
/// # Arguments
/// * `url` - URL of the database endpoint
/// * `token` - auth token
pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
impl<C> Client<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: std::error::Error + Sync + Send + 'static,
{
/// Same as `new`, but uses `connector` to create connections.
pub async fn new_with_connector(url: impl Into<String>, token: impl Into<String>, connector: C) -> Result<Self>

{
let token = token.into();
let token = if token.is_empty() { None } else { Some(token) };
let url = url.into();

let (client, client_future) = hrana_client::Client::connect(&url, token.clone()).await?;
let (client, client_future) = hrana_client::Client::with_connector(&url, token.clone(), connector.clone()).await?;

Ok(Self {
url,
token,
client,
client_future,
streams_for_transactions: RwLock::new(HashMap::new()),
connector,
})
}

pub async fn reconnect(&mut self) -> Result<()> {
let (client, client_future) =
hrana_client::Client::connect(&self.url, self.token.clone()).await?;
hrana_client::Client::with_connector(&self.url, self.token.clone(), self.connector.clone()).await?;
self.client = client;
self.client_future = client_future;
Ok(())
}
}

impl Client {
/// Creates a database client with JWT authentication.
///
/// # Arguments
/// * `url` - URL of the database endpoint
/// * `token` - auth token
pub async fn new(url: impl Into<String>, token: impl Into<String>) -> Result<Self> {
let connector = HttpConnector::new();
Self::new_with_connector(url, token, connector).await
}

/// Creates a database client, given a `Url`
///
Expand Down
27 changes: 21 additions & 6 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::client::Config;
use crate::hyper::HttpsConnector;
use anyhow::Result;
use hyper::Uri;
use hyper::client::connect::Connection as HyperConnection;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::Service;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

Expand All @@ -17,15 +20,15 @@ struct Cookie {
/// Generic HTTP client. Needs a helper function that actually sends
/// the request.
#[derive(Clone, Debug)]
pub struct Client<C = HttpsConnector> {
pub struct Client<C> {
inner: InnerClient<C>,
cookies: Arc<RwLock<HashMap<u64, Cookie>>>,
url_for_queries: String,
auth: String,
}

#[derive(Clone, Debug)]
pub enum InnerClient<C = HttpsConnector> {
pub enum InnerClient<C> {
#[cfg(feature = "reqwest_backend")]
Reqwest(crate::hyper::HttpClient<C>),
#[cfg(feature = "workers_backend")]
Expand All @@ -35,7 +38,13 @@ pub enum InnerClient<C = HttpsConnector> {
Default,
}

impl InnerClient {
impl<C> InnerClient<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
pub async fn send(
&self,
url: String,
Expand Down Expand Up @@ -87,7 +96,7 @@ impl<C> Client<C> {
))
}

pub fn from_env(inner: InnerClient) -> anyhow::Result<Client> {
pub fn from_env(inner: InnerClient<C>) -> anyhow::Result<Self> {
let url = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| {
anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your sqld database")
})?;
Expand All @@ -97,7 +106,13 @@ impl<C> Client<C> {
}
}

impl Client {
impl<C> Client<C>
where
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
fn into_hrana(stmt: Statement) -> crate::proto::Stmt {
let mut hrana_stmt = crate::proto::Stmt::new(stmt.sql, true);
for param in stmt.args {
Expand Down
48 changes: 28 additions & 20 deletions src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
use anyhow::Result;
use hyper::{client::HttpConnector, Request, Body, StatusCode, body::{to_bytes, HttpBody}};
use hyper::{Request, Body, StatusCode, Uri, };
use hyper::body::{to_bytes, HttpBody};
use hyper::client::{HttpConnector, connect::Connection};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use tokio::io::{AsyncRead, AsyncWrite};
use tower::Service;

use crate::proto::pipeline;

pub(crate) type HttpsConnector<H = HttpConnector> = hyper_rustls::HttpsConnector<H>;

#[derive(Clone, Debug)]
pub struct HttpClient<C = HttpsConnector> {
inner: hyper::client::Client<C>,
}

impl HttpClient {
pub fn new() -> Self {
let http_connector = HttpConnector::new();
let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http2()
.wrap_connector(http_connector);
Self::with_connector(https_connector)
}
pub struct HttpClient<C = HttpConnector> {
inner: hyper::client::Client<HttpsConnector<C>>,
}

pub async fn to_text<T>(body: T) -> anyhow::Result<String>
Expand All @@ -31,15 +22,32 @@ where
Ok(String::from_utf8(bytes.to_vec())?)
}

impl HttpClient {
pub fn new() -> Self {
let connector = HttpConnector::new();
Self::with_connector(connector)
}
}

impl<C> HttpClient<C>
where
C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
C: Service<Uri> + Send + Clone + Sync + 'static,
C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
/// Creates an HttpClient using the provided connector.
pub fn with_connector(connector: C) -> Self {
let inner = hyper::client::Client::builder().build(connector);
let connector = HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http2()
.wrap_connector(connector);

Self { inner }
let builder = hyper::client::Client::builder();
let inner = builder.build(connector);

Self { inner }
}

pub async fn send(
Expand Down

0 comments on commit 6c32c00

Please sign in to comment.