# Benchmark exploration

This notebook contains exploratatory queries into data for the following tests runs run between 2020-08-24 and 2020-08-25.

```bash
BUCKET=gs://prio-processor-benchmark  N_DATA=32 N_ROWS=2500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=32 N_ROWS=5000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=32 N_ROWS=7500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=32 N_ROWS=10000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc

BUCKET=gs://prio-processor-benchmark  N_DATA=64 N_ROWS=2500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=64 N_ROWS=5000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=64 N_ROWS=7500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=64 N_ROWS=10000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc

BUCKET=gs://prio-processor-benchmark  N_DATA=128 N_ROWS=2500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=128 N_ROWS=5000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=128 N_ROWS=7500 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
BUCKET=gs://prio-processor-benchmark  N_DATA=128 N_ROWS=10000 MACHINE_TYPE=n1-standard-16 NUM_WORKERS=0 ./scripts/test-cli-integration-dataproc
```

In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

## List of trials

In [2]:
! ls ../data

[34mtest-128-10000-n1-standard-16-0-20200824003830[m[m
[34mtest-128-2500-n1-standard-16-0-20200824004122[m[m
[34mtest-128-5000-n1-standard-16-0-20200824004027[m[m
[34mtest-128-7500-n1-standard-16-0-20200824003959[m[m
[34mtest-32-10000-n1-standard-16-0-20200824000750396484[m[m
[34mtest-32-2500-n1-standard-16-0-20200823234848898312[m[m
[34mtest-32-5000-n1-standard-16-0-20200824000733009240[m[m
[34mtest-32-7500-n1-standard-16-0-20200824000741901558[m[m
[34mtest-512-10000-n1-standard-16-0-20200825231625[m[m
[34mtest-512-100000-n1-standard-4-0-20200825223401[m[m
[34mtest-512-1000000-n1-standard-4-0-20200825222931[m[m
[34mtest-512-1000000-n1-standard-4-0-20200825223019[m[m
[34mtest-64-10000-n1-standard-16-0-20200824001934367765[m[m
[34mtest-64-2500-n1-standard-16-0-20200824001916185249[m[m
[34mtest-64-5000-n1-standard-16-0-20200824001922568161[m[m
[34mtest-64-7500-n1-standard-16-0-20200824001928540460[m[m


## Artifacts from a single trial

In [3]:
! tree ../data/test-128-10000-n1-standard-16-0-20200824003830

[01;34m../data/test-128-10000-n1-standard-16-0-20200824003830[00m
├── [01;34mclient[00m
│   ├── _SUCCESS
│   ├── [01;34mserver_id=a[00m
│   │   └── part-00000-faab898b-0423-4e16-813b-560ac71d754d.c000.json
│   └── [01;34mserver_id=b[00m
│       └── part-00000-faab898b-0423-4e16-813b-560ac71d754d.c000.json
├── config.json
├── [01;34mlogs[00m
│   ├── bucket-listing.txt
│   ├── dataproc-clusters-describe.json
│   ├── dataproc-jobs-list.json
│   ├── [01;34mspark-job-history[00m
│   │   ├── application_1598254988496_0002
│   │   ├── application_1598254988496_0003
│   │   ├── application_1598254988496_0004
│   │   ├── application_1598254988496_0005
│   │   ├── application_1598254988496_0006
│   │   ├── application_1598254988496_0007
│   │   ├── application_1598254988496_0008
│   │   ├── application_1598254988496_0009
│   │   └── application_1598254988496_0010
│   └── [01;34myarn-logs[00m
│       └── [01;34mroot[00m
│           └── [01;34mlogs-tfile[0

## Configuration across trials

In [4]:
@F.udf("string")
def parse_config_path(filename):
    parts = filename.split("/")
    return parts[-2]

config = (
    spark.read.json("../data/*/config.json", multiLine=True)
    .withColumn("cluster_id", parse_config_path(F.input_file_name()))
)
config.show()

+--------+--------------+------+------+-----------+-----+--------------------+
|batch_id|  machine_type|n_data|n_rows|num_workers|scale|          cluster_id|
+--------+--------------+------+------+-----------+-----+--------------------+
|    test| n1-standard-4|   512| 10000|          0|  100|test-512-1000000-...|
|    test| n1-standard-4|   512| 10000|          0|  100|test-512-1000000-...|
|    test|n1-standard-16|   128| 10000|          0|    1|test-128-10000-n1...|
|    test| n1-standard-4|   512| 10000|          0|   10|test-512-100000-n...|
|    test|n1-standard-16|   512| 10000|          0|    1|test-512-10000-n1...|
|    test|n1-standard-16|   128|  5000|          0|    1|test-128-5000-n1-...|
|    test|n1-standard-16|   128|  2500|          0|    1|test-128-2500-n1-...|
|    test|n1-standard-16|    32| 10000|          0|    1|test-32-10000-n1-...|
|    test|n1-standard-16|   128|  7500|          0|    1|test-128-7500-n1-...|
|    test|n1-standard-16|    64| 10000|          0| 

## Dataproc job listings

In [5]:
job_list = spark.read.json("../data/*/*/dataproc-jobs-list.json", multiLine=True)
job_list.printSchema()
job_list.show(vertical=True, n=2, truncate=80)

root
 |-- done: boolean (nullable = true)
 |-- driverControlFilesUri: string (nullable = true)
 |-- driverOutputResourceUri: string (nullable = true)
 |-- jobUuid: string (nullable = true)
 |-- placement: struct (nullable = true)
 |    |-- clusterName: string (nullable = true)
 |    |-- clusterUuid: string (nullable = true)
 |-- pysparkJob: struct (nullable = true)
 |    |-- args: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- mainPythonFileUri: string (nullable = true)
 |    |-- pythonFileUris: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- reference: struct (nullable = true)
 |    |-- jobId: string (nullable = true)
 |    |-- projectId: string (nullable = true)
 |-- status: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- stateStartTime: timestamp (nullable = true)
 |-- statusHistory: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- details: str

## Bucket listings

In [6]:
bucket_listing = spark.read.csv("../data/*/*/bucket-listing.txt", sep="\t", schema="data string")
bucket_listing.show(truncate=80)

+--------------------------------------------------------------------------------+
|                                                                            data|
+--------------------------------------------------------------------------------+
|         0  2020-08-24T07:44:49Z  gs://prio-processor-benchmark/data/working/...|
|       237  2020-08-24T07:38:36Z  gs://prio-processor-benchmark/data/working/...|
|      1784  2020-08-24T07:38:36Z  gs://prio-processor-benchmark/data/working/...|
|     91112  2020-08-24T07:38:37Z  gs://prio-processor-benchmark/data/working/...|
|        65  2020-08-24T07:38:37Z  gs://prio-processor-benchmark/data/working/...|
|      1383  2020-08-24T07:38:37Z  gs://prio-processor-benchmark/data/working/...|
|         0  2020-08-24T07:44:49Z  gs://prio-processor-benchmark/data/working/...|
|         0  2020-08-24T07:45:20Z  gs://prio-processor-benchmark/data/working/...|
|         0  2020-08-24T07:45:17Z  gs://prio-processor-benchmark/data/working/...|
|  6

## Parsing Spark Job History

In [7]:
job_history = (
    spark.read.json("../data/*/logs/spark-job-history/*")
    .drop("sparkPlanInfo")
    .drop("Hadoop Properties")
    .drop("Spark Properties")
    .drop("Properties")
    .drop("Classpath Entries")
    .drop("System Properties")
    .withColumn("filename", F.input_file_name())
)
job_history.printSchema()

root
 |-- App ID: string (nullable = true)
 |-- App Name: string (nullable = true)
 |-- Block Manager ID: struct (nullable = true)
 |    |-- Executor ID: string (nullable = true)
 |    |-- Host: string (nullable = true)
 |    |-- Port: long (nullable = true)
 |-- Completion Time: long (nullable = true)
 |-- Event: string (nullable = true)
 |-- Executor ID: string (nullable = true)
 |-- Executor Info: struct (nullable = true)
 |    |-- Attributes: struct (nullable = true)
 |    |    |-- CLUSTER_ID: string (nullable = true)
 |    |    |-- CONTAINER_ID: string (nullable = true)
 |    |    |-- HTTP_SCHEME: string (nullable = true)
 |    |    |-- LOG_FILES: string (nullable = true)
 |    |    |-- NM_HOST: string (nullable = true)
 |    |    |-- NM_HTTP_ADDRESS: string (nullable = true)
 |    |    |-- NM_HTTP_PORT: string (nullable = true)
 |    |    |-- NM_PORT: string (nullable = true)
 |    |    |-- USER: string (nullable = true)
 |    |-- Host: string (nullable = true)
 |    |-- Log Urls

In [8]:
# by file convention, we can extract both the cluster-id and application name for grouping

@F.udf("struct<cluster_id: string, application_name: string>")
def parse_benchmark_path(filename):
    """Parse the spark job history filename for relevant grouping information.
    file:/Users/amiyaguchi/Work/prio-processor/data/test-32-2500-n1-standard-16-0-20200823234848898312/logs/spark-job-history/application_1598251821633_0002
    """
    parts = filename.split("/")
    return dict(cluster_id=parts[-4], application_name=parts[-1])

job_history.select(parse_benchmark_path("filename")).distinct().show(truncate=False, n=5)

+------------------------------------------------------------------------------------+
|parse_benchmark_path(filename)                                                      |
+------------------------------------------------------------------------------------+
|[test-32-7500-n1-standard-16-0-20200824000741901558, application_1598252956150_0007]|
|[test-32-7500-n1-standard-16-0-20200824000741901558, application_1598252956150_0008]|
|[test-128-10000-n1-standard-16-0-20200824003830, application_1598254988496_0003]    |
|[test-64-2500-n1-standard-16-0-20200824001916185249, application_1598253651488_0005]|
|[test-32-7500-n1-standard-16-0-20200824000741901558, application_1598252956150_0002]|
+------------------------------------------------------------------------------------+
only showing top 5 rows



In [9]:
# https://spark.apache.org/docs/latest/monitoring.html#executor-task-metrics
task_metrics = (
    job_history
    .where(F.col("Event") == "SparkListenerTaskEnd")
    .withColumn("job_info", parse_benchmark_path("filename"))
    .withColumn("task_info", F.col("Task Info"))
    .withColumn("task_metrics", F.col("Task Metrics"))
    .select("*", "task_metrics.*")
    .select(
        "job_info.*",
        F.col("Task End Reason").getField("Reason").alias("reason"),
        F.col("Stage ID").alias("stage_id"),
        F.col("Stage Attempt ID").alias("stage_attempt_id"),
        F.col("Task Type").alias("task_type"), 
        F.col("task_info").getField("Task ID").alias("task_id"),
        F.col("task_info").getField("Executor ID").alias("executor_id"),
        (F.col("Executor Run Time")/1000).alias("run_seconds"),
        (F.col("Executor CPU Time")/10**9).alias("cpu_seconds"), 
        (F.col("Executor Deserialize Time")/1000).alias("deser_run_seconds"),
        (F.col("Executor Deserialize CPU Time")/10**9).alias("deser_cpu_seconds"),
        F.col("Result Size").alias("result_size"),
    )
    .orderBy("cluster_id", "application_name", "task_id", "executor_id")
)

single_perf = task_metrics.where(
    "cluster_id='test-32-7500-n1-standard-16-0-20200824000741901558'"
    "AND application_name='application_1598252956150_0003'"
).drop("cluster_id", "application_name")

single_perf.show()
single_perf.count()

+-------+--------+----------------+----------+-------+-----------+-----------+-----------+-----------------+-----------------+-----------+
| reason|stage_id|stage_attempt_id| task_type|task_id|executor_id|run_seconds|cpu_seconds|deser_run_seconds|deser_cpu_seconds|result_size|
+-------+--------+----------------+----------+-------+-----------+-----------+-----------+-----------------+-----------------+-----------+
|Success|       0|               0|ResultTask|      0|          2|      2.204|1.072734103|            0.843|      0.340503745|       1966|
|Success|       0|               0|ResultTask|      1|          2|      2.284|1.065204953|            0.842|      0.561023979|       1966|
|Success|       0|               0|ResultTask|      2|          2|      2.283|0.825324974|            0.843|      0.416646733|       1966|
|Success|       0|               0|ResultTask|      3|          2|      2.003| 0.57048707|            0.842|      0.356277139|       1966|
|Success|       1|         

8

In [10]:
task_metrics.groupBy("reason").count().show()
task_metrics.groupBy("stage_attempt_id").count().show()
task_metrics.groupBy("stage_attempt_id", "reason").count().orderBy("stage_attempt_id", "reason").show()

+-----------+-----+
|     reason|count|
+-----------+-----+
|FetchFailed|  131|
|    Success| 8890|
+-----------+-----+

+----------------+-----+
|stage_attempt_id|count|
+----------------+-----+
|               0| 5689|
|               1| 2301|
|               3|    5|
|               2| 1026|
+----------------+-----+

+----------------+-----------+-----+
|stage_attempt_id|     reason|count|
+----------------+-----------+-----+
|               0|FetchFailed|  108|
|               0|    Success| 5581|
|               1|FetchFailed|   23|
|               1|    Success| 2278|
|               2|    Success| 1026|
|               3|    Success|    5|
+----------------+-----------+-----+



In [11]:
@F.udf("string")
def sequence_id(application_name):
    return application_name.split("_")[-1]

# deser_run_seconds, deser_cpu_seconds
metrics = ["run_seconds", "cpu_seconds", "result_size"]
task_aggregation = (
    task_metrics
    .withColumn("sequence_id", sequence_id("application_name"))
    .where("reason = 'Success'")
    .groupBy("sequence_id", "stage_id", "cluster_id")
    .agg(*[F.round(F.sum(metric), 1).alias(metric) for metric in metrics], 
         F.countDistinct("task_id", "executor_id").alias("task_count"))
    .orderBy("sequence_id", "stage_id", "cluster_id")
)
task_aggregation.cache()

# now we only keep the stage for each sequence with the highest cumulative value
# since this is the stage of interest for scaling
largest_stage = (
    task_aggregation
    .groupBy("sequence_id", "stage_id")
    .agg(F.sum("run_seconds").alias("run_seconds"))
    .withColumn(
        "_rank",
        F.row_number().over(
            Window.partitionBy("sequence_id")
            .orderBy(F.desc("run_seconds"))
        )
    )
    .where("_rank=1")
    .drop("_rank")
    .orderBy("sequence_id")
)
largest_stage.show()

task_aggregation_results = (
    task_aggregation
    .join(config.select("cluster_id", "n_data", "n_rows"), on="cluster_id")
    .join(largest_stage.drop("run_seconds"), on=["sequence_id", "stage_id"], how="right")
    .orderBy("sequence_id", "stage_id", "n_rows", "n_data")
)

task_aggregation_results.show()

+-----------+--------+------------------+
|sequence_id|stage_id|       run_seconds|
+-----------+--------+------------------+
|       0002|       3|            1898.0|
|       0003|       1|             767.6|
|       0004|       1|402.09999999999997|
|       0005|       5|             765.4|
|       0006|       5|             398.0|
|       0007|       6| 985.8000000000001|
|       0008|       6|             501.0|
|       0009|       3|              43.6|
|       0010|       3| 42.89999999999999|
+-----------+--------+------------------+

+-----------+--------+--------------------+-----------+-----------+-----------+----------+------+------+
|sequence_id|stage_id|          cluster_id|run_seconds|cpu_seconds|result_size|task_count|n_data|n_rows|
+-----------+--------+--------------------+-----------+-----------+-----------+----------+------+------+
|       0002|       3|test-32-2500-n1-s...|       87.0|        4.6|     870853|       219|    32|  2500|
|       0002|       3|test-64-250

In [12]:
with_server_id = (
    task_aggregation_results
    .withColumn("sequence_id", F.col("sequence_id").cast("int"))
    .where("sequence_id > 2")
    .withColumn("server_id", F.when(F.col("sequence_id")%2==1, F.lit("a")).otherwise(F.lit("b")))
    .withColumn("sequence_id", ((F.col("sequence_id")-3)/2).cast("int"))
    .select("server_id", "sequence_id", "n_data", "n_rows", "run_seconds")
)
with_server_id.show(n=5)

+---------+-----------+------+------+-----------+
|server_id|sequence_id|n_data|n_rows|run_seconds|
+---------+-----------+------+------+-----------+
|        a|          0|    32|  2500|       12.5|
|        a|          0|    64|  2500|       17.0|
|        a|          0|   128|  2500|       37.5|
|        a|          0|    32|  5000|       20.7|
|        a|          0|    64|  5000|       38.0|
+---------+-----------+------+------+-----------+
only showing top 5 rows



In [13]:
task_by_n_data = (
    with_server_id
    .groupBy("server_id", "sequence_id", "n_data")
    .pivot("n_rows")
    .min("run_seconds")
    .orderBy("server_id", "sequence_id", "n_data")
)

task_by_rows = (
    with_server_id
    .groupBy("server_id", "sequence_id", "n_rows")
    .pivot("n_data")
    .min("run_seconds")
    .orderBy("server_id", "sequence_id", "n_rows")
)

task_by_n_data.show()
task_by_rows.show()

+---------+-----------+------+----+-----+-----+-----+
|server_id|sequence_id|n_data|2500| 5000| 7500|10000|
+---------+-----------+------+----+-----+-----+-----+
|        a|          0|    32|12.5| 20.7| 32.8| 43.1|
|        a|          0|    64|17.0| 38.0| 54.7| 74.4|
|        a|          0|   128|37.5| 74.7|141.0|221.2|
|        a|          1|    32|12.5| 20.8| 30.5| 39.5|
|        a|          1|    64|17.4| 34.9| 54.1| 74.0|
|        a|          1|   128|38.9| 76.3|142.0|224.5|
|        a|          2|    32|19.0| 24.9| 32.4| 37.0|
|        a|          2|    64|23.9| 36.3| 47.4|132.7|
|        a|          2|   128|36.0|131.9|215.5|248.8|
|        a|          3|    32| 3.6|  3.4|  3.6|  3.6|
|        a|          3|    64| 3.7|  3.6|  3.6|  3.7|
|        a|          3|   128| 3.6|  3.7|  3.6|  3.9|
|        b|          0|    32| 8.6| 12.6| 18.9| 22.2|
|        b|          0|    64|13.5| 23.7| 32.6| 41.3|
|        b|          0|   128|25.0| 44.1| 67.8| 91.8|
|        b|          1|    3

In [14]:
df_n = task_by_n_data.toPandas()
df_row = task_by_rows.toPandas()

df_n.to_csv("2020-08-25-cpu-time-by-n-data.csv", index=False)
df_row.to_csv("2020-08-25-cpu-time-by-n-rows.csv", index=False)

# Appendix

In [15]:
stage_info = (
    job_history.where(F.col("Event") == "SparkListenerStageCompleted")
    .withColumn("job_info", parse_benchmark_path("filename"))
    .select(
        "job_info.*",
        F.col("Stage Info").getField("Submission Time").alias("submission_time"),
        F.col("Stage Info").getField("Completion Time").alias("completion_time"),
        F.col("Stage Info").getField("Stage ID").alias("task_id"),
        F.col("Stage Info").getField("Stage Attempt ID").alias("stage_attempt_id"),
        F.col("Stage Info").getField("Stage Name").alias("stage_name"),
        F.col("Stage Info").getField("Number of Tasks").alias("number_of_tasks"),
    )
)
stage_info.show(vertical=True, n=5, truncate=80)

-RECORD 0--------------------------------------------------------------------------------------------
 cluster_id       | test-32-2500-n1-standard-16-0-20200823234848898312                               
 application_name | application_1598251821633_0002                                                   
 submission_time  | 1598251912108                                                                    
 completion_time  | 1598251915217                                                                    
 task_id          | 0                                                                                
 stage_attempt_id | 0                                                                                
 stage_name       | first at /tmp/df2001cf035740a49970f1deb9035caf/prio_processor.egg/prio_proces... 
 number_of_tasks  | 2                                                                                
-RECORD 1-------------------------------------------------------------------------