From 831665c2d434fd5f89a6f03b1544b4c78f7bdc16 Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Thu, 7 Oct 2021 17:09:43 +0200 Subject: [PATCH 1/8] Dummy CI reproducer for Spark Check --- cscs-checks/apps/spark/spark_check.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 3a89e5483a..791b739b5f 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -9,6 +9,7 @@ import reframe.utility.sanity as sn from reframe.core.backends import getlauncher +# Dummy comment to trigger CI @rfm.simple_test class SparkCheck(rfm.RunOnlyRegressionTest): From d11fc6f8268473a4a351c6b2f2225983652655c4 Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Thu, 7 Oct 2021 18:00:28 +0200 Subject: [PATCH 2/8] Create base test class and modernize tests --- cscs-checks/apps/spark/spark_check.py | 49 +++++++------------- hpctestlib/apps/spark/compute_pi/__init__.py | 48 +++++++++++++++++++ 2 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 hpctestlib/apps/spark/compute_pi/__init__.py diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 791b739b5f..97e716f3c2 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -3,48 +3,29 @@ # # SPDX-License-Identifier: BSD-3-Clause -import math - import reframe as rfm -import reframe.utility.sanity as sn -from reframe.core.backends import getlauncher -# Dummy comment to trigger CI +from reframe.core.backends import getlauncher +from hpctestlib.apps.spark.compute_pi import ComputePi @rfm.simple_test -class SparkCheck(rfm.RunOnlyRegressionTest): - variant = parameter(['spark', 'pyspark']) - - def __init__(self): - self.descr = f'Simple calculation of pi with {self.variant}' - self.valid_systems = ['daint:gpu', 'daint:mc', - 'dom:gpu', 'dom:mc'] - self.valid_prog_environs = ['builtin'] - self.modules = ['Spark'] - self.prerun_cmds = ['start-all.sh'] - self.postrun_cmds = ['stop-all.sh'] - self.num_tasks = 3 - self.num_tasks_per_node = 1 - pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', - self.stdout, 'pi', float) - self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01) - self.maintainers = ['TM', 'RS'] - self.tags = {'production'} +class SparkCheck(ComputePi): + valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc'] + valid_prog_environs = ['builtin'] + modules = ['Spark'] + num_tasks = 3 + num_tasks_per_node = 1 + maintainers = ['TM', 'RS'] + tags = {'production'} @run_before('run') def prepare_run(self): - if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']: - num_workers = 12 - exec_cores = 3 - else: - num_workers = 36 - exec_cores = 9 - + num_workers = self.current_partition.processor.num_cores + exec_cores = num_workers // 4 self.variables = { 'SPARK_WORKER_CORES': str(num_workers), 'SPARK_LOCAL_DIRS': '"/tmp"', } - self.executable = 'spark-submit' self.executable_opts = [ f'--conf spark.default.parallelism={num_workers}', f'--conf spark.executor.cores={exec_cores}', @@ -56,9 +37,11 @@ def prepare_run(self): '--class org.apache.spark.examples.SparkPi', '$EBROOTSPARK/examples/jars/spark-examples*.jar 10000' ] - else: - self.executable_opts.append('spark_pi.py') + elif self.variant == 'pyspark': + self.executable_opts += ['spark_pi.py'] + @run_before('run') + def set_job_launcher(self): # The job launcher has to be changed since the `spark-submit` # script is not used with srun. self.job.launcher = getlauncher('local')() diff --git a/hpctestlib/apps/spark/compute_pi/__init__.py b/hpctestlib/apps/spark/compute_pi/__init__.py new file mode 100644 index 0000000000..323bee711d --- /dev/null +++ b/hpctestlib/apps/spark/compute_pi/__init__.py @@ -0,0 +1,48 @@ +# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import math + +import reframe as rfm +import reframe.utility.sanity as sn + + +class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): + '''Base class for the Spark Test. + + Apache Spark is a unified analytics engine for large-scale data + processing. It provides high-level APIs in Java, Scala, Python + and R, and an optimized engine that supports general execution + graphs. It also supports a rich set of higher-level tools including + Spark SQL for SQL and structured data processing, MLlib for machine + learning, GraphX for graph processing, and Structured Streaming for + incremental computation and stream processing (see spark.apache.org). + + The present abstract run-only class checks the Spark perfomance. + To do this, it is necessary to define the tolerance of acceptable + deviation. The tolerance is used to check if the task executed correctly, + comparing the value of pi calculated to the one obtained from the math + library. The default assumption is that Spark is already installed on the + system under test. + ''' + + variant = parameter(['spark', 'pyspark']) + tolerance = variable(float, value=0.01) + prerun_cmds = ['start-all.sh'] + postrun_cmds = ['stop-all.sh'] + executable = 'spark-submit' + executable_opts = required + + @run_after('init') + def set_description(self): + self.mydescr = f'Simple calculation of pi with {self.variant}' + + @sanity_function + def assert_pi_readout(self): + '''Assert that the obtained pi value meets the specified tolerances.''' + + pi_value = sn.extractsingle(r'Pi is roughly\s+(?P\S+)', + self.stdout, 'pi', float) + return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance) From 32463a940dd306c6820587a23d8a141f186ef77e Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Fri, 8 Oct 2021 09:53:35 +0200 Subject: [PATCH 3/8] Move the pyspark src to the base test --- .../spark => hpctestlib/apps/spark/compute_pi}/src/spark_pi.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {cscs-checks/apps/spark => hpctestlib/apps/spark/compute_pi}/src/spark_pi.py (100%) diff --git a/cscs-checks/apps/spark/src/spark_pi.py b/hpctestlib/apps/spark/compute_pi/src/spark_pi.py similarity index 100% rename from cscs-checks/apps/spark/src/spark_pi.py rename to hpctestlib/apps/spark/compute_pi/src/spark_pi.py From 16e481881be05ceffb561016ef4c841b02488512 Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Fri, 8 Oct 2021 14:59:56 +0200 Subject: [PATCH 4/8] Document class variables --- hpctestlib/apps/spark/compute_pi/__init__.py | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/hpctestlib/apps/spark/compute_pi/__init__.py b/hpctestlib/apps/spark/compute_pi/__init__.py index 323bee711d..014a996566 100644 --- a/hpctestlib/apps/spark/compute_pi/__init__.py +++ b/hpctestlib/apps/spark/compute_pi/__init__.py @@ -28,11 +28,37 @@ class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): system under test. ''' + #: Parameter encoding the variant of the test. + #: + #: :type:`str` + #: :values: ``['spark', 'pyspark']`` variant = parameter(['spark', 'pyspark']) + + #: The absolute tolerance of the computed value of PI + #: + #: :type: :class:`float` + #: :required: No + #: :default: `0.01` tolerance = variable(float, value=0.01) + + #: See :attr:`~reframe.core.pipeline.RegressionTest.prerun_cmds`. + #: + #: :required: No prerun_cmds = ['start-all.sh'] + + #: See :attr:`~reframe.core.pipeline.RegressionTest.prerun_cmds`. + #: + #: :required: No postrun_cmds = ['stop-all.sh'] + + #: See :attr:`~reframe.core.pipeline.RegressionTest.executable`. + #: + #: :required: No executable = 'spark-submit' + + #: See :attr:`~reframe.core.pipeline.RegressionTest.executable_opts`. + #: + #: :required: Yes executable_opts = required @run_after('init') From f78c7a417e345c51c5c9ae177c730d9094b83489 Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Mon, 11 Oct 2021 18:13:28 +0200 Subject: [PATCH 5/8] Try adding more configuration options to Spark --- cscs-checks/apps/spark/spark_check.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 97e716f3c2..7d0ffd01ea 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -30,6 +30,10 @@ def prepare_run(self): f'--conf spark.default.parallelism={num_workers}', f'--conf spark.executor.cores={exec_cores}', f'--conf spark.executor.memory=15g', + f'--conf spark.rdd.compress=false', + f'--conf spark.shuffle.compress=false', + f'--conf spark.broadcast.compress=false', + f'--conf spark.hadoop.dfs.replication=1', f'--master $SPARKURL' ] if self.variant == 'spark': From 7bcc9bf8e1f202f571d50d5603150cb784883b19 Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Tue, 2 Nov 2021 16:33:31 +0100 Subject: [PATCH 6/8] Generalize the Spark tests and add variables --- cscs-checks/apps/spark/spark_check.py | 43 ++-------- hpctestlib/apps/spark/compute_pi/__init__.py | 85 ++++++++++++++++---- 2 files changed, 77 insertions(+), 51 deletions(-) diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 7d0ffd01ea..07acad7c38 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -6,46 +6,19 @@ import reframe as rfm from reframe.core.backends import getlauncher -from hpctestlib.apps.spark.compute_pi import ComputePi +from hpctestlib.apps.spark.compute_pi import compute_pi @rfm.simple_test -class SparkCheck(ComputePi): +class cscs_compute_pi_check(compute_pi): valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc'] valid_prog_environs = ['builtin'] modules = ['Spark'] - num_tasks = 3 - num_tasks_per_node = 1 + spark_prefix = '$EBROOTSPARK' + executor_memory = '15g' maintainers = ['TM', 'RS'] - tags = {'production'} + tags |= {'production'} @run_before('run') - def prepare_run(self): - num_workers = self.current_partition.processor.num_cores - exec_cores = num_workers // 4 - self.variables = { - 'SPARK_WORKER_CORES': str(num_workers), - 'SPARK_LOCAL_DIRS': '"/tmp"', - } - self.executable_opts = [ - f'--conf spark.default.parallelism={num_workers}', - f'--conf spark.executor.cores={exec_cores}', - f'--conf spark.executor.memory=15g', - f'--conf spark.rdd.compress=false', - f'--conf spark.shuffle.compress=false', - f'--conf spark.broadcast.compress=false', - f'--conf spark.hadoop.dfs.replication=1', - f'--master $SPARKURL' - ] - if self.variant == 'spark': - self.executable_opts += [ - '--class org.apache.spark.examples.SparkPi', - '$EBROOTSPARK/examples/jars/spark-examples*.jar 10000' - ] - elif self.variant == 'pyspark': - self.executable_opts += ['spark_pi.py'] - - @run_before('run') - def set_job_launcher(self): - # The job launcher has to be changed since the `spark-submit` - # script is not used with srun. - self.job.launcher = getlauncher('local')() + def set_num_workers_and_cores(self): + self.num_workers = self.current_partition.processor.num_cores + self.exec_cores = self.num_workers // 4 diff --git a/hpctestlib/apps/spark/compute_pi/__init__.py b/hpctestlib/apps/spark/compute_pi/__init__.py index 014a996566..ff53ad734e 100644 --- a/hpctestlib/apps/spark/compute_pi/__init__.py +++ b/hpctestlib/apps/spark/compute_pi/__init__.py @@ -5,12 +5,16 @@ import math + import reframe as rfm import reframe.utility.sanity as sn +from reframe.core.backends import getlauncher + -class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): - '''Base class for the Spark Test. +@rfm.simple_test +class compute_pi(rfm.RunOnlyRegressionTest, pin_prefix=True): + '''Test Apache Spark by computing PI. Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python @@ -20,12 +24,12 @@ class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing (see spark.apache.org). - The present abstract run-only class checks the Spark perfomance. + The present class check that Spark is functioning correctly. To do this, it is necessary to define the tolerance of acceptable - deviation. The tolerance is used to check if the task executed correctly, - comparing the value of pi calculated to the one obtained from the math - library. The default assumption is that Spark is already installed on the - system under test. + deviation. The tolerance is used to check that the computations is + executed correctly, by comparing the value of PI calculated to the one + obtained from the math library. The default assumption is that Spark is + already installed on the system under test. ''' #: Parameter encoding the variant of the test. @@ -41,30 +45,79 @@ class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True): #: :default: `0.01` tolerance = variable(float, value=0.01) - #: See :attr:`~reframe.core.pipeline.RegressionTest.prerun_cmds`. + #: The Spark installation prefix path #: - #: :required: No - prerun_cmds = ['start-all.sh'] + #: :type: :class:`str` + #: :required: Yes + spark_prefix = variable(str) - #: See :attr:`~reframe.core.pipeline.RegressionTest.prerun_cmds`. + #: The local directories used by Spark #: + #: :type: :class:`str` #: :required: No - postrun_cmds = ['stop-all.sh'] + #: :default: `'/tmp'` + spark_local_dirs = variable(str, value='/tmp') + + #: Amount of memory to use per executor process, following the JVM memory + #: strings convention, i.e a number with a size unit suffix + #: ("k", "m", "g" or "t") (e.g. 512m, 2g) + #: + #: :type: :class:`str` + #: :required: Yes + executor_memory = variable(str) - #: See :attr:`~reframe.core.pipeline.RegressionTest.executable`. + #: The number of Spark workers per node #: + #: :type: :class:`int` #: :required: No - executable = 'spark-submit' + #: :default: `1` + num_workers = variable(int, value=1) - #: See :attr:`~reframe.core.pipeline.RegressionTest.executable_opts`. + #: The number of cores per each Spark executor #: - #: :required: Yes + #: :type: :class:`int` + #: :required: No + #: :default: `1` + exec_cores = variable(int, value=1) + + num_tasks = 3 + num_tasks_per_node = 1 + prerun_cmds = ['start-all.sh'] + postrun_cmds = ['stop-all.sh'] + executable = 'spark-submit' executable_opts = required + tags = {'data-science', 'big-data'} @run_after('init') def set_description(self): self.mydescr = f'Simple calculation of pi with {self.variant}' + @run_before('run') + def set_job_launcher(self): + # The job launcher has to be changed since the `spark-submit` + # script is not used with srun. + self.job.launcher = getlauncher('local')() + + @run_before('run') + def prepare_run(self): + self.variables = { + 'SPARK_WORKER_CORES': str(self.num_workers), + 'SPARK_LOCAL_DIRS': self.spark_local_dirs, + } + self.executable_opts = [ + f'--conf spark.default.parallelism={self.num_workers}', + f'--conf spark.executor.cores={self.exec_cores}', + f'--conf spark.executor.memory={self.executor_memory}', + f'--master $SPARKURL' + ] + if self.variant == 'spark': + self.executable_opts += [ + f'--class org.apache.spark.examples.SparkPi', + f'{self.spark_prefix}/examples/jars/spark-examples*.jar 10000' + ] + elif self.variant == 'pyspark': + self.executable_opts += ['spark_pi.py'] + @sanity_function def assert_pi_readout(self): '''Assert that the obtained pi value meets the specified tolerances.''' From 443226ffdf4000e9efa1649abea9dbfbd087ea2c Mon Sep 17 00:00:00 2001 From: Theofilos Manitaras Date: Tue, 2 Nov 2021 16:42:12 +0100 Subject: [PATCH 7/8] Remove unused imports --- cscs-checks/apps/spark/spark_check.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 07acad7c38..668642ca0a 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -5,7 +5,6 @@ import reframe as rfm -from reframe.core.backends import getlauncher from hpctestlib.apps.spark.compute_pi import compute_pi @rfm.simple_test From a91a7780df9585867edbb3b3f8d1b696f1501731 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 8 Nov 2021 23:54:45 +0100 Subject: [PATCH 8/8] Minor style enhancements and fixes Also moved Spark library test under a new folder. --- cscs-checks/apps/spark/spark_check.py | 5 +++-- .../spark/compute_pi/__init__.py | 15 ++++++++------- .../spark/compute_pi/src/spark_pi.py | 0 3 files changed, 11 insertions(+), 9 deletions(-) rename hpctestlib/{apps => data_analytics}/spark/compute_pi/__init__.py (89%) rename hpctestlib/{apps => data_analytics}/spark/compute_pi/src/spark_pi.py (100%) diff --git a/cscs-checks/apps/spark/spark_check.py b/cscs-checks/apps/spark/spark_check.py index 668642ca0a..5b59176b25 100644 --- a/cscs-checks/apps/spark/spark_check.py +++ b/cscs-checks/apps/spark/spark_check.py @@ -5,10 +5,11 @@ import reframe as rfm -from hpctestlib.apps.spark.compute_pi import compute_pi +from hpctestlib.data_analytics.spark.compute_pi import compute_pi_check + @rfm.simple_test -class cscs_compute_pi_check(compute_pi): +class cscs_compute_pi_check(compute_pi_check): valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc'] valid_prog_environs = ['builtin'] modules = ['Spark'] diff --git a/hpctestlib/apps/spark/compute_pi/__init__.py b/hpctestlib/data_analytics/spark/compute_pi/__init__.py similarity index 89% rename from hpctestlib/apps/spark/compute_pi/__init__.py rename to hpctestlib/data_analytics/spark/compute_pi/__init__.py index ff53ad734e..be6d7c839f 100644 --- a/hpctestlib/apps/spark/compute_pi/__init__.py +++ b/hpctestlib/data_analytics/spark/compute_pi/__init__.py @@ -13,7 +13,7 @@ @rfm.simple_test -class compute_pi(rfm.RunOnlyRegressionTest, pin_prefix=True): +class compute_pi_check(rfm.RunOnlyRegressionTest, pin_prefix=True): '''Test Apache Spark by computing PI. Apache Spark is a unified analytics engine for large-scale data @@ -24,12 +24,13 @@ class compute_pi(rfm.RunOnlyRegressionTest, pin_prefix=True): learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing (see spark.apache.org). - The present class check that Spark is functioning correctly. - To do this, it is necessary to define the tolerance of acceptable - deviation. The tolerance is used to check that the computations is - executed correctly, by comparing the value of PI calculated to the one - obtained from the math library. The default assumption is that Spark is - already installed on the system under test. + This test checks that Spark is functioning correctly. To do this, it is + necessary to define the tolerance of acceptable deviation. The tolerance + is used to check that the computations are executed correctly, by + comparing the value of pi calculated to the one obtained from the math + library. The default assumption is that Spark is already installed on the + system under test. + ''' #: Parameter encoding the variant of the test. diff --git a/hpctestlib/apps/spark/compute_pi/src/spark_pi.py b/hpctestlib/data_analytics/spark/compute_pi/src/spark_pi.py similarity index 100% rename from hpctestlib/apps/spark/compute_pi/src/spark_pi.py rename to hpctestlib/data_analytics/spark/compute_pi/src/spark_pi.py