diff --git a/README.adoc b/README.adoc index 2564e92..1d2fbe7 100644 --- a/README.adoc +++ b/README.adoc @@ -75,13 +75,18 @@ There are three levels of configuration: |logDir |string -|The log folder for spark applications +|The log folder for spark applications (must created by user) |spark.history.fs.logDirectory=logDir, spark.eventLog.enabled=true, spark.eventLog.dir=logDir; |secret |string |A secret shared between nodes and required to submit applications via spark-submit |spark.authenticate=true, spark.authenticate.secret=secret; + +|maxPortRetries +|integer +|Maximum number of retries when binding to a port before giving up. When a port is given a specific value (non 0), each subsequent retry will increment the port used in the previous attempt by 1 before retrying. This essentially allows it to try a range of ports from the start port specified to port + maxRetries. +|spark.port.maxRetries |=== === Node type options diff --git a/crd/sparkcluster.crd.yaml b/crd/sparkcluster.crd.yaml index 3965ba1..874f3fc 100644 --- a/crd/sparkcluster.crd.yaml +++ b/crd/sparkcluster.crd.yaml @@ -135,6 +135,8 @@ spec: type: string logDir: type: string + maxPortRetries: + type: integer status: nullable: true properties: diff --git a/crd/src/lib.rs b/crd/src/lib.rs index 163f28c..402caa1 100644 --- a/crd/src/lib.rs +++ b/crd/src/lib.rs @@ -31,6 +31,7 @@ pub struct SparkClusterSpec { pub version: SparkVersion, pub secret: Option, pub log_dir: Option, + pub max_port_retries: Option, } impl SparkClusterSpec { @@ -173,9 +174,9 @@ impl SparkNodeType { /// Returns the container start command for a spark node /// Right now works only for images using hadoop2.7 /// # Arguments - /// * `version` - current specified SparkVersion + /// * `version` - Current specified cluster version /// - pub fn get_command(&self, version: &SparkVersion) -> String { + pub fn get_command(&self, version: &str) -> String { // TODO: remove hardcoded and adapt for versioning format!( "spark-{}-bin-hadoop2.7/sbin/start-{}.sh", @@ -263,14 +264,12 @@ impl SparkVersion { pub fn is_upgrade(&self, to: &Self) -> Result { let from_version = Version::parse(&self.to_string())?; let to_version = Version::parse(&to.to_string())?; - Ok(to_version > from_version) } pub fn is_downgrade(&self, to: &Self) -> Result { let from_version = Version::parse(&self.to_string())?; let to_version = Version::parse(&to.to_string())?; - Ok(to_version < from_version) } } diff --git a/examples/sparkcluster.example.v3.0.1.yaml b/examples/sparkcluster.example.v3.0.1.yaml index 5352ae8..fe5b189 100644 --- a/examples/sparkcluster.example.v3.0.1.yaml +++ b/examples/sparkcluster.example.v3.0.1.yaml @@ -20,3 +20,4 @@ spec: - nodeName: "mdesktop" instances: 1 version: "3.0.1" + maxPortRetries: 0 diff --git a/operator/src/config.rs b/operator/src/config.rs index 61fc496..6e3ebcf 100644 --- a/operator/src/config.rs +++ b/operator/src/config.rs @@ -1,11 +1,9 @@ -use k8s_openapi::api::core::v1::{ConfigMapVolumeSource, EnvVar, Volume, VolumeMount}; +use k8s_openapi::api::core::v1::EnvVar; use stackable_spark_crd::{ ConfigOption, SparkClusterSpec, SparkNode, SparkNodeSelector, SparkNodeType, }; use std::collections::HashMap; -const SPARK_URL_START: &str = "spark://"; - // basic for startup const SPARK_NO_DAEMONIZE: &str = "SPARK_NO_DAEMONIZE"; const SPARK_CONF_DIR: &str = "SPARK_CONF_DIR"; @@ -14,6 +12,7 @@ const SPARK_EVENT_LOG_ENABLED: &str = "spark.eventLog.enabled"; const SPARK_EVENT_LOG_DIR: &str = "spark.eventLog.dir"; const SPARK_AUTHENTICATE: &str = "spark.authenticate"; const SPARK_AUTHENTICATE_SECRET: &str = "spark.authenticate.secret"; +const SPARK_PORT_MAX_RETRIES: &str = "spark.port.maxRetries"; // master const SPARK_MASTER_PORT_ENV: &str = "SPARK_MASTER_PORT"; const SPARK_MASTER_PORT_CONF: &str = "spark.master.port"; @@ -29,126 +28,93 @@ const SPARK_HISTORY_STORE_PATH: &str = "spark.history.store.path"; const SPARK_HISTORY_UI_PORT: &str = "spark.history.ui.port"; /// The worker start command needs to be extended with all known master nodes and ports. -/// The required URLs are in format: 'spark://:::,: /// /// # Arguments /// * `node_type` - SparkNodeType (master/worker/history-server) /// * `master` - Master SparkNode containing the required settings /// -pub fn adapt_container_command(node_type: &SparkNodeType, master: &SparkNode) -> Option { - let mut master_url: String = String::new(); +pub fn adapt_worker_command(node_type: &SparkNodeType, master: &SparkNode) -> Option { + let mut adapted_command: String = String::new(); // only for workers if node_type != &SparkNodeType::Worker { return None; } + + let master_urls = get_master_urls(master); + for url in master_urls { + if adapted_command.is_empty() { + adapted_command.push_str("spark://"); + } else { + adapted_command.push(','); + } + adapted_command.push_str(url.as_str()); + } + + Some(adapted_command) +} + +/// The master port can be configured and needs to be checked in config / env or general options. +/// Defaults to 7077 if no port is specified. +/// +/// # Arguments +/// * `master` - Master SparkNode containing the required node_name and port settings +/// +pub fn get_master_urls(master: &SparkNode) -> Vec { + let mut master_urls = vec![]; // get all available master selectors for selector in &master.selectors { // check in conf properties and env variables for port // conf properties have higher priority than env variables if let Some(conf) = &selector.config { - if let Some(master) = - search_master_port(&selector.node_name, SPARK_MASTER_PORT_CONF, conf) - { - master_url.push_str(master.as_str()); + if let Some(port) = get_master_port(SPARK_MASTER_PORT_CONF, conf) { + master_urls.push(create_master_url(&selector.node_name, &port.to_string())); continue; } } else if let Some(env) = &selector.env { - if let Some(master) = - search_master_port(&selector.node_name, SPARK_MASTER_PORT_ENV, env) - { - master_url.push_str(master.as_str()); + if let Some(port) = get_master_port(SPARK_MASTER_PORT_ENV, env) { + master_urls.push(create_master_url(&selector.node_name, &port.to_string())); continue; } } else if let Some(port) = selector.master_port { - master_url - .push_str(format!("{}{}:{},", SPARK_URL_START, selector.node_name, port).as_str()); + master_urls.push(create_master_url(&selector.node_name, &port.to_string())); continue; } // TODO: default to default value in product conf - master_url - .push_str(format!("{}{}:{},", SPARK_URL_START, selector.node_name, "7077").as_str()); + master_urls.push(create_master_url(&selector.node_name, "7077")); } - Some(master_url) + master_urls +} + +/// Create master url in format: : +/// +/// # Arguments +/// * `node_name` - Master node_name / host name +/// * `port` - Port on which the master is running +/// +fn create_master_url(node_name: &str, port: &str) -> String { + format!("{}:{}", node_name, port) } /// Search for a master port in config properties or env variables /// /// # Arguments -/// * `node_name` - Node IP / DNS address /// * `option_name` - Name of the option to look for e.g. "SPARK_MASTER_PORT" /// * `options` - Vec of config properties or env variables /// -fn search_master_port( - node_name: &str, - option_name: &str, - options: &[ConfigOption], -) -> Option { +fn get_master_port(option_name: &str, options: &[ConfigOption]) -> Option { for option in options { if option.name == option_name { - return Some(format!( - "{}{}:{},", - SPARK_URL_START, node_name, option.value - )); + return Some(option.value.clone()); } } None } -const CONFIG_VOLUME: &str = "config-volume"; -const EVENT_VOLUME: &str = "event-volume"; - -/// Create volume mounts for the spark config files and optional an event dir for spark logs -/// -/// # Arguments -/// * `log_dir` - Event/Log dir for SparkNodes. History Server reads these logs to offer metrics -/// -pub fn create_volume_mounts(log_dir: &Option) -> Vec { - let mut volume_mounts = vec![VolumeMount { - mount_path: "conf".to_string(), - name: CONFIG_VOLUME.to_string(), - ..VolumeMount::default() - }]; - // if log dir is provided, create another folder for logDir - if let Some(dir) = log_dir { - volume_mounts.push(VolumeMount { - mount_path: dir.clone(), - name: EVENT_VOLUME.to_string(), - ..VolumeMount::default() - }); - } - - volume_mounts -} - -/// Create a volume to store the spark config files and optional an event volume for spark logs -/// -/// # Arguments -/// * `configmap_name` - ConfigMap name where the required spark configuration files (spark-defaults.conf and spark-env.sh) are located -/// -pub fn create_volumes(configmap_name: &str) -> Vec { - let volumes = vec![ - Volume { - name: CONFIG_VOLUME.to_string(), - config_map: Some(ConfigMapVolumeSource { - name: Some(configmap_name.to_string()), - ..ConfigMapVolumeSource::default() - }), - ..Volume::default() - }, - Volume { - name: EVENT_VOLUME.to_string(), - ..Volume::default() - }, - ]; - - volumes -} - /// The SPARK_CONFIG_DIR and SPARK_NO_DAEMONIZE must be provided as env variable in the container. /// SPARK_CONFIG_DIR must be available before the start up of the nodes (master, worker, history-server) to point to our custom configuration. /// SPARK_NO_DAEMONIZE stops the node processes to be started in the background, which causes the agent to lose track of the processes. @@ -173,6 +139,7 @@ pub fn create_required_startup_env() -> Vec { /// 2) from node /// 3) from selector /// 4) from config properties +/// /// # Arguments /// * `spec` - SparkCluster spec for common properties /// * `selector` - SparkClusterSelector containing desired config properties @@ -194,6 +161,13 @@ pub fn get_config_properties( config.insert(SPARK_AUTHENTICATE_SECRET.to_string(), secret.to_string()); } + if let Some(max_port_retries) = &spec.max_port_retries { + config.insert( + SPARK_PORT_MAX_RETRIES.to_string(), + max_port_retries.to_string(), + ); + } + // history server config.insert( SPARK_HISTORY_FS_LOG_DIRECTORY.to_string(), @@ -226,6 +200,7 @@ pub fn get_config_properties( /// 2) from node /// 3) from selector /// 4) from environment variables +/// /// # Arguments /// * `selector` - SparkClusterSelector containing desired env variables /// @@ -270,6 +245,7 @@ pub fn get_env_variables(selector: &SparkNodeSelector) -> HashMap; struct SparkState { @@ -92,7 +84,7 @@ impl PodInformation { /// /// * `node_type` - Optional SparkNodeType (master/worker/history-server); /// - pub fn get_all_pods(&self, node_type: Option) -> Vec { + fn get_all_pods(&self, node_type: Option) -> Vec { let mut pods: Vec = vec![]; match node_type { @@ -314,9 +306,14 @@ impl SparkState { Ok(ReconcileFunctionAction::Continue) } - /// Read all cluster specific pods. Check for valid labels such as HASH (required to match a certain selector) or TYPE (which indicates master/worker/history-server). + /// Read all cluster specific pods. Check for required valid labels such as: + /// - HASH (required to match a certain selector) + /// - TYPE (which indicates master/worker/history-server) + /// - VERSION (cluster version the pod was created for) + /// /// Remove invalid pods which are lacking (or have outdated) required labels. - /// Sort incoming valid pods into corresponding maps (hash -> Vec) for later usage for each node type (master/worker/history-server). + /// Sort incoming valid pods into corresponding maps (hash -> Vec) + /// for later usage for each node type (master/worker/history-server). pub async fn read_existing_pod_information(&mut self) -> SparkReconcileResult { trace!( "Reading existing pod information for {}", @@ -346,9 +343,9 @@ impl SparkState { if let Some(labels) = pod.metadata.labels.clone() { // we require HASH and TYPE label to identify and sort the pods into the NodeInformation if let (Some(hash), Some(node_type), Some(version)) = ( - labels.get(HASH_LABEL), - labels.get(TYPE_LABEL), - labels.get(VERSION_LABEL), + labels.get(pod_utils::HASH_LABEL), + labels.get(pod_utils::TYPE_LABEL), + labels.get(pod_utils::VERSION_LABEL), ) { let spark_node_type = match SparkNodeType::from_str(node_type) { Ok(nt) => nt, @@ -356,7 +353,7 @@ impl SparkState { error!( "Pod [{}] has an invalid type '{}' [{}], deleting it.", Meta::name(&pod), - TYPE_LABEL, + pod_utils::TYPE_LABEL, node_type ); self.context.client.delete(&pod).await?; @@ -370,7 +367,7 @@ impl SparkState { error!( "Pod [{}] has an outdated '{}' [{}], deleting it.", Meta::name(&pod), - HASH_LABEL, + pod_utils::HASH_LABEL, hash ); self.context.client.delete(&pod).await?; @@ -386,7 +383,7 @@ impl SparkState { info!( "Pod [{}] has an outdated '{}' [{}] - required is [{}], deleting it", Meta::name(&pod), - VERSION_LABEL, + pod_utils::VERSION_LABEL, version, target_version ); @@ -410,7 +407,7 @@ impl SparkState { } else { // some labels missing error!("Pod [{}] is missing one or more required '{:?}' labels, this is illegal, deleting it.", - Meta::name(&pod), vec![HASH_LABEL, TYPE_LABEL]); + Meta::name(&pod), vec![pod_utils::HASH_LABEL, pod_utils::TYPE_LABEL, pod_utils::VERSION_LABEL]); self.context.client.delete(&pod).await?; } } else { @@ -520,41 +517,6 @@ impl SparkState { true } - /// Process the cluster status version for upgrades/downgrades. - pub async fn process_version(&mut self) -> SparkReconcileResult { - // If we reach here it means all pods must be running on target_version. - // We can now set current_version to target_version (if target_version was set) and - // target_version to None - if let Some(status) = &self.status.clone() { - if let Some(target_version) = &status.target_version { - info!( - "Finished upgrade/downgrade to [{}]. Cluster ready!", - &target_version - ); - - self.status = self.set_target_version(None).await?.status; - self.status = self - .set_current_version(Some(&target_version)) - .await? - .status; - self.status = self - .set_upgrading_condition( - &status.conditions, - &format!( - "No change required [{:?}] is still the current_version", - target_version - ), - "", - ConditionStatus::False, - ) - .await? - .status; - } - } - - Ok(ReconcileFunctionAction::Continue) - } - /// Reconcile the cluster according to provided spec. Start with master nodes and continue to worker and history-server nodes. /// Create missing pods or delete excess pods to match the spec. /// @@ -638,6 +600,75 @@ impl SparkState { Ok(ReconcileFunctionAction::Continue) } + /// In spark stand alone, workers are started via script and require the master urls to connect to. + /// If masters change (added/deleted), workers need to be updated accordingly to be able + /// to fall back on other masters, if the primary master fails. + /// Therefore we always need to keep the workers updated in terms of available master urls. + /// Available master urls are hashed and stored as label in the worker pod. If the label differs + /// from the spec, we need to replace (delete and and) the workers in a rolling fashion. + pub async fn check_worker_master_urls(&self) -> SparkReconcileResult { + if let Some(pod_info) = &self.pod_information { + let worker_pods = pod_info.get_all_pods(Some(SparkNodeType::Worker)); + for pod in &worker_pods { + if let Some(labels) = &pod.metadata.labels { + if let Some(label_hashed_master_urls) = + labels.get(pod_utils::MASTER_URLS_HASH_LABEL) + { + let current_hashed_master_urls = + pod_utils::get_hashed_master_urls(&self.spec.master); + if label_hashed_master_urls != ¤t_hashed_master_urls { + info!( + "Pod [{}] has an outdated '{}' [{}] - required is [{}], deleting it", + Meta::name(pod), + pod_utils::MASTER_URLS_HASH_LABEL, + label_hashed_master_urls, + current_hashed_master_urls, + ); + self.context.client.delete(pod).await?; + return Ok(ReconcileFunctionAction::Requeue(Duration::from_secs(10))); + } + } + } + } + } + Ok(ReconcileFunctionAction::Continue) + } + + /// Process the cluster status version for upgrades/downgrades. + pub async fn process_version(&mut self) -> SparkReconcileResult { + // If we reach here it means all pods must be running on target_version. + // We can now set current_version to target_version (if target_version was set) and + // target_version to None + if let Some(status) = &self.status.clone() { + if let Some(target_version) = &status.target_version { + info!( + "Finished upgrade/downgrade to [{}]. Cluster ready!", + &target_version + ); + + self.status = self.set_target_version(None).await?.status; + self.status = self + .set_current_version(Some(&target_version)) + .await? + .status; + self.status = self + .set_upgrading_condition( + &status.conditions, + &format!( + "No change required [{:?}] is still the current_version", + target_version + ), + "", + ConditionStatus::False, + ) + .await? + .status; + } + } + + Ok(ReconcileFunctionAction::Continue) + } + /// Build a pod and create it /// /// # Arguments @@ -651,79 +682,18 @@ impl SparkState { node_type: &SparkNodeType, hash: &str, ) -> Result { - let pod = self.build_pod(selector, hash, node_type)?; + let pod = pod_utils::build_pod( + &self.context, + node_type, + selector.node_name.as_str(), + &self.spec.master, + hash, + &self.spec.version.to_string(), + &self.spec.log_dir, + )?; Ok(self.context.client.create(&pod).await?) } - /// Build a pod using its selector and node_type - /// - /// # Arguments - /// * `selector` - SparkNodeSelector which contains specific pod information - /// * `node_type` - SparkNodeType (master/worker/history-server) - /// * `hash` - NodeSelector hash - /// - fn build_pod( - &self, - selector: &SparkNodeSelector, - hash: &str, - node_type: &SparkNodeType, - ) -> Result { - let (containers, volumes) = self.build_containers(node_type, hash); - - Ok(Pod { - metadata: metadata::build_metadata( - self.create_pod_name(node_type, hash), - Some(self.build_labels(node_type, hash)), - &self.context.resource, - true, - )?, - spec: Some(PodSpec { - node_name: Some(selector.node_name.clone()), - tolerations: Some(create_tolerations()), - containers, - volumes: Some(volumes), - ..PodSpec::default() - }), - ..Pod::default() - }) - } - - /// Build required pod containers - /// - /// # Arguments - /// * `node_type` - SparkNodeType (master/worker/history-server) - /// * `hash` - NodeSelector hash - /// - fn build_containers( - &self, - node_type: &SparkNodeType, - hash: &str, - ) -> (Vec, Vec) { - let image_name = format!("spark:{}", &self.spec.version); - - // adapt worker command with master url(s) - let mut command = vec![node_type.get_command(&self.spec.version)]; - - if let Some(adapted_command) = config::adapt_container_command(node_type, &self.spec.master) - { - command.push(adapted_command); - } - - let containers = vec![Container { - image: Some(image_name), - name: "spark".to_string(), - command: Some(command), - volume_mounts: Some(config::create_volume_mounts(&self.spec.log_dir)), - env: Some(config::create_required_startup_env()), - ..Container::default() - }]; - - let cm_name = self.create_config_map_name(node_type, hash); - let volumes = config::create_volumes(&cm_name); - - (containers, volumes) - } - /// Create required config maps for the cluster /// /// # Arguments @@ -746,54 +716,12 @@ impl SparkState { data.insert("spark-defaults.conf".to_string(), conf); data.insert("spark-env.sh".to_string(), env); - let cm_name = self.create_config_map_name(node_type, hash); + let cm_name = pod_utils::create_config_map_name(&self.context.name(), node_type, hash); let cm = create_config_map(&self.context.resource, &cm_name, data)?; self.context.client.apply_patch(&cm, &cm).await?; Ok(()) } - - /// Provide required labels for pods - /// - /// # Arguments - /// * `node_type` - SparkNodeType (master/worker/history-server) - /// * `hash` - NodeSelector hash - /// - fn build_labels(&self, node_type: &SparkNodeType, hash: &str) -> BTreeMap { - let mut labels = BTreeMap::new(); - labels.insert(TYPE_LABEL.to_string(), node_type.to_string()); - labels.insert(HASH_LABEL.to_string(), hash.to_string()); - labels.insert(VERSION_LABEL.to_string(), self.spec.version.to_string()); - - labels - } - - /// All pod names follow a simple pattern: --- - /// - /// # Arguments - /// * `node_type` - SparkNodeType (master/worker/history-server) - /// * `hash` - NodeSelector hash - /// - fn create_pod_name(&self, node_type: &SparkNodeType, hash: &str) -> String { - format!( - "{}-{}-{}-{}", - self.context.name(), - node_type.as_str(), - hash, - Uuid::new_v4().as_fields().0.to_string(), - ) - } - - /// All config map names follow a simple pattern: ---cm - /// That means multiple pods of one selector share one and the same config map - /// - /// # Arguments - /// * `node_type` - SparkNodeType (master/worker/history-server) - /// * `hash` - NodeSelector hash - /// - fn create_config_map_name(&self, node_type: &SparkNodeType, hash: &str) -> String { - format!("{}-{}-{}-cm", self.context.name(), node_type.as_str(), hash) - } } impl ReconciliationState for SparkState { @@ -816,6 +744,8 @@ impl ReconciliationState for SparkState { .await? .then(self.reconcile_cluster(&SparkNodeType::HistoryServer)) .await? + .then(self.check_worker_master_urls()) + .await? .then(self.process_version()) .await }) diff --git a/operator/src/pod_utils.rs b/operator/src/pod_utils.rs new file mode 100644 index 0000000..0ff8821 --- /dev/null +++ b/operator/src/pod_utils.rs @@ -0,0 +1,234 @@ +use crate::config; +use crate::error::Error; +use k8s_openapi::api::core::v1::{ + ConfigMapVolumeSource, Container, Pod, PodSpec, Volume, VolumeMount, +}; +use stackable_operator::krustlet::create_tolerations; +use stackable_operator::metadata; +use stackable_operator::reconcile::ReconciliationContext; +use stackable_spark_crd::{SparkCluster, SparkNode, SparkNodeType}; +use std::collections::hash_map::DefaultHasher; +use std::collections::BTreeMap; +use std::hash::{Hash, Hasher}; +use uuid::Uuid; + +/// Pod label which holds the selector hash in the pods to identify which selector they belong to +pub const HASH_LABEL: &str = "spark.stackable.tech/hash"; +/// Pod label which holds node role / type (master, worker, history-server) in the pods +pub const TYPE_LABEL: &str = "spark.stackable.tech/type"; +/// Pod label which indicates the cluster version it was created for +pub const VERSION_LABEL: &str = "spark.stackable.tech/currentVersion"; +/// Pod label which indicates the known master urls for a worker pod +pub const MASTER_URLS_HASH_LABEL: &str = "spark.stackable.tech/masterUrls"; + +/// Name of the config volume to store configmap data +const CONFIG_VOLUME: &str = "config-volume"; +/// Name of the logging / event volume for SparkNode logs required by the history server +const EVENT_VOLUME: &str = "event-volume"; + +/// Build a pod using its selector and node_type +/// +/// # Arguments +/// * `context` - Reconciliation context for cluster name and resource (metadata) +/// * `node_type` - SparkNodeType (master/worker/history-server) +/// * `node_name` - Specific node_name (host) of the pod +/// * `master_node` - SparkNode master to retrieve master urls for worker start commands +/// * `hash` - NodeSelector hash +/// * `version` - Current cluster version +/// * `log_dir` - Logging dir for all nodes to enable history server logs +/// +pub fn build_pod( + context: &ReconciliationContext, + node_type: &SparkNodeType, + node_name: &str, + master_node: &SparkNode, + hash: &str, + version: &str, + log_dir: &Option, +) -> Result { + let cluster_name = &context.name(); + let (containers, volumes) = + build_containers(master_node, cluster_name, node_type, hash, version, log_dir); + + Ok(Pod { + metadata: metadata::build_metadata( + create_pod_name(cluster_name, node_type, hash), + Some(build_labels(node_type, hash, version, master_node)), + &context.resource, + true, + )?, + spec: Some(PodSpec { + node_name: Some(node_name.to_string()), + tolerations: Some(create_tolerations()), + containers, + volumes: Some(volumes), + ..PodSpec::default() + }), + ..Pod::default() + }) +} + +/// Build required pod containers +/// +/// # Arguments +/// * `master_node` - SparkNode master to retrieve master urls for worker start commands +/// * `cluster_name` - Current cluster name +/// * `node_type` - SparkNodeType (master/worker/history-server) +/// * `hash` - NodeSelector hash +/// * `version` - Current cluster version +/// * `log_dir` - Logging dir for all nodes to enable history server logs +/// +fn build_containers( + master_node: &SparkNode, + cluster_name: &str, + node_type: &SparkNodeType, + hash: &str, + version: &str, + log_dir: &Option, +) -> (Vec, Vec) { + let image_name = format!("spark:{}", version); + + let mut command = vec![node_type.get_command(version)]; + // adapt worker command with master url(s) + if let Some(master_urls) = config::adapt_worker_command(node_type, master_node) { + command.push(master_urls); + } + + let containers = vec![Container { + image: Some(image_name), + name: "spark".to_string(), + command: Some(command), + volume_mounts: Some(create_volume_mounts(log_dir)), + env: Some(config::create_required_startup_env()), + ..Container::default() + }]; + + let cm_name = create_config_map_name(cluster_name, node_type, hash); + let volumes = create_volumes(&cm_name); + + (containers, volumes) +} + +/// Create a volume to store the spark config files and optional an event volume for spark logs +/// +/// # Arguments +/// * `configmap_name` - ConfigMap name where the required spark configuration files (spark-defaults.conf and spark-env.sh) are located +/// +fn create_volumes(configmap_name: &str) -> Vec { + let volumes = vec![ + Volume { + name: CONFIG_VOLUME.to_string(), + config_map: Some(ConfigMapVolumeSource { + name: Some(configmap_name.to_string()), + ..ConfigMapVolumeSource::default() + }), + ..Volume::default() + }, + Volume { + name: EVENT_VOLUME.to_string(), + ..Volume::default() + }, + ]; + + volumes +} + +/// Create volume mounts for the spark config files and optional an event dir for spark logs +/// +/// # Arguments +/// * `log_dir` - Event/Log dir for SparkNodes. History Server reads these logs to offer metrics +/// +pub fn create_volume_mounts(log_dir: &Option) -> Vec { + let mut volume_mounts = vec![VolumeMount { + mount_path: "conf".to_string(), + name: CONFIG_VOLUME.to_string(), + ..VolumeMount::default() + }]; + // if log dir is provided, create another folder for logDir + if let Some(dir) = log_dir { + volume_mounts.push(VolumeMount { + mount_path: dir.clone(), + name: EVENT_VOLUME.to_string(), + ..VolumeMount::default() + }); + } + + volume_mounts +} + +/// Provide required labels for pods. We need to keep track of which workers are +/// connected to which masters. This is accomplished by hashing known master urls +/// and comparing to the pods. If the hash from pod and selector differ, that means +/// we had changes (added / removed) masters and therefore restart the workers. +/// +/// # Arguments +/// * `node_type` - SparkNodeType (master/worker/history-server) +/// * `hash` - NodeSelector hash +/// * `version` - Current cluster version +/// * `master_node` - SparkNode master to retrieve master urls +/// +fn build_labels( + node_type: &SparkNodeType, + hash: &str, + version: &str, + master_node: &SparkNode, +) -> BTreeMap { + let mut labels = BTreeMap::new(); + labels.insert(TYPE_LABEL.to_string(), node_type.to_string()); + labels.insert(HASH_LABEL.to_string(), hash.to_string()); + labels.insert(VERSION_LABEL.to_string(), version.to_string()); + + if node_type == &SparkNodeType::Worker { + labels.insert( + MASTER_URLS_HASH_LABEL.to_string(), + get_hashed_master_urls(master_node), + ); + } + + labels +} + +/// Get all master urls and hash them. This is required to keep track of which workers +/// are connected to which masters. In case masters are added / deleted, this hash changes +/// and we need to restart the worker pods to keep them up to date with all known masters. +/// +/// # Arguments +/// * `master_node` - SparkNode master to retrieve master urls for pod label hash +/// +pub fn get_hashed_master_urls(master_node: &SparkNode) -> String { + let master_urls = config::get_master_urls(master_node); + let mut hasher = DefaultHasher::new(); + for url in master_urls { + url.hash(&mut hasher); + } + hasher.finish().to_string() +} + +/// All pod names follow a simple pattern: --- +/// +/// # Arguments +/// * `cluster_name` - Current cluster name +/// * `node_type` - SparkNodeType (master/worker/history-server) +/// * `hash` - NodeSelector hash +/// +fn create_pod_name(cluster_name: &str, node_type: &SparkNodeType, hash: &str) -> String { + format!( + "{}-{}-{}-{}", + cluster_name, + node_type.as_str(), + hash, + Uuid::new_v4().as_fields().0.to_string(), + ) +} + +/// All config map names follow a simple pattern: ---cm +/// That means multiple pods of one selector share one and the same config map +/// +/// # Arguments +/// * `cluster_name` - Current cluster name +/// * `node_type` - SparkNodeType (master/worker/history-server) +/// * `hash` - NodeSelector hash +/// +pub fn create_config_map_name(cluster_name: &str, node_type: &SparkNodeType, hash: &str) -> String { + format!("{}-{}-{}-cm", cluster_name, node_type.as_str(), hash) +}