In [1]:
import sys
from pyspark import SparkContext

# start spark with 4 worker threads (since 4 cores available)
sc = SparkContext("local[4]")
sc.setLogLevel("ERROR")

# read the input files into an RDD[String] and split each line into an array of items
# 1 file of machine events (~ 3 MB)
# first 200 files of job events (~ 128 MB)
# first 7 files of task events (~ 137 MB)
# first file of task usage (~ 364 MB)
machine_events_file = sc.textFile("data/machine_events/part-00000-of-00001.csv").map(lambda x : x.split(','))
job_events_file = sc.textFile("data/job_events/part-00[0-1][0-9][0-9]-of-00500.csv").map(lambda x : x.split(','))
task_events_file = sc.textFile("data/task_events/part-0000[0-6]-of-00500.csv").map(lambda x : x.split(','))
task_usage_file = sc.textFile("data/task_usage/part-00001-of-00500.csv").map(lambda x : x.split(','))

# keep the RDD in memory
machine_events_file.cache()
job_events_file.cache()
task_events_file.cache()
task_usage_file.cache()

# show the number of observations (lines)
# print(machine_events_file.count())
# print(job_events_file.count())
# print(task_events_file.count())


PythonRDD[11] at RDD at PythonRDD.scala:53

## Question 1: What is the distribution of the machines according to their CPU capacity?

In [2]:
# prepare indexes (machine ID and CPU) according to "schema.csv" file
i_mID = 1
i_CPU = 4

# create (machine_ID, CPU) unique pairs
mID_CPU = machine_events_file.map(lambda x: (x[i_mID], x[i_CPU])).distinct()

# TODO: create graph (or other illustr)


## Question 2: What is the percentage of computational power lost due to maintenance (a machine went offline and reconnected later)?

TODO: Challenges: Thinking in key-value paradigm. How result should look like? Tried (time, % lost), but % always 0 or 100. Couldn't convert to float, because some CPU not given.

Conclusion: More memory capacity loss than cpu power.

In [3]:
## formula: (lost CPU power / total CPU power) * 100 

# get total CPU power
col_CPU = machine_events_file.map(lambda x: x[4])  # get only CPU column
col_CPU = col_CPU.filter(lambda x: len(x) > 0)     # filter rows with no CPU data 
col_CPU = col_CPU.map(lambda x: float(x))          # convert CPU to float
total_CPU = col_CPU.sum()                          # sum up column

# get total lost CPU power (event_type == 1)
col_CPU_lost = machine_events_file.map(lambda x: (x[4], x[2])).filter(lambda x: x[1] == '1')
col_CPU_lost = col_CPU_lost.filter(lambda x: len(x[0]) > 0).map(lambda x: float(x[0]))
total_lost_CPU = col_CPU_lost.sum()

# result
print("Percentage of computational power lost due to maintenance:", 
      round((total_lost_CPU / total_CPU) * 100, 2), "%")

# same for % of memory capacity lost
total_mem = machine_events_file.map(lambda x: x[5]).filter(lambda x: len(x) > 0).map(lambda x: float(x)).sum()
total_lost_mem = machine_events_file.map(lambda x: (x[5], x[2])).filter(lambda x: x[1] == '1' and len(x[0]) > 0).map(lambda x: float(x[0])).sum()

print("Percentage of memory capacity lost due to maintenance:", 
      round((total_lost_mem / total_mem) * 100, 2), "%")

Percentage of computational power lost due to maintenance: 23.99 %
Percentage of memory capacity lost due to maintenance: 24.47 %


## Question 3: What is the distribution of the number jobs/tasks per scheduling class?

In [4]:
## Goal:
# (sch_class, nb_jobs) pairs
# (sch_class, nb_tasks) pairs

# index of scheduling class is 5 at job_events_file and 7 at task_events_file
# create (sch_class, 1) tuple
# and reduce by sch_class, summing up values
jobs_per_class =   job_events_file.map(lambda x: (x[5], 1)).reduceByKey(lambda x,y: x+y)
tasks_per_class = task_events_file.map(lambda x: (x[7], 1)).reduceByKey(lambda x,y: x+y)

# show results
print(jobs_per_class.collect())
print(tasks_per_class.collect())

# TODO: bar chart

[('1', 257814), ('0', 305793), ('2', 211509), ('3', 7705)]
[('1', 155637), ('0', 805914), ('3', 60468), ('2', 218972)]


## Question 4: What is the percentage of jobs/tasks that got killed or evicted depending on the scheduling class?

In [5]:
## Goal:
# (sch_class, %_jobs_evicted, %_jobs_killed)
# (sch_class, %_tasks_evicted, %_tasks_killed)

# for event_type, 2 means EVICT, 5 means KILL
# event_type index is 3 at job_events and 5 at task_events

# create (sch_class, 1) pairs, for jobs and tasks, for event_type 2 and 5
jobs_evict = job_events_file.filter(lambda x: x[3] == '2').map(lambda x: (x[5], 1))
jobs_kill  = job_events_file.filter(lambda x: x[3] == '5').map(lambda x: (x[5], 1))
tasks_evict = task_events_file.filter(lambda x: x[5] == '2').map(lambda x: (x[7], 1))
tasks_kill  = task_events_file.filter(lambda x: x[5] == '5').map(lambda x: (x[7], 1))

# reduce by sch_class, summing up values, and tranform to dictionary
jobs_evict_per_class   = jobs_evict.reduceByKey(lambda x,y: x+y).collectAsMap()
jobs_kill_per_class     = jobs_kill.reduceByKey(lambda x,y: x+y).collectAsMap()
tasks_evict_per_class = tasks_evict.reduceByKey(lambda x,y: x+y).collectAsMap()
tasks_kill_per_class   = tasks_kill.reduceByKey(lambda x,y: x+y).collectAsMap()

# total nb of events per class
job_events_per_class = job_events_file.map(lambda x: (x[5], 1)).reduceByKey(lambda x,y: x+y)
task_events_per_class = task_events_file.map(lambda x: (x[7], 1)).reduceByKey(lambda x,y: x+y)

# (sch_class, %_evict) and (sch_class, %_kill)
job_percent_evict_per_class = job_events_per_class.map(lambda x: (x[0], (jobs_evict_per_class[x[0]] / x[1])*100 
                                                                          if x[0] in jobs_evict_per_class
                                                                          else 0.0))
job_percent_kill_per_class  = job_events_per_class.map(lambda x: (x[0], (jobs_kill_per_class[x[0]] / x[1])*100 
                                                                          if x[0] in jobs_kill_per_class
                                                                          else 0.0))
task_percent_evict_per_class = task_events_per_class.map(lambda x: (x[0], (tasks_evict_per_class[x[0]] / x[1])*100 
                                                                          if x[0] in tasks_evict_per_class
                                                                          else 0.0))
task_percent_kill_per_class  = task_events_per_class.map(lambda x: (x[0], (tasks_kill_per_class[x[0]] / x[1])*100 
                                                                          if x[0] in tasks_kill_per_class
                                                                          else 0.0))

# transform to dictionary, need for merging
job_percent_kill_per_class = job_percent_kill_per_class.collectAsMap()
task_percent_kill_per_class = task_percent_kill_per_class.collectAsMap()

# (sch_class, %_jobs_evicted, %_jobs_killed)
res_job = job_percent_evict_per_class.map(lambda x: (x[0], x[1], job_percent_kill_per_class[x[0]]))
# (sch_class, %_tasks_evicted, %_tasks_killed)
res_task = task_percent_evict_per_class.map(lambda x: (x[0], x[1], task_percent_kill_per_class[x[0]]))

# print results
for f in sorted(res_job.collect()):
    print(f)
for f in sorted(res_task.collect()):
    print(f)
    
# TODO: round(x, 4), double bar chart

('0', 0.0032701860408838658, 17.661620769605584)
('1', 0.0015515061245704267, 6.230460719743691)
('2', 0.0, 14.667933752228038)
('3', 0.0, 24.87994808565866)
('0', 4.828058576969751, 5.942569554567857)
('1', 5.226906198397553, 1.477155175183279)
('2', 2.8410025026030725, 16.00798275578613)
('3', 0.3274459218098829, 2.39630879142687)


## Question 5: Do tasks with low priority have a higher probability of being evicted?

In [6]:
## Goal:
# (task_priority, %_evicted) pairs
# task_priority index is 8

# create (task_priority, 1) pairs, and reduce by task_priority, summing up values
total_by_priority = task_events_file.map(lambda x: (x[8], 1)).reduceByKey(lambda x,y: x+y)

# create (task_priority, 1) pairs, where event == EVICT, and reduce by task_priority, summing up values
nb_evicted_by_priority = task_events_file.filter(lambda x: x[5] == '2').map(lambda x: (x[8], 1)).reduceByKey(lambda x,y: x+y)

# convert to dictionary
nb_evicted_by_priority = nb_evicted_by_priority.collectAsMap()

res = total_by_priority.map(lambda x: (x[0], round((nb_evicted_by_priority[x[0]] / x[1])*100, 2) 
                                               if x[0] in nb_evicted_by_priority 
                                               else 0.0))

# show result
res.sortBy(lambda x: int(x[0])).collect()

# answer: tasks with low priority have a higher probability of being evicted

# TODO: illustrate

[('0', 11.12),
 ('1', 1.96),
 ('2', 0.07),
 ('8', 0.06),
 ('9', 0.15),
 ('10', 0.1),
 ('11', 0.0)]

## Question 6: In general, do tasks from the same job run on the same machine?

In [7]:
## Goal: 
# tuple of 2 components: job_ID, 
#                        highest % of tasks from that job_ID running on same machine
# if 2nd component is >= 50% in at least half of cases (job_IDs), then answer is yes

# job ID index is 2, machine ID is 4

# create (job_ID, list_of_machines) pair
job_machine = task_events_file.map(lambda x: (x[2], x[4])).groupByKey().mapValues(list)

# create (job_ID, highest % of tasks from that job_ID running on same machine) pair
highest_percentage_per_job = job_machine.map(lambda x: (x[0], 
                                                        (x[1].count(max(set(x[1]), key=x[1].count))
                                                        / len(x[1])) * 100))

# keep the RDD in memory, because computation is very costly
job_machine.cache()
highest_percentage_per_job.cache()

# total nb of jobs
total_per_job = highest_percentage_per_job.count()
# nb of jobs where % of tasks running on same machine is >= 50
half_or_more_per_job = highest_percentage_per_job.filter(lambda x: x[1] >= 50.0).count()

# result
print("In", round((half_or_more_per_job / total_per_job) * 100, 2), 
      "% of jobs, at least 50 % of tasks run on the same machine")
print("We can conclude that in general, tasks from the same job run on the same machine")


In 74.09 % of jobs, at least 50 % of tasks run on the same machine
We can conclude that in general, tasks from the same job run on the same machine


## Question 7: Are there tasks that consume significantly less resources than what they requested?

In [8]:
## Goal:
# True if there exists a task that didn't consume at least 50 % of requested ressource (CPU, memory or disk space)

# task_event columns:
#  job ID (2), 
#  task index (3), 
#  machine ID (4), 
#  memory request (10), 
#  disk space request (11), 
#  CPU request (9)

# task_usage columns:
#  job ID (2), 
#  task index (3), 
#  machine ID (4), 
#  maximum memory usage (10), 
#  local disk space usage (12), 
#  sampled CPU usage (19)


# create ((job_ID, task_index, machine_ID), 
#         (memory, disk_space, CPU)) nested tuple
#
# where (job_ID, task_index, machine_ID) is key
#       (memory, disk_space, CPU) is value
#
# and filter rows where no info about resource
requested_pow = task_events_file.map(lambda x: ((x[ 2], x[ 3], x[4]), 
                                                (x[10], x[11], x[9]))
                                    ).filter(lambda x: len(x[1][0]) > 0 and 
                                                       len(x[1][1]) > 0 and 
                                                       len(x[1][2]) > 0
                                            ).distinct()

used_pow = task_usage_file.map(lambda x: ((x[ 2], x[ 3], x[ 4]),
                                          (x[10], x[12], x[19]))
                              ).filter(lambda x: len(x[1][0]) > 0 and
                                                 len(x[1][1]) > 0 and
                                                 len(x[1][2]) > 0
                                      ).distinct()

# keep rdd in memory
requested_pow.cache()
used_pow.cache()

# show nb of observations
print(requested_pow.count())
print(used_pow.count())



762662
2033047


In [9]:
# create ((job_ID, task_index, machine_ID), 
#         (memory_requested, disk_space_requested, CPU_requested),
#         (memory_used, disk_space_used, CPU_used)) nested tuple
# requested_pow_dict = requested_pow.collectAsMap()
# requested_pow_dict_keys = list(requested_pow.collectAsMap().keys())

# task_consumption = used_pow.filter(lambda x: x[0] in requested_pow_dict_keys)
# task_consumption.cache()
# task_consumption.count()

In [11]:

print(requested_pow.keys().distinct().count())
print(used_pow.keys().distinct().count())

762662
726051

2033047
152340


726051
152340


## Question 8: How often does it happen that the resources of a machine are over-committed?

In [8]:
# Goal: 
# get tuple:
#  (time_t, 
#   machine_ID
#   [list_of_requested_ressources_at_time_t_on_machine_ID] as list1,
#   [list_of_used_ressources_at_time_t_on_machine_ID] as list2)
# then result is the % of list1 == list2
# i.e. result = (tuple.filter(list1 == list2).count() / tuple.count()) * 100

# Step 1
# task_event 
# get (time (0), machine ID (4), CPU request (9) , memory request (10), disk space request (11))
# filter 4 event types (index 5) that don't allow tasks to appear in task_usage:
# SUBMIT, FINISH, KILL and UPDATE_PENDING
task_event_data = task_events_file.filter(lambda x: x[5] != '0' and x[5] != '4' and x[5] != '5' and x[5] != '7')
task_event_data = task_event_data.map(lambda x: (x[0], x[4], x[9], x[10], x[11])).distinct()

# Step 2
# task_usage 
# get:
# (start time (0), machine ID (4), sampled CPU usage (19), maximum memory usage (10), local disk space usage (12))
task_usage_data = task_usage_file.map(lambda x: (x[0], x[4], x[19], x[10], x[12])).distinct()

# Step 3
# we have:
#  requested: (t, ID, CPU, RAM, disk)
#  used:      (t, ID, CPU, RAM, disk)
# we need:
#  requested: ((t, ID), list of requested (CPU, RAM, disk) at t on ID)
#  used:      ((t, ID), list of used (CPU, RAM, disk) at t on ID)
requested = task_event_data.map(lambda x: ((x[0], x[1]), (x[2], x[3], x[4]))).groupByKey().mapValues(list)
used = task_usage_data.map(lambda x: ((x[0], x[1]), (x[2], x[3], x[4]))).groupByKey().mapValues(list)

# keep rdd in memory
requested.cache()
used.cache()

In [28]:
# Step 4
# get:
#  ( (time_t, 
#     machine_ID),
#    ([list_of_requested_ressources_at_time_t_on_machine_ID],
#     [list_of_used_ressources_at_t_on_machine_ID]))


# Step 5
# show % of overcommitting for the first result.count() observations



0