In [1]:
import os
import sys
from glob import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pprint import pprint
import json

import radical.utils     as ru
import radical.pilot     as rp
import radical.analytics as ra
import radical.entk as re

  from .tslib import iNaT, NaT, Timestamp, Timedelta, OutOfBoundsDatetime
  from pandas._libs import (hashtable as _hashtable,
  from pandas._libs import algos, lib
  from pandas._libs import hashing, tslib
  from pandas._libs import (lib, index as libindex, tslib as libts,
  import pandas._libs.tslibs.offsets as liboffsets
  from pandas._libs import algos as libalgos, ops as libops
  from pandas._libs.interval import (
  from pandas._libs import internals as libinternals
  import pandas._libs.sparse as splib
  import pandas._libs.window as _window
  from pandas._libs import (lib, reduction,
  from pandas._libs import algos as _algos, reshape as _reshape
  import pandas._libs.parsers as parsers
  from pandas._libs import algos, lib, writers as libwriters


In [13]:
# Input constants
trials = 1
stages = [16, 64]
resource = 'bw'
src = '../raw-data/'
proc = '../proc-data/'

In [3]:
def get_adap_time(loc, sid):
    
    # Get adap time
    duration = 0.0
    sess = ra.Session(stype='radical.entk', src=loc, sid=sid)
    stages = sorted(sess.filter(etype='stage', inplace=False).list('uid'))
#     print stages
    for stage in stages:
        duration += sess.duration(event=[{ru.EVENT: 'executing post-exec for stage %s'%stage},
                                         {ru.EVENT: 'post-exec executed for stage %s'%stage}])
    return duration

In [4]:
def get_entk_overheads(loc, sid):
    
    sess = ra.Session(stype='radical.entk', src=loc, sid=sid)
    init_time = sess.duration(event=[{ru.EVENT: 'create amgr obj'},
                                     {ru.EVENT: 'init rreq submission'}])
    res_sub_time = sess.duration(event=[{ru.EVENT: 'creating rreq'},
                                     {ru.EVENT: 'rreq submitted'}])
    total_teardown_time = sess.duration(event=[{ru.EVENT: 'start termination'},
                                               {ru.EVENT: 'termination done'}])
    rts_teardown_time = sess.duration(event=[{ru.EVENT: 'canceling resource allocation'},
                                             {ru.EVENT: 'resource allocation cancelled'}])
    
    return {'init_time': init_time,
           'res_sub_time': res_sub_time,
           'total_teardown_time': total_teardown_time,
           'rts_teardown_time': rts_teardown_time}

In [5]:
def get_entk_exec_time(loc, sid):
    sess = ra.Session(stype='radical.entk', src=loc, sid=sid)
    tasks = sess.filter(etype='task', inplace=False)
    return tasks.duration(state=['SUBMITTED','EXECUTED'])

In [6]:
def process_entk_profiles(src):
    
    sid = os.path.basename(src)
    loc = os.path.dirname(src)
    tag = '/'.join(loc.split('/')[2:])
    proc_data = os.path.join(proc,tag) + '/entk_data.json'
    data = {'adap_time': 0, 'overheads': 0, 'exec_time': 0}
        
    data['adap_time'] = get_adap_time(loc, sid)
    data['overheads'] = get_entk_overheads(loc, sid)
    data['exec_time'] = get_entk_exec_time(loc, sid)
    
    write_data(data, proc_data)
    return proc_data

In [52]:
def write_data(data, proc_path):

    if 'rp.session' in proc_path:
        proc_path = os.path.dirname(os.path.dirname(proc_path)) + '/' + os.path.basename(proc_path)
    if not os.path.isdir(os.path.dirname(proc_path)):
        os.makedirs(os.path.dirname(proc_path))
    ru.write_json(data,proc_path)
    
    return proc_path

In [51]:
print 'EnTK analysis'
for s in stages:
    for t in range(1,trials+1):
        path = os.path.join(src,resource,'trial-%s'%t,'stages-%s'%s)
        for sess in glob(path + '/' + 're.session.*'):  
            print 'Processing: ', sess
            out_path = process_entk_profiles(sess)
            print 'Output written to ', out_path

EnTK analysis
Processing:  ../raw-data/bw/trial-1/stages-16/re.session.two.vivek.017759.0012
Output written to  ../proc-data/bw/trial-1/stages-16/entk_data.json
Processing:  ../raw-data/bw/trial-1/stages-64/re.session.two.vivek.017759.0018
Output written to  ../proc-data/bw/trial-1/stages-64/entk_data.json


In [55]:
def process_rp_profiles(src):
    
    sid = os.path.basename(src)
    loc = os.path.dirname(src)
    tag = '/'.join(loc.split('/')[2:])
    proc_data = os.path.join(proc,tag) + '/rp_data.json'
    data = {'task_mgmt': 0, 'exec_time': 0}
    
    sess = ra.Session(stype='radical.pilot', src=loc, sid=sid)
    units = sess.filter(etype='unit', inplace=False)
        
    data['task_mgmt'] = units.duration(state=['NEW','DONE'])
    data['exec_time'] = units.duration(event=[{ru.EVENT:'exec_start'},{ru.EVENT:'exec_stop'}])
    
    proc_path = write_data(data, proc_data)
    return proc_path

In [56]:
print 'RP analysis'
for s in stages:
    for t in range(1,trials+1):
        path = os.path.join(src,resource,'trial-%s'%t,'stages-%s'%s)
        for sess in glob(path + '/' + 'rp.session.*/'):  
            print 'Processing: ', sess
            out_path = process_rp_profiles(sess)
            print 'Output written to ', out_path

RP analysis
Processing:  ../raw-data/bw/trial-1/stages-16/rp.session.two.vivek.017759.0013/
Output written to  ../proc-data/bw/trial-1/stages-16/rp_data.json
Processing:  ../raw-data/bw/trial-1/stages-64/rp.session.two.vivek.017759.0019/
Output written to  ../proc-data/bw/trial-1/stages-64/rp_data.json
