diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 3a89e5483a..5b59176b25 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -3,61 +3,22 @@ # # SPDX-License-Identifier: BSD-3-Clause -import math - import reframe as rfm -import reframe.utility.sanity as sn -from reframe.core.backends import getlauncher +from hpctestlib.data_analytics.spark.compute_pi import compute_pi_check -@rfm.simple_test -class SparkCheck(rfm.RunOnlyRegressionTest): - variant = parameter(['spark', 'pyspark']) - def __init__(self): - self.descr = f'Simple calculation of pi with {self.variant}' - self.valid_systems = ['daint:gpu', 'daint:mc', - 'dom:gpu', 'dom:mc'] - self.valid_prog_environs = ['builtin'] - self.modules = ['Spark'] - self.prerun_cmds = ['start-all.sh'] - self.postrun_cmds = ['stop-all.sh'] - self.num_tasks = 3 - self.num_tasks_per_node = 1 - pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', - self.stdout, 'pi', float) - self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01) - self.maintainers = ['TM', 'RS'] - self.tags = {'production'} +@rfm.simple_test +class cscs_compute_pi_check(compute_pi_check): + valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc'] + valid_prog_environs = ['builtin'] + modules = ['Spark'] + spark_prefix = '$EBROOTSPARK' + executor_memory = '15g' + maintainers = ['TM', 'RS'] + tags |= {'production'} @run_before('run') - def prepare_run(self): - if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']: - num_workers = 12 - exec_cores = 3 - else: - num_workers = 36 - exec_cores = 9 - - self.variables = { - 'SPARK_WORKER_CORES': str(num_workers), - 'SPARK_LOCAL_DIRS': '"/tmp"', - } - self.executable = 'spark-submit' - self.executable_opts = [ - f'--conf spark.default.parallelism={num_workers}', - f'--conf spark.executor.cores={exec_cores}', - f'--conf spark.executor.memory=15g', - f'--master $SPARKURL' - ] - if self.variant == 'spark': - self.executable_opts += [ - '--class org.apache.spark.examples.SparkPi', - '$EBROOTSPARK/examples/jars/spark-examples*.jar 10000' - ] - else: - self.executable_opts.append('spark_pi.py') - - # The job launcher has to be changed since the `spark-submit` - # script is not used with srun. - self.job.launcher = getlauncher('local')() + def set_num_workers_and_cores(self): + self.num_workers = self.current_partition.processor.num_cores + self.exec_cores = self.num_workers // 4 diff --git a/hpctestlib/data_analytics/spark/compute_pi/__init__.py b/hpctestlib/data_analytics/spark/compute_pi/__init__.py new file mode 100644 index 0000000000..be6d7c839f --- /dev/null +++ b/hpctestlib/data_analytics/spark/compute_pi/__init__.py @@ -0,0 +1,128 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import math + + +import reframe as rfm +import reframe.utility.sanity as sn + +from reframe.core.backends import getlauncher + + +@rfm.simple_test +class compute_pi_check(rfm.RunOnlyRegressionTest, pin_prefix=True): + '''Test Apache Spark by computing PI. + + Apache Spark is a unified analytics engine for large-scale data + processing. It provides high-level APIs in Java, Scala, Python + and R, and an optimized engine that supports general execution + graphs. It also supports a rich set of higher-level tools including + Spark SQL for SQL and structured data processing, MLlib for machine + learning, GraphX for graph processing, and Structured Streaming for + incremental computation and stream processing (see spark.apache.org). + + This test checks that Spark is functioning correctly. To do this, it is + necessary to define the tolerance of acceptable deviation. The tolerance + is used to check that the computations are executed correctly, by + comparing the value of pi calculated to the one obtained from the math + library. The default assumption is that Spark is already installed on the + system under test. + + ''' + + #: Parameter encoding the variant of the test. + #: + #: :type:`str` + #: :values: ``['spark', 'pyspark']`` + variant = parameter(['spark', 'pyspark']) + + #: The absolute tolerance of the computed value of PI + #: + #: :type: :class:`float` + #: :required: No + #: :default: `0.01` + tolerance = variable(float, value=0.01) + + #: The Spark installation prefix path + #: + #: :type: :class:`str` + #: :required: Yes + spark_prefix = variable(str) + + #: The local directories used by Spark + #: + #: :type: :class:`str` + #: :required: No + #: :default: `'/tmp'` + spark_local_dirs = variable(str, value='/tmp') + + #: Amount of memory to use per executor process, following the JVM memory + #: strings convention, i.e a number with a size unit suffix + #: ("k", "m", "g" or "t") (e.g. 512m, 2g) + #: + #: :type: :class:`str` + #: :required: Yes + executor_memory = variable(str) + + #: The number of Spark workers per node + #: + #: :type: :class:`int` + #: :required: No + #: :default: `1` + num_workers = variable(int, value=1) + + #: The number of cores per each Spark executor + #: + #: :type: :class:`int` + #: :required: No + #: :default: `1` + exec_cores = variable(int, value=1) + + num_tasks = 3 + num_tasks_per_node = 1 + prerun_cmds = ['start-all.sh'] + postrun_cmds = ['stop-all.sh'] + executable = 'spark-submit' + executable_opts = required + tags = {'data-science', 'big-data'} + + @run_after('init') + def set_description(self): + self.mydescr = f'Simple calculation of pi with {self.variant}' + + @run_before('run') + def set_job_launcher(self): + # The job launcher has to be changed since the `spark-submit` + # script is not used with srun. + self.job.launcher = getlauncher('local')() + + @run_before('run') + def prepare_run(self): + self.variables = { + 'SPARK_WORKER_CORES': str(self.num_workers), + 'SPARK_LOCAL_DIRS': self.spark_local_dirs, + } + self.executable_opts = [ + f'--conf spark.default.parallelism={self.num_workers}', + f'--conf spark.executor.cores={self.exec_cores}', + f'--conf spark.executor.memory={self.executor_memory}', + f'--master $SPARKURL' + ] + if self.variant == 'spark': + self.executable_opts += [ + f'--class org.apache.spark.examples.SparkPi', + f'{self.spark_prefix}/examples/jars/spark-examples*.jar 10000' + ] + elif self.variant == 'pyspark': + self.executable_opts += ['spark_pi.py'] + + @sanity_function + def assert_pi_readout(self): + '''Assert that the obtained pi value meets the specified tolerances.''' + + pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', + self.stdout, 'pi', float) + return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance) diff --git a/cscs-checks/apps/spark/src/spark_pi.py b/hpctestlib/data_analytics/spark/compute_pi/src/spark_pi.py similarity index 100% rename from cscs-checks/apps/spark/src/spark_pi.py rename to hpctestlib/data_analytics/spark/compute_pi/src/spark_pi.py