Skip to content
Merged
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ All notable changes to this project will be documented in this file.

### Changed

- Fixed a port reference in the role services ([#102])
- Shut down gracefully ([#101]).

### Added

- Added the discovery ConfigMap creation ([#102])

[#101]: https://github.com/stackabletech/druid-operator/pull/101
[#102]: https://github.com/stackabletech/druid-operator/pull/102

## [0.2.0] - 2021-12-23


### Changed

- Migrated to StatefulSet rather than direct Pod management ([#59]).
Expand Down
14 changes: 11 additions & 3 deletions docs/modules/ROOT/pages/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Then a cluster can be deployed using the example below. Make sure you have *exac
apiVersion: druid.stackable.tech/v1alpha1
kind: DruidCluster
metadata:
name: simple
name: simple-druid
spec:
version: 0.22.0
zookeeperReference: simple-zk
Expand Down Expand Up @@ -79,7 +79,7 @@ spec:
config: {}
replicas: 1

The Router is hosting the web UI, a `NodePort` service is created by the operator to access the web UI. Connect to the `simple-router` `NodePort` service and follow the https://druid.apache.org/docs/latest/tutorials/index.html#step-4-load-data[druid documentation] on how to load and query sample data.
The Router is hosting the web UI, a `NodePort` service is created by the operator to access the web UI. Connect to the `simple-druid-router` `NodePort` service and follow the https://druid.apache.org/docs/latest/tutorials/index.html#step-4-load-data[druid documentation] on how to load and query sample data.

== Using S3

Expand All @@ -106,4 +106,12 @@ This allows to ingest data from accessible buckets already. To configure a bucke
deepStorage:
storageType: s3
bucket: druid-deepstorage
baseKey: storage # the base key is the prefix to be used; optional
baseKey: storage # the base key is the prefix to be used; optional

== Connecting to Druid from other Services

The operator creates a `ConfigMap` with the name of the cluster which contains connection information. Following our example above (the name of the cluster is `simple-druid`) a `ConfigMap` with the name `simple-druid` will be created containing 3 keys:

- `DRUID_ROUTER` with the format `<host>:<port>`, which points to the router processes HTTP endpoint. Here you can connect to the web UI, or use REST endpoints such as `/druid/v2/sql/` to query data. https://druid.apache.org/docs/latest/querying/sql.html#http-post[More information in the Druid Docs].
- `DRUID_AVATICA_JDBC` contains a JDBC connect string which can be used together with the https://calcite.apache.org/avatica/downloads/[Avatica JDBC Driver] to connect to Druid and query data. https://druid.apache.org/docs/latest/querying/sql.html#jdbc[More information in the Druid Docs].
- `DRUID_SQALCHEMY` contains a connection string used to connect to Druid with SQAlchemy, in - for example - Apache Superset.
2 changes: 1 addition & 1 deletion examples/derby/druidcluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: druid.stackable.tech/v1alpha1
kind: DruidCluster
metadata:
name: simple
name: derby-druid
spec:
version: 0.22.0
zookeeperReference:
Expand Down
2 changes: 1 addition & 1 deletion examples/psql-s3/druidcluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: druid.stackable.tech/v1alpha1
kind: DruidCluster
metadata:
name: simple
name: psqls3-druid
spec:
version: 0.22.0
zookeeperReference:
Expand Down
2 changes: 1 addition & 1 deletion examples/psql/druidcluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: druid.stackable.tech/v1alpha1
kind: DruidCluster
metadata:
name: simple
name: psql-druid
spec:
version: 0.22.0
zookeeperReference:
Expand Down
120 changes: 116 additions & 4 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use serde::{Deserialize, Serialize};
use stackable_operator::kube::CustomResource;
use stackable_operator::product_config_utils::{ConfigError, Configuration};
use stackable_operator::role_utils::Role;
use stackable_operator::schemars::{self, JsonSchema};
use stackable_operator::{
kube::CustomResource,
product_config_utils::{ConfigError, Configuration},
role_utils::Role,
schemars::{self, JsonSchema},
};
use std::collections::BTreeMap;
use std::str::FromStr;
use strum_macros::Display;
Expand Down Expand Up @@ -145,6 +147,7 @@ impl DruidRole {
}

impl DruidCluster {
/// The spec for the given Role
pub fn get_role(&self, role: &DruidRole) -> &Role<DruidConfig> {
match role {
DruidRole::Coordinator => &self.spec.coordinators,
Expand All @@ -154,6 +157,20 @@ impl DruidCluster {
DruidRole::Router => &self.spec.routers,
}
}

/// The name of the role-level load-balanced Kubernetes `Service`
pub fn role_service_name(&self, role: &DruidRole) -> Option<String> {
Some(format!("{}-{}", self.metadata.name.clone()?, role))
}

/// The fully-qualified domain name of the role-level load-balanced Kubernetes `Service`
pub fn role_service_fqdn(&self, role: &DruidRole) -> Option<String> {
Some(format!(
"{}.{}.svc.cluster.local",
self.role_service_name(role)?,
self.metadata.namespace.as_ref()?
))
}
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)]
Expand Down Expand Up @@ -361,3 +378,98 @@ fn build_string_list(strings: &[String]) -> String {
let comma_list = quoted_strings.join(", ");
format!("[{}]", comma_list)
}

#[cfg(test)]
mod tests {
use super::*;
use stackable_operator::role_utils::CommonConfiguration;
use stackable_operator::role_utils::RoleGroup;
use std::array::IntoIter;
use std::collections::HashMap;

#[test]
fn test_service_name_generation() {
let mut cluster = DruidCluster::new(
"testcluster",
DruidClusterSpec {
stopped: None,
version: "".to_string(),
brokers: Role {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
role_groups: Default::default(),
},
coordinators: Role {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
role_groups: Default::default(),
},
historicals: Role {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
role_groups: Default::default(),
},
middle_managers: Role {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
role_groups: Default::default(),
},
routers: Role {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
role_groups: HashMap::<_, _>::from_iter(IntoIter::new([(
"default".to_string(),
RoleGroup {
config: CommonConfiguration {
config: DruidConfig {},
config_overrides: Default::default(),
env_overrides: Default::default(),
cli_overrides: Default::default(),
},
replicas: Some(1),
selector: None,
},
)])),
},
metadata_storage_database: Default::default(),
deep_storage: Default::default(),
s3: None,
zookeeper_reference: Default::default(),
},
);

cluster.metadata.namespace = Some("default".to_string());

assert_eq!(cluster.metadata.name, Some("testcluster".to_string()));

assert_eq!(
cluster.role_service_name(&DruidRole::Router),
Some("testcluster-router".to_string())
);

assert_eq!(
cluster.role_service_fqdn(&DruidRole::Router),
Some("testcluster-router.default.svc.cluster.local".to_string())
)
}
}
81 changes: 81 additions & 0 deletions rust/operator-binary/src/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Discovery for Druid. We make Druid discoverable by putting a connection string to the router service
//! inside a config map. We only provide a connection string to the router service, since it serves as
//! a gateway to the cluster for client queries.

use snafu::{OptionExt, ResultExt, Snafu};
use stackable_druid_crd::{DruidCluster, DruidRole, APP_NAME};
use stackable_operator::{
builder::{ConfigMapBuilder, ObjectMetaBuilder},
k8s_openapi::api::core::v1::ConfigMap,
kube::{runtime::reflector::ObjectRef, Resource, ResourceExt},
};

use crate::druid_controller::druid_version;

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("object {} is missing metadata to build owner reference", druid))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::error::Error,
druid: ObjectRef<DruidCluster>,
},
#[snafu(display("failed to get service FQDN"))]
NoServiceFqdn,
#[snafu(display("failed to build ConfigMap"))]
BuildConfigMap {
source: stackable_operator::error::Error,
},
}

/// Builds discovery [`ConfigMap`]s for connecting to a [`DruidCluster`]
pub async fn build_discovery_configmaps(
owner: &impl Resource<DynamicType = ()>,
druid: &DruidCluster,
) -> Result<Vec<ConfigMap>, Error> {
let name = owner.name();
Ok(vec![build_discovery_configmap(&name, owner, druid)?])
}

/// Build a discovery [`ConfigMap`] containing information about how to connect to a certain [`DruidCluster`]
fn build_discovery_configmap(
name: &str,
owner: &impl Resource<DynamicType = ()>,
druid: &DruidCluster,
) -> Result<ConfigMap, Error> {
let router_host = format!(
"{}:{}",
druid
.role_service_fqdn(&DruidRole::Router)
.with_context(|| NoServiceFqdn)?,
DruidRole::Router.get_http_port()
);
let sqlalchemy_conn_str = format!("druid://{}/druid/v2/sql", router_host);
let avatica_conn_str = format!(
"jdbc:avatica:remote:url=http://{}/druid/v2/sql/avatica/",
router_host
);

ConfigMapBuilder::new()
.metadata(
ObjectMetaBuilder::new()
.name_and_namespace(druid)
.name(name)
.ownerreference_from_resource(owner, None, Some(true))
.with_context(|| ObjectMissingMetadataForOwnerRef {
druid: ObjectRef::from_obj(druid),
})?
.with_recommended_labels(
druid,
APP_NAME,
druid_version(druid).unwrap_or("unknown"),
&DruidRole::Router.to_string(),
"discovery",
)
.build(),
)
.add_data("DRUID_ROUTER", router_host)
.add_data("DRUID_SQLALCHEMY", sqlalchemy_conn_str)
.add_data("DRUID_AVATICA_JDBC", avatica_conn_str)
.build()
.context(BuildConfigMap)
}
26 changes: 23 additions & 3 deletions rust/operator-binary/src/druid_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
time::Duration,
};

use crate::config::{get_jvm_config, get_log4j_config, get_runtime_properties};
use crate::{
config::{get_jvm_config, get_log4j_config, get_runtime_properties},
discovery::{self, build_discovery_configmaps},
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_druid_crd::{
DeepStorageType, DruidCluster, DruidRole, APP_NAME, CONTAINER_HTTP_PORT,
Expand Down Expand Up @@ -114,6 +117,12 @@ pub enum Error {
PropertiesWriteError {
source: stackable_operator::product_config::writer::PropertiesWriterError,
},
#[snafu(display("failed to build discovery ConfigMap"))]
BuildDiscoveryConfig { source: discovery::Error },
#[snafu(display("failed to apply discovery ConfigMap"))]
ApplyDiscoveryConfig {
source: stackable_operator::error::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -210,6 +219,17 @@ pub async fn reconcile_druid(druid: DruidCluster, ctx: Context<Ctx>) -> Result<R
}
}

// discovery
for discovery_cm in build_discovery_configmaps(&druid, &druid)
.await
.context(BuildDiscoveryConfig)?
{
client
.apply_patch(FIELD_MANAGER_SCOPE, &discovery_cm, &discovery_cm)
.await
.context(ApplyDiscoveryConfig)?;
}

Ok(ReconcilerAction {
requeue_after: None,
})
Expand All @@ -235,12 +255,12 @@ pub fn build_role_service(role_name: &str, druid: &DruidCluster) -> Result<Servi
.build(),
spec: Some(ServiceSpec {
ports: Some(vec![ServicePort {
name: Some("plaintext".to_string()),
name: Some(CONTAINER_HTTP_PORT.to_string()),
port: DruidRole::from_str(role_name)
.unwrap()
.get_http_port()
.into(),
target_port: Some(IntOrString::String("plaintext".to_string())),
target_port: Some(IntOrString::String(CONTAINER_HTTP_PORT.to_string())),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]),
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod discovery;
mod druid_controller;

use futures::StreamExt;
Expand Down