# SparkMeasure

In [2]:
!pip3 install sparkmeasure

You should consider upgrading via the '/usr/local/bin/python3.6 -m pip install --upgrade pip' command.[0m


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-1")\
    .config("spark.yarn.access.hadoopFileSystems","s3a://demo-aws-1/")\
    .config("spark.hadoop.yarn.resourcemanager.principal",os.getenv("HADOOP_USER_NAME"))\
    .config("spark.executor.instances", 4)\
    .config("spark.executor.cores", 4)\
    .getOrCreate()

In [4]:
from pyspark.sql import SparkSession

# Create Spark Session
# This example uses a local cluster, you can modify master to use  YARN or K8S if available 
# This example downloads sparkMeasure 0.14 for scala 2_11 from maven central

spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("Test sparkmeasure instrumentation of Python/PySpark code") \
 .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.11:0.14")  \
 .getOrCreate()

In [5]:
spark

In [6]:
# test that Spark is working OK
spark.sql("select 1 as id, 'Hello world!' as Greeting").show()

+---+------------+
| id|    Greeting|
+---+------------+
|  1|Hello world!|
+---+------------+



In [7]:
spark.stop()

In [8]:
spark = SparkSession \
 .builder \
 .master("local[*]") \
 .appName("Test sparkmeasure instrumentation of Python/PySpark code") \
 .config("spark.jars","spark-measure_2.11-0.18-SNAPSHOT.jar")  \
 .getOrCreate()

In [10]:
spark.sql("select 1 as id, 'Hello world!' as Greeting").show()

+---+------------+
| id|    Greeting|
+---+------------+
|  1|Hello world!|
+---+------------+



In [11]:
# Load the Python API in sparkmeasure package
# an attache the sparkMeasure Listener for stagemetrics to the active Spark session

from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)

In [12]:
# Define cell and line magic to wrap the instrumentation
from IPython.core.magic import (register_line_magic, register_cell_magic, register_line_cell_magic)

@register_line_cell_magic
def sparkmeasure(line, cell=None):
    "run and measure spark workload. Use: %sparkmeasure or %%sparkmeasure"
    val = cell if cell is not None else line
    stagemetrics.begin()
    eval(val)
    stagemetrics.end()
    stagemetrics.print_report()

In [13]:
%%sparkmeasure
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 4
sum(numTasks) => 7
elapsedTime => 3141 (3 s)
sum(stageDuration) => 3088 (3 s)
sum(executorRunTime) => 5998 (6 s)
sum(executorCpuTime) => 5960 (6 s)
sum(executorDeserializeTime) => 18 (18 ms)
sum(executorDeserializeCpuTime) => 17 (17 ms)
sum(resultSerializationTime) => 6 (6 ms)
sum(jvmGCTime) => 0 (0 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 10 (10 ms)
max(resultSize) => 5340 (5.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2100
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 150 (150 Bytes)
sum(shuffleTotalBlocksFetched) => 2
sum(shuffleLocalBlocksFetched) => 2
sum(shuffleRemoteBlocksFetched) => 0


In [14]:
# Print additional metrics from accumulables
stagemetrics.print_accumulables()


Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]

executorCpuTime => 5961 (6 s)
executorDeserializeCpuTime => 18 (18 ms)
executorDeserializeTime => 18 (18 ms)
executorRunTime => 5998 (6 s)
input.recordsRead => 2100
resultSerializationTime => 6 (6 ms)
resultSize => 12438 (12.0 KB)
shuffle.read.fetchWaitTime => 0 (0 ms)
shuffle.read.localBlocksFetched => 2
shuffle.read.localBytesRead => 150 (150 Bytes)
shuffle.read.recordsRead => 2
shuffle.read.remoteBlocksFetched => 0
shuffle.read.remoteBytesRead => 0 (0 Bytes)
shuffle.read.remoteBytesReadToDisk => 0 (0 Bytes)
shuffle.write.bytesWritten => 150 (150 Bytes)
shuffle.write.recordsWritten => 2
shuffle.write.writeTime => 10 (10 ms)

SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]

   57, data size total => 29 (29 Bytes)
   58, duration total => 4 (4 ms)
   59, number of o

In [15]:
# You can also explicitly Wrap your Spark workload into stagemetrics instrumentation 
# as in this example
stagemetrics.begin()

spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()

stagemetrics.end()
# Print a summary report
stagemetrics.print_report()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 4
sum(numTasks) => 7
elapsedTime => 3588 (4 s)
sum(stageDuration) => 3571 (4 s)
sum(executorRunTime) => 7048 (7 s)
sum(executorCpuTime) => 7039 (7 s)
sum(executorDeserializeTime) => 4 (4 ms)
sum(executorDeserializeCpuTime) => 3 (3 ms)
sum(resultSerializationTime) => 1 (1 ms)
sum(jvmGCTime) => 0 (0 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
max(resultSize) => 5297 (5.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2100
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 150 (150 Bytes)
sum(shuffleTotalBlocksFetched) => 2
sum(shuffleLocalBlocksFetched) => 2
sum(shuffleRemoteBlocksFetched) => 0
sum(sh

In [16]:
# Another way to encapsulate code and instrumentation in a compact form

stagemetrics.runandmeasure(locals(), """
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()
""")

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Context default degree of parallelism = 2
Aggregated Spark stage metrics:
numStages => 4
sum(numTasks) => 7
elapsedTime => 3562 (4 s)
sum(stageDuration) => 3546 (4 s)
sum(executorRunTime) => 6989 (7 s)
sum(executorCpuTime) => 6982 (7 s)
sum(executorDeserializeTime) => 7 (7 ms)
sum(executorDeserializeCpuTime) => 4 (4 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 0 (0 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
max(resultSize) => 5254 (5.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2100
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 150 (150 Bytes)
sum(shuffleTotalBlocksFetched) => 2
sum(shuffleLocalBlocksFetched) => 2
sum(shuffleRemoteBlocksFetched) => 0
sum(sh

In [17]:
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)

taskmetrics.begin()
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(100)").show()
taskmetrics.end()
taskmetrics.print_report()

+---------+
| count(1)|
+---------+
|100000000|
+---------+


Scheduling mode = FIFO
Spark Contex default degree of parallelism = 2
Aggregated Spark task metrics:
numtasks => 7
elapsedTime => 4594 (5 s)
sum(duration) => 9116 (9 s)
sum(schedulerDelay) => 3
sum(executorRunTime) => 9110 (9 s)
sum(executorCpuTime) => 9080 (9 s)
sum(executorDeserializeTime) => 3 (3 ms)
sum(executorDeserializeCpuTime) => 1 (1 ms)
sum(resultSerializationTime) => 0 (0 ms)
sum(jvmGCTime) => 18 (18 ms)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
sum(gettingResultTime) => 0 (0 ms)
max(resultSize) => 2627 (2.0 KB)
sum(numUpdatedBlockStatuses) => 0
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 2100
sum(bytesRead) => 0 (0 Bytes)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 150 (150 Bytes)
sum(shuffleTotalBlocksFetched) => 2
sum(shuffleLocalBlocksFetched) => 2
sum(shuf

In [19]:
spark.sql("select * from PerfTaskMetrics").show()

+-----+--------+-------+-----+-------------+-------------+--------+--------------+----------+---------+------------+-----------+-----------------+----------+---------------+---------------+-----------------------+--------------------------+-----------------------+---------+----------+-----------------------+----------------+------------------+-------------------+-----------+---------+--------------+------------+--------------------+---------------------+-------------------------+-------------------------+--------------------------+----------------+-------------------+---------------------+
|jobId|jobGroup|stageId|index|   launchTime|   finishTime|duration|schedulerDelay|executorId|     host|taskLocality|speculative|gettingResultTime|successful|executorRunTime|executorCpuTime|executorDeserializeTime|executorDeserializeCpuTime|resultSerializationTime|jvmGCTime|resultSize|numUpdatedBlockStatuses|diskBytesSpilled|memoryBytesSpilled|peakExecutionMemory|recordsRead|bytesRead|recordsWritten|b