diff --git a/influxdb/Cargo.toml b/influxdb/Cargo.toml index f0c9bfd..9c1cb86 100644 --- a/influxdb/Cargo.toml +++ b/influxdb/Cargo.toml @@ -16,11 +16,12 @@ repository = "https://github.com/Empty2k12/influxdb-rust" travis-ci = { repository = "Empty2k12/influxdb-rust", branch = "master" } [dependencies] -chrono = { version = "0.4.11", features = ["serde"] } +chrono = { version = "0.4.11", features = ["serde"] } failure = "0.1.7" -futures = "0.3.4" influxdb_derive = { version = "0.1.0", optional = true } -reqwest = { version = "0.10.4", features = ["json"] } +isahc = "^0.9" +http = "^0.2" +url = "^2.1" serde = { version = "1.0.104", features = ["derive"], optional = true } serde_json = { version = "1.0.48", optional = true } regex = "1.3.5" @@ -32,4 +33,11 @@ default = ["use-serde"] derive = ["influxdb_derive"] [dev-dependencies] -tokio = { version = "0.2.11", features = ["macros"] } +tokio = { version = "0.2.11", features = ["rt-threaded", "macros"] } +async-std = "1.5" +futures = "*" + +[[test]] +name = "derive_integration_tests" +path = "tests/derive_integration_tests.rs" +required-features = ["derive"] diff --git a/influxdb/src/client/mod.rs b/influxdb/src/client/mod.rs index d1c224b..09d2d06 100644 --- a/influxdb/src/client/mod.rs +++ b/influxdb/src/client/mod.rs @@ -15,8 +15,9 @@ //! assert_eq!(client.database_name(), "test"); //! ``` -use futures::prelude::*; -use reqwest::{self, Client as ReqwestClient, StatusCode, Url}; +use http::StatusCode; +use isahc::prelude::*; +use url::Url; use crate::query::QueryTypes; use crate::Error; @@ -128,7 +129,7 @@ impl Client { /// /// Returns a tuple of build type and version number pub async fn ping(&self) -> Result<(String, String), Error> { - let res = reqwest::get(format!("{}/ping", self.url).as_str()) + let res = isahc::get_async(format!("{}/ping", self.url).as_str()) .await .map_err(|err| Error::ProtocolError { error: format!("{}", err), @@ -191,7 +192,7 @@ impl Client { let basic_parameters: Vec<(String, String)> = self.into(); - let client = match q.into() { + let res = match q.into() { QueryTypes::Read(_) => { let read_query = query.get(); let mut url = Url::parse_with_params( @@ -205,9 +206,9 @@ impl Client { url.query_pairs_mut().append_pair("q", &read_query); if read_query.contains("SELECT") || read_query.contains("SHOW") { - ReqwestClient::new().get(url) + isahc::get_async(url.as_str()).await } else { - ReqwestClient::new().post(url) + isahc::post_async(url.as_str().to_owned(), "").await } } QueryTypes::Write(write_query) => { @@ -222,14 +223,11 @@ impl Client { url.query_pairs_mut() .append_pair("precision", &write_query.get_precision()); - ReqwestClient::new().post(url).body(query.get()) + isahc::post_async(url.as_str().to_owned(), query.get()).await } }; - let res = client - .send() - .map_err(|err| Error::ConnectionError { error: err }) - .await?; + let mut res = res.map_err(|err| Error::ConnectionError { error: err })?; match res.status() { StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError), @@ -237,9 +235,12 @@ impl Client { _ => {} } - let s = res.text().await.map_err(|_| Error::DeserializationError { - error: "response could not be converted to UTF-8".to_string(), - })?; + let s = res + .text_async() + .await + .map_err(|_| Error::DeserializationError { + error: "response could not be converted to UTF-8".to_string(), + })?; // todo: improve error parsing without serde if s.contains("\"error\"") { diff --git a/influxdb/src/error.rs b/influxdb/src/error.rs index 2f23b36..40fdbbc 100644 --- a/influxdb/src/error.rs +++ b/influxdb/src/error.rs @@ -1,6 +1,3 @@ -//! Errors that might happen in the crate -use reqwest; - #[derive(Debug, Fail)] pub enum Error { #[fail(display = "query is invalid: {}", error)] @@ -32,9 +29,9 @@ pub enum Error { AuthorizationError, #[fail(display = "connection error: {}", error)] - /// Error happens when reqwest fails + /// Error happens when isahc fails ConnectionError { #[fail(cause)] - error: reqwest::Error, + error: isahc::Error, }, } diff --git a/influxdb/src/integrations/serde_integration.rs b/influxdb/src/integrations/serde_integration.rs index 1c941a1..1e9ce71 100644 --- a/influxdb/src/integrations/serde_integration.rs +++ b/influxdb/src/integrations/serde_integration.rs @@ -46,7 +46,9 @@ //! # } //! ``` -use reqwest::{Client as ReqwestClient, StatusCode, Url}; +use http::StatusCode; +use isahc::prelude::*; +use url::Url; use serde::{de::DeserializeOwned, Deserialize}; use serde_json; @@ -95,7 +97,7 @@ impl Client { pub async fn json_query(&self, q: ReadQuery) -> Result { let query = q.build().unwrap(); let basic_parameters: Vec<(String, String)> = self.into(); - let client = { + let url = { let read_query = query.get(); let mut url = match Url::parse_with_params( @@ -121,11 +123,10 @@ impl Client { return Err(error); } - ReqwestClient::new().get(url.as_str()) + url.as_str().to_owned() }; - let res = client - .send() + let mut res = isahc::get_async(url) .await .map_err(|err| Error::ConnectionError { error: err })?; @@ -135,17 +136,17 @@ impl Client { _ => {} } - let body = res.bytes().await.map_err(|err| Error::ProtocolError { + let body = res.text_async().await.map_err(|err| Error::ProtocolError { error: format!("{}", err), })?; // Try parsing InfluxDBs { "error": "error message here" } - if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) { + if let Ok(error) = serde_json::from_str::<_DatabaseError>(&body) { return Err(Error::DatabaseError { error: error.error }); } // Json has another structure, let's try actually parsing it to the type we're deserializing - serde_json::from_slice::(&body).map_err(|err| { + serde_json::from_str::(&body).map_err(|err| { Error::DeserializationError { error: format!("serde error: {}", err), } diff --git a/influxdb/tests/integration_tests.rs b/influxdb/tests/integration_tests.rs index d69447e..e41b6dc 100644 --- a/influxdb/tests/integration_tests.rs +++ b/influxdb/tests/integration_tests.rs @@ -499,3 +499,17 @@ async fn test_wrong_query_errors() { "Should only build SELECT and SHOW queries." ); } + +/// INTEGRATION TEST +/// +/// Test if async-std is available when asynchronous +#[test] +fn test_async_std_runtime() { + async_std::task::block_on(async { + let test_name = "mydb"; + let client = Client::new("http://localhost:8086", test_name); + let read_query = Query::raw_read_query("SELECT * FROM \"weather\""); + let read_result = client.query(&read_query).await; + assert!(read_result.is_ok()); + }); +}