Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 16 additions & 32 deletions cscs-checks/apps/spark/spark_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,29 @@
#
# SPDX-License-Identifier: BSD-3-Clause

import math

import reframe as rfm
import reframe.utility.sanity as sn
from reframe.core.backends import getlauncher

from reframe.core.backends import getlauncher
from hpctestlib.apps.spark.compute_pi import ComputePi

@rfm.simple_test
class SparkCheck(rfm.RunOnlyRegressionTest):
variant = parameter(['spark', 'pyspark'])

def __init__(self):
self.descr = f'Simple calculation of pi with {self.variant}'
self.valid_systems = ['daint:gpu', 'daint:mc',
'dom:gpu', 'dom:mc']
self.valid_prog_environs = ['builtin']
self.modules = ['Spark']
self.prerun_cmds = ['start-all.sh']
self.postrun_cmds = ['stop-all.sh']
self.num_tasks = 3
self.num_tasks_per_node = 1
pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
self.stdout, 'pi', float)
self.sanity_patterns = sn.assert_lt(sn.abs(pi_value - math.pi), 0.01)
self.maintainers = ['TM', 'RS']
self.tags = {'production'}
class SparkCheck(ComputePi):
valid_systems = ['daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc']
valid_prog_environs = ['builtin']
modules = ['Spark']
num_tasks = 3
num_tasks_per_node = 1
maintainers = ['TM', 'RS']
tags = {'production'}

@run_before('run')
def prepare_run(self):
if self.current_partition.fullname in ['daint:gpu', 'dom:gpu']:
num_workers = 12
exec_cores = 3
else:
num_workers = 36
exec_cores = 9

num_workers = self.current_partition.processor.num_cores
exec_cores = num_workers // 4
self.variables = {
'SPARK_WORKER_CORES': str(num_workers),
'SPARK_LOCAL_DIRS': '"/tmp"',
}
self.executable = 'spark-submit'
self.executable_opts = [
f'--conf spark.default.parallelism={num_workers}',
f'--conf spark.executor.cores={exec_cores}',
Expand All @@ -55,9 +37,11 @@ def prepare_run(self):
'--class org.apache.spark.examples.SparkPi',
'$EBROOTSPARK/examples/jars/spark-examples*.jar 10000'
]
else:
self.executable_opts.append('spark_pi.py')
elif self.variant == 'pyspark':
self.executable_opts += ['spark_pi.py']

@run_before('run')
def set_job_launcher(self):
# The job launcher has to be changed since the `spark-submit`
# script is not used with srun.
self.job.launcher = getlauncher('local')()
48 changes: 48 additions & 0 deletions hpctestlib/apps/spark/compute_pi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause

import math

import reframe as rfm
import reframe.utility.sanity as sn


class ComputePi(rfm.RunOnlyRegressionTest, pin_prefix=True):
'''Base class for the Spark Test.

Apache Spark is a unified analytics engine for large-scale data
processing. It provides high-level APIs in Java, Scala, Python
and R, and an optimized engine that supports general execution
graphs. It also supports a rich set of higher-level tools including
Spark SQL for SQL and structured data processing, MLlib for machine
learning, GraphX for graph processing, and Structured Streaming for
incremental computation and stream processing (see spark.apache.org).

The present abstract run-only class checks the Spark perfomance.
To do this, it is necessary to define the tolerance of acceptable
deviation. The tolerance is used to check if the task executed correctly,
comparing the value of pi calculated to the one obtained from the math
library. The default assumption is that Spark is already installed on the
system under test.
'''

variant = parameter(['spark', 'pyspark'])
tolerance = variable(float, value=0.01)
prerun_cmds = ['start-all.sh']
postrun_cmds = ['stop-all.sh']
executable = 'spark-submit'
executable_opts = required

@run_after('init')
def set_description(self):
self.mydescr = f'Simple calculation of pi with {self.variant}'

@sanity_function
def assert_pi_readout(self):
'''Assert that the obtained pi value meets the specified tolerances.'''

pi_value = sn.extractsingle(r'Pi is roughly\s+(?P<pi>\S+)',
self.stdout, 'pi', float)
return sn.assert_lt(sn.abs(pi_value - math.pi), self.tolerance)