Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clickhouse sink): Add support for basic auth #937

Merged
merged 6 commits into from Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/usage/configuration/sinks/clickhouse.md
Expand Up @@ -155,6 +155,19 @@ The `clickhouse` sink [batches](#buffers-and-batches) [`log`][docs.data-model.lo
# * default: 9223372036854775807
# * unit: seconds
retry_backoff_secs = 9223372036854775807

[sinks.clickhouse_sink.basic_auth]
# The basic authentication password.
#
# * required
# * no default
password = "password"

# The basic authentication user name.
#
# * required
# * no default
user = "username"
```
{% endcode-tabs-item %}
{% endcode-tabs %}
Expand Down
37 changes: 33 additions & 4 deletions src/sinks/clickhouse.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
topology::config::{DataType, SinkConfig},
};
use futures::{Future, Sink};
use headers::HeaderMapExt;
use http::StatusCode;
use http::{Method, Uri};
use hyper::{Body, Client, Request};
Expand Down Expand Up @@ -36,13 +37,15 @@ pub struct ClickhouseConfig {
pub request_rate_limit_num: Option<u64>,
pub request_retry_attempts: Option<usize>,
pub request_retry_backoff_secs: Option<u64>,

pub basic_auth: Option<ClickHouseBasicAuthConfig>,
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
}

#[typetag::serde(name = "clickhouse")]
impl SinkConfig for ClickhouseConfig {
fn build(&self, acker: Acker) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
let sink = clickhouse(self.clone(), acker)?;
let healtcheck = healthcheck(self.host.clone());
let healtcheck = healthcheck(self.host.clone(), self.basic_auth.clone());

Ok((sink, healtcheck))
}
Expand All @@ -52,6 +55,20 @@ impl SinkConfig for ClickhouseConfig {
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct ClickHouseBasicAuthConfig {
pub password: String,
pub user: String,
}

impl ClickHouseBasicAuthConfig {
fn apply(&self, header_map: &mut http::header::HeaderMap) {
let auth = headers::Authorization::basic(&self.user, &self.password);
header_map.typed_insert(auth)
}
}

fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::RouterSink> {
let host = config.host.clone();
let database = config.database.clone().unwrap_or("default".into());
Expand All @@ -72,6 +89,8 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
let retry_attempts = config.request_retry_attempts.unwrap_or(usize::max_value());
let retry_backoff_secs = config.request_retry_backoff_secs.unwrap_or(1);

let basic_auth = config.basic_auth.clone();

let policy = FixedRetryPolicy::new(
retry_attempts,
Duration::from_secs(retry_backoff_secs),
Expand All @@ -93,7 +112,13 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
builder.header("Content-Encoding", "gzip");
}

builder.body(body).unwrap()
let mut request = builder.body(body).unwrap();

if let Some(auth) = &basic_auth {
auth.apply(request.headers_mut());
}

request
});

let service = ServiceBuilder::new()
Expand All @@ -119,10 +144,14 @@ fn clickhouse(config: ClickhouseConfig, acker: Acker) -> crate::Result<super::Ro
Ok(Box::new(sink))
}

fn healthcheck(host: String) -> super::Healthcheck {
fn healthcheck(host: String, basic_auth: Option<ClickHouseBasicAuthConfig>) -> super::Healthcheck {
// TODO: check if table exists?
let uri = format!("{}/?query=SELECT%201", host);
let request = Request::get(uri).body(Body::empty()).unwrap();
let mut request = Request::get(uri).body(Body::empty()).unwrap();

if let Some(auth) = &basic_auth {
auth.apply(request.headers_mut());
}

let https = HttpsConnector::new(4).expect("TLS initialization failed");
let client = Client::builder().build(https);
Expand Down