Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ by InfluxData for making metrics reporting easy for distributed services - see t

This library does not provide querying or other InfluxDB client-library features. This is meant to be lightweight and simple for services to report metrics.

Telegraf-rust supports all socket connection types, such as UDS(unix domain socket):
- TCP(`tcp://`)
- UDP(`udp://`)
- UDS Stream(`unix://`)
- UDS Datagram(`unixgram://`)

# Install

Add it to your Cargo.toml:
Expand All @@ -26,7 +32,7 @@ Using this library assumes you have a socket listener setup in your Telegraf con
service_address = "tcp://localhost:8094"
```

All usage will start by creating a socket connection via a `Client`. This supports multiple connection protocols - which one you use will be determined by how your Telegraf `input.socket_listener` configuration is setup. Telegraf-rust supports both `TCP` and `UDP` socket connections.
All usage will start by creating a socket connection via a `Client`. This supports multiple connection protocols - which one you use will be determined by how your Telegraf `input.socket_listener` configuration is setup.

Once a client is setup there are multiple different ways to write points:

Expand Down Expand Up @@ -109,7 +115,7 @@ Any attribute that will be the value of a field must implement the `IntoFieldDat

```rust
pub trait IntoFieldData {
fn into_field_data(&self) -> FieldData;
fn field_data(&self) -> FieldData;
}
```

Expand Down
161 changes: 81 additions & 80 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,17 @@
pub mod macros;
pub mod protocol;

use std::fmt;
use std::io;
use std::io::{Write, Error};
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::{
fmt,
io::{self, Error, Write},
net::{Shutdown, SocketAddr, TcpStream, UdpSocket},
os::unix::net::{UnixDatagram, UnixStream},
};

use url::Url;
use std::net::{Shutdown, TcpStream};

use protocol::*;
pub use protocol::{IntoFieldData, FieldData};
pub use protocol::{FieldData, IntoFieldData};
pub use telegraf_derive::*;

/// Common result type. Only meaningful response is
Expand Down Expand Up @@ -157,7 +158,7 @@ pub enum TelegrafError {
/// Error with internal socket connection.
ConnectionError(String),
/// Error when a bad protocol is created.
BadProtocol(String)
BadProtocol(String),
}

/// A single influx metric. Handles conversion from Rust types
Expand All @@ -172,28 +173,21 @@ pub enum TelegrafError {
pub struct Point {
pub measurement: String,
pub tags: Vec<Tag>,
pub fields: Vec<Field>
pub fields: Vec<Field>,
}

/// Connection client used to handle socket connection management
/// and writing.
pub struct Client {
conn: Connector
conn: Connector,
}

/// Different types of connections that the library supports.
enum Connector {
TCP(TcPConnection),
UDP(UdPConnection),
}

/// TCP socket connection container.
struct TcPConnection {
conn: TcpStream
}

struct UdPConnection {
conn: UdpSocket
Tcp(TcpStream),
Udp(UdpSocket),
Unix(UnixStream),
Unixgram(UnixDatagram),
}

impl Point {
Expand All @@ -203,11 +197,16 @@ impl Point {
tags: Vec<(String, String)>,
fields: Vec<(String, Box<dyn IntoFieldData>)>,
) -> Self {
let t = tags.into_iter()
.map(|(n,v)| Tag { name: n, value: v })
let t = tags
.into_iter()
.map(|(n, v)| Tag { name: n, value: v })
.collect();
let f = fields.into_iter()
.map(|(n,v)| Field { name: n, value: v.into_field_data() })
let f = fields
.into_iter()
.map(|(n, v)| Field {
name: n,
value: v.field_data(),
})
.collect();
Self {
measurement,
Expand All @@ -217,16 +216,8 @@ impl Point {
}

fn to_lp(&self) -> LineProtocol {
let tag_attrs: Vec<Attr> = self.tags
.to_owned()
.into_iter()
.map(Attr::Tag)
.collect();
let field_attrs: Vec<Attr> = self.fields
.to_owned()
.into_iter()
.map(Attr::Field)
.collect();
let tag_attrs: Vec<Attr> = self.tags.iter().cloned().map(Attr::Tag).collect();
let field_attrs: Vec<Attr> = self.fields.iter().cloned().map(Attr::Field).collect();
let tag_str = if tag_attrs.is_empty() {
None
} else {
Expand All @@ -249,9 +240,9 @@ impl Client {
/// to the established connection.
pub fn write_point(&mut self, pt: &Point) -> TelegrafResult {
if pt.fields.is_empty() {
return Err(
TelegrafError::BadProtocol("points must have at least 1 field".to_owned())
);
return Err(TelegrafError::BadProtocol(
"points must have at least 1 field".to_owned(),
));
}

let lp = pt.to_lp();
Expand All @@ -264,12 +255,13 @@ impl Client {
/// you want to ensure all points have the exact same timestamp.
pub fn write_points(&mut self, pts: &[Point]) -> TelegrafResult {
if pts.iter().any(|p| p.fields.is_empty()) {
return Err(
TelegrafError::BadProtocol("points must have at least 1 field".to_owned())
);
return Err(TelegrafError::BadProtocol(
"points must have at least 1 field".to_owned(),
));
}

let lp = pts.iter()
let lp = pts
.iter()
.map(|p| p.to_lp().to_str().to_owned())
.collect::<Vec<String>>()
.join("");
Expand All @@ -289,65 +281,76 @@ impl Client {
}

/// Writes byte array to internal outgoing socket.
fn write_to_conn(&mut self, data: &[u8]) -> TelegrafResult {
pub fn write_to_conn(&mut self, data: &[u8]) -> TelegrafResult {
self.conn.write(data).map(|_| Ok(()))?
}
}

impl Connector {
pub fn close(&self) -> io::Result<()> {
fn close(&self) -> io::Result<()> {
use Connector::*;
match self {
Self::TCP(c) => c.close(),
// UdP socket doesnt have a graceful close.
Self::UDP(_) => Ok(()),
Tcp(c) => c.shutdown(Shutdown::Both),
Unix(c) => c.shutdown(Shutdown::Both),
Unixgram(c) => c.shutdown(Shutdown::Both),
// Udp socket doesnt have a graceful close.
Udp(_) => Ok(()),
}
}

fn write(&mut self, buf: &[u8]) -> io::Result<()> {
let r = match self {
Self::TCP(ref mut c) => {
c.conn.write(buf)
}
Self::UDP(c) => {
c.conn.send(buf)
}
Self::Tcp(c) => c.write(buf),
Self::Udp(c) => c.send(buf),
Self::Unix(c) => c.write(buf),
Self::Unixgram(c) => c.send(buf),
};
r.map(|_| Ok(()))?
}

fn new(url: &str) -> Result<Self, TelegrafError> {
match Url::parse(url) {
Ok(u) => {
let host = u.host_str().t_unwrap("invalid URL host")?;
let port = u.port().t_unwrap("invalid URL port")?;
let scheme = u.scheme();
match scheme {
"tcp" => {
let conn = TcpStream::connect(format!("{}:{}", host, port))?;
Ok(Connector::TCP(TcPConnection { conn }))
},
let addr = u.socket_addrs(|| None)?;
let conn = TcpStream::connect(&*addr)?;
Ok(Connector::Tcp(conn))
}
"udp" => {
let socket = UdpSocket::bind(&[SocketAddr::from(([0, 0, 0, 0,], 0))][..])?;
let addr = u.socket_addrs(|| None)?;
socket.connect(&*addr)?;
socket.set_nonblocking(true)?;
Ok(Connector::UDP(UdPConnection { conn: socket }))
},
"unix" => Err(TelegrafError::BadProtocol("unix not supported yet".to_owned())),
_ => Err(TelegrafError::BadProtocol(format!("unknown connection protocol {}", scheme)))
let conn = UdpSocket::bind(&[SocketAddr::from(([0, 0, 0, 0], 0))][..])?;
conn.connect(&*addr)?;
conn.set_nonblocking(true)?;
Ok(Connector::Udp(conn))
}
"unix" => {
let path = u.path();
let conn = UnixStream::connect(path)?;
Ok(Connector::Unix(conn))
}
"unixgram" => {
let path = u.path();
let conn = UnixDatagram::unbound()?;
conn.connect(path)?;
conn.set_nonblocking(true)?;
Ok(Connector::Unixgram(conn))
}
_ => Err(TelegrafError::BadProtocol(format!(
"unknown connection protocol {}",
scheme
))),
}
},
Err(_) => Err(TelegrafError::BadProtocol(format!("invalid connection URL {}", url)))
}
Err(_) => Err(TelegrafError::BadProtocol(format!(
"invalid connection URL {}",
url
))),
}
}
}

impl TcPConnection {
pub fn close(&self) -> io::Result<()> {
self.conn.shutdown(Shutdown::Both)
}
}

impl fmt::Display for TelegrafError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Expand All @@ -370,7 +373,7 @@ trait TelegrafUnwrap<T> {

impl<T> TelegrafUnwrap<T> for Option<T> {
fn t_unwrap(self, msg: &str) -> Result<T, TelegrafError> {
self.ok_or(TelegrafError::ConnectionError(msg.to_owned()))
self.ok_or_else(|| TelegrafError::ConnectionError(msg.to_owned()))
}
}

Expand All @@ -382,14 +385,12 @@ mod tests {
fn can_create_point_lp() {
let p = Point::new(
String::from("Foo"),
vec![
("t1".to_owned(), "v".to_owned())
],
vec![("t1".to_owned(), "v".to_owned())],
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
("f3".to_owned(), Box::new("b"))
]
("f3".to_owned(), Box::new("b")),
],
);

let lp = p.to_lp();
Expand All @@ -404,7 +405,7 @@ mod tests {
vec![
("f1".to_owned(), Box::new(10)),
("f2".to_owned(), Box::new(10.3)),
]
],
);
let lp = p.to_lp();
assert_eq!(lp.to_str(), "Foo f1=10i,f2=10.3\n");
Expand Down
10 changes: 3 additions & 7 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ mod tests {

#[test]
fn can_create_with_tag() {
let p = point!("test", ("t", "v") ("t2", "v2"), ("f", "v"));
let p = point!("test", ("t", "v")("t2", "v2"), ("f", "v"));
let exp = Point::new(
"test".to_string(),
vec![
("t".to_string(), "v".to_string()),
("t2".to_string(), "v2".to_string()),
],
vec![
("f".to_string(), Box::new("v")),
],
vec![("f".to_string(), Box::new("v"))],
);
assert_eq!(p, exp);
}
Expand All @@ -93,9 +91,7 @@ mod tests {
let exp = Point::new(
"test".to_string(),
Vec::new(),
vec![
("f".to_string(), Box::new("v")),
],
vec![("f".to_string(), Box::new("v"))],
);
assert_eq!(p, exp);
}
Expand Down
Loading