In [2]:
from pyspark import SparkContext

In [3]:
# start spark with 1 worker thread
sc = SparkContext("local[1]")
sc.setLogLevel("ERROR")


In [4]:
# Read the machine_events file
me_file = sc.textFile("./machine_events/*")

In [5]:
me_entries = me_file.map(lambda line: line.split(","))
me_columns = ['timestamp', 'machineId', 'eventType', 'platformId', 'cpu', 'memory']
me_entries = me_entries.cache()

In [6]:
# Read job_events file
je_file = sc.textFile("./job_events/*")

In [7]:
je_entries = je_file.map(lambda line: line.split(","))
je_columns = ['timestamp', 'missing', 'jobId', 'eventType', 'username',
              'schedulingClass', 'jobname', 'logicalJobname']
je_entries = je_entries.cache()

In [8]:
# Read task_events file
te_file = sc.textFile("./task_events/*")

In [19]:
te_entries = te_file.map(lambda line: line.split(","))
te_columns = ['timestamp', 'missing', 'jobId', 'taskId', 'machineId',
             'eventType', 'username', 'schedulingClass',
             'priority', 'rrfCpu', 'rrfRam', 'rrfDiskSpace', 'diffMConstraint']
te_entries = te_entries.cache()

In [10]:
#Read task_usage file
tu_file = sc.textFile("./task_usage/*")

In [11]:
tu_entries = tu_file.map(lambda line: line.split(","))
tu_entries = tu_entries.cache()

In [12]:
def findCol(firstLine, name):
	if name in firstLine:
		return firstLine.index(name)
	else:
		return -1
    
def getColumn(index,entries):
    return entries.map(lambda x: x[index])

## Analysis

### Get the ID of the job with the highest number of tasks

In [30]:
ind1 = findCol(te_columns, 'jobId')
ind2 = findCol(te_columns, 'taskId')

job_tasks = te_entries.map(lambda x: (x[ind1],x[ind2])).distinct()\
                      .aggregateByKey(0,lambda x,y: x+1, lambda x,y: x+y)\
                      .sortBy(lambda x: x[1], ascending=False)



In [31]:
job = job_tasks.take(1)
print(f'the job with the highest #tasks is {job[0][0]} with {job[0][1]} tasks')

the job with the highest #tasks is 6221861800 with 20010 tasks


### On average, how many tasks get killed or evicted per job?

In [309]:
ind = findCol(te_columns, 'eventType')
ind1 = findCol(te_columns, 'jobId')
ind2 = findCol(te_columns, 'taskId')

# number of tasks killed or evicted
num_of_tasks = te_entries.filter(lambda x: x[ind] in ['2','5'])\
                      .map(lambda x: (x[ind1],x[ind2])).distinct().count()

i = findCol(je_columns, 'jobId')

# total number of jobs
total = getColumn(i,je_entries).distinct().count()
average = num_of_tasks / total



In [236]:
average

1.9308163265306122

### Do tasks with low priority have a higher probability of being evicted?

In [139]:
ind1 = findCol(te_columns, 'eventType')
ind2 = findCol(te_columns, 'priority')

evicted_tasks = te_entries.filter(lambda x: x[ind1] in ['1','2'])\
                          .map(lambda x: (x[ind2],((0,1)[x[ind1]=='1'], (0,1)[x[ind1]=='2'] ) ))\
                          .aggregateByKey((0,0),lambda x,y: (x[0]+y[0],x[1]+y[1]), lambda x,y:  (x[0]+y[0],x[1]+y[1]))
ratio = evicted_tasks.map(lambda x: (int(x[0]),x[1][1]*100 / x[1][0])).sortBy(lambda x: x[0]).collect()


In [242]:
for t in ratio:
    print(f'priority {t[0]} ratio: {t[1]}')

priority 0 ratio: 8.207848950612467
priority 1 ratio: 0.302185030218503
priority 2 ratio: 0.08932486641872249
priority 8 ratio: 0.0
priority 9 ratio: 0.04471635602349641
priority 10 ratio: 0.12004801920768307
priority 11 ratio: 0.0


As we can see task with very low priority have more or less a higher chance of being evicted

### relation between scheduling class of tasks and their priority

In [18]:
ind1 = findCol(te_columns, 'jobId')
ind2 = findCol(te_columns, 'taskId')
ind3 = findCol(te_columns, 'priority')
ind4 = findCol(te_columns, 'schedulingClass')


tasks = te_entries.map(lambda x: ((x[ind1], x[ind2] ), (x[ind4], x[ind3] ) )).distinct()
def addLambda(x,y):
    x[y]+=1
    return x

def combine(x,y):
    return [x[0]+y[0],x[1]+y[1],x[2]+y[2]]
    
# a 3 elmts list for each class [(0-3),(4-7),(8-11)] each cell 
#represents the #task with a priority in the range indicated in the cell
tasks = tasks.map(lambda x: (int(x[1][0]), int(x[1][1])//4))\
             .aggregateByKey([0,0,0], addLambda, combine).sortByKey()


In [266]:
tasks.take(4)

[(0, [93500, 0, 744]),
 (1, [12592, 0, 14107]),
 (2, [23093, 0, 18780]),
 (3, [2031, 0, 26079])]

For each scheduling class there is a list of 3 elements. [#tasks with priority in (0-3),#tasks with priority in (4-7),#tasks with priority in (8-11)]

### ratio of successfully completed jobs per scheduling class

In [312]:
ind1 = findCol(je_columns, 'eventType')
ind2 = findCol(je_columns, 'schedulingClass')
ind3 = findCol(je_columns, 'jobId')

# completed jobs per scheduling class
jobs_completed = je_entries.filter(lambda x: x[ind1]=='4').map(lambda x: (int(x[ind2]),1))
jobs_completed = jobs_completed.aggregateByKey(0,lambda x,y:x+1, lambda x,y: x+y).collect()

# total_jobs per schediling class
jobs = je_entries.map(lambda x: (int(x[ind2]),x[ind3])).distinct()
jobs = jobs.aggregateByKey(0,lambda x,y:x+1, lambda x,y: x+y).collect()

In [269]:
def convert(tup):
    di = {}
    for a, b in tup: 
        di.setdefault(a, []).append(b) 
    return di 

jobs_compl_dict = convert(jobs_completed)
jobs_dict = convert(jobs)

In [270]:
for i in range(4):
    if i not in jobs_compl_dict:
        jobs_compl_dict[i] = [0]
    ratio = (0,jobs_compl_dict[i][0])[i in jobs_compl_dict]*100 / jobs_dict[i][0]
    print(f'the ratio of successful job for scheduling class {i} is: {ratio}')

the ratio of successful job for scheduling class 0 is: 23.612622415669204
the ratio of successful job for scheduling class 1 is: 19.93846153846154
the ratio of successful job for scheduling class 2 is: 6.694855532064834
the ratio of successful job for scheduling class 3 is: 0.0


AS we can see the less latency-sensitive jobs are the higher their chance to complete

### What is the percentage of computational power lost due to maintenance (a machine wentoffline and reconnected later)?

In [274]:
import heapq

In [275]:

def timeCounter(eventMap):
    eventList = eventMap[1]
    heapq.heapify(eventList)
    heapq.heappop(eventList)  # remove first add
    rmTime = False
    res = 0
    # addTime = 0
    # remTime = 0
    while len(eventList) > 0:
        event = heapq.heappop(eventList)
        if event[1] == 1:
            rmTime = event[0]
        #    remTime +=1
        elif rmTime and event[1] == 0:
            res += event[0] - rmTime
            rmTime = False
        #    addTime +=1
   # if addTime != 0 or remTime != 0:
    #    sys.stdout.write("machine: " + str(eventMap[0]) +" addTime: " + str(addTime) + " resTime: " + str(remTime)+"\n")
    return (eventMap[0], res)

    

In [276]:
ind1 = findCol(me_columns, 'timestamp')
ind2 = findCol(me_columns, 'machineId')
ind3 = findCol(me_columns, 'eventType')



#mappedRDD = rdd.map(mapper_1).cache()
mappedRDD = me_entries.map(lambda x: (int(x[ind2]), (int(x[ind1]), int(x[ind3]) ) )).cache()
maxTime = mappedRDD.map(lambda t: t[1][0]).max()  # the maxTime traced

# RDD for each machine the time it has been added to the system (the minimum time for which there is an event for such machine)
machineAddTime = mappedRDD.combineByKey(
    lambda t: t[0], lambda x_old, t: min(x_old, t[0]), lambda x0, x1: min(x0, x1))
# the sum of the total lifetime of all the machines in microseconds
totalMachineTime = machineAddTime.aggregate(
    0, lambda x_old, t: x_old + maxTime - t[1], lambda x0, x1: x0+x1)

# only keep add and removal events
filteredRDD = mappedRDD.filter(lambda t: (t[1][1] == 0 or t[1][1] == 1))
# for each machine get a list of event-tuples (time, event-type)
aggregatedRDD = filteredRDD.combineByKey(
    lambda x: [x], lambda h, x: h + [x], lambda h1, h2: h1+h2)

resRDD = aggregatedRDD.map(timeCounter)  # only keep down times
# sum downtimes and divide by the total time
wasted_time = resRDD.map(lambda x: x[1]).sum()/totalMachineTime


In [277]:
wasted_time

0.004946197580885792

this is the ratio between the wasted time in reboots over the total up-time of machines

### average number of tasks per job

In [279]:
ind1 = findCol(te_columns, 'jobId')
ind2 = findCol(te_columns, 'taskId')

countSum = te_entries.map(lambda x: (x[ind1],x[ind2]))\
    .distinct()\
    .aggregateByKey(0, lambda x, y: x+1, lambda x, y: x+y)\
    .map(lambda y: y[1])\
    .aggregate((0, 0), lambda x, y: (x[0]+1, x[1]+y), lambda x, y: (x[0] + y[0], x[1]+y[1]))

average_tasks_job = countSum[1]/countSum[0]


In [280]:
average_tasks_job

39.00429009193054

In [283]:
threshold = 0.8

def adder(d, tuple):
    if tuple[1] not in d:
        d[tuple[1]] = 1
    else:
        d[tuple[1]] += 1
    return d

def merger(d1, d2):
    for k in d2:
        if k not in d1:
            d1[k] = d2[k]
        else:
            d1[k] += d2[k]
    return d1

def percentile(tuple1):
    di = tuple1[1]
    _max = -1
    sum = 0
    for k in di:
        v = di[k]
        _max = max(_max, v)
        sum += v
    return (_max/ sum)

def aggregator(tuple1, val):

    a = tuple1[0] + 1
    b = tuple1[1]
    if val > threshold:
        b += 1
    return (a,b)


In [284]:
ind1 = findCol(te_columns, 'jobId')
ind2 = findCol(te_columns, 'taskId')
ind3 = findCol(te_columns, 'machineId')

rdd = te_entries.map(lambda x: (int(x[ind1]), (int(x[ind2]), x[ind3] ) ))
projectedRDD = rdd.filter(lambda v: v[1][1] != '').distinct()

aggregatedRDD = projectedRDD.filter(lambda p: len(p[1])>1).aggregateByKey(dict(), adder, merger)
ratio = aggregatedRDD.map(percentile).aggregate((0,0), aggregator, lambda t1,t2: (t1[0]+t2[0], t1[1]+t2[1]))


In [285]:
ratio

(4879, 3065)

### Is there a relation between the amount of resource consumed by tasks and their priority?

In [299]:
from math import sqrt

In [300]:
scheduleRDD = te_entries.map(lambda lines: ((int(lines[2]), int(lines[3])), int(lines[8])))

usageRDD = tu_entries.map(lambda lines: ((int(lines[2]), int(lines[3])), (lines[5], lines[6])))\
                     .filter(lambda x: x[1][0] != '' and x[1][1] != '')\
                     .map(lambda p: ((p[0][0], p[0][1]),(float(p[1][0]),float(p[1][1]))))\
                     .aggregateByKey((0,0,0), lambda x,y: (x[0]+1, x[1]+y[0],x[2]+y[1]),lambda x,y: (x[0]+y[0],x[1]+y[1],x[2]+y[2]))\
                     .map(lambda p: ((p[0][0],p[0][1]), (p[1][1]/p[1][0], p[1][2]/p[1][0])))  # meanCPURate and ramUsage exist
    # ((jobIdD, taskID), ((Priority), (meanCPURate,ramUsage)))
completeRDD = scheduleRDD.join(usageRDD).cache()
sumsCount = completeRDD.aggregate((0, 0, 0, 0),
                                  lambda x, y: (x[0]+1, x[1]+y[1][0],
                                                x[2]+y[1][1][0], x[3]+y[1][1][1]),
                                  lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3]))
avgPriority = sumsCount[1]/sumsCount[0]
avgmeanCPURate = sumsCount[2]/sumsCount[0]
avgRamUsage = sumsCount[3]/sumsCount[0]


diffRDD = completeRDD.map(lambda x: (
    x[1][0] - avgPriority, x[1][1][0] - avgmeanCPURate, x[1][1][1] - avgRamUsage))
#alfa = diffRDD.take(3)
pearsonComponents = diffRDD.aggregate((0., 0., 0., 0., 0.),  # [(xi-x')(yi-y'), (xi-x')(zi-z'), (xi-x')^2, (yi-y')^2, (zi-z')^2] where x is priority, y is cpu, z is ram
                                      lambda x, y: (
    x[0]+y[0]*y[1], x[1]+y[0]*y[2], x[2]+y[0]*y[0], x[3]+y[1]*y[1], x[4]+y[2]*y[2]),
    lambda ya, yb: (ya[0]+yb[0], ya[1]+yb[1], ya[2]+yb[2], ya[3]+yb[3], ya[4]+yb[4]))

correlations = pearsonComponents[0]/sqrt(pearsonComponents[2]*pearsonComponents[3]), pearsonComponents[1]/sqrt(pearsonComponents[2]*pearsonComponents[4])



In [301]:
correlations

(0.24994329658364695, 0.4687627723345299)

### Can  we  observe  correlations  between  peaks  of  high  resource  consumption  on  some  ma-chines and task eviction events?

In [304]:
job_id = findCol(te_columns, 'jobId')
task_id = findCol(te_columns, 'taskId')
machine_id = findCol(te_columns, 'machineId')
time_id = findCol(te_columns, 'timestamp')
event_id = findCol(te_columns, 'eventType')

def mapper_1(line):
    machID = -1
    if line[machine_id] != '':
        machID = int(line[machine_id])
    # ((time, machineID), event_type)
    return ((int(line[time_id])//300000000, machID), int(line[event_id]))

def mapper_2(line):
    val = -1
    if line[5] != '':
        val = float(line[5])
    # ((start_time, machine ID), task_usage_rate)
    return ((int(line[1])//300000000, int(line[4])), val)
def customSum(sum, elem):
    if sum>=0 and elem>=0:
        return sum+elem
    return -1 # -1 elements will be filtered out


In [305]:

evictedCountRDD = te_entries.map(mapper_1).filter(lambda x: x[1] == 2 and x[0][1] >= 0).aggregateByKey(
    0, lambda x, y: x+1, lambda x, y: x+y)  # only keep EVICT events and count evicts for time window of 300s

machineCPUUsageRDD = tu_entries.map(mapper_2).aggregateByKey(0,customSum, customSum)\
                               .filter(lambda x: x[1] >= 0)  # for pairs (start_time, machine ID) a valid sum of cpu usage (sum of task usages)

dataRDD = evictedCountRDD.join(machineCPUUsageRDD).map(
    lambda x: (x[1][0], x[1][1])).cache() # result RDD of paired (evictedCount, cpu_usage_rate) for each machine-timeBucket

sumsCount= dataRDD.aggregate((0, 0, 0),
lambda x, y: (x[0] + 1, x[1]+y[0], x[2]+y[1]),
lambda x, y: (x[0] + y[0], x[1] + y[1], x[2]+y[2]))

avgEvicted = sumsCount[1]/sumsCount[0]
avgPeak = sumsCount[2]/sumsCount[0]

pearsonComponents = dataRDD.aggregate((0,0,0), lambda x,y: (x[0]+(y[0]-avgEvicted)*(y[1]-avgPeak), x[1]+(y[0]-avgEvicted)*(y[0]-avgEvicted), x[2]+(y[1]-avgPeak)*(y[1]-avgPeak)),
lambda x, y: (x[0] + y[0], x[1] + y[1], x[2]+y[2]))

correlations = pearsonComponents[0]/sqrt(pearsonComponents[1]*pearsonComponents[2])


In [306]:
correlations

0.060042960762372366

## DataFrames

In [93]:
#convert RDDs into dataframes
te_df = te_entries.toDF(schema=te_columns)
je_df = je_entries.toDF(schema=je_columns)

In [94]:
df.head(4)

[Row(timestamp='0', missing='2', jobId='3418309', taskId='0', machineId='4155527081', eventType='0', username='70s3v5qRyCO/1PCdI6fVXnrW8FU/w+5CKRSa72xgcIo=', schedulingClass='3', priority='9', rrfCpu='', rrfRam='', rrfDiskSpace='', diffMConstraint=''),
 Row(timestamp='0', missing='2', jobId='3418309', taskId='1', machineId='329150663', eventType='0', username='70s3v5qRyCO/1PCdI6fVXnrW8FU/w+5CKRSa72xgcIo=', schedulingClass='3', priority='9', rrfCpu='', rrfRam='', rrfDiskSpace='', diffMConstraint=''),
 Row(timestamp='0', missing='', jobId='3418314', taskId='0', machineId='3938719206', eventType='0', username='70s3v5qRyCO/1PCdI6fVXnrW8FU/w+5CKRSa72xgcIo=', schedulingClass='3', priority='9', rrfCpu='0.125', rrfRam='0.07446', rrfDiskSpace='0.0004244', diffMConstraint='0'),
 Row(timestamp='0', missing='', jobId='3418314', taskId='1', machineId='351618647', eventType='0', username='70s3v5qRyCO/1PCdI6fVXnrW8FU/w+5CKRSa72xgcIo=', schedulingClass='3', priority='9', rrfCpu='0.125', rrfRam='0.0744

In [101]:
from pyspark.sql.functions import col,udf

###  Get the ID of the job with the highest number of tasks

In [96]:
te_df.select('jobId','taskId').distinct().groupBy('jobId').count().orderBy(col('count').desc()).head(1)

[Row(jobId='6221861800', count=20010)]

### On average, how many tasks get killed or evicted per job?

In [97]:

# number of tasks killed or evicted
job_tasks = te_df.filter((te_df.eventType=='2') | (te_df.eventType=='5')).select('jobId','taskId').distinct().count()

# total number of jobs
total = je_df.select('jobId').distinct().count()

avg = job_tasks / total

In [98]:
avg

1.9308163265306122

### Do tasks with low priority have a higher probability of being evicted?

In [142]:

def mapper_1(eventType):
    evicted = (0,1)[eventType=='2']
    return evicted 

def mapper_2(eventType):
    scheduled = (0,1)[eventType=='1']
    return scheduled

def mapper_3(sCount,eCount):
    return eCount*100 / sCount

udf1 = udf(mapper_1)
udf2 = udf(mapper_2)
udf3 = udf(mapper_3)

df3 = te_df.filter((te_df.eventType=='1') | (te_df.eventType=='2'))\
           .withColumn('evictedCount', udf1('eventType'))\
           .withColumn('scheduledCount', udf2('eventType'))\
           .withColumn("priority",col("priority").cast('int'))\
           .select('priority','evictedCount','scheduledCount')\
           .groupBy('priority')\
           .agg({'evictedCount': 'sum', 'scheduledCount': 'sum'})\
           .withColumn('ratio', udf3('sum(scheduledCount)','sum(evictedCount)'))\
           .orderBy('priority')




In [141]:
df3.show()

+--------+-------------------+-----------------+-------------------+
|priority|sum(scheduledCount)|sum(evictedCount)|              ratio|
+--------+-------------------+-----------------+-------------------+
|       0|            51268.0|           4208.0|  8.207848950612467|
|       1|            17208.0|             52.0|  0.302185030218503|
|       2|            61573.0|             55.0|0.08932486641872249|
|       8|             3371.0|              0.0|                0.0|
|       9|            49199.0|             22.0|0.04471635602349641|
|      10|              833.0|              1.0|0.12004801920768307|
|      11|             6542.0|              0.0|                0.0|
+--------+-------------------+-----------------+-------------------+



So what implement 3 of the previous analysis with dataframe. In Dataframe, data are organized into named columns. It is conceptually equal to a table in a relational database. Unlike RDD, Dataframe  will automatically find out the schema of the dataset. For aggreagation it is definetly faster than an RDD and provide an easy API for that.