Skip to content
This repository was archived by the owner on Jul 20, 2022. It is now read-only.
Merged
7 changes: 6 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,18 @@ There are three levels of configuration:

|logDir
|string
|The log folder for spark applications
|The log folder for spark applications (must created by user)
|spark.history.fs.logDirectory=logDir, spark.eventLog.enabled=true, spark.eventLog.dir=logDir;

|secret
|string
|A secret shared between nodes and required to submit applications via spark-submit
|spark.authenticate=true, spark.authenticate.secret=secret;

|maxPortRetries
|integer
|Maximum number of retries when binding to a port before giving up. When a port is given a specific value (non 0), each subsequent retry will increment the port used in the previous attempt by 1 before retrying. This essentially allows it to try a range of ports from the start port specified to port + maxRetries.
|spark.port.maxRetries
|===

=== Node type options
Expand Down
2 changes: 2 additions & 0 deletions crd/sparkcluster.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ spec:
type: string
logDir:
type: string
maxPortRetries:
type: integer
status:
nullable: true
properties:
Expand Down
7 changes: 3 additions & 4 deletions crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct SparkClusterSpec {
pub version: SparkVersion,
pub secret: Option<String>,
pub log_dir: Option<String>,
pub max_port_retries: Option<usize>,
}

impl SparkClusterSpec {
Expand Down Expand Up @@ -173,9 +174,9 @@ impl SparkNodeType {
/// Returns the container start command for a spark node
/// Right now works only for images using hadoop2.7
/// # Arguments
/// * `version` - current specified SparkVersion
/// * `version` - Current specified cluster version
///
pub fn get_command(&self, version: &SparkVersion) -> String {
pub fn get_command(&self, version: &str) -> String {
// TODO: remove hardcoded and adapt for versioning
format!(
"spark-{}-bin-hadoop2.7/sbin/start-{}.sh",
Expand Down Expand Up @@ -263,14 +264,12 @@ impl SparkVersion {
pub fn is_upgrade(&self, to: &Self) -> Result<bool, SemVerError> {
let from_version = Version::parse(&self.to_string())?;
let to_version = Version::parse(&to.to_string())?;

Ok(to_version > from_version)
}

pub fn is_downgrade(&self, to: &Self) -> Result<bool, SemVerError> {
let from_version = Version::parse(&self.to_string())?;
let to_version = Version::parse(&to.to_string())?;

Ok(to_version < from_version)
}
}
1 change: 1 addition & 0 deletions examples/sparkcluster.example.v3.0.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ spec:
- nodeName: "mdesktop"
instances: 1
version: "3.0.1"
maxPortRetries: 0
140 changes: 58 additions & 82 deletions operator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use k8s_openapi::api::core::v1::{ConfigMapVolumeSource, EnvVar, Volume, VolumeMount};
use k8s_openapi::api::core::v1::EnvVar;
use stackable_spark_crd::{
ConfigOption, SparkClusterSpec, SparkNode, SparkNodeSelector, SparkNodeType,
};
use std::collections::HashMap;

const SPARK_URL_START: &str = "spark://";

// basic for startup
const SPARK_NO_DAEMONIZE: &str = "SPARK_NO_DAEMONIZE";
const SPARK_CONF_DIR: &str = "SPARK_CONF_DIR";
Expand All @@ -14,6 +12,7 @@ const SPARK_EVENT_LOG_ENABLED: &str = "spark.eventLog.enabled";
const SPARK_EVENT_LOG_DIR: &str = "spark.eventLog.dir";
const SPARK_AUTHENTICATE: &str = "spark.authenticate";
const SPARK_AUTHENTICATE_SECRET: &str = "spark.authenticate.secret";
const SPARK_PORT_MAX_RETRIES: &str = "spark.port.maxRetries";
// master
const SPARK_MASTER_PORT_ENV: &str = "SPARK_MASTER_PORT";
const SPARK_MASTER_PORT_CONF: &str = "spark.master.port";
Expand All @@ -29,126 +28,93 @@ const SPARK_HISTORY_STORE_PATH: &str = "spark.history.store.path";
const SPARK_HISTORY_UI_PORT: &str = "spark.history.ui.port";

/// The worker start command needs to be extended with all known master nodes and ports.
/// The required URLs are in format: 'spark://<master-node-name>:<master-port'
/// Multiple masters are separated via ','
/// The master port can be configured and needs to be checked in config / env or general options.
/// Defaults to 7077 if no port is specified.
/// The required URLs for the starting command are in format: '<master-node-name>:<master-port'
/// and prefixed with 'spark://'. Multiple masters are separated via ',' e.g.:
/// spark://<master-node-name-1>:<master-port-1>,<master-node-name-2>:<master-port-2>
///
/// # Arguments
/// * `node_type` - SparkNodeType (master/worker/history-server)
/// * `master` - Master SparkNode containing the required settings
///
pub fn adapt_container_command(node_type: &SparkNodeType, master: &SparkNode) -> Option<String> {
let mut master_url: String = String::new();
pub fn adapt_worker_command(node_type: &SparkNodeType, master: &SparkNode) -> Option<String> {
let mut adapted_command: String = String::new();
// only for workers
if node_type != &SparkNodeType::Worker {
return None;
}

let master_urls = get_master_urls(master);
for url in master_urls {
if adapted_command.is_empty() {
adapted_command.push_str("spark://");
} else {
adapted_command.push(',');
}
adapted_command.push_str(url.as_str());
}

Some(adapted_command)
}

/// The master port can be configured and needs to be checked in config / env or general options.
/// Defaults to 7077 if no port is specified.
///
/// # Arguments
/// * `master` - Master SparkNode containing the required node_name and port settings
///
pub fn get_master_urls(master: &SparkNode) -> Vec<String> {
let mut master_urls = vec![];
// get all available master selectors
for selector in &master.selectors {
// check in conf properties and env variables for port
// conf properties have higher priority than env variables
if let Some(conf) = &selector.config {
if let Some(master) =
search_master_port(&selector.node_name, SPARK_MASTER_PORT_CONF, conf)
{
master_url.push_str(master.as_str());
if let Some(port) = get_master_port(SPARK_MASTER_PORT_CONF, conf) {
master_urls.push(create_master_url(&selector.node_name, &port.to_string()));
continue;
}
} else if let Some(env) = &selector.env {
if let Some(master) =
search_master_port(&selector.node_name, SPARK_MASTER_PORT_ENV, env)
{
master_url.push_str(master.as_str());
if let Some(port) = get_master_port(SPARK_MASTER_PORT_ENV, env) {
master_urls.push(create_master_url(&selector.node_name, &port.to_string()));
continue;
}
} else if let Some(port) = selector.master_port {
master_url
.push_str(format!("{}{}:{},", SPARK_URL_START, selector.node_name, port).as_str());
master_urls.push(create_master_url(&selector.node_name, &port.to_string()));
continue;
}

// TODO: default to default value in product conf
master_url
.push_str(format!("{}{}:{},", SPARK_URL_START, selector.node_name, "7077").as_str());
master_urls.push(create_master_url(&selector.node_name, "7077"));
}

Some(master_url)
master_urls
}

/// Create master url in format: <node_name>:<port>
///
/// # Arguments
/// * `node_name` - Master node_name / host name
/// * `port` - Port on which the master is running
///
fn create_master_url(node_name: &str, port: &str) -> String {
format!("{}:{}", node_name, port)
}

/// Search for a master port in config properties or env variables
///
/// # Arguments
/// * `node_name` - Node IP / DNS address
/// * `option_name` - Name of the option to look for e.g. "SPARK_MASTER_PORT"
/// * `options` - Vec of config properties or env variables
///
fn search_master_port(
node_name: &str,
option_name: &str,
options: &[ConfigOption],
) -> Option<String> {
fn get_master_port(option_name: &str, options: &[ConfigOption]) -> Option<String> {
for option in options {
if option.name == option_name {
return Some(format!(
"{}{}:{},",
SPARK_URL_START, node_name, option.value
));
return Some(option.value.clone());
}
}
None
}

const CONFIG_VOLUME: &str = "config-volume";
const EVENT_VOLUME: &str = "event-volume";

/// Create volume mounts for the spark config files and optional an event dir for spark logs
///
/// # Arguments
/// * `log_dir` - Event/Log dir for SparkNodes. History Server reads these logs to offer metrics
///
pub fn create_volume_mounts(log_dir: &Option<String>) -> Vec<VolumeMount> {
let mut volume_mounts = vec![VolumeMount {
mount_path: "conf".to_string(),
name: CONFIG_VOLUME.to_string(),
..VolumeMount::default()
}];
// if log dir is provided, create another folder for logDir
if let Some(dir) = log_dir {
volume_mounts.push(VolumeMount {
mount_path: dir.clone(),
name: EVENT_VOLUME.to_string(),
..VolumeMount::default()
});
}

volume_mounts
}

/// Create a volume to store the spark config files and optional an event volume for spark logs
///
/// # Arguments
/// * `configmap_name` - ConfigMap name where the required spark configuration files (spark-defaults.conf and spark-env.sh) are located
///
pub fn create_volumes(configmap_name: &str) -> Vec<Volume> {
let volumes = vec![
Volume {
name: CONFIG_VOLUME.to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some(configmap_name.to_string()),
..ConfigMapVolumeSource::default()
}),
..Volume::default()
},
Volume {
name: EVENT_VOLUME.to_string(),
..Volume::default()
},
];

volumes
}

/// The SPARK_CONFIG_DIR and SPARK_NO_DAEMONIZE must be provided as env variable in the container.
/// SPARK_CONFIG_DIR must be available before the start up of the nodes (master, worker, history-server) to point to our custom configuration.
/// SPARK_NO_DAEMONIZE stops the node processes to be started in the background, which causes the agent to lose track of the processes.
Expand All @@ -173,6 +139,7 @@ pub fn create_required_startup_env() -> Vec<EnvVar> {
/// 2) from node
/// 3) from selector
/// 4) from config properties
///
/// # Arguments
/// * `spec` - SparkCluster spec for common properties
/// * `selector` - SparkClusterSelector containing desired config properties
Expand All @@ -194,6 +161,13 @@ pub fn get_config_properties(
config.insert(SPARK_AUTHENTICATE_SECRET.to_string(), secret.to_string());
}

if let Some(max_port_retries) = &spec.max_port_retries {
config.insert(
SPARK_PORT_MAX_RETRIES.to_string(),
max_port_retries.to_string(),
);
}

// history server
config.insert(
SPARK_HISTORY_FS_LOG_DIRECTORY.to_string(),
Expand Down Expand Up @@ -226,6 +200,7 @@ pub fn get_config_properties(
/// 2) from node
/// 3) from selector
/// 4) from environment variables
///
/// # Arguments
/// * `selector` - SparkClusterSelector containing desired env variables
///
Expand Down Expand Up @@ -270,6 +245,7 @@ pub fn get_env_variables(selector: &SparkNodeSelector) -> HashMap<String, String
}

/// Unroll a map into a String using a given assignment character (for writing config maps)
///
/// # Arguments
/// * `map` - Map containing option_name:option_value pairs
/// * `assignment` - Used character to assign option_value to option_name (e.g. "=", " ", ":" ...)
Expand Down
Loading