Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions influxdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ repository = "https://github.com/Empty2k12/influxdb-rust"
travis-ci = { repository = "Empty2k12/influxdb-rust", branch = "master" }

[dependencies]
chrono = { version = "0.4.11", features = ["serde"] }
chrono = { version = "0.4.11", features = ["serde"] }
failure = "0.1.7"
futures = "0.3.4"
influxdb_derive = { version = "0.1.0", optional = true }
reqwest = { version = "0.10.4", features = ["json"] }
isahc = "^0.9"
http = "^0.2"
url = "^2.1"
serde = { version = "1.0.104", features = ["derive"], optional = true }
serde_json = { version = "1.0.48", optional = true }
regex = "1.3.5"
Expand All @@ -32,4 +33,11 @@ default = ["use-serde"]
derive = ["influxdb_derive"]

[dev-dependencies]
tokio = { version = "0.2.11", features = ["macros"] }
tokio = { version = "0.2.11", features = ["rt-threaded", "macros"] }
async-std = "1.5"
futures = "*"

[[test]]
name = "derive_integration_tests"
path = "tests/derive_integration_tests.rs"
required-features = ["derive"]
29 changes: 15 additions & 14 deletions influxdb/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
//! assert_eq!(client.database_name(), "test");
//! ```

use futures::prelude::*;
use reqwest::{self, Client as ReqwestClient, StatusCode, Url};
use http::StatusCode;
use isahc::prelude::*;
use url::Url;

use crate::query::QueryTypes;
use crate::Error;
Expand Down Expand Up @@ -128,7 +129,7 @@ impl Client {
///
/// Returns a tuple of build type and version number
pub async fn ping(&self) -> Result<(String, String), Error> {
let res = reqwest::get(format!("{}/ping", self.url).as_str())
let res = isahc::get_async(format!("{}/ping", self.url).as_str())
.await
.map_err(|err| Error::ProtocolError {
error: format!("{}", err),
Expand Down Expand Up @@ -191,7 +192,7 @@ impl Client {

let basic_parameters: Vec<(String, String)> = self.into();

let client = match q.into() {
let res = match q.into() {
QueryTypes::Read(_) => {
let read_query = query.get();
let mut url = Url::parse_with_params(
Expand All @@ -205,9 +206,9 @@ impl Client {
url.query_pairs_mut().append_pair("q", &read_query);

if read_query.contains("SELECT") || read_query.contains("SHOW") {
ReqwestClient::new().get(url)
isahc::get_async(url.as_str()).await
} else {
ReqwestClient::new().post(url)
isahc::post_async(url.as_str().to_owned(), "").await
}
}
QueryTypes::Write(write_query) => {
Expand All @@ -222,24 +223,24 @@ impl Client {
url.query_pairs_mut()
.append_pair("precision", &write_query.get_precision());

ReqwestClient::new().post(url).body(query.get())
isahc::post_async(url.as_str().to_owned(), query.get()).await
}
};

let res = client
.send()
.map_err(|err| Error::ConnectionError { error: err })
.await?;
let mut res = res.map_err(|err| Error::ConnectionError { error: err })?;

match res.status() {
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
_ => {}
}

let s = res.text().await.map_err(|_| Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})?;
let s = res
.text_async()
.await
.map_err(|_| Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})?;

// todo: improve error parsing without serde
if s.contains("\"error\"") {
Expand Down
7 changes: 2 additions & 5 deletions influxdb/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//! Errors that might happen in the crate
use reqwest;

#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "query is invalid: {}", error)]
Expand Down Expand Up @@ -32,9 +29,9 @@ pub enum Error {
AuthorizationError,

#[fail(display = "connection error: {}", error)]
/// Error happens when reqwest fails
/// Error happens when isahc fails
ConnectionError {
#[fail(cause)]
error: reqwest::Error,
error: isahc::Error,
},
}
17 changes: 9 additions & 8 deletions influxdb/src/integrations/serde_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
//! # }
//! ```

use reqwest::{Client as ReqwestClient, StatusCode, Url};
use http::StatusCode;
use isahc::prelude::*;
use url::Url;

use serde::{de::DeserializeOwned, Deserialize};
use serde_json;
Expand Down Expand Up @@ -95,7 +97,7 @@ impl Client {
pub async fn json_query(&self, q: ReadQuery) -> Result<DatabaseQueryResult, Error> {
let query = q.build().unwrap();
let basic_parameters: Vec<(String, String)> = self.into();
let client = {
let url = {
let read_query = query.get();

let mut url = match Url::parse_with_params(
Expand All @@ -121,11 +123,10 @@ impl Client {
return Err(error);
}

ReqwestClient::new().get(url.as_str())
url.as_str().to_owned()
};

let res = client
.send()
let mut res = isahc::get_async(url)
.await
.map_err(|err| Error::ConnectionError { error: err })?;

Expand All @@ -135,17 +136,17 @@ impl Client {
_ => {}
}

let body = res.bytes().await.map_err(|err| Error::ProtocolError {
let body = res.text_async().await.map_err(|err| Error::ProtocolError {
error: format!("{}", err),
})?;

// Try parsing InfluxDBs { "error": "error message here" }
if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) {
if let Ok(error) = serde_json::from_str::<_DatabaseError>(&body) {
return Err(Error::DatabaseError { error: error.error });
}

// Json has another structure, let's try actually parsing it to the type we're deserializing
serde_json::from_slice::<DatabaseQueryResult>(&body).map_err(|err| {
serde_json::from_str::<DatabaseQueryResult>(&body).map_err(|err| {
Error::DeserializationError {
error: format!("serde error: {}", err),
}
Expand Down
14 changes: 14 additions & 0 deletions influxdb/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,17 @@ async fn test_wrong_query_errors() {
"Should only build SELECT and SHOW queries."
);
}

/// INTEGRATION TEST
///
/// Test if async-std is available when asynchronous
#[test]
fn test_async_std_runtime() {
async_std::task::block_on(async {
let test_name = "mydb";
let client = Client::new("http://localhost:8086", test_name);
let read_query = Query::raw_read_query("SELECT * FROM \"weather\"");
let read_result = client.query(&read_query).await;
assert!(read_result.is_ok());
});
}