In [181]:
import pandas as pd
import json
import os
import sys
import glob
import graph_tool.all as gt
from datetime import datetime, date, time
import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
from os import listdir
from os.path import isfile, join
import numpy as np
import math

statistics_dir = '/home/maniaa/ashes/code/statistics/'
stat_csv = [(statistics_dir + f) for f in listdir(statistics_dir) if (f.endswith(".csv") and isfile(join(statistics_dir, f)))]
stat_csv.sort()
stat_csv = stat_csv[1:]

In [197]:
for index, f in enumerate(stat_csv):
    # 6Am of each day
    print(f)
    month, day, year = f.split('/')[-1].split('.csv')[0].split('-')
    trace_starttime = datetime.combine(date(int(year), int(month), int(day)), time(7, 0))
    
    df = pd.read_csv(f)
    df = df[df['submitTime']/1000 > datetime.timestamp(trace_starttime)]
    
    df['submit_ts'] = df['submitTime']//1000 - datetime.timestamp(trace_starttime);
    df.sort_values('submit_ts', inplace=True)
    df.reset_index(inplace=True)
    
    break

/home/maniaa/ashes/code/statistics/08-02-2018.csv


In [198]:
print(df.columns)

Index(['index', 'state', 'submitTime', 'startTime', 'finishTime', 'queueTime',
       'runTime', 'NumMaps', 'avgMapTime', 'avgReduceTime', 'avgShuffleTime',
       'avgMergeTime', 'NumReduce', 'HDFS_INPUT_SIZE', 'HDFS_OUTPUT_SIZE',
       'MAP_CPU_USAGE_MSEC', 'REDUCE_CPU_USAGE_MSEC', 'MAP_MEM_USAGE_B',
       'REDUCE_MEM_USAGE_B', 'HIVE_RECORDS_IN', 'HIVE_RECORDS_OUT',
       'HIVE_RECORDS_INTERMEDIATE', 'SLOTS_MILLIS_MAPS',
       'SLOTS_MILLIS_REDUCES', 'TOTAL_LAUNCHED_MAPS', 'TOTAL_LAUNCHED_REDUCES',
       'DATA_LOCAL_MAPS', 'RACK_LOCAL_MAPS', 'MILLIS_MAPS', 'MILLIS_REDUCES',
       'VCORES_MILLIS_MAPS', 'VCORES_MILLIS_REDUCES', 'MB_MILLIS_MAPS',
       'MB_MILLIS_REDUCES', 'PHMAP_MEM_USAGE_B', 'PHREDUCE_MEM_USAGE_B',
       'PHPHYSICAL_MEMORY_B', 'jobid', 'job.maps', 'query', 'outputdir',
       'scratchdir', 'sessionid', 'query.id', 'local.scratchdir', 'tmpouput',
       'user.name', 'job', 'n_inputs', 'inputdir', 'workflow.node',
       'workflow.id', 'workflow.dag', 'table.nam

In [280]:
metadata = {}

def get_obj(f):
    f = f.replace('hdfs://datalake-nnha', '')
    if '-ext-' in f:
        return f.rsplit('-', 1)[0]
    elif '-mr-' in f:
        return f.rsplit('/', 1)[0]
    return f;
    

def match_outputs(data):
    global metadata
    o_obj = get_obj(data['tmpouput'])
    iobjs = [get_obj(f) for f in data['inputdir'].split(',')]
    if o_obj in iobjs:
        o_obj = o_obj.split('.hive')[0]+data['table.name'].rsplit('.')[1]
                
    if o_obj not in metadata:
        metadata[o_obj] = {'id': len(metadata), 'size': data['HDFS_OUTPUT_SIZE'], 'reuse': 0, 
                           'name': o_obj, 'approximate': 0, 'jobs': []}
    metadata[o_obj]['reuse'] += 1;
    metadata[o_obj]['jobs'].append(data['jobid'])
    metadata[o_obj]['size'] = data['HDFS_OUTPUT_SIZE'];
    data['outputdir'] = o_obj
    return data

def approximate_inputs(row):
    global metadata
    job_meta = {}
    for f in row['inputdir'].split(','):
        i_obj = get_obj(f)
        if i_obj not in job_meta:
            job_meta[i_obj] = 0;
        job_meta[i_obj] += 1;
    
    for i_obj in job_meta:
        if i_obj not in metadata: 
            metadata[i_obj] = {'id': len(metadata), 'name': i_obj, 'reuse': 0, 'approximate': 1,
                               'size': job_meta[i_obj]*(row['HDFS_INPUT_SIZE']//row['n_inputs']),
                              'jobs': []}
        metadata[i_obj]['reuse'] += 1;
        metadata[i_obj]['jobs'].append(row['jobid'])
        
    row['inputdir'] = ','.join(job_meta.keys())
    row['n_inputs'] = sum(job_meta.values())
    return row

def check_and_assign_size(data):
    global metadata
    
    if len(data['inputdir']) == 0:
        return data;    
    
    iobjs = [get_obj(f) for f in data['inputdir'].split(',')]
    if len(iobjs) > 1:
        return data;
    
    for f in iobjs:    
        if f not in metadata:
            metadata[f] = {'id': len(metadata), 'size': data['HDFS_INPUT_SIZE'], 'reuse': 0, 
                           'name': f, 'approximate': 0}
        metadata[f]['reuse'] += 1
        
    data['inputdir'] = ''
    data['HDFS_INPUT_SIZE'] = 0
    return data

def remove_observed(data):
    global metadata
    del_ls = []
    inputs = data['inputdir'].split(',')
    remove_size = 0
    data_size = data['HDFS_INPUT_SIZE']
    if data_size == 0: return data
    
    input_sz = data_size
    
    for f in inputs:
        if get_obj(f) in metadata:
            metadata[get_obj(f)]['reuse'] +=1
            del_ls.append(f)
            remove_size +=  metadata[get_obj(f)]['size']
    for f in del_ls:
        inputs.remove(f)
    
    if data['HDFS_INPUT_SIZE'] < remove_size:
        data_size = 0;
    else:
        data_size = data_size - remove_size
        
    data['HDFS_INPUT_SIZE'] = data_size
    data['inputdir'] = ','.join(inputs)
    
    if len(inputs) == 0:
        data['HDFS_INPUT_SIZE'] = 0
    
    return data

def approximate_size_assignment(data):
    global metadata
    inputs = [get_obj(f) for f in data['inputdir'].split(',')]
    n_splits = len(inputs)
    split_size = data['HDFS_INPUT_SIZE']/n_splits
    
    for f in inputs:
        if f not in metadata:
            metadata[f] = {'id': len(metadata), 'size': split_size, 'reuse': 0,
                          'name': f, 'approximate': 0} 
        metadata[f]['reuse'] += 1
    data['inputdir'] = ''
    data['HDFS_INPUT_SIZE'] = 0
    return data  
        

def identify_obj_size(idx, g):
    for i in range(0, idx+1):
        g = g.apply(remove_observed, axis=1)
        g = g.apply(check_and_assign_size, axis=1)
    if g[g['HDFS_INPUT_SIZE'] != 0].empty == False:
        g = g[g['HDFS_INPUT_SIZE'] != 0].apply(approximate_size_assignment, axis=1)
    print(idx, len(metadata))        
    
df = df.apply(match_outputs, axis=1)    
df = df.apply(approximate_inputs, axis=1)    
#grps = df.groupby('n_inputs')

#for idx, g in grps:
#    identify_obj_size(idx, g)
#    
#    for i in range(0, idx):
#        if g.empty == True: continue;
#        g = g.apply(print_name2, axis=1)
#        g = g.apply(print_name, axis=1)

#    if g[g['HDFS_INPUT_SIZE'] != 0].empty == False:
#        
#    print(idx, len(metadata))
#    #if idx > 20:
#    #    break;



In [286]:
pd.set_option("display.max_colwidth", 100)
pd.set_option("display.max_rows", 40)
metadata_df = pd.DataFrame(data=metadata.values()).set_index('id')
dsr = metadata_df.sort_values('reuse', ascending=False).reset_index()
dsr

Unnamed: 0,id,size,reuse,name,approximate,jobs
0,13621,74981601,2714,/hive_external_tables/SYSTEM/dt=20180731,1,"[job_1531656020138_222164, job_1531656020138_222173, job_1531656020138_222176, job_1531656020138..."
1,13567,43231231,1746,/hive_external_tables/ASUP/dt=20180731,1,"[job_1531656020138_222152, job_1531656020138_222171, job_1531656020138_222180, job_1531656020138..."
2,13602,19695698,1733,/hive_external_tables/CLUSTER_TABLE/dt=20180731,1,"[job_1531656020138_222157, job_1531656020138_222172, job_1531656020138_222184, job_1531656020138..."
3,11950,56802497,271,/hive_external_tables/CLUSTER_MEMBER/dt=20180731,1,"[job_1531656020138_222148, job_1531656020138_222168, job_1531656020138_222178, job_1531656020138..."
4,77306,75141273,205,/hive_external_tables/SYSTEM/dt=20180801,1,"[job_1531656020138_232288, job_1531656020138_232320, job_1531656020138_232327, job_1531656020138..."
...,...,...,...,...,...,...
86143,5576,52,1,/hive_external_tables/Users_db/guestdb/ubs_shelf_asups_sd/.hive-staging_hive_2018-08-02_09-17-09...,0,[job_1531656020138_228038]
86144,5575,3,1,/tmp/hive/netappiq/0d152597-7ee6-42b2-aac7-8de1b71cef1c/hive_2018-08-02_09-17-27_658_83272760737...,0,[job_1531656020138_228037]
86145,5574,17,1,/tmp/hive/netappiq/276fcb2d-e1fb-4e61-a9d7-907c2a5d4443/hive_2018-08-02_09-17-21_525_75390598473...,0,[job_1531656020138_228036]
86146,5573,17,1,/tmp/hive/netappiq/61b2aaa9-5cd2-4c4a-aef3-4d470d25f1d1/hive_2018-08-02_09-17-18_444_66666308392...,0,[job_1531656020138_228035]


In [317]:
print(dsr.loc[0]['size']*dsr.loc[0]['reuse'])
dt = pd.DataFrame()
dt['reuse_distance'] = df[df['jobid'].isin(dsr.loc[0]['jobs'])].set_index('index').sort_values('submit_ts').reset_index()['submit_ts'].diff()
dt[['submit_ts', 'n_inputs', 'jobid']] = df[df['jobid'].isin(dsr.loc[0]['jobs'])].set_index('index').sort_values('submit_ts').reset_index()[['submit_ts', 'n_inputs', 'jobid']]

dt

print(df[df['jobid'] == 'job_1531656020138_222200']['HDFS_INPUT_SIZE'])

203500065114
52    74981601
Name: HDFS_INPUT_SIZE, dtype: int64
