Skip to content

Commit

Permalink
feat(clickhouse sink): Add support for basic auth (#937)
Browse files Browse the repository at this point in the history
* add clickhouse basic auth

Signed-off-by: albert <63851587@qq.com>

* Add docs

Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
  • Loading branch information
amuluowin authored and LucioFranco committed Oct 4, 2019
1 parent 5c5ad89 commit d5974dc
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
17 changes: 17 additions & 0 deletions .meta/sinks/clickhouse.toml
Expand Up @@ -40,3 +40,20 @@ type = "string"
examples = ["mydatabase"]
null = true
description = "The database that contains the stable that data will be inserted into."

[sinks.clickhouse.options.basic_auth]
type = "table"
null = true
description = "Options for basic authentication."

[sinks.clickhouse.options.basic_auth.options.password]
type = "string"
examples = ["password"]
null = false
description = "The basic authentication password."

[sinks.clickhouse.options.basic_auth.options.user]
type = "string"
examples = ["username"]
null = false
description = "The basic authentication user name."
17 changes: 17 additions & 0 deletions config/vector.spec.toml
Expand Up @@ -1766,6 +1766,23 @@ end
# * unit: seconds
retry_backoff_secs = 9223372036854775807

#
# Basic auth
#

[sinks.clickhouse.basic_auth]
# The basic authentication password.
#
# * required
# * no default
password = "password"

# The basic authentication user name.
#
# * required
# * no default
user = "username"

# Streams `log` and `metric` events to the console, `STDOUT` or `STDERR`.
[sinks.console]
# The component type
Expand Down
17 changes: 17 additions & 0 deletions docs/usage/configuration/sinks/clickhouse.md
Expand Up @@ -155,6 +155,23 @@ The `clickhouse` sink [batches](#buffers-and-batches) [`log`][docs.data-model.lo
# * default: 9223372036854775807
# * unit: seconds
retry_backoff_secs = 9223372036854775807

#
# Basic auth
#

[sinks.clickhouse_sink.basic_auth]
# The basic authentication password.
#
# * required
# * no default
password = "password"

# The basic authentication user name.
#
# * required
# * no default
user = "username"
```
{% endcode-tabs-item %}
{% endcode-tabs %}
Expand Down
17 changes: 17 additions & 0 deletions docs/usage/configuration/specification.md
Expand Up @@ -1786,6 +1786,23 @@ end
# * unit: seconds
retry_backoff_secs = 9223372036854775807

#
# Basic auth
#

[sinks.clickhouse.basic_auth]
# The basic authentication password.
#
# * required
# * no default
password = "password"

# The basic authentication user name.
#
# * required
# * no default
user = "username"

# Streams `log` and `metric` events to the console, `STDOUT` or `STDERR`.
[sinks.console]
# The component type
Expand Down
36 changes: 32 additions & 4 deletions src/sinks/clickhouse.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
topology::config::{DataType, SinkConfig},
};
use futures::{Future, Sink};
use headers::HeaderMapExt;
use http::StatusCode;
use http::{Method, Uri};
use hyper::{Body, Client, Request};
Expand All @@ -28,6 +29,7 @@ pub struct ClickhouseConfig {
pub batch_size: Option<usize>,
pub batch_timeout: Option<u64>,
pub compression: Option<Compression>,
pub basic_auth: Option<ClickHouseBasicAuthConfig>,

// Tower Request based configuration
pub request_in_flight_limit: Option<usize>,
Expand All @@ -42,7 +44,7 @@ pub struct ClickhouseConfig {
impl SinkConfig for ClickhouseConfig {
fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
let sink = clickhouse(self.clone(), acker)?;
let healtcheck = healthcheck(self.host.clone());
let healtcheck = healthcheck(self.host.clone(), self.basic_auth.clone());

Ok((sink, healtcheck))
}
Expand All @@ -52,6 +54,20 @@ impl SinkConfig for ClickhouseConfig {
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct ClickHouseBasicAuthConfig {
pub password: String,
pub user: String,
}

impl ClickHouseBasicAuthConfig {
fn apply(&self, header_map: &mut http::header::HeaderMap) {
let auth = headers::Authorization::basic(&self.user, &self.password);
header_map.typed_insert(auth)
}
}

fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::RouterSink> {
let host = config.host.clone();
let database = config.database.clone().unwrap_or("default".into());
Expand All @@ -72,6 +88,8 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
let retry_attempts = config.request_retry_attempts.unwrap_or(usize::max_value());
let retry_backoff_secs = config.request_retry_backoff_secs.unwrap_or(1);

let basic_auth = config.basic_auth.clone();

let policy = FixedRetryPolicy::new(
retry_attempts,
Duration::from_secs(retry_backoff_secs),
Expand All @@ -93,7 +111,13 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
builder.header("Content-Encoding", "gzip");
}

builder.body(body).unwrap()
let mut request = builder.body(body).unwrap();

if let Some(auth) = &basic_auth {
auth.apply(request.headers_mut());
}

request
})?;

let service = ServiceBuilder::new()
Expand All @@ -119,10 +143,14 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
Ok(Box::new(sink))
}

fn healthcheck(host: String) -> super::Healthcheck {
fn healthcheck(host: String, basic_auth: Option<ClickHouseBasicAuthConfig>) -> super::Healthcheck {
// TODO: check if table exists?
let uri = format!("{}/?query=SELECT%201", host);
let request = Request::get(uri).body(Body::empty()).unwrap();
let mut request = Request::get(uri).body(Body::empty()).unwrap();

if let Some(auth) = &basic_auth {
auth.apply(request.headers_mut());
}

let https = HttpsConnector::new(4).expect("TLS initialization failed");
let client = Client::builder().build(https);
Expand Down

0 comments on commit d5974dc

Please sign in to comment.