diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 3a89e5483a..97e716f3c2 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -3,47 +3,29 @@ # # SPDX-License-Identifier: BSD-3-Clause -import math - import reframe as rfm -import reframe.utility.sanity as sn -from reframe.core.backends import getlauncher +from reframe.core.backends import getlauncher +from hpctestlib.apps.spark.compute_pi import ComputePi @rfm.simple_test -class SparkCheck(rfm.RunOnlyRegressionTest): - variant = parameter(['spark', 'pyspark']) - - def __init__(self): - self.descr = f'Simple calculation of pi with {self.variant}' - self.valid_systems = ['daint:gpu', 'daint:mc', - 'dom:gpu', 'dom:mc'] - self.valid_prog_environs = ['builtin'] - self.modules = ['Spark'] - self.prerun_cmds = ['start-all.sh'] - self.postrun_cmds = ['stop-all.sh'] - self.num_tasks = 3 - self.num_tasks_per_node = 1 - pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', - self.stdout, 'pi', float) - self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01) - self.maintainers = ['TM', 'RS'] - self.tags = {'production'} +class SparkCheck(ComputePi): + valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc'] + valid_prog_environs = ['builtin'] + modules = ['Spark'] + num_tasks = 3 + num_tasks_per_node = 1 + maintainers = ['TM', 'RS'] + tags = {'production'} @run_before('run') def prepare_run(self): - if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']: - num_workers = 12 - exec_cores = 3 - else: - num_workers = 36 - exec_cores = 9 - + num_workers = self.current_partition.processor.num_cores + exec_cores = num_workers // 4 self.variables = { 'SPARK_WORKER_CORES': str(num_workers), 'SPARK_LOCAL_DIRS': '"/tmp"', } - self.executable = 'spark-submit' self.executable_opts = [ f'--conf spark.default.parallelism={num_workers}', f'--conf spark.executor.cores={exec_cores}', @@ -55,9 +37,11 @@ def prepare_run(self): '--class org.apache.spark.examples.SparkPi', '$EBROOTSPARK/examples/jars/spark-examples*.jar 10000' ] - else: - self.executable_opts.append('spark_pi.py') + elif self.variant == 'pyspark': + self.executable_opts += ['spark_pi.py'] + @run_before('run') + def set_job_launcher(self): # The job launcher has to be changed since the `spark-submit` # script is not used with srun. self.job.launcher = getlauncher('local')() diff --git a/hpctestlib/apps/spark/compute_pi/__init__.py b/hpctestlib/apps/spark/compute_pi/__init__.py new file mode 100644 index 0000000000..323bee711d --- /dev/null +++ b/hpctestlib/apps/spark/compute_pi/__init__.py @@ -0,0 +1,48 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import math + +import reframe as rfm +import reframe.utility.sanity as sn + + +class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): + '''Base class for the Spark Test. + + Apache Spark is a unified analytics engine for large-scale data + processing. It provides high-level APIs in Java, Scala, Python + and R, and an optimized engine that supports general execution + graphs. It also supports a rich set of higher-level tools including + Spark SQL for SQL and structured data processing, MLlib for machine + learning, GraphX for graph processing, and Structured Streaming for + incremental computation and stream processing (see spark.apache.org). + + The present abstract run-only class checks the Spark perfomance. + To do this, it is necessary to define the tolerance of acceptable + deviation. The tolerance is used to check if the task executed correctly, + comparing the value of pi calculated to the one obtained from the math + library. The default assumption is that Spark is already installed on the + system under test. + ''' + + variant = parameter(['spark', 'pyspark']) + tolerance = variable(float, value=0.01) + prerun_cmds = ['start-all.sh'] + postrun_cmds = ['stop-all.sh'] + executable = 'spark-submit' + executable_opts = required + + @run_after('init') + def set_description(self): + self.mydescr = f'Simple calculation of pi with {self.variant}' + + @sanity_function + def assert_pi_readout(self): + '''Assert that the obtained pi value meets the specified tolerances.''' + + pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', + self.stdout, 'pi', float) + return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance) diff --git a/cscs-checks/apps/spark/src/spark_pi.py b/hpctestlib/apps/spark/compute_pi/src/spark_pi.py similarity index 100% rename from cscs-checks/apps/spark/src/spark_pi.py rename to hpctestlib/apps/spark/compute_pi/src/spark_pi.py