In [53]:
import copy
import math
import matplotlib.pyplot as plt
import os
import re
import subprocess
import sys
import pandas as pd
import time

from ast import literal_eval
from matplotlib.ticker import FuncFormatter
from IPython.display import display, clear_output

In [31]:
# pull in our gquery module and import the config
sys.path.append('../../scripts')
os.environ['GUFI_CONFIG']='/home/jbent/GUFI/configs/jbent_home'
#os.environ['GUFI_DEBUG']='TRUE'
import gufi_config as gq
import gufi_common as gc

config = open(os.environ['GUFI_CONFIG'], 'r')
config=gq.Server(config)

nthreads=config.config['Threads']
indexroot=config.config['IndexRoot']
print(f"Using {nthreads} threads on {indexroot}")

Using 224 threads on /mnt/nvme3n1/jbent/jbent_home


In [83]:
def run_gufi_query(indexroot, nthreads, create_int, insert_int, create_agg, insert_agg, select_agg, Verbose=False):
    cmd = 'gufi_query'
    command = [
        cmd,
        "-B", "4096",
        "-I", create_int,
        "-S", insert_int,
        "-K", create_agg,
        "-J", insert_agg,
        "-G", select_agg,
        "-n", nthreads,
        indexroot
    ]
    if Verbose:
        gc.print_query(command)
    
    start_time = time.time()
    completed_process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    end_time = time.time()
    
    if completed_process.returncode != 0:
        print(f"An error occurred:\n{completed_process.stderr.decode('utf-8')}")
        return None
    else:
        output = completed_process.stdout.decode('utf-8')
        output = output.replace('\u001A', ' ') # some weird character in the output for some reason

        if Verbose:
            print(f"Output:\n{output}")
            print(f"Elapsed time: {(end_time - start_time)/60:.2f} minutes")
        return output

In [106]:
# helper functions for converts the bins queries from sequel into human readable without any gaps
def detect_and_fill_missing_bins(series):
    lines = series.split('\n')
    new_lines = []
    last_upper = None

    for line in lines:
        match = re.match(r'\[(\d+),(\d+)\)', line)
        if match:
            lower, upper = map(int, match.groups())

            if last_upper is not None and lower != last_upper:
                missing_lower = last_upper
                missing_upper = lower
                new_line = f"[{missing_lower},{missing_upper}) 0"
                new_lines.append(new_line)

            new_lines.append(line)
            last_upper = upper

    return "\n".join(new_lines)


def human_readable_size(size, base, decimal_places=1):
    if size == 0:
        return "0"

    if base == 1024:
        units = ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']
    else:
        units = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']

    exponent = int(math.log(size, base))
    unit = units[exponent]
    size /= (base ** exponent)

    if size.is_integer():
        return f"{int(size)}{unit}"
    else:
        return f"{size:.{decimal_places}f}{unit}"
    
def human_readable_bins(bins,base):
    output = ''
    for line in bins.split('\n'):
        readable = re.sub(
            r'\[(\d+),(\d+)\)',
            lambda m: f"[{human_readable_size(int(m.group(1)), base=base)},{human_readable_size(int(m.group(2)), base=base)})",
            line
        )
        output += f"{readable}\n"
    return output

print(human_readable_size(8192, base=1024))  # Outputs "8Ki"
print(human_readable_size(134217728, base=1024))  # Outputs "128Mi"
print(human_readable_size(1000000, base=1000))  # Outputs "1M"
print(human_readable_bins('[1024,4096) 700\n[4096,8192) 400',base=1024))
test_bins = """
[65536,131072) 285
[262144,524288) 129
[524288,1048576) 55
"""
print(test_bins)
print(detect_and_fill_missing_bins(test_bins))

8Ki
128Mi
1M
[1Ki,4Ki) 700
[4Ki,8Ki) 400


[65536,131072) 285
[262144,524288) 129
[524288,1048576) 55

[65536,131072) 285
[131072,262144) 0
[262144,524288) 129
[524288,1048576) 55


In [107]:
# function to query and return bins (based on logic in get_stats)
def get_bins(indexroot,field,entry_type,base,conversion,Verbose=True):
    cmd   = 'gufi_query'
    tab   = 'vrsummary'
    itab  = 'intermediate'
    atab  = 'aggregate'
    if entry_type == 'd':
        tab = 'vrsummary'
    elif entry_type == 'f':
        tab = 'vrpentries'
    else:
        print(f'ERROR: Unknown entry_type {entry_type}')
        return None
    
    output = run_gufi_query(
        indexroot  = indexroot, 
        nthreads   = f"{nthreads}",
        create_int = f"CREATE TABLE {itab} (exponent INTEGER)",
        insert_int = f"INSERT INTO {itab} SELECT CASE WHEN {field} == 0 THEN -1 ELSE CAST(FLOOR(LOG({base}, {field})) AS INTEGER) END FROM {tab} WHERE (type == '{entry_type}')",
        create_agg = f"CREATE TABLE {atab} (exponent INTEGER, count INTEGER)",
        insert_agg = f"INSERT INTO {atab} SELECT exponent, count(exponent) FROM {itab} GROUP by exponent",
        select_agg = f"SELECT CASE WHEN exponent == -1 THEN '[0,1)' ELSE '[' || CAST(pow({base}, exponent) AS INTEGER) || ',' || CAST(pow({base}, exponent+1) AS INTEGER) || ')' END, SUM(count) FROM {atab} GROUP BY exponent ORDER BY exponent ASC",
        Verbose    = Verbose
    )
    #print(output)
    output = detect_and_fill_missing_bins(output)
    output = human_readable_bins(output,conversion)
    return(output)

In [113]:
output = get_bins(indexroot,field='totfiles',entry_type='d',base=10,conversion=1000,Verbose=False)
print(output)
output = get_bins(indexroot,field='size',entry_type='f',base=4,conversion=1024,Verbose=False)
print(output)

[0,1)790
[1,10)2356
[10,100)401
[100,1K)7

[0,1)2226
[1,4)207
[4,16)337
[16,64)279
[64,256)1159
[256,1Ki)2272
[1Ki,4Ki)4499
[4Ki,16Ki)3821
[16Ki,64Ki)1658
[64Ki,256Ki)393
[256Ki,1Mi)184
[1Mi,4Mi)63
[4Mi,16Mi)43
[16Mi,64Mi)6
[64Mi,256Mi)1



In [115]:
# a function to pull the predefined file size bins using the native gufi function
# note that this notebook is no longer using this function.
# also, we should be able to update it because totzero has been added in the following commit:
# https://github.com/mar-file-system/GUFI/commit/ed622831a0289876fd0b620c12a27db9043a152a
def get_predefined_bins(indexroot,Verbose=True):
    cmd = 'gufi_query'
    tab = 'vrsummary'
    itab = 'intermediate'
    atab = 'aggregate'
    fields = ('totltk', 'totmtk', 'totltm', 'totmtm', 'totmtg', 'totmtt', 'totzero')
    
    # int and agg use identical syntax for create so use helper function
    def create_table_str(table):
        create_str = ','.join([f"{field} INT64" for field in fields])
        return f"CREATE TABLE {table}({create_str});"
    
    # int and agg use identical syntax for insert so use helper function
    def insert_str(source,dest):
        select_str = ','.join([f"{field}" for field in fields])
        return f"INSERT INTO {dest} SELECT {select_str} from {source};"
    
    # create the string to select all the sums from the agg table
    agg_sum_str = ','.join([f"sum({field})" for field in fields])
    agg_select_str = f"SELECT {agg_sum_str} from {atab};"

    # now that we have all the strings constructed, we can run the command
    output = run_gufi_query(
        indexroot  = indexroot, 
        nthreads   = nthreads,
        create_int = create_table_str(itab),
        insert_int = insert_str(tab,itab),
        create_agg = create_table_str(atab),
        insert_agg = insert_str(itab,atab),
        select_agg = agg_select_str,
        Verbose    = Verbose
    )
    
    # convert the output into a dict corresponding to our fields
    out_dict = dict(zip(fields, map(int, output.split())))
    return out_dict

In [116]:
output = get_predefined_bins(indexroot,Verbose=True)
print(output)

AttributeError: 'int' object has no attribute 'startswith'

In [None]:
def fill_bins(all_bins, indexroot, Verbose=True):
    if indexroot in all_bins:
        print(f"Cowardly refusing to overwrite an existing entry for {indexroot}")
    else:
        bins = get_predefined_bins(indexroot, Verbose)
        all_bins[indexroot] = bins

In [None]:
if 'all_bins' not in locals():
    all_bins = {}

indexroots = ["/mnt/nvme3n1/jbent/jbent_home/", 
              "/mnt/nvme1n1/jbent/scr4/", 
              "/mnt/nvme1n1/jbent/yellprojs/", 
              "/mnt/nvme1n1/jbent/ttscratch/", 
              "/mnt/nvme3n1/jbent/yellusers", 
              "/mnt/nvme1n1/jbent/anony"]

for ir in indexroots:
    fill_bins(all_bins,ir,Verbose=True)

In [None]:
# the predefined bins don't have zero-byte files in them. So let's compute that separately. Bummer we have to
# do another pass. Oh well.
def count_empty_files(indexroot, Verbose=True):
    zfiles = run_gufi_query(
        indexroot  = indexroot,
        nthreads   = nthreads,
        create_int = "CREATE TABLE intermediate(quantity INT64);",
        insert_int = "INSERT INTO intermediate SELECT count(*) FROM vrpentries where size=0;",
        create_agg = "CREATE TABLE aggregate(quantity INT64);",
        insert_agg = "INSERT INTO aggregate SELECT sum(quantity) FROM intermediate;",
        select_agg = "SELECT sum(quantity) FROM aggregate;",
        Verbose    = Verbose
    )
    return int(zfiles)

In [None]:
def add_empty_files(all_bins, indexroot, Verbose=True):
    zkey = 'totzero'
    if zkey not in all_bins[indexroot]:
        zfiles = count_empty_files(indexroot, Verbose)
        all_bins[indexroot][zkey] = zfiles
    else:
        print(f"Cowardly refusing to overwrite an existing {zkey} entry for {indexroot}")

for ir in indexroots:
    add_empty_files(all_bins,ir,Verbose=True)

In [None]:
#del(all_bins['/mnt/nvme3n1/jbent/jbent_home/'])

In [None]:
def convert_to_dataframe(bins):
    ub = 'Upper_Bound'
    nf = 'Num_Files'
    infinity = '\u221E'
    data = { ub : [], nf : []}
    tot = bins['totltk'] + bins['totmtk']
    data[ub].append('[0]')
    data[nf].append(bins['totzero'])
    data[ub].append('(0, 1K]')
    data[nf].append(bins['totltk'] - bins['totzero'])
    data[ub].append('(1K, 1M]')
    data[nf].append(bins['totmtk']-bins['totmtm'])
    data[ub].append('(1M, 1G]')
    data[nf].append(bins['totmtm'] - bins['totmtg'])
    data[ub].append('(1G, 1T]')
    data[nf].append(bins['totmtg'] - bins['totmtt'])
    data[ub].append(f'(1T, {infinity})')
    data[nf].append(bins['totmtt'])
    if tot != sum(data[nf]):
        print("Data error? Tot files of %d != tot files of %d" % (tot, sum(data[df])))
    return data

In [None]:
print(r'$\infty$')

In [None]:
dataframes = {}
for ir,bins in all_bins.items():
    dataframes[ir] = convert_to_dataframe(bins)

In [None]:
dataframes

In [None]:
def frames_to_plt(dataframes,logscale=False):
    plt.figure(figsize=(15, 8))
    plt.rcParams['font.size'] = 16


    # Plot the data as a line graph
    markers = ['o', 'x', 's', 'D', '^', 'v']
    for idx, (indexroot, df) in enumerate(dataframes.items()):
        shortname = indexroot.rstrip('/').split("/")[-1]
        plt.plot(df['Upper_Bound'], df['Num_Files'], marker=markers[idx], label=shortname)

    # y-tics are ugly, change them to be in values of millions
    def millions(x, pos):
        return f'{int(x/1e6)}'
    ax = plt.gca()
    formatter = FuncFormatter(millions)
    ax.yaxis.set_major_formatter(formatter)

    # Add labels and title
    plt.xlabel('File Size')
    plt.ylabel('Millions of Files')
    plt.title('Distribution of File Sizes')

    # avoid transparent background
    for item in (ax.figure, ax):
        item.set_facecolor('white')

    # Add grid and legend
    plt.grid(True)
    plt.legend()
    
    if logscale:
        # then add logscale and show it again
        ax.set_yscale('log')
        ax.set_title(ax.get_title() + " (Log Scale y)")
        ax.set_ylabel(ax.get_ylabel() + " (Log Scale 10)")
        
    return plt

plt = frames_to_plt(dataframes)
plt.show()
plt2 = frames_to_plt(dataframes,logscale=True)
plt2.show()