In [61]:
#!/usr/bin/python3
import sys
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import json
import graph_tool as gt
from graph_tool.all import *
from matplotlib.ticker import MaxNLocator
import io
import string

output = io.StringIO()


class Del:
  def __init__(self, keep=string.digits):
    self.comp = dict((ord(c),c) for c in keep)
  def __getitem__(self, k):
    return self.comp.get(k)

DD = Del()


def dump_dag(job_graph):
    job_graph.save("/local0/graph.dot", 'dot')

def get_critical_path(task_graph):
    return "1_3_2_5"

def build_graph(subtasks):
    num_subtasks_in_job = len(subtasks)
    g = gt.Graph(directed=True)
    g.add_vertex(num_subtasks_in_job)
    count_independent_tasks = 0;

    # Build the graph
    vertex_index = 0;
    vertex_map = dict()
    for row in subtasks:
        vertex_label = [int(i.translate(DD)) -1 for i in row.split('_')][0]
        vertex_map.update({vertex_label: vertex_index})
        vertex_index = vertex_index + 1;
    
    for row in subtasks:
        dependencies = [int(i.translate(DD)) -1 for i in row.split('_')]
        
        if len(dependencies) == 1:
            count_independent_tasks = count_independent_tasks + 1
            continue;
        child_task = vertex_map[dependencies[0]]
        child_vertex = g.vertex(child_task)
        for parent_task in dependencies[1:]:
            if parent_task in vertex_map:
                parent_vertex = g.vertex(vertex_map[parent_task])
                g.add_edge(parent_vertex, child_vertex)
    
    print("\t\t# of independent tasks is " + str(count_independent_tasks) + 
          ", in degree: " + str(gt.stats.vertex_average(g, "in")) +
          ", out degree: " + str(gt.stats.vertex_average(g, "out")))        
    return g;

def get_subtask_stats(jobId):
    instance_statistics['run_time'] = instance_statistics['End Time'] - instance_statistics['Start Time']

    instance_statistics_gb_task = instance_statistics.groupby(['Task'])

    tasks_runtime = instance_statistics_gb_task['End Time'].max() - instance_statistics_gb_task['Start Time'].min()
    tasks_start_delay = instance_statistics_gb_task['Start Time'].max() - instance_statistics_gb_task['Start Time'].min()
    task_max_instance = instance_statistics_gb_task['run_time'].max()
    task_min_instance = instance_statistics_gb_task['run_time'].min()

    task_statistics2 = pd.DataFrame({'min instance time': task_min_instance,
                                     'max instance time': task_max_instance,
                                     'run time': tasks_runtime,
                                     'schedule delay': tasks_start_delay})
    return task_statistics2;





In [18]:
file_name = "/local0/batch_task.csv"

# Read Batch Data, hint: Job and Type column should swap
data = pd.read_csv(file_name, names=['Name', '# Instances', 'Job', 'Type', 'Status', 'Start Time', 'End Time', 'CPU', 'Mem'])
#total_number_of_jobs = len(data.groupby(['Job']))
#print("Total number of submitted batch jobs: " + str(total_number_of_jobs))
#total_number_of_tasks = data[['Name', 'Job']].nunique()
#print("Total number of submitted task: " + str(total_number_of_jobs))

# Filter jobs with dependency, dataset has two types of batch jobs, w/wo DAG info and 
data_dag = data.loc[~data['Name'].str.startswith('task_', na=False)]

print("Jobs with Dags .......")
# group jobs by their JobId
job_groups_by_jobId = data_dag.groupby(['Job']);
print("# of jobs with Dags: " + str(len(job_groups_by_jobId)))

# Count DAGs by size
freq_number_tasks_per_dag = job_groups_by_jobId.size()
tasks_per_dag = freq_number_tasks_per_dag.to_frame().rename(columns={0: "DAG size"})
tasks_per_dag.reset_index(inplace=True)
tasks_per_dag['run_time'] = 0
tasks_per_dag['critical_path'] = 0

Jobs with Dags .......


KeyboardInterrupt: 

In [62]:
dags_groupby_size = tasks_per_dag.groupby('DAG size')
for name, group in dags_groupby_size:
    dag_size = name
    print("DAG size:" + str(dag_size) + ", Number of Jobs with this size:" + str(len(group)))

    if dag_size == 1:
        '''
        print("Process runtime of Single Jobs")

        single_job_dags = pd.merge(data_dag, group, on='Job', how='inner')
        single_job_dags['Run Time'] = single_job_dags['End Time'] - joined['Start Time'] + 1
        single_job_dags = single_job_dags.loc[single_job_dags['Run Time'] > 0];
        print(len(single_job_dags))

        print("Mininum # of instances in single jobs: " + str(min(single_job_dags['# Instances'])))
        print("Maximum # of instances in single jobs: " + str(max(single_job_dags['# Instances'])))
        print("Mininum run time of single jobs: " +str(min(single_job_dags['Run Time'])))
        print("Maximum run time of single jobs: " + str(max(single_job_dags['Run Time'])))
        print("Average run time of single jobs:" + str(single_job_dags['Run Time'].mean()) )
        print("Standard deviation of single jobs run time:" + str(single_job_dags['Run Time'].std()) )
        print("Variance of single jobs run time:" + str(single_job_dags['Run Time'].var()) )
        '''
        continue;

        
    if dag_size == 2:
        continue;
        
        
    # groupby DAGs by jobId
    tasks_ofsize_dagsize = pd.merge(data_dag, group, on='Job', how='inner')
    jobs_ofsize_dagsize = tasks_ofsize_dagsize.groupby(['Job'])

    counter = 0;
    for name, group in jobs_ofsize_dagsize:
        jobId = name;
        job_statistics = data_dag.loc[data_dag['Job'] == jobId]
        job_runtime = max(job_statistics['End Time']) - min(job_statistics['Start Time'])
        print("\tProcess Job:" + jobId + ", of size " + str(dag_size) + ', runtime is: ' + str(job_runtime))

        #subtask_statistics = get_subtask_stats(jobId)

        job_statistics['Name'] = job_statistics['Name'].map(lambda x: x.lstrip('MJR'))
        subtasks = job_statistics['Name'].values
        subtask_graph = build_graph(subtasks)
        #dump_dag(subtask_graph)
        if counter == 1000:
            break;
        counter += 1
        #dags_groupby_size['critical_path'].loc[job_ofsize_dagsize['Job'] == jobId] = get_critical_path(subtask_graph)
    break;


DAG size:1, Number of Jobs with this size:1315457
DAG size:2, Number of Jobs with this size:634561
DAG size:3, Number of Jobs with this size:404402
	Process Job:j_1000012, of size 3, runtime is: 2364
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


	Process Job:j_100002, of size 3, runtime is: 320
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000034, of size 3, runtime is: 4
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000037, of size 3, runtime is: 7
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000049, of size 3, runtime is: 13
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000075, of size 3, runtime is: 11
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000095, of size 3, runtime is: 9
		# of independent

	Process Job:j_1000560, of size 3, runtime is: 4
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000569, of size 3, runtime is: 16
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000572, of size 3, runtime is: 25
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000582, of size 3, runtime is: 1
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000583, of size 3, runtime is: 328
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000584, of size 3, runtime is: 148
		# of independ

	Process Job:j_1000965, of size 3, runtime is: 57
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1000996, of size 3, runtime is: 57
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001021, of size 3, runtime is: 784
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001026, of size 3, runtime is: 0
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_100104, of size 3, runtime is: 197
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_100106, of size 3, runtime is: 6
		# of independen

	Process Job:j_1001504, of size 3, runtime is: 5
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001513, of size 3, runtime is: 7
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001514, of size 3, runtime is: 14
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001516, of size 3, runtime is: 42
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001523, of size 3, runtime is: 2
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001537, of size 3, runtime is: 34
		# of independent

	Process Job:j_1001945, of size 3, runtime is: 8
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001953, of size 3, runtime is: 502
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001958, of size 3, runtime is: 33
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001959, of size 3, runtime is: 105
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001961, of size 3, runtime is: 1024
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1001963, of size 3, runtime is: 192
		# of inde

	Process Job:j_1002342, of size 3, runtime is: 28
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002357, of size 3, runtime is: 563
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002381, of size 3, runtime is: 1
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002384, of size 3, runtime is: 6
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002393, of size 3, runtime is: 1
		# of independent tasks is 3, in degree: (0.0, 0.0), out degree: (0.0, 0.0)
	Process Job:j_1002397, of size 3, runtime is: 6
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759

	Process Job:j_1002766, of size 3, runtime is: 35
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002769, of size 3, runtime is: 21
		# of independent tasks is 2, in degree: (0.6666666666666666, 0.5443310539518174), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002775, of size 3, runtime is: 41
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002824, of size 3, runtime is: 1
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002829, of size 3, runtime is: 0
		# of independent tasks is 1, in degree: (0.6666666666666666, 0.2721655269759087), out degree: (0.6666666666666666, 0.2721655269759087)
	Process Job:j_1002832, of size 3, runtime is: 2
		# of independent

KeyboardInterrupt: 

In [None]:
file_name = "/local0/j_3989171.csv"

# Read Batch Data, hint: Job and Type column should swap
instance_statistics_raw = pd.read_csv(file_name, names=['instance', 'Task', 'Job', 'Type', 'Status', 'Start Time', 'End Time', 
                                     'machine_id', 'seq_no', 'total_seq_no',
                                     'cpu_avg', 'cpu_max', 'mem_avg', 'mem_max'])
instance_statistics = instance_statistics_raw[instance_statistics_raw['Status'] == 'Terminated']
instance_statistics['Task'] = instance_statistics['Task'].map(lambda x: x.lstrip('MJR'))

In [None]:
from matplotlib.ticker import MaxNLocator


minimum_time = 0

def format_func(value, tick_number):
    return int(value - minimum_time)


instance_statistics['run_time'] = instance_statistics['End Time'] - instance_statistics['Start Time']

instance_statistics_gb_task = instance_statistics.groupby(['Task'])
print(len(instance_statistics_gb_task))

tasks_runtime = instance_statistics_gb_task['End Time'].max() - instance_statistics_gb_task['Start Time'].min()
tasks_start_delay = instance_statistics_gb_task['Start Time'].max() - instance_statistics_gb_task['Start Time'].min()
task_max_instance = instance_statistics_gb_task['run_time'].max()
task_min_instance = instance_statistics_gb_task['run_time'].min()

task_statistics = pd.DataFrame({'min instance time': task_min_instance, 
                    'max instance time': task_max_instance,
                    'run time': tasks_runtime,
                    'schedule delay': tasks_start_delay})

task_statistics2 = job_withsize_203[['Name', '# Instances']].rename(columns={'Name':'Task'}).set_index('Task')

task_statistics = task_statistics.join(task_statistics2)
minimum_time = min(instance_statistics_gb_task['Start Time'].min().values) - 1

fig, ax = plt.subplots()
plt.locator_params(axis='y', nbins=6)
ax.hlines(task_statistics.index, 
           instance_statistics_gb_task['Start Time'].min().values, 
          instance_statistics_gb_task['End Time'].max().values)
ax.xaxis.set_major_formatter(plt.FuncFormatter(format_func))
ax.yaxis.set_major_locator(MaxNLocator(8))

#pyplot.locator_params(axis='x', nbins=10)

plt.tight_layout()
plt.savefig('job_schedule.pdf', format='pdf', dpi=200)
plt.savefig('job_schedule.png', format='png', dpi=200)
plt.show()

print("Minumum run time of a task in DAG:" + str(min(task_statistics['run time'])))
print("Maximum run time of a task in DAG:" + str(max(task_statistics['run time'])))

In [None]:
#single_jobs = data_dag.loc[data_dag['Job'] == single_jobs['Job']]
#print(single_jobs['Job'])
for name, group in dags_groupby_size: 
    print("DAG size:" + str(name) + ", Number of Jobs with this size:" + str(len(group)))
    
    
    
    if name == 203:
        print(group)
        job_withsize_203 = pd.merge(data_dag, group, on='Job', how='inner')
        print(len(job_withsize_203))
        job_withsize_203_runtime = max(job_withsize_203['End Time']) - min(job_withsize_203['Start Time'])
        print(job_withsize_203_runtime)
        
        num_tasks_in_job = len(job_withsize_203)
        
        g = gt.Graph(directed=True)
        g.add_vertex(num_tasks_in_job)
        
        job_withsize_203['Name'] = job_withsize_203['Name'].map(lambda x: x.lstrip('MJR'))
        tasks = job_withsize_203['Name']
        
        count_independent_tasks = 0;
        # Build the graph
        for index, row in job_withsize_203.iterrows():
            dependencies = [int(i) -1 for i in row['Name'].split('_')] 
            if len(dependencies) == 1:
                count_independent_tasks = count_independent_tasks + 1
                print(str(row['Name']) + " is an independent task, # of indepent tasks: " 
                      + str(count_independent_tasks))
                continue;
            child_task = dependencies[0]
            child_vertex = g.vertex(child_task)
            for parent_task in dependencies[1:]:
                parent_vertex = g.vertex(parent_task)
                g.add_edge(parent_vertex, child_vertex)
            
        #pos = gt.draw.fruchterman_reingold_layout(g, n_iter=1000)
        pos = gt.draw.arf_layout(g, max_iter=0)
        gt.draw.graph_draw(g,pos = pos, vertex_text=g.vertex_index, vertex_font_size=8,inline=True,
                           output_size=(1000, 1000), output="j_of_size_203_dep_graph.png")
        print("Draw a DAG")
    

In [7]:
#!/usr/bin/python3
import sys
import graph_tool as gt
from graph_tool.all import *

g = gt.Graph(directed=False)
num_vertices = 5
g.add_vertex(5)
edge_weights = g.new_edge_property('double')
g.edge_properties['weight'] = edge_weights
for i in range(0,num_vertices):
    for j in range(i+1,num_vertices):
        e = g.add_edge(i,j)
        edge_weights[e] = 15
        
g.list_properties()

weight         (edge)    (type: double)


In [3]:
import json
import re
import graph_tool as gt
from graph_tool.all import *

string = "#DAG:\n#--------------------------------------------------\n# Map Reduce Plan                                  \n#--------------------------------------------------\nMapReduce node: {scope-64\nMap Plan: {\nStore(hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734:org.apache.pig.impl.io.InterStorage) - scope-65\n{alpha: New For Each(false,false,false,false,false,false)[bag] - scope-60\n{{Project[bytearray][0] - scope-48\n{{Project[bytearray][1] - scope-50\n{{Project[bytearray][2] - scope-52\n{{Project[bytearray][3] - scope-54\n{{Project[bytearray][4] - scope-56\n{{Project[bytearray][5] - scope-58\n{{alpha: Load(/pigmix5/pigmix_users:PigStorage('\x01')) - scope-47}\n\nGlobal sort: {false}\n\n}\n\nMapReduce node: {scope-67\nMap Plan: {\na1: Local Rearrange[tuple]{chararray}(false) - scope-71\n{Constant(all) - scope-70\n{New For Each(false)[tuple] - scope-69\n{{Project[bytearray][0] - scope-68\n{{Load(hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-66}\n\nReduce Plan: {\nStore(hdfs://kariz-1:9000/tmp/temp121776661/tmp-19359954:org.apache.pig.impl.io.InterStorage) - scope-80\n{New For Each(false)[tuple] - scope-79\n{{POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-78\n{{{Project[tuple][*] - scope-77\n{{New For Each(false,false)[tuple] - scope-76\n{{{Constant(10) - scope-75\n{{{a1: POSort[bag]() - scope-62\n{{{{Project[bytearray][0] - scope-74\n{{{{Project[bag][1] - scope-73\n{{{Package(Packager)[tuple]{chararray} - scope-72}\n\nGlobal sort: {false}\n\n}\n\nMapReduce node: {scope-82\nMap Plan: {\na1: Local Rearrange[tuple]{bytearray}(false) - scope-83\n{Project[bytearray][0] - scope-61\n{Load(hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734:org.apache.pig.impl.io.InterStorage) - scope-81}\n\nReduce Plan: {\na1: Store(/pigmix5/pigmix_users_sorted:PigStorage('\x01')) - scope-63\n{New For Each(true)[tuple] - scope-86\n{{Project[bag][1] - scope-85\n{{Package(LitePackager)[tuple]{bytearray} - scope-84}\n\nGlobal sort: {true}\n\nQuantile file: {hdfs://kariz-1:9000/tmp/temp121776661/tmp-19359954}\n\n}\n\n"
ls = string.split("\n")

start_new_job = False

g = gt.Graph(directed=True)
input_dc = dict()
output_dc = dict()

for x in ls:
    if x.startswith("#"):
        continue;
    if x.startswith("MapReduce node:"):
        start_new_job = True
        v = g.add_vertex();
        print("Start new job")
    if x.find("Store") != -1:
        result = x.split('(')[1].split(')')[0]
        extra = result.split(":")[-1]
        outputs = result.replace(":" + extra, "")
        output_dc[outputs] = v;        
        print("\tStore: " + outputs)
    if x.find("Load") != -1:
        result = x.split('(')[1].split(')')[0]
        extra = result.split(":")[-1]
        inputs =  result.replace(":" + extra, "")
        input_dc[outputs] = v;
        print("\tLoad: " + inputs)
        
for key in input_dc:
    if key in output_dc:
        vi = output_dc[key]
        vo = input_dc[key]
        g.add_edge(vi, vo)
    

gt.draw.graph_draw(g,vertex_text=g.vertex_index, vertex_font_size=8,inline=True,
                           output_size=(200, 200), output="pigmix_generator_graph.png")

print("Graph done")

Start new job
	Store: hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734
	Load: /pigmix5/pigmix_users
Start new job
	Load: hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734
	Store: hdfs://kariz-1:9000/tmp/temp121776661/tmp-19359954
Start new job
	Load: hdfs://kariz-1:9000/tmp/temp121776661/tmp1968173734
	Store: /pigmix5/pigmix_users_sorted
Graph done


In [None]:
#!/usr/bin/python3
import sys
import graph_tool as gt
from graph_tool.all import *

g = gt.Graph(directed=False)
num_vertices = 5
g.add_vertex(5)
edge_weights = g.new_edge_property('double')
g.edge_properties['weight'] = edge_weights
for i in range(0,num_vertices):
    for j in range(i+1,num_vertices):
        e = g.add_edge(i,j)
        edge_weights[e] = 15
        
g.list_properties()