In [1]:
%run init.ipynb

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36973 instead


In [2]:
import dask
import dask.dataframe as dd
import functools
import glob
import json
import logging
import math
import os
import psutil
import socket
from anytree import PostOrderIter
from concurrent.futures import ThreadPoolExecutor, as_completed
from dask.distributed import Client, LocalCluster, get_task_stream, progress, wait
from distributed.diagnostics.plugin import SchedulerPlugin
from dask_jobqueue import LSFCluster
from time import perf_counter, sleep
from tqdm.auto import tqdm
from typing import Union

In [3]:
# from vani.utils.logger import ElapsedTimeLogger, create_logger, format_log
# filter_group_index='tmid'
# wait_until_workers_alive(filter_group_index, vn.n_workers_per_node / 2)
# #wait_workers = vn.clients[filter_group_index].submit()
# #wait_workers.result()

In [5]:
def draw_graph(future):
    dask_graph = future.__dask_graph__()
    edges = []
    edge_count = {}
    for key, value in dask_graph.dependencies.items():
        vals = list(value)
        destination = key.split("-")[0]
        for val in vals:
            source = val.split("-")[0]
            str_edge = str(source) + "-" + str(destination)
            if str_edge not in edge_count:
                edge_count[str_edge] = 0
            edge_count[str_edge] = edge_count[str_edge] + 1
            edges.append((source, destination))
    #print(edges[:5])
    import networkx as nx
    dag = nx.DiGraph()
    dag.add_edges_from(edges)
    import graphviz
    dot = graphviz.Digraph()
    for node in dag.nodes:
        dot.node(node)
        #print(node)
    for edge_str, count in edge_count.items():
        source, destination  = edge_str.split("-")
        dot.edge(source, destination, label=str(count))
    return dot

In [6]:
delimiter = '-'
def merge(x, y):
    import numpy as np
    return {
        'read': {
            'uniq_ranks': np.union1d(x['read']['uniq_ranks'],y['read']['uniq_ranks']),
            'agg_dur': x['read']['agg_dur'] + y['read']['agg_dur'],
            'total_io_size': x['read']['total_io_size'] + y['read']['total_io_size'],
            'uniq_filenames': np.union1d(x['read']['uniq_filenames'],y['read']['uniq_filenames']),
            'bw_sum': x['read']['bw_sum'] + y['read']['bw_sum'],
            'ops': x['read']['ops'] + y['read']['ops'],
        },
        'write': {
            'uniq_ranks': np.union1d(x['write']['uniq_ranks'],y['write']['uniq_ranks']),
            'agg_dur': x['write']['agg_dur'] + y['write']['agg_dur'],
            'total_io_size': x['write']['total_io_size'] + y['write']['total_io_size'],
            'uniq_filenames': np.union1d(x['write']['uniq_filenames'],y['write']['uniq_filenames']),
            'bw_sum': x['write']['bw_sum'] + y['write']['bw_sum'],
            'ops': x['write']['ops'] + y['write']['ops'],
        },
        'metadata': {
            'uniq_ranks': np.union1d(x['metadata']['uniq_ranks'],y['metadata']['uniq_ranks']),
            'agg_dur': x['metadata']['agg_dur'] + y['metadata']['agg_dur'],
            'uniq_filenames': np.union1d(x['metadata']['uniq_filenames'],y['metadata']['uniq_filenames']),
            'ops': x['metadata']['ops'] + y['metadata']['ops'],
        }
    }

def filter(target_ddf, filter_group_index: str):
    import numpy as np
    #target_ddf = ddf.loc[start:stop]
    # Select dataframes
    #result = wait(target_ddf)
    import intervals
    empty = {
            'uniq_ranks': [],
            'agg_dur': 0.0,
            'total_io_size': 0,
            'uniq_filenames': [],
            'bw_sum': 0.0,
            'ops': 0,
        }
    if len(target_ddf.index) == 0:
        return {
        'read': empty,
        'write': empty,
        'metadata': empty
    }
    #target_ddf = target_ddf.compute()
    import pandas as pd
    def f(x):
        d = {}
        d['duration'] = x['duration'].sum()
        d['size'] = x['size'].sum()
        d['bandwidth'] = x['bandwidth'].sum()
        d['index'] = x['index'].count()
        d['proc_id'] = x['proc_id'].nunique()
        d['filename'] = x['filename'].min()
        return pd.Series(d, index=['duration', 'size', 'bandwidth', 'index', 'proc_id', 'filename'])
    
    aggregated_values = target_ddf.groupby(['file_id','io_cat']).apply(f)#.compute()
    
    # Clear dataframes
    #del read_ddf
    del target_ddf
    return aggregated_values.compute()
    # Arrange results
    #read_start, read_end = 0, len(read_tasks)
    #write_start, write_end = len(read_tasks), len(read_tasks) + len(write_tasks)
    #metadata_start, metadata_end = len(read_tasks) + len(write_tasks), 0
    index_values = aggregated_values.index.unique()
    read_values = empty
    write_values = empty
    metadata_values = empty
    if 1 in index_values:
        read_values = {
            'uniq_ranks': aggregated_values.loc[1]['proc_id'],
            'agg_dur': aggregated_values.loc[1]['duration'],
            'total_io_size': aggregated_values.loc[1]['size'],
            'uniq_filenames': aggregated_values.loc[1]['file_id'],
            'bw_sum': aggregated_values.loc[1]['bandwidth'],
            'ops': aggregated_values.loc[1]['index'],
        }
    if 2 in index_values:
        write_values = {
            'uniq_ranks': aggregated_values.loc[2]['proc_id'],
            'agg_dur': aggregated_values.loc[2]['duration'],
            'total_io_size': aggregated_values.loc[2]['size'],
            'uniq_filenames': aggregated_values.loc[2]['file_id'],
            'bw_sum': aggregated_values.loc[2]['bandwidth'],
            'ops': aggregated_values.loc[2]['index'],
        }
    if 3 in index_values:
        metadata_values = {
            'uniq_ranks': aggregated_values.loc[3]['proc_id'],
            'agg_dur': aggregated_values.loc[3]['duration'],
            'total_io_size': aggregated_values.loc[3]['size'],
            'uniq_filenames': aggregated_values.loc[3]['file_id'],
            'bw_sum': aggregated_values.loc[3]['bandwidth'],
            'ops': aggregated_values.loc[3]['index'],
        }
    filter_result = {
        'read': read_values,
        'write': write_values,
        'metadata': metadata_values
    }
    # Return results
    return filter_result
def cal_len(x):
    return {
        'read': {
            'uniq_ranks': len(x['read']['uniq_ranks']),
            'agg_dur': x['read']['agg_dur'],
            'total_io_size': x['read']['total_io_size'],
            'uniq_filenames': len(x['read']['uniq_filenames']),
            'bw_sum': x['read']['bw_sum'],
            'ops': x['read']['ops'],
        },
        'write': {
            'uniq_ranks': len(x['write']['uniq_ranks']),
            'agg_dur': x['write']['agg_dur'],
            'total_io_size': x['write']['total_io_size'],
            'uniq_filenames': len(x['write']['uniq_filenames']),
            'bw_sum': x['write']['bw_sum'],
            'ops': x['write']['ops'],
        },
        'metadata': {
            'uniq_ranks': len(x['metadata']['uniq_ranks']),
            'agg_dur': x['metadata']['agg_dur'],
            'uniq_filenames': len(x['metadata']['uniq_filenames']),
            'ops': x['metadata']['ops']
        }
    }

from dask.graph_manipulation import bind
def compute_min_max(log_dir: str, filter_group_index: str, depth):
    with open(f"{log_dir}/global.json") as file:
        global_metrics = json.load(file)
        min_val, max_val = global_metrics[filter_group_index][0], global_metrics[filter_group_index][1]
        next_tasks = 2 ** depth
        interval = math.ceil((max_val - min_val) * 1.0 / next_tasks)
        time_range = range(min_val, max_val, interval)
        return interval, time_range

def splice_ddf(ddf, file_id):
    return ddf.loc[[file_id]].reset_index().compute()
def splice_ddf_list(ddf, list_vals):
    start = 0
    stop = 0
    if len(list_vals) > 0:
        start = list_vals[0]
    if len(list_vals) > 1:
        stop = list_vals[-1] - 1
    return ddf.loc[start:stop].reset_index()#.compute()
def filter_map(ddf, file_id, filter_group_index):
    #print(index,"depth", depth, end = '\r')
    target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{file_id}", [ddf, file_id])
    return delayed_func(filter, f"filter{delimiter}{file_id}", [target_ddf, filter_group_index])
def merge_map(index, depth,task_1, task_2):
    #print(index,"depth", depth, end = '\r')
    return delayed_func(merge, f"merge{delimiter}{depth}_{task_1[0]}_{task_2[0]}", [task_1[1], task_2[1]])
def compute_metrics(file_id_list, ddf, filter_group_index: str, wait_persist_delayed):
    num_nodes = len(file_id_list) * 2 - 1
    MAX_DEPTH = math.ceil(math.log(len(file_id_list),2))
    less_pieces = False
    print(len(file_id_list), MAX_DEPTH, less_pieces)
    iterations = list(range(0, MAX_DEPTH + 1))
    iterations.reverse()
    all_tasks = [0] * (MAX_DEPTH + 1)
    all_intervals = [0] * (MAX_DEPTH + 1)
    from tqdm.notebook import trange, tqdm
    nodes_done = 0
    NUM_THREADS = 128
    executor = concurrent.futures.ThreadPoolExecutor(NUM_THREADS)
    for i in tqdm(iterations):
        print(f"\ndepth {i}")
        tasks = []
        current_intervals = []
        if i == MAX_DEPTH:
            tasks = [0]*len(file_id_list)
            print(f"submitting {i}")
            index_futures = {executor.submit(filter_map, i,index, ddf, file_id, filter_group_index): index for index,file_id in enumerate(file_id_list)}
            print(f"waiting {i}")
            for future in concurrent.futures.as_completed(index_futures):
                index = index_futures[future]
                print(f"{index} depth {i}           ", end = '\r')
                tasks[index] = future.result()
            print(f"done {i}")
            #futures = [executor.submit(filter_map, index, ddf, file_id, filter_group_index) for index,file_id in enumerate(file_id_list)]
            #concurrent.futures.wait(futures)
            #for future in futures:
            #    tasks.append(future.result())
            #for file_id in file_id_list:
            #    print(nodes_done," of ", num_nodes, " " ,i, end = '\r')
            #    target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{file_id}", [ddf, file_id])
            #    tasks.append(delayed_func(filter, f"filter{delimiter}{file_id}", [target_ddf, filter_group_index]))
            #    nodes_done = nodes_done +1
            #    index = index +1
                #current_intervals.append((start, stop))
        else:
            next_tasks = len(all_tasks[i + 1])
            num_tasks = math.floor(next_tasks /2 )
            if next_tasks % 2 == 1:
                next_tasks = next_tasks - 1#3
                num_tasks = num_tasks + 1
            tasks = [0]*num_tasks
            index_futures = {executor.submit(merge_map,index, i,[node_id, all_tasks[i + 1][node_id]],[node_id+1, all_tasks[i + 1][node_id+1]]):index for index,node_id in enumerate(range(0, next_tasks, 2))}
            for future in concurrent.futures.as_completed(index_futures):
                index = index_futures[future]
                print(f"{index} depth {i}           ", end = '\r')
                tasks[index] = future.result()
            if next_tasks % 2 == 1:
                print(f"{next_tasks/2} depth {i}           ", end = '\r')
                tasks.append(all_tasks[i + 1][next_tasks-1])
            
            index_futures = {executor.submit(delayed_func,cal_len, f"cal_len{delimiter}{i+1}_{t}", [next_tasks]):t for t, next_tasks in enumerate(all_tasks[i + 1])}
            for future in concurrent.futures.as_completed(index_futures):
                index = index_futures[future]
                print(f"{index} depth {i}           ", end = '\r')
                all_tasks[i + 1][index] = future.result()
#             executor = concurrent.futures.ThreadPoolExecutor(32)
#             futures = [executor.submit(delayed_func,cal_len, f"cal_len{delimiter}{i+1}_{t}", [next_tasks]) for t, next_tasks in enumerate(all_tasks[i + 1])]
#             concurrent.futures.wait(futures)
#             for t,future in enumerate(futures):
#                 all_tasks[i + 1][t] = future.result()
                #tasks.append(future.result())
            #next_tasks = len(all_tasks[i + 1])
            #if next_tasks % 2 == 1:
            #    next_tasks = next_tasks - 1#3
            #index = 0
            #if next_tasks > 1:
            #    for t in range(0, next_tasks, 2):
            #        print(nodes_done," of ", num_nodes, " " ,i, end = '\r')
            #        #start, stop = all_intervals[i+1][t][0], all_intervals[i+1][t+1][1]
            #        #target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{start}_{stop}", [ddf, start, stop])
            #        #tasks.append(delayed_func(filter, f"filter{delimiter}{start}_{stop}", [target_ddf, filter_group_index]))
            #        #current_intervals.append((start, stop))
            #        tasks.append(delayed_func(merge, f"merge{delimiter}{i}_{t}_{t+1}", [all_tasks[i + 1][t], all_tasks[i + 1][t + 1]]))
            #        nodes_done = nodes_done +1
            #        index = index +1
            #next_tasks = len(all_tasks[i + 1])
            #if next_tasks % 2 == 1:
            #    print(nodes_done," of ", num_nodes, " " ,i, end = '\r')
            #    tasks.append(all_tasks[i + 1][next_tasks-1])
            #    nodes_done = nodes_done +1
            #    index = index +1
                #start, stop = all_intervals[i+1][next_tasks - 3][0], all_intervals[i+1][next_tasks - 1][1]
                #target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{start}_{stop}", [ddf, start, stop])
                #tasks.append(delayed_func(filter, f"filter{delimiter}{start}_{stop}", [target_ddf, filter_group_index]))
                #current_intervals.append((start, stop))
            # TODO why are we calling len on everything?
            #for t, next_tasks in enumerate(all_tasks[i + 1]):
            #    all_tasks[i + 1][t] = delayed_func(cal_len, f"cal_len{delimiter}{i+1}_{t}", [next_tasks])
        all_tasks[i] = tasks
        #all_intervals[i] = current_intervals
    if len(all_tasks[0]) == 2:
        all_tasks[0] = [delayed_func(merge, f"merge{delimiter}{0}_{0}_{1}", [all_tasks[0][0], all_tasks[0][1]])]
        #all_tasks[0] = [delayed_func(cal_len,f"cal_len{delimiter}0_{t}",all_tasks[0][0])]
    for t, next_tasks in enumerate(all_tasks[0]):
        all_tasks[0][t] = delayed_func(cal_len,f"cal_len{delimiter}0_{t}",[next_tasks])
    #metrics = dask.compute(all_tasks)
    return all_tasks

In [7]:
total_workers = n_workers_per_node*2
cluster.scale(total_workers)



In [8]:
%%time
import dask
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
def delayed_func(func, name, args, nout=1):
    #if name:
    #    func.__name__ = name
    obj = dask.delayed(func, nout=nout)(*args, dask_key_name=name)
    return obj
def wait_delayed(ddf):
    result = wait(ddf)
    return 1
def dummy(x):
    return x
import numpy as np
class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NpEncoder, self).default(obj)
def group_filter_map(ddf, file_id_list, filter_group_index):
    delayed_list = []
    for file_id in file_id_list:
        delayed_list.append(filter_map(ddf, file_id, filter_group_index))
    return delayed_list
def cal_range_filenames(file_id_list):
    #     #file_id_list = df.index.unique()
    #     mask = mask =  (2**32 -1) << 32
    #     dir1_ids = set()
    #     for file in file_id_list:
    #         dir1_ids.add(file & mask)
    s = list(file_id_list)
    s.sort()
    with open(f"{log_dir}/file.json", "w") as file:
        json.dump(s, file, cls=NpEncoder)
    return s
#def transform(ddf):
#    import dask.array as da
#     mask =  (2**32 -1) << 32
#     ddf['dir_1'] = ddf.index.apply(lambda val, mask: val & mask, axis=1, meta=ddf, args=(mask))
#     mask =  (2**16 -1) << 48
#     ddf['dir_2'] = ddf.index.apply(lambda val, mask: val & mask, axis=1, meta=ddf, args=(mask))
#     mask =  (2**8 -1) << 56
#     ddf['dir_3'] = ddf.index.apply(lambda val, mask: val & mask, axis=1, meta=ddf, args=(mask))
#     return ddf
import numpy as np
MAX_DEPTH = 10
log_dir = "/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet"
filter_group_index='file_id'
delayed_list =[]
PARTITION_FOLDER="partitoned"
is_partitioned = os.path.exists(f"{log_dir}/{PARTITION_FOLDER}/{filter_group_index}/_common_metadata")
json_exists = os.path.exists(f"{log_dir}/file.json")
if not is_partitioned:
    interval, time_range = compute_min_max(log_dir, filter_group_index, MAX_DEPTH)
    read_delayed = delayed_func(lambda log_dir: dd.read_parquet(f"{log_dir}/*.parquet", index=False), f"read_raw_parquet_{filter_group_index}", [log_dir])
    delayed_list.append(read_delayed)
    index_delayed = delayed_func(lambda filter_group_index, ddf: ddf.set_index([filter_group_index]), f"index_delayed_{filter_group_index}", [filter_group_index, read_delayed])
    delayed_list.append(index_delayed)
    persist_delayed = delayed_func(lambda ddf: ddf.persist(), f"persist_{filter_group_index}" , [index_delayed])
    delayed_list.append(persist_delayed)
    #transform_delayed = delayed_func(transform, f"transform_{filter_group_index}" , [persist_delayed])
    #delayed_list.append(transform_delayed)
    wait_persist_delayed = delayed_func(wait_delayed, f"wait_delayed_{filter_group_index}", [persist_delayed])
    delayed_list.append(wait_persist_delayed)
    partition_delayed = delayed_func(lambda ddf, ignore: ddf.repartition(partition_size="128MB"), f"partitioned_{filter_group_index}", [persist_delayed, wait_persist_delayed])
    delayed_list.append(partition_delayed)
    save_parquet_delayed = delayed_func(lambda ddf, filter_group_index, ignore: dd.to_parquet(ddf, f"{log_dir}/{PARTITION_FOLDER}/{filter_group_index}"), f"save_{filter_group_index}", [partition_delayed, filter_group_index,wait_persist_delayed])
    delayed_list.append(save_parquet_delayed)
else:
    read_delayed = delayed_func(lambda log_dir: dd.read_parquet(f"{log_dir}/{PARTITION_FOLDER}/{filter_group_index}/*.parquet", calculate_divisions=True, index=[filter_group_index]), f"read_indexed_parquet_{filter_group_index}", [log_dir])
    delayed_list.append(read_delayed)
    persist_delayed = delayed_func(lambda ddf: ddf.persist(), f"persist_{filter_group_index}" , [read_delayed])
    delayed_list.append(persist_delayed)
    wait_persist_delayed = delayed_func(wait_delayed, f"wait_delayed_{filter_group_index}", [persist_delayed])
    delayed_list.append(wait_persist_delayed)
    
    pass
    if not json_exists:
        unique_filenames = delayed_func(lambda ddf, ignore: ddf.index.unique().compute(), f"groupby_{filter_group_index}" , [persist_delayed, wait_persist_delayed])
        delayed_list.append(unique_filenames)
        dir1_ids = delayed_func(cal_range_filenames, f"dir1_ids_{filter_group_index}", [unique_filenames])
        delayed_list.append(dir1_ids)
    else:
        with open(f"{log_dir}/file.json") as file:
            files = json.load(file)
        
#         print(len(files))
# #         dir1_mask =  (2**32 -1) << (64 - 32)
# #         dir2_mask =  (2**16 -1) << (64 - 16)
# #         dir3_mask =  (2**8 -1) << (64 - 8)
# #         dir1_ids = set()
# #         dir2_ids = set()
# #         dir3_ids = set()
# #         for file in files:
# #             dir1_ids.add(file & dir1_mask)
# #             dir2_ids.add(file & dir2_mask)
# #             dir3_ids.add(file & dir3_mask)
#         num_files_per_partition = 2**12
#         file_range = range(0, len(files),num_files_per_partition)
#         partition_index= 0
#         num_partitions = len(file_range)
#         partitions_delayed = [0]*num_partitions
#         NUM_THREADS = 128
#         def compute_metrics_list(index, file_id_list, ddf, filter_group_index):
#             target_ddf = ddf.loc[file_id_list].reset_index()
#             #target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{file_id}", [persist_delayed, file_id])
#             return delayed_func(filter, f"filter{delimiter}{index}", [target_ddf, filter_group_index])
        
# #         executor = concurrent.futures.ThreadPoolExecutor(NUM_THREADS)
# #         index_futures = {executor.submit(group_filter_map, persist_delayed, files[index:index+num_files_per_partition-1], filter_group_index): i for i,index in enumerate(file_range)}
# #         print(f"waiting")
        
# #         for future in concurrent.futures.as_completed(index_futures):
# #             index = index_futures[future]
# #             print(f"partition {partition_index} of {num_partitions} done          ", end = '\r')
# #             partitions_delayed[index] = dask.delayed(dummy)( future.result(),dask_key_name=f"partition_{partition_index}_{filter_group_index}")
# #             partition_index = partition_index + 1
# #         print(f"done")
# #         delayed_list.extend(partitions_delayed)
        
#         #executor = concurrent.futures.ThreadPoolExecutor(NUM_THREADS)
#         print("Sumitting Futures")
#         index_futures = [client.submit(compute_metrics_list, i, files[index:index+num_files_per_partition-1], persist_delayed, filter_group_index) for i,index in enumerate(file_range)]
# #         for future in as_completed(index_futures):
# #             index = index_futures[future]
# #             partitions_delayed[index] = dask.delayed(dummy)(future.result(),dask_key_name=f"partition_{partition_index}_{filter_group_index}")
# #             partition_index = partition_index + 1
# #             print(f"Finished partition {partition_index} of {len(file_range)}           ", end='\r')
# #         delayed_list.extend(partitions_delayed)
#         # for index in file_range:
#         #    file_id_list = files[index:index+num_files_per_partition-1]
#         #    tasks = []
#         #    for file_id in file_id_list:
#         #        target_ddf = delayed_func(splice_ddf, f"splice_ddf{delimiter}{file_id}", [persist_delayed, file_id])
#         #        tasks.append(delayed_func(filter, f"filter{delimiter}{file_id}", [target_ddf, filter_group_index]))
#         #    delayed_list.append(dask.delayed(dummy)(tasks,dask_key_name=f"partition_{partition_index}_{filter_group_index}"))
#         #    partition_index = partition_index + 1
#         #    print(f"Finished partition {partition_index} of {len(file_range)}           ", end='\r')
            
            
#         print("Create sequence") # took 10 mins
#         file_sequence = [[persist_delayed, file_id] for file_id in files]
#         num_files_per_partition = 2**12
#         paritions = math.ceil(len(files)/num_files_per_partition)
#         import dask.bag as db
#         print(f"Create bag {num_files_per_partition} {paritions}")
#         file_bag = db.from_sequence(file_sequence, partition_size=1).map(get_file_metrics)
#         print("Done bag")
        #dir1_sequence = [(ddf, dir1_id) for dir1_id in dir1_ids]
        #delayed_list.extend(file_bag.to_delayed())
        
        #         import dask.bag as db
        #         interval = 2**10
        #         b = db.from_sequence(range(0, len(files[:interval]), interval))
        #         b = b.map(lambda x: compute_metrics(files[x, x + interval - 1], persist_delayed, filter_group_index, wait_persist_delayed))
        #         delayed_list.extend(b)
        #compute_metrics_delayed = compute_metrics(files,persist_delayed, filter_group_index, wait_persist_delayed)
        #print(len(compute_metrics_delayed))
        #iter_com = list(enumerate(compute_metrics_delayed))
        #iter_com.reverse()
        #for index, compute_metric_item in iter_com:
        #    delayed_list.append(dask.delayed(dummy)(compute_metric_item,dask_key_name=f"depth_{index}_{filter_group_index}"))
        
        #delayed_list.append(compute_metrics_delayed)
#compute_metrics_delayed.reverse()
#for index, compute_metric_item in enumerate(compute_metrics_delayed):
#    delayed_list.append(dask.delayed(dummy)(compute_metric_item,dask_key_name=f"depth_{len(compute_metrics_delayed) - index - 1}_{filter_group_index}"))
print(len(delayed_list))
#compute_metrics_delayed = delayed_func(compute_metrics, "compute_metrics", [wait_persist_delayed, filter_group_index, compute_min_max_delayed])
total = dask.delayed(delayed_list)(dask_key_name=f"{filter_group_index}_all")

3
CPU times: user 5.96 s, sys: 336 ms, total: 6.3 s
Wall time: 6.38 s


In [9]:
%%time
num_files_per_partition = math.ceil(len(files)*1.0/total_workers)
file_range = range(0, len(files),num_files_per_partition)
def cal_metrics_file(ddf, index, file_lists, log_dir, ignore):
    filename = f"{log_dir}/metrics/file_id/{index}.parquet"
    splice_ddf = ddf.loc[file_lists]
    splice_ddf.reset_index()
    target_ddf = splice_ddf.compute()
    aggregate = target_ddf.groupby(['index','io_cat']).agg({'duration':sum, 
                                                              'size':sum, 
                                                              'bandwidth':sum, 
                                                              'index':'count', 
                                                              'proc_id':[min,max], 
                                                              'filename':min})
    aggregate.reset_index(inplace=True)
    aggregate.columns  = ['_'.join(col) for col in aggregate.columns.values]
    aggregate.to_parquet(filename)
    return filename
futures = []
#
for index, file_index in enumerate(file_range):
    print(f"processing {index} of {len(file_range)}", end='\r')
    seleceted_files = files[file_index:file_index+num_files_per_partition]
    #seleceted_files_future = client.scatter(seleceted_files) doesnt help as list of futures is larger as list of file ids :D
    #print(len(seleceted_files), index)
    cal_metrics= dask.delayed(cal_metrics_file)(persist_delayed, index, seleceted_files,log_dir, wait_persist_delayed, dask_key_name=f"cal_metrics_{index}_{filter_group_index}")
    futures.append(client.compute(cal_metrics, sync=False))
    #cal_metrics.append(delayed_func(cal_metrics,  , []))

processing 0 of 64

  ('persist_file_id', 0, [-9223371986035283781, -922 ... layed_file_id')
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


CPU times: user 5min 38s, sys: 4.11 s, total: 5min 42s
Wall time: 5min 42s


In [10]:
#futures = 

In [12]:
futures[:5]

[<Future: finished, type: str, key: cal_metrics_0_file_id>,
 <Future: finished, type: str, key: cal_metrics_1_file_id>,
 <Future: finished, type: str, key: cal_metrics_2_file_id>,
 <Future: finished, type: str, key: cal_metrics_3_file_id>,
 <Future: finished, type: str, key: cal_metrics_4_file_id>]

In [13]:
%%time
import time
from dask.distributed import as_completed
start_time = time.time()
metrics = []
for future in as_completed(futures):
    end_time = time.time() - start_time
    #filename = future.result()
    metrics.append(future)
    print(f"Completed {len(metrics)} of {len(futures)} in {end_time/60}", end='\r')

Completed 63 of 64 in 0.00028105179468790696

KeyboardInterrupt: 

In [14]:
len(metrics)

63

'/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/63.parquet'

In [None]:
for future in futures:
    if future.status in "error":
        print(f"printing exception {future.key}")
        print(future.exception())
        future.cancel()

In [None]:
%%time
# Progress bar on a single-machine scheduler
from dask.distributed import performance_report
from dask.distributed import as_completed
futures = client.compute(delayed_list, sync=False)

In [None]:
# %%time
# import pandas as pd
# import dask.dataframe as dd

# def chunk(s):
#     '''
#     The function applied to the
#     individual partition (map)
#     '''    
#     return s.apply(lambda x: list(set(x)))


# def agg(s):
#     '''
#     The function whic will aggrgate 
#     the result from all the partitions(reduce)
#     '''
#     s = s._selected_obj    
#     return s.groupby(level=list(range(s.index.nlevels))).sum()


# def finalize(s):
#     '''
#     The optional functional that will be 
#     applied to the result of the agg_tu functions
#     '''
#     return s.apply(lambda x: len(set(x)))


# tunique = dd.Aggregation('tunique', chunk, agg,finalize)

In [None]:
import pandas as pd


In [None]:
file_delayed = delayed_func(lambda ddf, ignore: ddf.reset_index().groupby(['file_id','io_cat']).agg({'duration':sum, 
                                                                       'size':sum, 
                                                                       'bandwidth':sum, 
                                                                       'index':'count', 
                                                                       'proc_id':[min,max], 
                                                                       'filename':min}), f"calculate_metrics_{filter_group_index}", [persist_delayed, wait_persist_delayed])
delayed_list.append(file_delayed)

save_metrics_delayed = delayed_func(lambda ddf, filter_group_index, ignore: dd.to_parquet(ddf.from_delayed(), f"{log_dir}/metric/{filter_group_index}"), f"save_metric_{filter_group_index}", [file_delayed, filter_group_index,wait_persist_delayed])
delayed_list.append(save_metrics_delayed)

In [None]:
delayed_list

In [None]:
%%time
metrics = val.result()

In [None]:
metrics

In [None]:
with open("genome_metrics_all.json", "w") as f:
    json.dump(metrics, f, cls=NpEncoder)

In [None]:
%%time
val = wait(index_futures, return_when="ALL_COMPLETED")

In [None]:
for future in val.done:
    if future.status in ["error"]:
        print(f"printing exception {future.key}")
        print(future.exception())

In [None]:
#%%time
#delayed_part_futures = [client.compute(future.result(), sync=False) for future in val.done]

In [None]:
def submit_partition_future(future):
    import dask
    return dask.compute(future)

In [None]:
fs = [client.submit(submit_partition_future, future) for future in val.done]

In [None]:
from dask.distributed import as_completed
import time
partition_index = 0
metrics = [0]*total
start_time = time.time()
for f in as_completed(fs):
    metrics[partition_index] = f.result()
    end_time = time.time() - start_time
    print(f"finished {partition_index} of {total} {end_time/60} ", end='\r')
    partition_index = partition_index + 1
    with open("genome_metrics_all.json", "w") as f:
        json.dump(metrics, f, cls=NpEncoder)

In [None]:
metrics

In [None]:
partition_index = 0

In [None]:
%%time
import time
total = len(val.done)
metrics = [0]*total
start_time = time.time()
future_list =list(val.done)
from dask.distributed import as_completed
parition_count = 1
parition_range = range(partition_index, total, parition_count)
for future_index in parition_range:
    my_futures = future_list[future_index:future_index + parition_count]
    fs = [client.submit(submit_partition_future, future) for future in my_futures]
    for f in as_completed(fs):
        end_time = time.time() - start_time
        #f = client.submit(submit_partition_future, future)
        metrics[partition_index] = f.result()
        print(f"finished {partition_index} of {total} {end_time/60} {len(metrics[partition_index])}", end='\r')
        partition_index = partition_index + 1
        with open("genome_metrics_all.json", "w") as f:
            json.dump(metrics, f, cls=NpEncoder)

In [None]:
metrics

In [None]:
for future in future_list:
    future.cancel()

In [None]:
with open("genome_metrics_all.json", "w") as f:
    json.dump(metrics, f, cls=NpEncoder)

In [None]:
%%time
import time
from dask.distributed import as_completed
start_time = time.time()
finished = 0
total = len(delayed_part_futures)
for future in as_completed(delayed_part_futures):
    finished = finished + 1
    end_time = time.time() - start_time
    print(f"finished {finished} of {total} {end_time/60}", end='\r')
print("\ndone")
    

In [None]:
#%%time
#delayed_part_futures = await delayed_part_futures_async

In [None]:
#delayed_part_futures

In [None]:
first_partition_delayed  = delayed_part_futures[0]

In [None]:
len(first_partition_delayed)

In [None]:
len(delayed_part_futures)

In [None]:
partition_futures = client.compute(delayed_part_futures, sync=False)

In [None]:
len(partition_futures)

In [None]:
partition_futures

In [None]:
%%time
import time
from dask.distributed import as_completed
start_time = time.time()
finished = 0
total = len(partition_futures)
for future in as_completed(partition_futures):
    finished = finished + 1
    end_time = time.time() - start_time
    print(f"finished {finished} of {total} {end_time/60}", end='\r')
print("\ndone")
    

In [None]:
metrics = {}
for future in futures:
    if future.status in ["finished"]:
        metrics[future.key.split("-")[1]] = future.result()
#file_bag.count()

In [None]:
metrics

In [None]:
with open("genome_metrics_4096.json", "w") as f:
    json.dump(metrics, f, cls=NpEncoder)

In [None]:
%%time
file_metrics_delayed = file_bag.to_delayed()

In [None]:
len(delayed_list)

In [None]:
bag_1024 = file_bag.take(2048, compute=False)

In [None]:
futures = client.compute(file_bag, sync=False)

In [None]:
metrics = bag_1024.compute()

In [None]:
metrics

In [None]:
delayed_list[:4]

In [None]:
dot = draw_graph(delayed_list[4])

In [None]:
dot

In [None]:
selected = delayed_list[:(3+1024)]

In [None]:
%%time
# Progress bar on a single-machine scheduler
from dask.distributed import performance_report
from dask.distributed import as_completed
futures = client.compute(selected, sync=False)

In [None]:
selected[:4]

In [None]:
client

In [None]:
%%time
import time
from dask.distributed import as_completed
start_time = time.time()
for future in as_completed(futures):
    end_time = time.time() - start_time
    print(f"{future.key} {future.status} {end_time/60}")

In [None]:
for future in futures:
    if future.status in ["error"]:
        print(f"printing exception {future.key}")
        print(future.exception())

In [None]:
for future in futures:
    print(f"{future.key} {future.status}")

In [None]:
for future in futures:
    if future.status not in ["finished"]:
        print(f"canceling {future.key}")
        future.cancel()

In [None]:
#dirs = futures[4].result()

In [None]:
#dirs

In [None]:
#metrics[0][44][0]

In [None]:
metrics = []
for i in futures[4:]:
    metrics.append(i.result())

In [None]:
len(metrics)

In [None]:
metrics.reverse()

In [None]:
len(metrics[9])

In [None]:

with open("genome_metrics_full.json", "w") as f:
    json.dump(metrics[0], f, cls=NpEncoder)

In [None]:
%%time
# Progress bar on a single-machine scheduler
from dask.diagnostics import ProgressBar
futures_2 = client.compute(delayed_list[5:], sync=False)

In [None]:
futures_2

In [None]:
futures_2[-1:][0].result()

In [None]:
for future in futures:
    if "calculate" in future.key:
        print(f"canceling {future.key}")
        future.cancel()
        

In [None]:
for future in futures:
    future.cancel()

In [None]:
for future in futures:
    if future.status  in ["error"]:
        print(f"canceling {future.key}")
        future.cancel()

In [None]:
if futures[4].done():
    metrics = futures[4].result()
else:
    print("future is pending")

In [None]:
metrics

In [None]:
import numpy as np
PARTITION_FOLDER = "partitioned_hari"
def indexed_ddf(log_dir: str, filter_group_index: str):
    ddf = dd.read_parquet(f"{log_dir}/*.parquet", index=False)
    #ddf[filter_group_index] = ddf[filter_group_index].astype(np.int64)
    ddf = ddf.set_index([filter_group_index])
    ddf = ddf.persist()
    #result = wait(ddf)
    return ddf

def compute_metrics(ddf, filter_group_index: str, interval):
    min_val, max_val = interval
    

    depth = 10
    next_tasks = 2 ** depth
    interval = math.ceil((max_val - min_val) * 1.0 / next_tasks)
    iterations = list(range(0, depth + 1))
    iterations.reverse()
    all_tasks = [0] * (depth + 1)
    time_range = range(min_val, max_val, interval)
    for i in iterations:
        tasks = []
        if i == depth:
            for start in time_range:
                stop = start + interval - 1
                target_ddf = ddf.loc[start:stop]
                tasks.append(dask.delayed(filter)(target_ddf, filter_group_index))
        else:
            next_tasks = len(all_tasks[i + 1])
            if next_tasks % 2 == 1:
                next_tasks = next_tasks - 1
            for t in range(0, next_tasks, 2):
                tasks.append(dask.delayed(merge)(all_tasks[i + 1][t], all_tasks[i + 1][t + 1]))
            next_tasks = len(all_tasks[i + 1])
            if next_tasks % 2 == 1:
                tasks.append(all_tasks[i + 1][next_tasks - 1])
            # TODO why are we calling len on everything?
            for t, next_tasks in enumerate(all_tasks[i + 1]):
                all_tasks[i + 1][t] = dask.delayed(cal_len)(next_tasks)
        all_tasks[i] = tasks
    for t, next_tasks in enumerate(all_tasks[0]):
        all_tasks[0][t] = dask.delayed(cal_len)(next_tasks)
    metrics = dask.compute(all_tasks)    
    #result = wait(metrics_futures)
    #metrics = client.gather(metrics_futures)
    return metrics

In [None]:
# json_index = 'tmid'
# futures = []
# args = [log_dir, json_index, filter_group_index]
# interval_ft = vn.clients[filter_group_index].submit(compute_min_max, *args, key=f'{filter_group_index}_compute_min_max')
# futures.append(interval_ft)
# args = [log_dir, filter_group_index]
# indexed_ft = vn.clients[filter_group_index].submit(indexed_ddf, *args, key=f'{filter_group_index}_indexed_ddf')
# futures.append(indexed_ft)
# args = [indexed_ft, filter_group_index, interval_ft]
# metrics_ft = vn.clients[filter_group_index].submit(compute_metrics, *args, key=f'{filter_group_index}_compute_metrics')
# futures.append(metrics_ft)

In [None]:
# from dask.distributed import as_completed
# from tqdm.auto import tqdm
# metrics_map = {}
# for future in tqdm(as_completed(futures)):
#     if "compute_metrics" in future.key:
#         metrics_map[future.key] = future.result(timeout=(2*60*60))
#     print(f'{future.key} completed')

In [None]:
# metrics_map

In [None]:
json_keys = dict(tmid='tmid', proc_id='proc_id', file_id='file_id')
futures = []
for filter_group_index in vn.filter_group_indices:
    json_index = json_keys[filter_group_index]
    args = [log_dir, json_index, filter_group_index]
    interval_ft = vn.clients[filter_group_index].submit(compute_min_max, *args, key=f'{filter_group_index}_compute_min_max')
    futures.append(interval_ft)
    args = [log_dir, filter_group_index]
    indexed_ft = vn.clients[filter_group_index].submit(indexed_ddf, *args, key=f'{filter_group_index}_indexed_ddf')
    futures.append(indexed_ft)
    args = [indexed_ft, filter_group_index, interval_ft]
    metrics_ft = vn.clients[filter_group_index].submit(compute_metrics, *args, key=f'{filter_group_index}_compute_metrics')
    futures.append(metrics_ft)


In [None]:
%%time
from dask.distributed import as_completed
from tqdm.auto import tqdm
metrics_map = {}
for future in tqdm(as_completed(futures)):
    if "compute_metrics" in future.key:
        metrics_map[future.key] = future.result(timeout=(2*60*60))
    print(f'{future.key} completed')

In [None]:
# min_val, max_val = metrics_map["tmid_compute_min_max"]
# depth = 10
# next_tasks = 2 ** depth
# interval = math.ceil((max_val - min_val) * 1.0 / next_tasks)
# time_range = range(min_val, max_val, interval)
# for start in time_range:
#     print(start, start + interval - 1)

In [None]:
def merge_result(x, y):
    return {
        'read': {
            'uniq_ranks': x['read']['uniq_ranks'] + y['read']['uniq_ranks'],
            'agg_dur': x['read']['agg_dur'] + y['read']['agg_dur'],
            'total_io_size': x['read']['total_io_size'] + y['read']['total_io_size'],
            'uniq_filenames': x['read']['uniq_filenames'] + y['read']['uniq_filenames'],
            'bw_sum': x['read']['bw_sum'] + y['read']['bw_sum'],
            'ops': x['read']['ops'] + y['read']['ops'],
        },
        'write': {
            'uniq_ranks': x['write']['uniq_ranks'] + y['write']['uniq_ranks'],
            'agg_dur': x['write']['agg_dur'] + y['write']['agg_dur'],
            'total_io_size': x['write']['total_io_size'] + y['write']['total_io_size'],
            'uniq_filenames': x['write']['uniq_filenames'] + y['write']['uniq_filenames'],
            'bw_sum': x['write']['bw_sum'] + y['write']['bw_sum'],
            'ops': x['write']['ops'] + y['write']['ops'],
        },
        'metadata': {
            'uniq_ranks': x['metadata']['uniq_ranks'] + y['metadata']['uniq_ranks'],
            'agg_dur': x['metadata']['agg_dur'] + y['metadata']['agg_dur'],
            'uniq_filenames': x['metadata']['uniq_filenames'] + y['metadata']['uniq_filenames'],
            'ops': x['metadata']['ops'] + y['metadata']['ops'],
        }
    }

In [None]:
metrics_map['tmid_compute_metrics'][0][0]

In [None]:
merge_result(metrics_map['proc_id_compute_metrics'][0][0][0],metrics_map['proc_id_compute_metrics'][0][0][1])

In [None]:
metrics_map['file_id_compute_metrics'][0][0]

In [None]:
metrics_map

In [None]:
metrics_map['file_id_indexed_ddf'].compute()

In [None]:
metrics_map['tmid_compute_metrics'][0][10]

In [None]:
metrics_map['proc_id_compute_metrics'][0][0]

In [None]:
metrics_map['file_id_compute_metrics'][0][0][0]

In [None]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
local_client = Client(LocalCluster(n_workers=16))

In [None]:
local_client

In [None]:
log_dir = "/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet"
ddf = dd.read_parquet(f"{log_dir}/*.parquet", index=False)

In [None]:
values = dd.compute(ddf['tmid'].min(), ddf['tmid'].max(), ddf['file_id'].min(), ddf['file_id'].max(), ddf['proc_id'].min(), ddf['proc_id'].max())

In [None]:
values

In [None]:
min_val, max_val = -9223371986035283781, 9223369538921024184

next_tasks = 2 ** 10
interval = math.ceil((max_val - min_val) * 1.0 / next_tasks)
time_range = range(min_val, max_val, interval)
print(time_range)
for start in time_range:
    print(start, start + interval -1)

In [None]:
!pip install python-intervals