Skip to content

Commit

Permalink
influxdb2
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkar598 committed Jul 18, 2023
1 parent daeeedf commit 858e560
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 3 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ rayon = { version = "1.5", optional = true }
dbpnoise = { version = "0.1.2", optional = true }
pathfinding = { version = "3.0.13", optional = true }
num = { version = "0.4.0", optional = true }
concat-string = { version = "1.0.1", optional = true }

[features]
default = [
Expand Down Expand Up @@ -101,6 +102,7 @@ hash = [
"serde",
"serde_json",
]
influxdb2 = ["concat-string", "serde", "serde_json", "http"]
pathfinder = ["num", "pathfinding", "serde", "serde_json"]
redis_pubsub = ["flume", "redis", "serde", "serde_json"]
unzip = ["zip", "jobs"]
Expand Down
1 change: 1 addition & 0 deletions dmsrc/influxdb2.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#define rustg_influxdb2_publish(data, endpoint, token) RUSTG_CALL(RUST_G, "influxdb2_publish")(data, endpoint, token)
3 changes: 3 additions & 0 deletions dmsrc/time.dm
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@

/proc/rustg_unix_timestamp()
return text2num(RUSTG_CALL(RUST_G, "unix_timestamp")())

/proc/rustg_unix_timestamp_int()
return RUSTG_CALL(RUST_G, "unix_timestamp_int")()
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub enum Error {
#[cfg(feature = "hash")]
#[error("Unable to decode hex value.")]
HexDecode,
#[cfg(feature = "influxdb2")]
#[error("Invalid metrics format")]
InvalidMetrics,
}

impl From<Utf8Error> for Error {
Expand Down
6 changes: 3 additions & 3 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ pub static HTTP_CLIENT: Lazy<reqwest::blocking::Client> = Lazy::new(setup_http_c
// ----------------------------------------------------------------------------
// Request construction and execution

struct RequestPrep {
pub struct RequestPrep {
req: reqwest::blocking::RequestBuilder,
output_filename: Option<String>,
}

fn construct_request(
pub fn construct_request(
method: &str,
url: &str,
body: &str,
Expand Down Expand Up @@ -130,7 +130,7 @@ fn construct_request(
})
}

fn submit_request(prep: RequestPrep) -> Result<String> {
pub fn submit_request(prep: RequestPrep) -> Result<String> {
let mut response = prep.req.send()?;

let body;
Expand Down
59 changes: 59 additions & 0 deletions src/influxdb2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use serde_json::Value;

use crate::error::Error;
use crate::http::{construct_request, submit_request, RequestPrep};
use crate::jobs;

byond_fn!(
fn influxdb2_publish(data, endpoint, token) {
let data = data.to_owned();
let endpoint = endpoint.to_owned();
let token = token.to_owned();
Some(jobs::start(move || {
fn handle(data: &str, endpoint: &str, token: &str) -> Result<RequestPrep, Error> {
let mut lines = vec!();

let data: Value = serde_json::from_str(data)?;
for entry in data.as_array().unwrap() {
let entry = entry.as_object().ok_or(Error::InvalidMetrics)?;

let measurement = entry.get("@measurement").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?.to_owned();
let mut measurement_tags = vec!{measurement};

let tags = entry.get("@tags").ok_or(Error::InvalidMetrics)?.as_object().ok_or(Error::InvalidMetrics)?;
for (key, val) in tags {
measurement_tags.push(concat_string!(key, "=", val.as_str().ok_or(Error::InvalidMetrics)?))
};

let mut fields = vec!{};
for (key, val) in entry {
if key.starts_with('@') {
continue;
}
fields.push(concat_string!(key, "=", val.to_string()))
};

let timestamp = entry.get("@timestamp").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?;
lines.push(concat_string!(measurement_tags.join(","), " ", fields.join(",") , " ", timestamp));
}

construct_request(
"post",
endpoint,
lines.join("\n").as_str(),
concat_string!("{\"Authorization\":\"Token ", token ,"\"}").as_str(),
""
)
}

let req = match handle(data.as_str(), endpoint.as_str(), token.as_str()) {
Ok(r) => r,
Err(e) => return e.to_string()
};
match submit_request(req) {
Ok(r) => r,
Err(e) => e.to_string()
}
}))
}
);
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#![forbid(unsafe_op_in_unsafe_fn)]

#[cfg(feature = "concat-string")]
#[macro_use(concat_string)]
extern crate concat_string;
#[macro_use]
mod byond;
#[allow(dead_code)]
Expand All @@ -24,6 +27,8 @@ pub mod git;
pub mod hash;
#[cfg(feature = "http")]
pub mod http;
#[cfg(feature = "influxdb2")]
pub mod influxdb2;
#[cfg(feature = "json")]
pub mod json;
#[cfg(feature = "log")]
Expand Down
12 changes: 12 additions & 0 deletions src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ byond_fn!(
)
}
);

byond_fn!(
fn unix_timestamp_int() {
Some(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string(),
)
}
);

0 comments on commit 858e560

Please sign in to comment.