In [1]:
spark

In [2]:
#/FileStore/tables/o42yj8mx1493971989213/job_events.csv
jobEventsDF = sqlContext.read.format("csv").load("/FileStore/tables/o42yj8mx1493971989213/job_events.csv")

In [3]:
jobEventsDF = jobEventsDF.drop('_c1', '_c4', '_c6', '_c7').filter("_c0 > 600 * 1000000").filter("_c0 < 9223372036854775807")

In [4]:
display(jobEventsDF)

#_c0 timestamp
#_c2 jobID
#_c3 eventType
#_c5 schedulingClass

In [5]:
jobEventsDF.count()

In [6]:
majorJobEventsDF = jobEventsDF.filter("_c3 == 1 or _c3 == 3 or _c3 == 4 or _c3 == 5")

In [7]:
display(majorJobEventsDF)

# 1 Schedule
# 3 Fail
# 4 Finish
# 5 Kill

In [8]:
majorJobEventsClassDF = majorJobEventsDF.groupBy(['_c5', '_c3']).count().withColumnRenamed('_c3', 'Event Type').withColumnRenamed('_c5', 'Scheduling Class')

In [9]:
display(majorJobEventsClassDF)

In [10]:
majorJobEventsClassPDDF = majorJobEventsClassDF.toPandas()

In [11]:
majorJobEventsClassPDDF = majorJobEventsClassPDDF.pivot(index='Scheduling Class', columns='Event Type', values='count')

In [12]:
majorJobEventsClassFig = majorJobEventsClassPDDF.plot.bar()
majorJobEventsClassFig.set_ylabel('Number of Job Events')
display(majorJobEventsClassFig.figure)

In [13]:
from pyspark.sql.functions import ceil
majorJobEventsDayDF = majorJobEventsDF.withColumn('day', ceil((majorJobEventsDF._c0 / 1000000 - 600) / 86400)).drop('_c0', '_c2', '_c5').groupBy(['day', '_c3']).count().withColumnRenamed('_c3', 'Event Type')

In [14]:
display(majorJobEventsDayDF)

In [15]:
majorJobEventsDayPDDF = majorJobEventsDayDF.toPandas()
majorJobEventsDayPDDF = majorJobEventsDayPDDF.pivot(index='day', columns='Event Type', values='count')

In [16]:
majorJobEventsDayFig = majorJobEventsDayPDDF.plot.area(stacked=False)
majorJobEventsDayFig.set_xlabel('Day')
majorJobEventsDayFig.set_ylabel('Number of Job Events')
display(majorJobEventsDayFig.figure)

In [17]:
jobTimeDF = jobEventsDF.withColumn('timestamp', (jobEventsDF._c0 / 1000000 - 600) / 60).drop('_c0')

In [18]:
display(jobTimeDF)

In [19]:
from pyspark.sql.functions import min
from pyspark.sql.functions import max
# 0 Submit
# 1 Schedule
# 4 Finish
class0TimeDF = jobTimeDF.filter("_c3 == 0").groupBy(['_c5', '_c3', '_c2']).agg(min(jobTimeDF.timestamp)).drop('_c3').withColumnRenamed('min(timestamp)', 'submittime')
class1TimeDF = jobTimeDF.filter("_c3 == 1").groupBy(['_c5', '_c3', '_c2']).agg(max(jobTimeDF.timestamp)).drop('_c5', '_c3').withColumnRenamed('max(timestamp)', 'scheduletime')
class4TimeDF = jobTimeDF.filter("_c3 == 4").groupBy(['_c5', '_c3', '_c2']).agg(max(jobTimeDF.timestamp)).drop('_c5', '_c3').withColumnRenamed('max(timestamp)', 'finishtime')

# jobTimeDF = jobTimeDF.groupBy(['_c5', '_c3', '_c2']).agg(functions.min(jobTimeDF.timestamp), functions.max(jobTimeDF.timestamp))

In [20]:
display(class0TimeDF)

In [21]:
jobDurationDF = class0TimeDF.join(class1TimeDF, '_c2', 'outer').join(class4TimeDF, '_c2', 'outer').na.fill(0)

In [22]:
display(jobDurationDF)

In [23]:
from pyspark.sql.functions import ceil

jobExecDF = jobDurationDF.filter("submittime != 0 and scheduletime != 0 and finishtime != 0").withColumn('totaltime', ceil(jobDurationDF.finishtime - jobDurationDF.submittime)).withColumn('runtime', ceil(jobDurationDF.finishtime - jobDurationDF.scheduletime)).drop('_c2', 'submittime', 'scheduletime', 'finishtime')
jobWaitDF = jobExecDF.withColumn('waittime', jobExecDF.totaltime - jobExecDF.runtime).drop('totaltime', 'runtime')

In [24]:
display(jobExecDF)

In [25]:
display(jobWaitDF)

In [26]:
jobExecPDDF = jobExecDF.groupBy(['_c5', 'runtime']).count().withColumnRenamed('_c5', 'Scheduling Class').toPandas()

In [27]:
jobExecPDDF = jobExecPDDF.pivot(index='runtime', columns='Scheduling Class', values='count')

In [28]:
# jobExecPDDF = jobExecPDDF.cumsum()
jobExecFig = jobExecPDDF.plot(logx=True)
jobExecFig.set_xlabel('Runtime (min)')
jobExecFig.set_ylabel('Number of Jobs')
display(jobExecFig.figure)

In [29]:
jobExecPDDF.describe()

In [30]:
jobExecShortPDDF = jobExecDF.filter("runtime <= 300").groupBy(['_c5', 'runtime']).count().withColumnRenamed('_c5', 'Scheduling Class').toPandas()

In [31]:
jobExecShortPDDF = jobExecShortPDDF.pivot(index='runtime', columns='Scheduling Class', values='count')
jobExecShortFig = jobExecShortPDDF.plot(logy=True)
jobExecShortFig.set_xlabel('Runtime (min)')
jobExecShortFig.set_ylabel('Number of Jobs')
display(jobExecShortFig.figure)

In [32]:
jobWaitPDDF = jobWaitDF.groupBy(['_c5', 'waittime']).count().withColumnRenamed('_c5', 'Scheduling Class').toPandas()
jobWaitPDDF = jobWaitPDDF.pivot(index='waittime', columns='Scheduling Class', values='count')

In [33]:
jobWaitFig = jobWaitPDDF.plot(logx=True, logy=True)
jobWaitFig.set_xlabel('Wait Time (min)')
jobWaitFig.set_ylabel('Number of Jobs')
display(jobWaitFig.figure)

In [34]:
jobWaitPDDF.describe()