In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
from pyspark.sql import functions as F

DATA_DIR_L =  ""
# DATA_DIR = DATA_DIR_L

DATA_DIR_M = "/home/masa/Downloads/google-cloud-sdk/our_data"
DATA_DIR = DATA_DIR_M

In [7]:
# We use spark session in order to use DataFrames
ss = SparkSession.builder \
    .appName("GoogleClusterAnalysis") \
    .master("local[2]") \
    .getOrCreate()

In [8]:
ss.sparkContext.setLogLevel("ERROR")

### 1. What is the distribution of the machines according to their CPU capacity? Can you explain (motivate) it?

https://www.researchgate.net/publication/261164671_Characterizing_Machines_and_Workloads_on_a_Google_Cluster

TODO: add comments and conclusion

In [4]:
machine_schema = StructType([
    StructField('timestamp', LongType(), True),
    StructField('machine_ID', LongType(), True),
    StructField('event_type', IntegerType(), True),
    StructField('platform_ID', StringType(), False),
    StructField('CPUs', DoubleType(), False),
    StructField('memory_capacity', DoubleType(), False),
    ])

In [5]:
df_machines = ss.read.csv(os.path.join(DATA_DIR,"machine_events/*.csv.gz"), schema=machine_schema)
df_machines.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+---------+----------+----------+--------------------+----+---------------+
|timestamp|machine_ID|event_type|         platform_ID|CPUs|memory_capacity|
+---------+----------+----------+--------------------+----+---------------+
|        0|         5|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|
|        0|         6|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|
|        0|         7|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|
|        0|        10|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|
|        0|        13|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|
+---------+----------+----------+--------------------+----+---------------+
only showing top 5 rows


                                                                                

In [6]:
df_machines.select('machine_ID').distinct().count()

                                                                                

12583

In [7]:
df_machines.select("event_type").distinct().count()

3

In [8]:
df_machines.filter(df_machines["event_type"] == 0).distinct().count()

21443

In [9]:
total_count_t0 = df_machines.filter(df_machines["timestamp"] == 0).distinct().count()
total_count_t0

12477

In [10]:
df_machines.select("machine_ID").filter(df_machines["timestamp"] == 0).distinct().count()

12477

In [11]:
df_machines.select("CPUs").filter(df_machines["timestamp"] == 0).distinct().show()

+----+
|CPUs|
+----+
| 1.0|
| 0.5|
|0.25|
+----+



In [12]:
df_ex1 = df_machines.select(['machine_ID', 'CPUs']).filter(df_machines["timestamp"] == 0).groupby('CPUs').count()
df_ex1.show()

+----+-----+
|CPUs|count|
+----+-----+
| 1.0|  791|
| 0.5|11563|
|0.25|  123|
+----+-----+



In [13]:
df_ex1.withColumn("percentage", F.round((F.col("count") / total_count_t0) * 100)).show()

+----+-----+----------+
|CPUs|count|percentage|
+----+-----+----------+
| 1.0|  791|       6.0|
| 0.5|11563|      93.0|
|0.25|  123|       1.0|
+----+-----+----------+



- at timestamp==0, there are 12477 machines intially started, and for them we want to  find the distribution (of those machines) acoording to their CPU capacity

### 2. What is the percentage of computational power lost due to maintenance (a machine went offline and reconnected later)? The computational power is proportional to both the CPU capacity and the unavailabil- ity period of machines.

- event_type = 1 means that a machine was removed from the cluster. Removals can occur due to
failures or maintenance
- we want to find all the times machien was removed due to maintenance and then reconnected later (event_type=0 in a time step later than for the event_type=1)

In [14]:
from pyspark.sql import Window

In [15]:
window = Window.partitionBy("machine_ID").orderBy("timestamp")

df_next = df_machines.withColumn("next_event", F.lead("event_type").over(window))
df_next = df_next.withColumn("next_time", F.lead("timestamp").over(window))

df_next.show(5)

+------------+----------+----------+--------------------+----+---------------+----------+------------+
|   timestamp|machine_ID|event_type|         platform_ID|CPUs|memory_capacity|next_event|   next_time|
+------------+----------+----------+--------------------+----+---------------+----------+------------+
|           0|         5|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         1|835150655707|
|835150655707|         5|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|836124903464|
|836124903464|         5|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|      NULL|        NULL|
|           0|         6|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|      NULL|        NULL|
|           0|         7|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|      NULL|        NULL|
+------------+----------+----------+--------------------+----+---------------+----------+------------+
only showing top 5 rows


- By partitioning the window by machine_id, we ensure that all events types are processed independently for **each machine**. In that way, we are sure that event pairings will refer to the same physical machine

- The lead() function returns the following event and time for the same machine. Previously, we ordered events by timestamp, and therefore, "next" refers to the next chronological event in that machine's sequence of events. In that way, we can be sure that we are identifying distinct offline intervals without overlappings

In [16]:
# this is just a check to see if time intervals are not overlapping, and if event types are matching
df_next.filter(df_next["machine_ID"] == 43).show(5)

+------------+----------+----------+--------------------+----+---------------+----------+------------+
|   timestamp|machine_ID|event_type|         platform_ID|CPUs|memory_capacity|next_event|   next_time|
+------------+----------+----------+--------------------+----+---------------+----------+------------+
|           0|        43|         0|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         2|372991930737|
|372991930737|        43|         2|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         2|373198072841|
|373198072841|        43|         2|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         2|727861243083|
|727861243083|        43|         2|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         2|736148630106|
|736148630106|        43|         2|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         2|736768939531|
+------------+----------+----------+--------------------+----+---------------+----------+------------+
only showing top 5 rows


- Now, we want to filter df_next to reflect only situations when machine went offline and was reloaded afterwards. That can be achieved by observing **event_type=1** followed by **next_event=0**

In [17]:
df_reloaded = df_next.filter((df_next["event_type"]==1) & (df_next["next_event"]==0))
df_reloaded.count()

8860

- We notice that this number matches the number of such observed cases of reloaded machines that were mentioned in paper: https://www.researchgate.net/publication/261164671_Characterizing_Machines_and_Workloads_on_a_Google_Cluster  on page 3 (Figure 3) which tells us we're on good trace

In [18]:
# Here we just extract offline time in a column named "dtime"
df_reloaded = df_reloaded.withColumn("dtime", F.col("next_time").cast("long")-F.col("timestamp").cast("long")).drop("timestamp", "next_time")
df_reloaded.show(5)

+----------+----------+--------------------+----+---------------+----------+-----------+
|machine_ID|event_type|         platform_ID|CPUs|memory_capacity|next_event|      dtime|
+----------+----------+--------------------+----+---------------+----------+-----------+
|         5|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  974247757|
|        10|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  998726348|
|        13|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  997280215|
|        23|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  120851153|
|        26|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|88666880740|
+----------+----------+--------------------+----+---------------+----------+-----------+
only showing top 5 rows


In [19]:
# Now we want to calculate lost resources for each sample as a product of offline time and CPUs, and then we sum over all samples
df_reloaded = df_reloaded.withColumn("lost", df_reloaded["dtime"]*df_reloaded["CPUs"])
total_lost = df_reloaded.agg(F.sum("lost")).collect()[0][0]
total_lost

80273246292457.75

In [20]:
# Now, in order to find percentage of lost resources, we want to find possible total power
total_cpu = df_machines.select("machine_ID", "CPUs").agg(F.sum("CPUs")).collect()[0]["sum(CPUs)"]
trace_bounds = df_machines.agg(
    F.min("timestamp").alias("start"),
    F.max("timestamp").alias("end")
).collect()[0]
total_time = trace_bounds["end"] - trace_bounds["start"]

total_power = total_cpu * total_time

In [21]:
percentage_lost = total_lost/total_power * 100
print(f"Percentage of computational power lost due to maintenance: {percentage_lost:.4f}%")

Percentage of computational power lost due to maintenance: 0.1613%


**Here, we conclude that the total percentage of computational power lost due to mainenance is 0.16%**

In [22]:
# maybe we need it for later, if not - delete :))

# machine_att_schema = StructType([
#     StructField('timestamp', LongType(), True),
#     StructField('machine_ID', LongType(), True),
#     StructField('attribute_name:', StringType(), True),
#     StructField('attribute_value', StringType(), False),
#     StructField('attribute_deleted:', BooleanType(), False),
#     ])

# df_machine_attributes = ss.read.csv(os.path.join(DATA_DIR,"machine_attributes/*.csv.gz"), schema=machine_att_schema)
# df_machine_attributes.show(5)

### 3. Is there a class of machines, according to their CPU, that stands out with a higher maintenance rate, as compared to other classes ?

- Here, by maintenance rate, we observe how frequently machines undergo maintenance (such as offline periods). We want to see if there is a class of machines (grouped by their CPU capacity) that experiences longer or more frequent maintenance periods compared to others

In [23]:
# Here, we check if every distinct machine can take only one value of CPU 
dfff = df_machines.groupBy("machine_ID").agg(F.count_distinct("CPUs").alias("distinct_cpus"))
dfff.filter(F.col("distinct_cpus") > 1).show()
# We conclude that only one CPUs value can be taken by one machine

+----------+-------------+
|machine_ID|distinct_cpus|
+----------+-------------+
+----------+-------------+



In [24]:
# We will calculate maintenance rate as a total_offline_time per machine divided by total_time (already calculated). 
# We will use already calculated df_reloaded from previous task
df_reloaded.show(5)

+----------+----------+--------------------+----+---------------+----------+-----------+--------------+
|machine_ID|event_type|         platform_ID|CPUs|memory_capacity|next_event|      dtime|          lost|
+----------+----------+--------------------+----+---------------+----------+-----------+--------------+
|         5|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  974247757| 4.871238785E8|
|        10|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  998726348|  4.99363174E8|
|        13|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  997280215| 4.986401075E8|
|        23|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|  120851153|  6.04255765E7|
|        26|         1|HofLGzk1Or/8Ildj2...| 0.5|         0.2493|         0|88666880740|4.433344037E10|
+----------+----------+--------------------+----+---------------+----------+-----------+--------------+
only showing top 5 rows


In [25]:
df_reloaded.count()

8860

In [57]:
df_offline_times = df_reloaded.groupBy("machine_ID", "CPUs").agg(F.sum("dtime").alias("total_offline_time"))
df_offline_times.show(5)

+----------+----+------------------+
|machine_ID|CPUs|total_offline_time|
+----------+----+------------------+
|         5| 0.5|         974247757|
|        10| 0.5|         998726348|
|        13| 0.5|         997280215|
|        23| 0.5|         120851153|
|        26| 0.5|       88666880740|
+----------+----+------------------+
only showing top 5 rows


In [58]:
df_offline_times = df_offline_times.withColumn(
    "maintenance_rate_percentage", 
    F.col("total_offline_time").cast("double") * 100 / total_time)
df_offline_times.show(5)

+----------+----+------------------+---------------------------+
|machine_ID|CPUs|total_offline_time|maintenance_rate_percentage|
+----------+----+------------------+---------------------------+
|         5| 0.5|         974247757|       0.038874504571689526|
|        10| 0.5|         998726348|       0.039851251082934525|
|        13| 0.5|         997280215|        0.03979354737921456|
|        23| 0.5|         120851153|       0.004822211461137036|
|        26| 0.5|       88666880740|         3.5379922980767815|
+----------+----+------------------+---------------------------+
only showing top 5 rows


In [59]:
df_offline_times.groupBy("CPUs").agg(F.avg("maintenance_rate_percentage")).show()

+----+--------------------------------+
|CPUs|avg(maintenance_rate_percentage)|
+----+--------------------------------+
| 1.0|              0.7976476440387422|
| 0.5|              1.2378912755606144|
|0.25|              1.9150004966013052|
+----+--------------------------------+



**From here, we conclude that machines with lowest CPU capacity (0.25) have the highest average maintenance rate (~1.92%)** <br>
 Therefore, the conclusion is that lower-CPU machines tend to have higher maintenance rates, which means they either undergo maintenance more frequently or for longer periods.


### 4. What is the distribution of the number of jobs/tasks per scheduling class? Comment on the results.

Tasks and jobs can be resubmitted multiple times if they fail, therefore unique tasks are identified by their key (job_id) or composite key (job_id, task_index) and filtered with ".distinct()" to ensure each task is counted only once, regardless of how many lifecycle events it generated.

In [4]:
# Define schema for job events table
job_events_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("missing_info", StringType(), True),
    StructField("job_id", LongType(), True),
    StructField("event_type", IntegerType(), True),
    StructField("user_name", StringType(), True),
    StructField("scheduling_class", IntegerType(), True),
    StructField("job_name", StringType(), True),
    StructField("logical_job_name", StringType(), True)
])

In [9]:
df_job_events = ss.read.csv(os.path.join(DATA_DIR_L,"job_events/*.csv.gz"), schema=job_events_schema)
df_job_events.show(5)

+------------+------------+----------+----------+--------------------+----------------+--------------------+--------------------+
|   timestamp|missing_info|    job_id|event_type|           user_name|scheduling_class|            job_name|    logical_job_name|
+------------+------------+----------+----------+--------------------+----------------+--------------------+--------------------+
|772325004610|        NULL|6319958772|         1|ZpQmujQYX55FcN2RF...|               0|4iFTcCfph6IhTG3o4...|AmKr63lD9MIGXiAac...|
|772325045338|        NULL|6319984350|         0|E+9U+J1Dicd5PJklb...|               1|UaXnrc6huaDo9qNtX...|Mz+7hmVdCFVQEwwuv...|
|772325074372|        NULL|6319983180|         0|r/Al6kYJOwZITr6wi...|               2|ct6ai8SyqLEEEdgBS...|G/9E4AW9fSviXbmdF...|
|772326178083|        NULL|6319984385|         0|F2+Gv53Pxd4KDRb/U...|               0|aPxb6dFdH8wZ2FTBC...|j25eTfDZ4FFHzd7p+...|
|772327671789|        NULL|6319983180|         1|r/Al6kYJOwZITr6wi...|               2|ct6

In [10]:
# define schema for task events table
task_events_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("missing_info", StringType(), True),
    StructField("job_id", LongType(), True),
    StructField("task_index", IntegerType(), True),
    StructField("machine_id", LongType(), True),
    StructField("event_type", IntegerType(), True),
    StructField("user_name", StringType(), True),
    StructField("scheduling_class", IntegerType(), True),
    StructField("priority", IntegerType(), True),
    StructField("req_cpu_cores", DoubleType(), True),
    StructField("req_ram", DoubleType(), True),
    StructField("req_local_disk", DoubleType(), True),
    StructField("different_machine_constraint", BooleanType(), True)
])

In [12]:
df_task_events = ss.read.csv(os.path.join(DATA_DIR_L,"task_events/*.csv.gz"), schema=task_events_schema)
df_task_events.show(5)

+------------+------------+----------+----------+----------+----------+--------------------+----------------+--------+-------------+-------+--------------+----------------------------+
|   timestamp|missing_info|    job_id|task_index|machine_id|event_type|           user_name|scheduling_class|priority|req_cpu_cores|req_ram|req_local_disk|different_machine_constraint|
+------------+------------+----------+----------+----------+----------+--------------------+----------------+--------+-------------+-------+--------------+----------------------------+
|767314028182|        NULL| 515042969|        15| 257406228|         1|/fk1fVcVxZ6iM6gHZ...|               2|       0|      0.01562|0.01553|      2.155E-4|                        NULL|
|767314057755|        NULL|6319230508|        13|   6567628|         4|r/Al6kYJOwZITr6wi...|               0|       4|      0.06873|0.00795|      3.815E-5|                        NULL|
|767314057799|        NULL|6319230508|         3|    902367|         4|r/Al

In [13]:
# Count number of unique jobs per scheduling class
# groupBy creates groups based on scheduling_class
# agg with count counts the number of job_ids in each group
jobs_per_class = df_job_events.select("job_id", "scheduling_class").distinct().groupBy("scheduling_class").agg(F.count("job_id").alias("num_of_jobs"))

jobs_per_class.show()

+----------------+-----------+
|scheduling_class|num_of_jobs|
+----------------+-----------+
|               1|       2977|
|               3|        110|
|               2|       2525|
|               0|       3158|
+----------------+-----------+



In [14]:
# Calculate total number of unique jobs in the dataset
# counts distinct job_ids across all events to get the absolute total
total_jobs = df_job_events.select("job_id").distinct().count()
total_jobs

8770

In [15]:
jobs_per_class.withColumn("percentage", F.round((F.col("num_of_jobs") / total_jobs) * 100)).show()

+----------------+-----------+----------+
|scheduling_class|num_of_jobs|percentage|
+----------------+-----------+----------+
|               1|       2977|      34.0|
|               3|        110|       1.0|
|               2|       2525|      29.0|
|               0|       3158|      36.0|
+----------------+-----------+----------+



In [26]:
# Count number of unique tasks per scheduling class
# groupBy creates groups based on scheduling_class
# agg with count counts the number of job_ids in combination with task_index in each group
tasks_per_class = df_task_events.select("job_id", "task_index", "scheduling_class").distinct().groupBy("scheduling_class").agg(F.count("*").alias("num_of_tasks"))

tasks_per_class.show()

+----------------+------------+
|scheduling_class|num_of_tasks|
+----------------+------------+
|               1|       32864|
|               3|        2590|
|               2|        9607|
|               0|      285756|
+----------------+------------+



In [18]:
# Calculate total number of unique jobs in the dataset
# counts distinct combination of job_ids and task_index across all events to get the absolute total
total_tasks = df_task_events.select("job_id", "task_index").distinct().count()
total_tasks

330817

In [19]:
tasks_per_class.withColumn("percentage", F.round((F.col("num_of_tasks") / total_tasks) * 100)).show()

+----------------+------------+----------+
|scheduling_class|num_of_tasks|percentage|
+----------------+------------+----------+
|               1|       32864|      10.0|
|               3|        2590|       1.0|
|               2|        9607|       3.0|
|               0|      285756|      86.0|
+----------------+------------+----------+



**Scheduling Class Distribution:**
- Class 0 (non-production): ~35% of jobs and 86% of task, this suggests non-production jobs tend to have many tasks per job
- Class 1: 34% of jobs, 10% of tasks
- Class 2: 29% of jobs, 3% of tasks  
- Class 3 (most latency-sensitive): Only 1% of both jobs and tasks, indicating production workloads are a small but critical portion of the cluster

**->** Non-production workloads (class 0) dominate task count and job count, suggesting they involve large-scale data processing jobs with many parallel tasks while, as the "Google documentation" notes, class 3 represents more latency-senstive tasks in production, being potentially short-lived and fast.

### 5. Would you qualify the percentage of jobs/tasks that got killed or evicted as important? 

In [20]:
# Define event type constants for readability
EVICT = 2
KILL = 5

# Count unique jobs that were killed or evicted at least once across its lifecycle
jobs_killed_evicted = (df_job_events.filter(F.col("event_type").isin([EVICT, KILL])).select("job_id").distinct().count())

# Calculate percentage
perc_jobs_killed_evicted = round(jobs_killed_evicted / total_jobs * 100)
perc_jobs_killed_evicted

46

In [21]:
# Break down by event type "KILL"
jobs_killed = (df_job_events.filter(F.col("event_type").isin([KILL])).select("job_id").count())
jobs_killed

3997

In [22]:
# Break down by event type "EVICT"
jobs_evicted = (df_job_events.filter(F.col("event_type").isin([EVICT])).select("job_id").count())
jobs_evicted

0

In [23]:
# Count unique tasks that were killed or evicted at least once across its lifecycle
tasks_killed_evicted = (df_task_events.filter(F.col("event_type").isin([EVICT, KILL])).select("task_index", "job_id").distinct().count())

# Calculate percentage
perc_tasks_killed_evicted = round(tasks_killed_evicted / total_tasks * 100)
perc_tasks_killed_evicted

43

In [24]:
# Break down by event type "KILL"
tasks_killed = (df_task_events.filter(F.col("event_type").isin([KILL])).select("task_index").count())
tasks_killed

138951

In [25]:
# Break down by event type "EVICT"
tasks_evicted = (df_task_events.filter(F.col("event_type").isin([EVICT])).select("task_index").count())
tasks_evicted

83227