diff --git a/Cargo.toml b/Cargo.toml index 30aefa4..07b15b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,19 +22,24 @@ bytes = { version = "1.4.0", optional = true } anyhow = "1.0.69" hyper = { version = "0.14.27", optional = true, default-features = false } hyper-rustls = { version = "0.24.1", optional = true, features = ["http2"] } -hrana-client = { version = "0.3", optional = true } -hrana-client-proto = { version = "0.2" } +# hrana-client = { version = "0.3", optional = true } +hrana-client = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1", optional = true } +# hrana-client-proto = { version = "0.2" } +hrana-client-proto = { git = "https://github.com/libsql/hrana-client-rs.git", rev = "1cb37f1" } futures-util = { version = "0.3.21", optional = true } serde = "1.0.159" tracing = "0.1.37" futures = "0.3.28" fallible-iterator = "0.2.0" libsql = { version = "0.1.6", optional = true } +tower = { version = "0.4.13", features = ["make"] } +tokio = { version = "1", default-features = false, optional = true } [features] default = ["local_backend", "hrana_backend", "reqwest_backend", "mapping_names_to_values_in_rows"] workers_backend = ["worker", "futures-util"] -reqwest_backend = ["hyper", "hyper-rustls"] +reqwest_backend = ["hyper_backend"] +hyper_backend = ["hyper", "hyper-rustls", "tokio"] local_backend = ["libsql"] spin_backend = ["spin-sdk", "http", "bytes"] hrana_backend = ["hrana-client"] diff --git a/src/client.rs b/src/client.rs index 86fa83e..f846e10 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,9 @@ //! [Client] is the main structure to interact with the database. use anyhow::Result; +use hyper::{client::HttpConnector, Uri}; +use hyper::client::connect::Connection as HyperConnection; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower::{make::MakeConnection, Service}; use crate::{proto, BatchResult, ResultSet, Statement, SyncTransaction, Transaction}; @@ -9,7 +13,7 @@ static TRANSACTION_IDS: std::sync::atomic::AtomicU64 = std::sync::atomic::Atomic /// It's a convenience struct which allows implementing connect() /// with backends being passed as env parameters. #[derive(Debug)] -pub enum Client { +pub enum Client { #[cfg(feature = "local_backend")] Local(crate::local::Client), #[cfg(any( @@ -17,7 +21,7 @@ pub enum Client { feature = "workers_backend", feature = "spin_backend" ))] - Http(crate::http::Client), + Http(crate::http::Client), #[cfg(feature = "hrana_backend")] Hrana(crate::hrana::Client), Default, @@ -29,7 +33,7 @@ pub struct SyncClient { inner: Client, } -unsafe impl Send for Client {} +unsafe impl Send for Client {} impl Client { /// Executes a batch of independent SQL statements. @@ -267,38 +271,17 @@ impl Client { } } -impl Client { - /// Creates an in-memory database - /// - /// # Examples - /// - /// ``` - /// # async fn f() { - /// # use libsql_client::Config; - /// let db = libsql_client::Client::in_memory().unwrap(); - /// # } - /// ``` - #[cfg(feature = "local_backend")] - pub fn in_memory() -> anyhow::Result { - Ok(Client::Local(crate::local::Client::in_memory()?)) - } - - /// Establishes a database client based on [Config] struct - /// - /// # Examples - /// - /// ``` - /// # async fn f() { - /// # use libsql_client::Config; - /// let config = Config { - /// url: url::Url::parse("file:////tmp/example.db").unwrap(), - /// auth_token: None - /// }; - /// let db = libsql_client::Client::from_config(config).await.unwrap(); - /// # } - /// ``` - #[allow(unreachable_patterns)] - pub async fn from_config<'a>(mut config: Config) -> anyhow::Result { +impl Client +where + C: Service + Send + Clone + Sync + 'static, + C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: Into>, +{ + pub async fn from_config_with_connector(mut config: Config, connector: C) -> anyhow::Result> + where + C: MakeConnection, + { config.url = if config.url.scheme() == "libsql" { // We cannot use url::Url::set_scheme() because it prevents changing the scheme to http... // Safe to unwrap, because we know that the scheme is libsql @@ -318,7 +301,7 @@ impl Client { }, #[cfg(feature = "reqwest_backend")] "http" | "https" => { - let inner = crate::http::InnerClient::Reqwest(crate::hyper::HttpClient::new()); + let inner = crate::http::InnerClient::Reqwest(crate::hyper::HttpClient::with_connector(connector)); Client::Http(crate::http::Client::from_config(inner, config)?) }, #[cfg(feature = "workers_backend")] @@ -335,6 +318,44 @@ impl Client { }) } +} + +impl Client { + /// Creates an in-memory database + /// + /// # Examples + /// + /// ``` + /// # async fn f() { + /// # use libsql_client::Config; + /// let db = libsql_client::Client::in_memory().unwrap(); + /// # } + /// ``` + #[cfg(feature = "local_backend")] + pub fn in_memory() -> anyhow::Result { + Ok(Client::Local(crate::local::Client::in_memory()?)) + } + + /// Establishes a database client based on [Config] struct + /// + /// # Examples + /// + /// ``` + /// # async fn f() { + /// # use libsql_client::Config; + /// let config = Config { + /// url: url::Url::parse("file:////tmp/example.db").unwrap(), + /// auth_token: None + /// }; + /// let db = libsql_client::Client::from_config(config).await.unwrap(); + /// # } + /// ``` + #[allow(unreachable_patterns)] + pub async fn from_config(config: Config) -> anyhow::Result { + let connector = HttpConnector::new(); + Self::from_config_with_connector(config, connector).await + } + /// Establishes a database client based on environment variables /// /// # Env diff --git a/src/hrana.rs b/src/hrana.rs index 11f3169..0aad30f 100644 --- a/src/hrana.rs +++ b/src/hrana.rs @@ -1,5 +1,10 @@ use crate::client::Config; use anyhow::Result; +use hyper::Uri; +use hyper::client::HttpConnector; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tower::Service; use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; @@ -8,13 +13,14 @@ use crate::{utils, BatchResult, ResultSet, Statement}; /// Database client. This is the main structure used to /// communicate with the database. -pub struct Client { +pub struct Client { url: String, token: Option, client: hrana_client::Client, client_future: hrana_client::ConnFut, streams_for_transactions: RwLock>>, + connector: C, } impl std::fmt::Debug for Client { @@ -26,18 +32,22 @@ impl std::fmt::Debug for Client { } } -impl Client { - /// Creates a database client with JWT authentication. - /// - /// # Arguments - /// * `url` - URL of the database endpoint - /// * `token` - auth token - pub async fn new(url: impl Into, token: impl Into) -> Result { +impl Client +where + C: Service + Send + Clone + Sync + 'static, + C::Response: hyper::client::connect::Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: std::error::Error + Sync + Send + 'static, +{ + /// Same as `new`, but uses `connector` to create connections. + pub async fn new_with_connector(url: impl Into, token: impl Into, connector: C) -> Result + + { let token = token.into(); let token = if token.is_empty() { None } else { Some(token) }; let url = url.into(); - let (client, client_future) = hrana_client::Client::connect(&url, token.clone()).await?; + let (client, client_future) = hrana_client::Client::with_connector(&url, token.clone(), connector.clone()).await?; Ok(Self { url, @@ -45,16 +55,29 @@ impl Client { client, client_future, streams_for_transactions: RwLock::new(HashMap::new()), + connector, }) } pub async fn reconnect(&mut self) -> Result<()> { let (client, client_future) = - hrana_client::Client::connect(&self.url, self.token.clone()).await?; + hrana_client::Client::with_connector(&self.url, self.token.clone(), self.connector.clone()).await?; self.client = client; self.client_future = client_future; Ok(()) } +} + +impl Client { + /// Creates a database client with JWT authentication. + /// + /// # Arguments + /// * `url` - URL of the database endpoint + /// * `token` - auth token + pub async fn new(url: impl Into, token: impl Into) -> Result { + let connector = HttpConnector::new(); + Self::new_with_connector(url, token, connector).await + } /// Creates a database client, given a `Url` /// diff --git a/src/http.rs b/src/http.rs index efa2abf..5c8821a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,6 +1,9 @@ use crate::client::Config; -use crate::hyper::HttpsConnector; use anyhow::Result; +use hyper::Uri; +use hyper::client::connect::Connection as HyperConnection; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower::Service; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -17,7 +20,7 @@ struct Cookie { /// Generic HTTP client. Needs a helper function that actually sends /// the request. #[derive(Clone, Debug)] -pub struct Client { +pub struct Client { inner: InnerClient, cookies: Arc>>, url_for_queries: String, @@ -25,7 +28,7 @@ pub struct Client { } #[derive(Clone, Debug)] -pub enum InnerClient { +pub enum InnerClient { #[cfg(feature = "reqwest_backend")] Reqwest(crate::hyper::HttpClient), #[cfg(feature = "workers_backend")] @@ -35,7 +38,13 @@ pub enum InnerClient { Default, } -impl InnerClient { +impl InnerClient +where + C: Service + Send + Clone + Sync + 'static, + C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: Into>, +{ pub async fn send( &self, url: String, @@ -87,7 +96,7 @@ impl Client { )) } - pub fn from_env(inner: InnerClient) -> anyhow::Result { + pub fn from_env(inner: InnerClient) -> anyhow::Result { let url = std::env::var("LIBSQL_CLIENT_URL").map_err(|_| { anyhow::anyhow!("LIBSQL_CLIENT_URL variable should point to your sqld database") })?; @@ -97,7 +106,13 @@ impl Client { } } -impl Client { +impl Client +where + C: Service + Send + Clone + Sync + 'static, + C::Response: HyperConnection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: Into>, +{ fn into_hrana(stmt: Statement) -> crate::proto::Stmt { let mut hrana_stmt = crate::proto::Stmt::new(stmt.sql, true); for param in stmt.args { diff --git a/src/hyper.rs b/src/hyper.rs index f7ec0a3..ee0fdf5 100644 --- a/src/hyper.rs +++ b/src/hyper.rs @@ -1,25 +1,16 @@ use anyhow::Result; -use hyper::{client::HttpConnector, Request, Body, StatusCode, body::{to_bytes, HttpBody}}; +use hyper::{Request, Body, StatusCode, Uri, }; +use hyper::body::{to_bytes, HttpBody}; +use hyper::client::{HttpConnector, connect::Connection}; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower::Service; use crate::proto::pipeline; -pub(crate) type HttpsConnector = hyper_rustls::HttpsConnector; - #[derive(Clone, Debug)] -pub struct HttpClient { - inner: hyper::client::Client, -} - -impl HttpClient { - pub fn new() -> Self { - let http_connector = HttpConnector::new(); - let https_connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http2() - .wrap_connector(http_connector); - Self::with_connector(https_connector) - } +pub struct HttpClient { + inner: hyper::client::Client>, } pub async fn to_text(body: T) -> anyhow::Result @@ -31,15 +22,32 @@ where Ok(String::from_utf8(bytes.to_vec())?) } +impl HttpClient { + pub fn new() -> Self { + let connector = HttpConnector::new(); + Self::with_connector(connector) + } +} + impl HttpClient where - C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: Service + Send + Clone + Sync + 'static, + C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static, + C::Future: Send + 'static, + C::Error: Into>, { + /// Creates an HttpClient using the provided connector. pub fn with_connector(connector: C) -> Self { - let inner = hyper::client::Client::builder().build(connector); + let connector = HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http2() + .wrap_connector(connector); - Self { inner } + let builder = hyper::client::Client::builder(); + let inner = builder.build(connector); + Self { inner } } pub async fn send(