Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(logdna sink): Support template syntax in hostname and tags field #4884

Merged
merged 6 commits into from Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 90 additions & 48 deletions src/sinks/logdna.rs
Expand Up @@ -4,9 +4,11 @@ use crate::{
http::{Auth, HttpClient},
sinks::util::{
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
http::{BatchedHttpSink, HttpSink},
BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, UriSerde,
http::{HttpSink, PartitionHttpSink},
BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig, UriSerde,
},
template::Template,
};
use futures::FutureExt;
use futures01::Sink;
Expand All @@ -28,10 +30,10 @@ pub struct LogdnaConfig {
#[serde(alias = "host")]
endpoint: Option<UriSerde>,

hostname: String,
hostname: Template,
mac: Option<String>,
ip: Option<String>,
tags: Option<Vec<String>>,
tags: Option<Vec<Template>>,

#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
Expand Down Expand Up @@ -85,9 +87,9 @@ impl SinkConfig for LogdnaConfig {
.parse_config(self.batch)?;
let client = HttpClient::new(None)?;

let sink = BatchedHttpSink::new(
let sink = PartitionHttpSink::new(
self.clone(),
JsonArrayBuffer::new(batch_settings.size),
PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
request_settings,
batch_settings.timeout,
client.clone(),
Expand All @@ -112,12 +114,29 @@ impl SinkConfig for LogdnaConfig {
}
}

#[derive(Hash, Eq, PartialEq, Clone)]
pub struct PartitionKey {
hostname: String,
tags: Option<Vec<String>>,
}

#[async_trait::async_trait]
impl HttpSink for LogdnaConfig {
type Input = serde_json::Value;
type Output = Vec<BoxedRawValue>;
type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;

fn encode_event(&self, mut event: Event) -> Option<Self::Input> {
let key = self
.render_key(&event)
.map_err(|missing| {
error!(
message = "Error rendering template.",
?missing,
rate_limit_secs = 30
);
})
.ok()?;

self.encoding.apply_rules(&mut event);
let mut log = event.into_log();

Expand Down Expand Up @@ -164,18 +183,19 @@ impl HttpSink for LogdnaConfig {
map.insert("meta".into(), json!(&log));
}

Some(map.into())
Some(PartitionInnerBuffer::new(map.into(), key))
}

async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
let (events, key) = output.into_parts();
let mut query = url::form_urlencoded::Serializer::new(String::new());

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time can't drift behind the epoch!")
.as_millis();

query.append_pair("hostname", &self.hostname);
query.append_pair("hostname", &key.hostname);
query.append_pair("now", &format!("{}", now));

if let Some(mac) = &self.mac {
Expand All @@ -186,7 +206,7 @@ impl HttpSink for LogdnaConfig {
query.append_pair("ip", ip);
}

if let Some(tags) = &self.tags {
if let Some(tags) = &key.tags {
let tags = tags.join(",");
query.append_pair("tags", &tags);
}
Expand Down Expand Up @@ -227,6 +247,22 @@ impl LogdnaConfig {
uri.parse::<http::Uri>()
.expect("This should be a valid uri")
}

fn render_key(&self, event: &Event) -> Result<PartitionKey, Vec<String>> {
let hostname = self.hostname.render_string(&event)?;
let tags = self
.tags
.as_ref()
.map(|tags| -> Result<Option<Vec<String>>, Vec<String>> {
let mut vec = Vec::with_capacity(tags.len());
for tag in tags {
vec.push(tag.render_string(event)?);
}
Ok(Some(vec))
})
.unwrap_or(Ok(None))?;
Ok(PartitionKey { hostname, tags })
}
}

async fn healthcheck(config: LogdnaConfig, mut client: HttpClient) -> crate::Result<()> {
Expand Down Expand Up @@ -288,13 +324,13 @@ mod tests {
let mut event4 = Event::from("hello world");
event4.as_mut_log().insert("env", "staging");

let event1_out = config.encode_event(event1).unwrap();
let event1_out = config.encode_event(event1).unwrap().into_parts().0;
let event1_out = event1_out.as_object().unwrap();
let event2_out = config.encode_event(event2).unwrap();
let event2_out = config.encode_event(event2).unwrap().into_parts().0;
let event2_out = event2_out.as_object().unwrap();
let event3_out = config.encode_event(event3).unwrap();
let event3_out = config.encode_event(event3).unwrap().into_parts().0;
let event3_out = event3_out.as_object().unwrap();
let event4_out = config.encode_event(event4).unwrap();
let event4_out = config.encode_event(event4).unwrap().into_parts().0;
let event4_out = event4_out.as_object().unwrap();

assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
Expand All @@ -313,7 +349,7 @@ mod tests {
api_key = "mylogtoken"
ip = "127.0.0.1"
mac = "some-mac-addr"
hostname = "vector"
hostname = "{{ hostname }}"
tags = ["test","maybeanothertest"]
"#,
)
Expand All @@ -335,55 +371,61 @@ mod tests {

let lines = random_lines(100).take(10).collect::<Vec<_>>();
let mut events = Vec::new();
let hosts = ["host0", "host1"];

let mut partitions = vec![Vec::new(), Vec::new()];
// Create 10 events where the first one contains custom
// fields that are not just `message`.
for (i, line) in lines.iter().enumerate() {
let event = if i == 0 {
let mut event = Event::from(line.as_str());
event.as_mut_log().insert("key1", "value1");
event
} else {
Event::from(line.as_str())
};
let mut event = Event::from(line.as_str());
let p = i % 2;
event.as_mut_log().insert("hostname", hosts[p]);

events.push(event);
partitions[p].push(line);
events.push(event.clone());
juchiast marked this conversation as resolved.
Show resolved Hide resolved
}

sink.run(stream::iter(events)).await.unwrap();

let output = rx.next().await.unwrap();
for _ in 0..partitions.len() {
let output = rx.next().await.unwrap();

let request = &output.0;
let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
let request = &output.0;
let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();

let query = request.uri.query().unwrap();
assert!(query.contains("hostname=vector"));
assert!(query.contains("ip=127.0.0.1"));
assert!(query.contains("mac=some-mac-addr"));
assert!(query.contains("tags=test%2Cmaybeanothertest"));
let query = request.uri.query().unwrap();

let output = body
.as_object()
.unwrap()
.get("lines")
.unwrap()
.as_array()
.unwrap();
let (p, host) = hosts
.iter()
.enumerate()
.find(|(_, host)| query.contains(&format!("hostname={}", host)))
.expect("invalid hostname");
let lines = &partitions[p];

assert!(query.contains("ip=127.0.0.1"));
assert!(query.contains("mac=some-mac-addr"));
assert!(query.contains("tags=test%2Cmaybeanothertest"));

let output = body
.as_object()
.unwrap()
.get("lines")
.unwrap()
.as_array()
.unwrap();

for (i, line) in output.iter().enumerate() {
// All lines are json objects
let line = line.as_object().unwrap();
for (i, line) in output.iter().enumerate() {
// All lines are json objects
let line = line.as_object().unwrap();

assert_eq!(line.get("app").unwrap(), &json!("vector"));
assert_eq!(line.get("env").unwrap(), &json!("production"));
assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
assert_eq!(line.get("app").unwrap(), &json!("vector"));
assert_eq!(line.get("env").unwrap(), &json!("production"));
assert_eq!(line.get("line").unwrap(), &json!(lines[i]));

if i == 0 {
assert_eq!(
line.get("meta").unwrap(),
&json!({
"key1": "value1"
"hostname": host,
})
);
}
Expand Down
127 changes: 126 additions & 1 deletion src/sinks/util/http.rs
@@ -1,6 +1,6 @@
use super::{
retries::{RetryAction, RetryLogic},
sink, Batch, TowerBatchedSink, TowerRequestSettings,
sink, Batch, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestSettings,
};
use crate::{buffers::Acker, event::Event, http::HttpClient};
use bytes::{Buf, Bytes};
Expand All @@ -11,6 +11,7 @@ use hyper::body::{self, Body};
use std::{
fmt,
future::Future,
hash::Hash,
sync::Arc,
task::{Context, Poll},
time::Duration,
Expand All @@ -26,6 +27,130 @@ pub trait HttpSink: Send + Sync + 'static {
async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>;
}

pub struct PartitionHttpSink<T, B, K, L = HttpRetryLogic>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
sink: Arc<T>,
inner: TowerPartitionSink<
HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Vec<u8>>>>, B::Output>,
B,
L,
K,
B::Output,
>,
slot: Option<B::Input>,
}

impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
pub fn new(
sink: T,
batch: B,
request_settings: TowerRequestSettings,
batch_timeout: Duration,
client: HttpClient,
acker: Acker,
) -> Self {
Self::with_retry_logic(
sink,
batch,
HttpRetryLogic,
request_settings,
batch_timeout,
client,
acker,
)
}
}

impl<T, B, K, L> PartitionHttpSink<T, B, K, L>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
L: RetryLogic<Response = http::Response<Bytes>, Error = hyper::Error> + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
pub fn with_retry_logic(
sink: T,
batch: B,
logic: L,
request_settings: TowerRequestSettings,
batch_timeout: Duration,
client: HttpClient,
acker: Acker,
) -> Self {
let sink = Arc::new(sink);

let sink1 = Arc::clone(&sink);
let request_builder =
move |b| -> BoxFuture<'static, crate::Result<http::Request<Vec<u8>>>> {
let sink = Arc::clone(&sink1);
Box::pin(async move { sink.build_request(b).await })
};

let svc = HttpBatchService::new(client, request_builder);
let inner = request_settings.partition_sink(logic, svc, batch, batch_timeout, acker);

Self {
sink,
inner,
slot: None,
}
}
}

impl<T, B, K, L> Sink for PartitionHttpSink<T, B, K, L>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
{
type SinkItem = crate::Event;
type SinkError = crate::Error;

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if self.slot.is_some() && self.poll_complete()?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
assert!(self.slot.is_none(), "poll_complete did not clear slot");

if let Some(item) = self.sink.encode_event(item) {
self.slot = Some(item);
self.poll_complete()?;
}

Ok(AsyncSink::Ready)
}

fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
if let Some(item) = self.slot.take() {
if let AsyncSink::NotReady(item) = self.inner.start_send(item)? {
self.slot = Some(item);
return Ok(Async::NotReady);
}
}

self.inner.poll_complete()
}
}

/// Provides a simple wrapper around internal tower and
/// batching sinks for http.
///
Expand Down