In [2]:
!pip install pandas

Collecting pandas
  Downloading pandas-2.3.3-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting numpy>=1.22.4
  Downloading numpy-2.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.8/16.8 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting pytz>=2020.1
  Downloading pytz-2025.2-py2.py3-none-any.whl (509 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m509.2/509.2 KB[0m [31m16.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pytz, numpy, pandas
Successfully installed numpy-2.2.6 pandas-2.3.3 pytz-2025.2
[0m

In [16]:
DATA_DIR = "data/clusterdata-2011-2/"

In [3]:
import pandas as pd

schema_path = "./data/clusterdata-2011-2/schema.csv"

df = pd.read_csv(schema_path)
df.head()

Unnamed: 0,file pattern,field number,content,format,mandatory
0,job_events/part-?????-of-?????.csv.gz,1,time,INTEGER,YES
1,job_events/part-?????-of-?????.csv.gz,2,missing info,INTEGER,NO
2,job_events/part-?????-of-?????.csv.gz,3,job ID,INTEGER,YES
3,job_events/part-?????-of-?????.csv.gz,4,event type,INTEGER,YES
4,job_events/part-?????-of-?????.csv.gz,5,user,STRING_HASH,NO


In [4]:
df

Unnamed: 0,file pattern,field number,content,format,mandatory
0,job_events/part-?????-of-?????.csv.gz,1,time,INTEGER,YES
1,job_events/part-?????-of-?????.csv.gz,2,missing info,INTEGER,NO
2,job_events/part-?????-of-?????.csv.gz,3,job ID,INTEGER,YES
3,job_events/part-?????-of-?????.csv.gz,4,event type,INTEGER,YES
4,job_events/part-?????-of-?????.csv.gz,5,user,STRING_HASH,NO
5,job_events/part-?????-of-?????.csv.gz,6,scheduling class,INTEGER,NO
6,job_events/part-?????-of-?????.csv.gz,7,job name,STRING_HASH,NO
7,job_events/part-?????-of-?????.csv.gz,8,logical job name,STRING_HASH,NO
8,task_events/part-?????-of-?????.csv.gz,1,time,INTEGER,YES
9,task_events/part-?????-of-?????.csv.gz,2,missing info,INTEGER,NO


In [10]:
# parse schema to dict
schema_dict = {}

for _, row in df.iterrows():
    file_pattern = row["file pattern"].split("/")[0]
    if file_pattern not in schema_dict.keys():
        schema_dict[file_pattern] = {}
        schema_dict[file_pattern]["col_content"] = []
        schema_dict[file_pattern]["col_format"] = []
        schema_dict[file_pattern]["col_mandatory"] = []
    schema_dict[file_pattern]["col_content"].append(row["content"])
    schema_dict[file_pattern]["col_format"].append(row["format"])
    schema_dict[file_pattern]["col_mandatory"].append(1 if row["mandatory"] == "YES" else 0)

print(schema_dict)

{'job_events': {'col_content': ['time', 'missing info', 'job ID', 'event type', 'user', 'scheduling class', 'job name', 'logical job name'], 'col_format': ['INTEGER', 'INTEGER', 'INTEGER', 'INTEGER', 'STRING_HASH', 'INTEGER', 'STRING_HASH', 'STRING_HASH'], 'col_mandatory': [1, 0, 1, 1, 0, 0, 0, 0]}, 'task_events': {'col_content': ['time', 'missing info', 'job ID', 'task index', 'machine ID', 'event type', 'user', 'scheduling class', 'priority', 'CPU request', 'memory request', 'disk space request', 'different machines restriction'], 'col_format': ['INTEGER', 'INTEGER', 'INTEGER', 'INTEGER', 'INTEGER', 'INTEGER', 'STRING_HASH', 'INTEGER', 'INTEGER', 'FLOAT', 'FLOAT', 'FLOAT', 'BOOLEAN'], 'col_mandatory': [1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0]}, 'machine_events': {'col_content': ['time', 'machine ID', 'event type', 'platform ID', 'CPUs', 'Memory'], 'col_format': ['INTEGER', 'INTEGER', 'INTEGER', 'STRING_HASH', 'FLOAT', 'FLOAT'], 'col_mandatory': [1, 1, 1, 0, 0, 0]}, 'machine_attributes'

### Init PySpark Session

In [11]:
import pyspark
print(pyspark.__version__)

4.1.0


In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GoogleClusterAnalysis") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

print("Spark version:", spark.version)

Spark version: 4.1.0


In [17]:
from pyspark.sql.functions import col

machines = spark.read.csv(DATA_DIR + "machine_events/part-00000-of-00001.csv", header=False, inferSchema=True).toDF(*schema_dict["machine_events"]["col_content"])

num_machines = machines.select("machine ID").distinct().count()
print("Number of unique machines:", num_machines)

machines.groupBy("event type").count().show()

machines.select("CPUs","Memory").describe().show()

Number of unique machines: 12583
+----------+-----+
|event type|count|
+----------+-----+
|         1| 8957|
|         2| 7380|
|         0|21443|
+----------+-----+

+-------+-------------------+-------------------+
|summary|               CPUs|             Memory|
+-------+-------------------+-------------------+
|  count|              37748|              37748|
|   mean| 0.5260676062307937| 0.4767365169015281|
| stddev|0.12201608180202861|0.19316256886695604|
|    min|               0.25|            0.03085|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [19]:
jobs = spark.read.csv(DATA_DIR + "job_events/part-00160-of-00500.csv", header=False, inferSchema=True).toDF(*schema_dict["job_events"]["col_content"])

jobs.groupBy("event type").count().show()
jobs.groupBy("scheduling class").count().show()


+----------+-----+
|event type|count|
+----------+-----+
|         1| 1082|
|         3|   22|
|         5|  351|
|         4|  596|
|         0| 1083|
+----------+-----+

+----------------+-----+
|scheduling class|count|
+----------------+-----+
|               1| 1295|
|               3|   11|
|               2|  647|
|               0| 1181|
+----------------+-----+



In [20]:
tasks = spark.read.csv(
    DATA_DIR + "task_events/part-00160-of-00500.csv",
    header=False,
    inferSchema=True
).toDF(*schema_dict["task_events"]["col_content"])

tasks.groupBy("priority").count().show()
tasks.orderBy(col("CPU request").desc()).show(5)
tasks.orderBy(col("memory request").desc()).show(5)
tasks.filter(col("event type") == 2).count()


+--------+-------+
|priority|  count|
+--------+-------+
|       1|  19371|
|       6|  10731|
|       9| 118735|
|       4|  93102|
|       8|   1080|
|       7|     12|
|      10|     57|
|      11|     22|
|       2|    460|
|       0|1757628|
|       5|      3|
+--------+-------+

+------------+------------+----------+----------+----------+----------+--------------------+----------------+--------+-----------+--------------+------------------+------------------------------+
|        time|missing info|    job ID|task index|machine ID|event type|                user|scheduling class|priority|CPU request|memory request|disk space request|different machines restriction|
+------------+------------+----------+----------+----------+----------+--------------------+----------------+--------+-----------+--------------+------------------+------------------------------+
|803007333703|        NULL|6211896520|         9|   5780326|         5|5ANFyIGysAA6XzeFu...|               3|       8|     0.4

10774

In [21]:
task_usage = spark.read.csv(
    DATA_DIR + "task_usage/part-00160-of-00500.csv",
    header=False,
    inferSchema=True
).toDF(*schema_dict["task_usage"]["col_content"])

task_usage.select("CPU rate","canonical memory usage").describe().show()

df = tasks.join(
    task_usage,
    on=["job ID","task index"],
    how="inner"
)

df.select("CPU request","CPU rate","memory request","canonical memory usage").show(5)


                                                                                

+-------+--------------------+----------------------+
|summary|            CPU rate|canonical memory usage|
+-------+--------------------+----------------------+
|  count|             4160733|               4160733|
|   mean|0.010607340661414165|  0.010778003832842373|
| stddev| 0.02755477387651928|  0.027621564165047467|
|    min|                 0.0|                   0.0|
|    max|              0.8398|                0.7686|
+-------+--------------------+----------------------+





+-----------+--------+--------------+----------------------+
|CPU request|CPU rate|memory request|canonical memory usage|
+-----------+--------+--------------+----------------------+
|     0.0625|     0.0|       0.01746|                   0.0|
|     0.0625|     0.0|       0.01746|                   0.0|
|     0.0625|     0.0|       0.01746|                   0.0|
|     0.0625|     0.0|       0.01746|                   0.0|
|     0.0625|     0.0|       0.01746|                   0.0|
+-----------+--------+--------------+----------------------+
only showing top 5 rows


                                                                                

In [24]:
from pyspark.sql.functions import col, avg

machine_usage = task_usage.join(
    tasks.select("job ID","task index","machine ID").withColumnRenamed("machine ID", "task_machine_ID"),
    on=["job ID","task index"],
    how="inner"
)

machine_stats = machine_usage.groupBy("task_machine_ID").agg(
    avg("CPU rate").alias("avg_cpu"),
    avg("canonical memory usage").alias("avg_mem")
).orderBy(col("avg_cpu").desc())

machine_stats.show(10)




+---------------+-------------------+--------------------+
|task_machine_ID|            avg_cpu|             avg_mem|
+---------------+-------------------+--------------------+
|      337885666| 0.1473642857142857|0.008761857142857144|
|        2851293|0.12471428571428571|0.009054428571428573|
|        3829048|           0.100545|0.006861499999999...|
|       17503849|            0.09786|          0.00661425|
|        1301865|0.09378500000000001|           0.0068905|
|        5771528|0.09146411764705882|0.027104705882352944|
|     4302097368|0.08480599999999999|           0.0088564|
|       32084206|           0.082245|          0.00662775|
|      337872643|0.08220440000000001|0.004828000000000001|
|       32098170|          0.0798276|           0.0083602|
+---------------+-------------------+--------------------+
only showing top 10 rows


                                                                                