Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): New sematext_metrics sink #3501

Merged
merged 31 commits into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f136c3b
Initial wip for the sematext_metrics sink
StephenWakely Aug 19, 2020
f75ea78
Moved Sematext modules into a single module
StephenWakely Aug 24, 2020
1be96b1
Added documentation for the Sematext metrics sink
StephenWakely Aug 25, 2020
662cb3b
Send the Vector version with the Sematext request
StephenWakely Aug 25, 2020
2eb6cfd
Fixing issues following comments from bruceg
StephenWakely Aug 25, 2020
f514cd7
Moved Sematext modules into a single module
StephenWakely Aug 24, 2020
d13360c
Fixing issues following comments from bruceg
StephenWakely Aug 25, 2020
72fe4cc
Updated sematext events in line with comments from BruceG.
StephenWakely Aug 30, 2020
8a6e552
Changes following a merge with master
StephenWakely Aug 31, 2020
b60a9e5
Fixed input type in documentation
StephenWakely Aug 31, 2020
5e4dfde
Added relevant_when fields to docs for host and region fields.
StephenWakely Sep 8, 2020
5434b4a
Restricted the visibility of a number of functions
StephenWakely Sep 8, 2020
cffbb6b
Added noun to sematext metrics docs
StephenWakely Sep 9, 2020
05231e7
Fixed clippy warnings
StephenWakely Sep 9, 2020
4bb50d1
Fixes following a merge with master
StephenWakely Sep 10, 2020
894f8ab
Added a better healthcheck url
StephenWakely Sep 17, 2020
3c03f05
sink.run no longer takes Result
StephenWakely Sep 17, 2020
72d33fb
Renamed host parameter to endpoint
StephenWakely Sep 22, 2020
e280c08
Renamed events following similar changes to statsd events
StephenWakely Sep 22, 2020
d19ac23
Changes following a review from @JeanMertz
StephenWakely Sep 22, 2020
cb2f8c4
Changes following a merge with master
StephenWakely Oct 9, 2020
ee072eb
Add docs cue file
StephenWakely Oct 9, 2020
3cf8588
Further changes following merge with master
StephenWakely Oct 9, 2020
1900af1
Fix cue formatting
StephenWakely Oct 9, 2020
8303234
Further attempt to fix cue formatting.
StephenWakely Oct 9, 2020
274c361
Use tabs not spaces
StephenWakely Oct 9, 2020
5f5d5f9
Fixed docs cue errors
StephenWakely Oct 10, 2020
f6b249c
Fixed cue doc following merge with master
StephenWakely Oct 10, 2020
006911c
Added a short description to the docs
StephenWakely Oct 10, 2020
ce8e717
Fixed cue formatting
StephenWakely Oct 10, 2020
aba77c9
Merge remote-tracking branch 'upstream/master' into sematext_metrics
binarylogic Oct 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions .meta/sinks/sematext_metrics.toml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
[sinks.sematext_metrics]
title = "Sematext Metrics"
noun = "Sematext"
beta = true
common = false
delivery_guarantee = "at_least_once"
description = """\
[Sematext][urls.sematext] is a hosted monitoring platform for metrics based on \
InfluxDB. Providing powerful monitoring and management solutions to \
monitor and observe your apps in real-time.\
"""
egress_method = "batching"
features = [
"Send metrics to the Sematext monitoring service.",
"Batch data to maximize throughput.",
"Automatically retry failed requests, with backoff.",
"Buffer your data in-memory or on-disk for performance and durability.",
]
function_category = "transmit"
healthcheck = true
input_types = ["metric"]
requirements = {}
service_providers = ["Sematext"]
write_to_description = "[Sematext][urls.sematext] via the [InfluxDB API v1][urls.influxdb_http_api_v1]."

<%= render("_partials/fields/_component_options.toml", type: "sink", name: "sematext_metrics") %>

<%= render("_partials/fields/_batch_options.toml", namespace: "sinks.sematext_metrics.options", common: false, max_bytes: 10490000, max_events: nil, timeout_secs: 1) %>

<%= render(
"_partials/fields/_buffer_options.toml",
namespace: "sinks.sematext_metrics.options",
common: false
) %>

<%= render(
"_partials/fields/_request_options.toml",
namespace: "sinks.sematext_metrics.options",
common: false,
rate_limit_duration_secs: 1,
rate_limit_num: 5,
retry_initial_backoff_secs: 1,
retry_max_duration_secs: 10,
timeout_secs: 60
) %>

[sinks.sematext_metrics.options.region]
type = "string"
required = false
examples = ["na", "eu"]
relevant_when = {endpoint = ""}
description = "The region destination to send metrics to. This option is required if `endpoint` is not set."

[sinks.sematext_metrics.options.endpoint]
type = "string"
required = false
examples = ["http://127.0.0.1", "http://example.com"]
relevant_when = {region = ""}
description = "The endpoint that will be used to send metrics to. This option is required if `region` is not set."

[sinks.sematext_metrics.options.token]
type = "string"
required = true
examples = ["${SEMATEXT_TOKEN}", "some-sematext-token"]
description = "The token that will be used to write to Sematext."


4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ sinks = [
"sinks-new_relic_logs",
"sinks-papertrail",
"sinks-prometheus",
"sinks-sematext_logs",
"sinks-sematext",
"sinks-socket",
"sinks-splunk_hec",
"sinks-statsd",
Expand Down Expand Up @@ -435,7 +435,7 @@ sinks-logdna = ["bytesize"]
sinks-loki = ["bytesize"]
sinks-new_relic_logs = ["bytesize", "sinks-http"]
sinks-prometheus = []
sinks-sematext_logs = ["sinks-elasticsearch"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
sinks-socket = []
sinks-papertrail = []
sinks-splunk_hec = ["bytesize"]
Expand Down
102 changes: 102 additions & 0 deletions docs/reference/components/sinks/sematext_metrics.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package metadata

components: sinks: sematext_metrics: {
title: "Sematext Metrics"
short_description: "Batches metric events to [Sematext][urls.sematext] to the [Sematext monitoring][urls.sematext_monitoring] service."
long_description: "[Sematext][urls.sematext] is a hosted monitoring platform for metrics based on InfluxDB. Providing powerful monitoring and management solutions to monitor and observe your apps in real-time."

classes: {
commonly_used: false
function: "transmit"
service_providers: ["Sematext"]
egress_method: "batch"
}

features: {
batch: {
enabled: true
common: false
max_bytes: 30000000
max_events: null
timeout_secs: 1
}
buffer: enabled: true
compression: enabled: false
healthcheck: enabled: true
request: enabled: false
tls: enabled: false
}

statuses: {
delivery: "at_least_once"
development: "beta"
}

support: {
input_types: ["metric"]

platforms: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}

requirements: []
warnings: []
notices: []
}

configuration: {
region: {
description: "The region destination to send metrics to. This option is required if `endpoint` is not set."
required: true
relevant_when: "`endpoint` is not set"
warnings: []
type: [string]: {
enum: {
us: "United States"
eu: "Europe"
}
examples: [ "us"]
}
}
endpoint: {
description: "The endpoint that will be used to send metrics to. This option is required if `region` is not set."
required: true
relevant_when: "`region` is not set"
warnings: []
type: string: {
examples: ["https://spm-receiver.sematext.com", "https://spm-receiver.eu.sematext.com"]
}
}
token: {
required: true
description: "The api token for the app in Sematext to send the metrics to."
warnings: []
type: string: {
examples: ["${SEMATEXT_TOKEN}", "some-sematext-token"]
}
}
}
how_it_works: {
metric_types: {
title: "Metric Types"
body: #"""
[Sematext monitoring](https://sematext.com/docs/monitoring/) accepts metrics which contain a single value.
These are the Counter and Gauge Vector metric types.

<Alert type="info">
Other metric types are not supported. The following metric types will not be sent to Sematext:

`aggregated_histogram`, `aggregated_summary`, `distribution`, `set`
</Alert>

All metrics are sent with a namespace. If no namespace is included with the metric, the metric name becomes
the namespace and the metric is named `value`.
"""#
}
}
}
1 change: 1 addition & 0 deletions docs/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ urls: {
rustup: "https://rustup.rs"
sematext: "https://sematext.com"
sematext_es: "https://sematext.com/docs/logs/index-events-via-elasticsearch-api/"
sematext_monitoring: "https://sematext.com/docs/monitoring/"
semver: "https://semver.org/"
snappy: "https://google.github.io/snappy/"
socket: "https://en.wikipedia.org/wiki/Network_socket"
Expand Down
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ mod remove_tags;
#[cfg(feature = "transforms-rename_fields")]
mod rename_fields;
mod sampler;
#[cfg(feature = "sinks-sematext")]
mod sematext_metrics;
#[cfg(any(
feature = "sources-socket",
feature = "sources-syslog",
Expand Down Expand Up @@ -158,6 +160,8 @@ pub use self::remove_tags::*;
#[cfg(feature = "transforms-rename_fields")]
pub use self::rename_fields::*;
pub use self::sampler::*;
#[cfg(feature = "sinks-sematext")]
pub use self::sematext_metrics::*;
#[cfg(any(feature = "sources-socket", feature = "sources-syslog"))]
pub(crate) use self::socket::*;
pub use self::split::*;
Expand Down
29 changes: 29 additions & 0 deletions src/internal_events/sematext_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::InternalEvent;
use crate::event::metric::{MetricKind, MetricValue};
use metrics::counter;

#[derive(Debug)]
pub struct SematextMetricsInvalidMetricReceived {
pub value: MetricValue,
pub kind: MetricKind,
}

impl InternalEvent for SematextMetricsInvalidMetricReceived {
fn emit_logs(&self) {
warn!(
message = "Invalid metric received; dropping event.",
value = ?self.value,
kind = ?self.kind,
rate_limit_secs = 30,
)
}

fn emit_metrics(&self) {
counter!(
"processing_errors", 1,
"component_kind" => "sink",
"component_type" => "sematext_metrics",
"error_type" => "invalid_metric",
);
}
}
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,18 @@ pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

pub type Result<T> = std::result::Result<T, Error>;

pub fn get_version() -> String {
pub fn vector_version() -> impl std::fmt::Display {
#[cfg(feature = "nightly")]
let pkg_version = format!("{}-nightly", built_info::PKG_VERSION);

#[cfg(not(feature = "nightly"))]
let pkg_version = built_info::PKG_VERSION;

pkg_version
}

pub fn get_version() -> String {
let pkg_version = vector_version();
let commit_hash = built_info::GIT_VERSION.and_then(|v| v.split('-').last());
let built_date = chrono::DateTime::parse_from_rfc2822(built_info::BUILT_TIME_UTC)
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl InfluxDBLogsConfig {
}

impl Value {
pub fn to_field(&self) -> Field {
fn to_field(&self) -> Field {
match self {
Value::Integer(num) => Field::Int(*num),
Value::Float(num) => Field::Float(*num),
Expand Down
14 changes: 9 additions & 5 deletions src/sinks/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use snafu::Snafu;
use std::collections::{BTreeMap, HashMap};
use tower::Service;

pub enum Field {
pub(in crate::sinks) enum Field {
/// string
String(String),
/// float
Expand All @@ -25,7 +25,7 @@ pub enum Field {
}

#[derive(Clone, Copy, Debug)]
enum ProtocolVersion {
pub(in crate::sinks) enum ProtocolVersion {
V1,
V2,
}
Expand Down Expand Up @@ -168,7 +168,7 @@ fn healthcheck(
}

// https://v2.docs.influxdata.com/v2.0/reference/syntax/line-protocol/
fn influx_line_protocol(
pub(in crate::sinks) fn influx_line_protocol(
protocol_version: ProtocolVersion,
measurement: String,
metric_type: &str,
Expand Down Expand Up @@ -274,15 +274,19 @@ fn encode_string(key: String, output: &mut String) {
}
}

fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
if let Some(ts) = timestamp {
ts.timestamp_nanos()
} else {
encode_timestamp(Some(Utc::now()))
}
}

fn encode_uri(endpoint: &str, path: &str, pairs: &[(&str, Option<String>)]) -> crate::Result<Uri> {
pub(in crate::sinks) fn encode_uri(
endpoint: &str,
path: &str,
pairs: &[(&str, Option<String>)],
) -> crate::Result<Uri> {
let mut serializer = url::form_urlencoded::Serializer::new(String::new());

for pair in pairs {
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ pub mod papertrail;
pub mod prometheus;
#[cfg(feature = "sinks-pulsar")]
pub mod pulsar;
#[cfg(feature = "sinks-sematext_logs")]
pub mod sematext_logs;
#[cfg(feature = "sinks-sematext")]
pub mod sematext;
#[cfg(feature = "sinks-socket")]
pub mod socket;
#[cfg(feature = "sinks-splunk_hec")]
Expand Down
16 changes: 4 additions & 12 deletions src/sinks/sematext_logs.rs → src/sinks/sematext/logs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::Region;
use crate::{
config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription},
sinks::elasticsearch::{ElasticSearchConfig, Encoding},
sinks::util::{
encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig,
},
sinks::{Healthcheck, VectorSink},
Event,
};
use futures01::{Future, Sink};
Expand Down Expand Up @@ -37,20 +39,10 @@ inventory::submit! {

impl GenerateConfig for SematextLogsConfig {}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Region {
Us,
Eu,
}

#[async_trait::async_trait]
#[typetag::serde(name = "sematext_logs")]
impl SinkConfig for SematextLogsConfig {
async fn build(
&self,
cx: SinkContext,
) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let endpoint = match (&self.endpoint, &self.region) {
(Some(host), None) => host.clone(),
(None, Some(Region::Us)) => "https://logsene-receiver.sematext.com".to_owned(),
Expand All @@ -76,7 +68,7 @@ impl SinkConfig for SematextLogsConfig {

let sink = Box::new(sink.into_futures01sink().with(map_timestamp));

Ok((super::VectorSink::Futures01Sink(sink), healthcheck))
Ok((VectorSink::Futures01Sink(sink), healthcheck))
}

fn input_type(&self) -> DataType {
Expand Down
Loading