Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 13 additions & 52 deletions cscs-checks/apps/spark/spark_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,22 @@
#
# SPDX-License-Identifier: BSD-3-Clause

import math

import reframe as rfm
import reframe.utility.sanity as sn
from reframe.core.backends import getlauncher

from hpctestlib.data_analytics.spark.compute_pi import compute_pi_check

@rfm.simple_test
class SparkCheck(rfm.RunOnlyRegressionTest):
variant = parameter(['spark', 'pyspark'])

def __init__(self):
self.descr = f'Simple calculation of pi with {self.variant}'
self.valid_systems = ['daint:gpu', 'daint:mc',
'dom:gpu', 'dom:mc']
self.valid_prog_environs = ['builtin']
self.modules = ['Spark']
self.prerun_cmds = ['start-all.sh']
self.postrun_cmds = ['stop-all.sh']
self.num_tasks = 3
self.num_tasks_per_node = 1
pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
self.stdout, 'pi', float)
self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01)
self.maintainers = ['TM', 'RS']
self.tags = {'production'}
@rfm.simple_test
class cscs_compute_pi_check(compute_pi_check):
valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc']
valid_prog_environs = ['builtin']
modules = ['Spark']
spark_prefix = '$EBROOTSPARK'
executor_memory = '15g'
maintainers = ['TM', 'RS']
tags |= {'production'}

@run_before('run')
def prepare_run(self):
if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']:
num_workers = 12
exec_cores = 3
else:
num_workers = 36
exec_cores = 9

self.variables = {
'SPARK_WORKER_CORES': str(num_workers),
'SPARK_LOCAL_DIRS': '"/tmp"',
}
self.executable = 'spark-submit'
self.executable_opts = [
f'--conf spark.default.parallelism={num_workers}',
f'--conf spark.executor.cores={exec_cores}',
f'--conf spark.executor.memory=15g',
f'--master $SPARKURL'
]
if self.variant == 'spark':
self.executable_opts += [
'--class org.apache.spark.examples.SparkPi',
'$EBROOTSPARK/examples/jars/spark-examples*.jar 10000'
]
else:
self.executable_opts.append('spark_pi.py')

# The job launcher has to be changed since the `spark-submit`
# script is not used with srun.
self.job.launcher = getlauncher('local')()
def set_num_workers_and_cores(self):
self.num_workers = self.current_partition.processor.num_cores
self.exec_cores = self.num_workers // 4
128 changes: 128 additions & 0 deletions hpctestlib/data_analytics/spark/compute_pi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import math


import reframe as rfm
import reframe.utility.sanity as sn

from reframe.core.backends import getlauncher


@rfm.simple_test
class compute_pi_check(rfm.RunOnlyRegressionTest, pin_prefix=True):
'''Test Apache Spark by computing PI.

Apache Spark is a unified analytics engine for large-scale data
processing. It provides high-level APIs in Java, Scala, Python
and R, and an optimized engine that supports general execution
graphs. It also supports a rich set of higher-level tools including
Spark SQL for SQL and structured data processing, MLlib for machine
learning, GraphX for graph processing, and Structured Streaming for
incremental computation and stream processing (see spark.apache.org).

This test checks that Spark is functioning correctly. To do this, it is
necessary to define the tolerance of acceptable deviation. The tolerance
is used to check that the computations are executed correctly, by
comparing the value of pi calculated to the one obtained from the math
library. The default assumption is that Spark is already installed on the
system under test.

'''

#: Parameter encoding the variant of the test.
#:
#: :type:`str`
#: :values: ``['spark', 'pyspark']``
variant = parameter(['spark', 'pyspark'])

#: The absolute tolerance of the computed value of PI
#:
#: :type: :class:`float`
#: :required: No
#: :default: `0.01`
tolerance = variable(float, value=0.01)

#: The Spark installation prefix path
#:
#: :type: :class:`str`
#: :required: Yes
spark_prefix = variable(str)

#: The local directories used by Spark
#:
#: :type: :class:`str`
#: :required: No
#: :default: `'/tmp'`
spark_local_dirs = variable(str, value='/tmp')

#: Amount of memory to use per executor process, following the JVM memory
#: strings convention, i.e a number with a size unit suffix
#: ("k", "m", "g" or "t") (e.g. 512m, 2g)
#:
#: :type: :class:`str`
#: :required: Yes
executor_memory = variable(str)

#: The number of Spark workers per node
#:
#: :type: :class:`int`
#: :required: No
#: :default: `1`
num_workers = variable(int, value=1)

#: The number of cores per each Spark executor
#:
#: :type: :class:`int`
#: :required: No
#: :default: `1`
exec_cores = variable(int, value=1)

num_tasks = 3
num_tasks_per_node = 1
prerun_cmds = ['start-all.sh']
postrun_cmds = ['stop-all.sh']
executable = 'spark-submit'
executable_opts = required
tags = {'data-science', 'big-data'}

@run_after('init')
def set_description(self):
self.mydescr = f'Simple calculation of pi with {self.variant}'

@run_before('run')
def set_job_launcher(self):
# The job launcher has to be changed since the `spark-submit`
# script is not used with srun.
self.job.launcher = getlauncher('local')()

@run_before('run')
def prepare_run(self):
self.variables = {
'SPARK_WORKER_CORES': str(self.num_workers),
'SPARK_LOCAL_DIRS': self.spark_local_dirs,
}
self.executable_opts = [
f'--conf spark.default.parallelism={self.num_workers}',
f'--conf spark.executor.cores={self.exec_cores}',
f'--conf spark.executor.memory={self.executor_memory}',
f'--master $SPARKURL'
]
if self.variant == 'spark':
self.executable_opts += [
f'--class org.apache.spark.examples.SparkPi',
f'{self.spark_prefix}/examples/jars/spark-examples*.jar 10000'
]
elif self.variant == 'pyspark':
self.executable_opts += ['spark_pi.py']

@sanity_function
def assert_pi_readout(self):
'''Assert that the obtained pi value meets the specified tolerances.'''

pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
self.stdout, 'pi', float)
return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance)