Task Events Inter-arrivals
==
(Run all the sections until the **Usage** section)
--

In [12]:
import numpy
import math
from utils import math_utils as mu
from bokeh.plotting import figure, show, output_file
from bokeh.io import output_notebook
from utils import plot_utils as pu
from utils import output_utils as ou

output_notebook()

# Load and parse the job traces from the dataset
# Traces content
# 0: timestamp
# 1: missing info
# 2: job ID
# 3: task index
# 4: machine ID
# 5: event type
# 6: username
# 7: scheduling class
# 8: priority
# 9: resource req CPU cores
# 10: resource req RAM
# 11: resource req local disk space
# 12: different machine costraint

# Load all the traces inside the directory 
t_google_traces = sc.textFile("../task_events/*.gz")

t_google_traces_RDD = t_google_traces.map(lambda line: line.split(","))\
    .map(lambda tokens: (int(tokens[0]),int(tokens[2]),int(tokens[3]),tokens[4],tokens[5],tokens[6],tokens[7],tokens[8],tokens[9],tokens[10],tokens[11]))\
    .cache()
# Traces content
# 0: timestamp
# 1: job ID
# 2: task index
# 3: machine ID
# 4: event type
# 5: username
# 6: scheduling class
# 7: priority
# 8: resource req CPU cores
# 9: resource req RAM
# 10: resource req local disk space
# 11: different machine costraint

# The 2nd element of the trace was a "missing information", so for the sake of simplicity was cut out
print t_google_traces_RDD.take(1)

[(0, 3418309, 0, u'4155527081', u'0', u'70s3v5qRyCO/1PCdI6fVXnrW8FU/w+5CKRSa72xgcIo=', u'3', u'9', u'', u'', u'')]


In [4]:
# We remove from the traces those that occurred before the beginning of the trace window (timestamp = 0)
# and those that occured after the end of the trace window (timestamp = 2^63-1)
# and we sort them in ascending order wrt the timestamps
t_filtered_google_traces_RDD = t_google_traces_RDD.map(lambda elem: (elem[0],(elem[1],elem[2],elem[3],elem[4])))\
    .filter(lambda elem: elem[0] != 0 and elem[0] != (2^63 - 1))\
    .sortByKey(1,1)

# Traces content
# 0: timestamp
# 1.0: job ID
# 1.1: task index
# 1.2: machine ID
# 1.3: event type
print t_google_traces_RDD.count() - t_filtered_google_traces_RDD.count(), "Traces were removed"
print t_filtered_google_traces_RDD.take(1)

277321 Traces were removed
[(600026913, (515042969, 1, u'372630443', u'1'))]


Main Functions
---

In [13]:
# This function takes as input 
# event_type: int, corresponding to the event type found in the traces
# init_time: int (in seconds!), it is the time from which we want to evaluate the model
# finish_time: int (in seconds!), it is the time when we want to stop the evaluation
# granularity: int (in seconds!), define the level of granularity for plotting the results of the model
# E.G. over a window of 200 seconds we may have a granularity of 10 seconds
# which means that the derived traces will be clustered, based on the timestamps, in groups following
# that granularity
# For example assume to start from time = 0 till time = 200
# cluster 1: time interval 0-10
# cluster 2: time interval 10-20
# ...
# cluster 20: time interval 180-200

def t_eval_time_window(event_type, machine_ID, job_ID, init_time, finish_time, granularity):
    init_time = init_time*10**6
    finish_time = finish_time*10**6
    granularity = granularity*10**6
    
    print t_filtered_google_traces_RDD.count()
    # First of all we apply another filter to select only the traces that corresponde to the event_type in input  
    t_eval_traces_RDD = t_filtered_google_traces_RDD.filter(lambda elem: elem[0] >= init_time and elem[0] < finish_time)
    print t_eval_traces_RDD.count()
    
    if(event_type != None):
        t_eval_traces_RDD = t_eval_traces_RDD.filter(lambda elem: elem[1][3] == unicode(event_type))
    if(machine_ID != None):
        t_eval_traces_RDD = t_eval_traces_RDD.filter(lambda elem: elem[1][2] == machine_ID)
    if(job_ID != None):
        t_eval_traces_RDD = t_eval_traces_RDD.filter(lambda elem: elem[1][0] == job_ID)
        
    t_eval_traces_RDD = t_eval_traces_RDD.map(lambda elem: elem[0])
    # Collect the RDD to get a python list
    t_eval_traces_list = t_eval_traces_RDD.collect()
    t_evaluated_means_list = []
    # This value will always contain the lowest bound for the clusterization based on the granularity
    # It's initial values is obviously the init_time
    t_lower_g = init_time
    # Define how many clusters we want to create depending on the input granularity
    t_n_cluster = (finish_time-init_time)/granularity
    
    t_interval_values_list = []
    # We iterate to creare each cluster
    for i in range(0,int(t_n_cluster+1)):
        # Define the cluster filtering the derived traces
        t_cluster_traces = [timestamp for timestamp in t_eval_traces_list if timestamp >= t_lower_g and timestamp < (t_lower_g+granularity)]
        # We then append to our list a tuple of this format
        # (cluster_lower_bound, cluster_upper_bound, mean_time_between_jobs)
        t_evaluated_means_list.append([t_lower_g, (t_lower_g+granularity), mu.mean_time_evaluation(sc, t_cluster_traces)])
        
        # interval_values_list contains all the inter-arrivals times between subsequent events among the intervals (clusters)
        t_interval_values_list.extend(mu.get_interval_values(sc, t_cluster_traces, t_lower_g, t_lower_g+granularity))
        # Increase the lower bound to reach the next cluster
        t_lower_g += granularity
    
    # Evaluate some metrics and plot them
    t_metrics_list = mu.evaluate_statistics(sc, t_interval_values_list)
    pu.plot_custom_metrics(sc, t_metrics_list, 0, "jobs", "time", "mean", "red")
    
    # Write on CSV
    #ou.write_csv(interval_values_list, event_type, init_time, finish_time, granularity)
    # Remove the NaNs and substitute them with 0s
    t_evaluated_means_list = mu.removeNans(t_evaluated_means_list)
    
    # Calculate the average time for the whole period
    t_mean_time_whole_period = mu.mean_time_evaluation(sc, t_eval_traces_RDD.collect())
    # Plot inter-irravals
    pu.plot_inter_arrivals(sc, t_evaluated_means_list, t_mean_time_whole_period)
    
# eval_day(event_type, day(int), granularity(seconds))
# Evaluate a single day (24 h)
def t_eval_day(event_type, day, granularity):
    if init_day > finish_day:
        print "Error, init day must be previous finish day"
    else if init_day > 30 or finish_day > 30:
        print "Error, exceeded month length (30 days)"
    else:
        if day != 1:
            init_time = 18000 + (day-2) * 86400
            finish_time = 18000 + (day-1) * 86400 - 1     
        else:
            init_time = 0
            finish_time = 18000

        t_eval_time_window(0,None,None,init_time,finish_time,granularity)

# eval_days(event_type, init_day(int), finish_day(int), granularity(seconds))
# Evaluate multiple days
def t_eval_days(event_type, init_day, finish_day, granularity):
    if init_day > finish_day:
        print "Error, init day must be previous finish day"
    else if init_day > 30 or finish_day > 30:
        print "Error, exceeded month length (30 days)"
    else:
        if init_day != 1:
            init_time = 18000 + (init_day-2) * 86400
            finish_time = 18000 + (finish_day-1) * 86400 - 1   
        else:
            init_time = 0
            finish_time = 18000 + (finish_day-1) * 86400 - 1

        t_eval_time_window(0,None,None,init_time,finish_time,granularity)

Usage
---

In [14]:
t_eval_time_window(4, None, None, 600, 86400, 7200)
#t_eval_days(1,5,10,7200)

6882293
3220838
event
[600046926, 600046937, 600046983, 600047059, 600047200, 600047230, 600047610, 600047705, 600818396, 600818433]
