In [1]:
import glob
import os
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from matplotlib import rcParams
import matplotlib.pyplot as plt

REPORT_SUBJ = "user"
REPORT_DATE = "2022-02-17" # YEAR-MM-DD

HADOOP_PATH = f"cms/{REPORT_SUBJ}/store"

BINS = [
    0.1, 1, 10, 1e2, 
    1e3, 1e4, 1e5, 
    1e6, 1e7, 1e8, 
    1e9, 1e10, 1e11, 
    1e12, 1e13
]
LABELS = [
    "0B", "1B", "10B", "100B", 
    "1KB", "10KB", "100KB", 
    "1MB", "10MB", "100MB",
    "1GB", "10GB", "100GB",
    "1TB", "10TB"
]
BYTES_LOOKUP = dict(zip(LABELS, BINS))

rcParams["legend.fontsize"] = 11
rcParams["legend.labelspacing"] = 0.2
rcParams["hatch.linewidth"] = 0.5
rcParams["axes.xmargin"] = 0.0  # rootlike, no extra padding within x axis
rcParams["axes.labelsize"] = "x-large"
rcParams["axes.formatter.use_mathtext"] = True
rcParams["legend.framealpha"] = 0.65
rcParams["axes.labelsize"] = "x-large"
rcParams["axes.titlesize"] = "large"
rcParams["xtick.labelsize"] = "large"
rcParams["ytick.labelsize"] = "large"
rcParams["figure.subplot.hspace"] = 0.1
rcParams["figure.subplot.wspace"] = 0.1
rcParams["figure.subplot.right"] = 0.96
rcParams["figure.max_open_warning"] = 0
rcParams["figure.dpi"] = 100
rcParams["axes.formatter.limits"] = [-5, 4]

In [None]:
reports_file = f"../hadoop_reports/{REPORT_SUBJ}_reports/{REPORT_DATE}/all_user_files.txt.gz"
chunk_dir = f"../hadoop_reports/{REPORT_SUBJ}_reports/{REPORT_DATE}/parquet_chunks"

os.makedirs(chunk_dir, exist_ok=True)
if len(glob.glob(f"{chunk_dir}/*.parquet")) == 0:
    df_chunks = pd.read_csv(
        reports_file, 
        header=None, 
        delim_whitespace=True, 
        chunksize=200000, 
        error_bad_lines=False,
        warn_bad_lines=False
    )
    for i, df_chunk in tqdm(enumerate(df_chunks)):
        # Set columns for each chunk (these are space-separated output of hdfs dfs -ls -R /cms/store/...)
        df_chunk.columns = ["junk1", "junk2", "junk3", "user", "bytes", "date", "junk4", "ext"]
        # Select only the interesting columns
        df_chunk = df_chunk.reset_index(drop=True)[["user","bytes","date","ext"]]
        # Drop files w/ bytes == 0 (these are directories)
        df_chunk = df_chunk[df_chunk["bytes"] > 0]
        # Grab the file extensions specfically
        df_chunk[df_chunk.ext.str.contains(".")]["ext"] = df_chunk["ext"].str.rsplit(".", 1).str[-1]
        df_chunk[~df_chunk.ext.str.contains(".")]["ext"] = "None"
        df_chunk = df_chunk.reset_index(drop=True)
        # Translate string dates to datetime objects (more space efficient)
        df_chunk["date"] = pd.to_datetime(df_chunk["date"])
        # Translate remaining string columns to categoricals (more space efficient)
        df_chunk["ext"] = df_chunk["ext"].astype("category")
        df_chunk["user"] = df_chunk["user"].astype("category")
        # Save chunk to a parquet file
        df_chunk.to_parquet(f"{chunk_dir}/chunk_{i:04d}.parquet")

# Recursively read all parquet_files and concat them, preserving categoricals
df = pd.read_parquet(chunk_dir)
# Make all extensions lowercase
df["ext"] = df.ext.str.lower().astype("category")

In [None]:
all_counts = df.groupby("ext")["bytes"].count()
all_sizes = df.groupby("ext")["bytes"].sum()

In [None]:
top_by_size = all_sizes.nlargest(4)

sizes = list(top_by_size.values)
extensions = list(top_by_size.index.values)
counts = []

print("Ext".ljust(10), "Size".rjust(11), "Count".rjust(10))
print("---------------------------------")
for i in range(len(sizes)):
    size = sizes[i]
    ext = extensions[i]
    count = all_counts[ext]
    counts.append(count)
    print(("."+ext).ljust(10), "{:.3f} TB".format(size/1e12).rjust(11), end=" ")
    print("{:10d}".format(count))
print("---------------------------------")
print(
    "OTHER".ljust(10), 
    "{:.3f} TB".format((np.sum(df.bytes) - np.sum(sizes))/1e12).rjust(11),
    "{:10d}".format(np.sum(all_counts) - np.sum(counts))
)
print(
    "TOTAL".ljust(10), 
    "{:.3f} TB".format(np.sum(df.bytes)/1e12).rjust(11),
    "{:10d}".format(np.sum(all_counts))
)

In [None]:
top_by_count = all_counts.nlargest(6)

sizes = []
extensions = list(top_by_count.index.values)
counts = list(top_by_count.values)

print("Ext".ljust(10), "Count".rjust(10), "Size".rjust(11))
print("---------------------------------")
for i in range(len(counts)):
    count = counts[i]
    ext = extensions[i]
    size = all_sizes[ext]
    sizes.append(size)
    print(("."+ext).ljust(10), "{:10d}".format(count), end=" ")
    print("{:.3f} TB".format(size/1e12).rjust(11))
print("---------------------------------")
print(
    "OTHER".ljust(10), 
    "{:10d}".format(np.sum(all_counts) - np.sum(counts)),
    "{:.3f} TB".format((np.sum(df.bytes) - np.sum(sizes))/1e12).rjust(11)
)
print(
    "TOTAL".ljust(10),
    "{:10d}".format(np.sum(all_counts)),
    "{:.3f} TB".format(np.sum(df.bytes)/1e12).rjust(11)
)

In [None]:
fig, axes = plt.subplots(figsize=(12, 9))

counts, bin_edges, _ = axes.hist(df.bytes, bins=BINS);
axes.set_xscale("log");
axes.set_xticks(BINS);
axes.set_xticklabels(LABELS)

axes.set_ylabel("Count", size=20);
axes.set_xlabel("File Size", size=20);

plt.title("{0}/.../* [{1:.1f} TB]".format(HADOOP_PATH, np.sum(df.bytes)/1e12), size=20);

label_locs = bin_edges[:-1]*3
for i, count in enumerate(counts.astype(int)):
    plt.text(label_locs[i], count+2e3, "{:,d}".format(count), horizontalalignment='center')

In [None]:
fig, axes = plt.subplots(figsize=(12, 9))
plt.tick_params(labelsize=14)

file_types = list((df.groupby("ext").bytes.count()).nlargest(6).index.values)
counts, bin_edges, _ = axes.hist(
    [df[df.ext == e].bytes for e in file_types]+[df[~df.ext.isin(file_types)].bytes], 
    stacked=True,
    bins=BINS,
    label=file_types+["other"]
);
axes.legend(fontsize=14);
axes.set_xscale("log");
axes.set_xticks(BINS);
axes.set_xticklabels(LABELS);

axes.set_ylabel(r"Count", size=20);
axes.set_xlabel("File Size", size=20);

plt.title("{0} [{1:.1f} TB]".format(HADOOP_PATH, np.sum(df.bytes)/1e12), size=20);

total_counts = counts[-1].astype(int)
total = np.sum(total_counts)
print("Bucket".ljust(8), "Count".rjust(7), end=" ")
print("Count%".rjust(8))
print("-------------------------")
for i, count in enumerate(total_counts):
    print(
        "< {}".format(LABELS[i+1]).ljust(8), 
        "{:7d}".format(count),
        end=" "
    )
    print("{:.2f}%".format(count/total*100).rjust(8))
print("-------------------------")
print("TOTAL".ljust(8), "{:7d}".format(total), end=" ")
print("100.00%".rjust(8))

label_locs = bin_edges[:-1]*3
for i, count in enumerate(total_counts):
    plt.text(label_locs[i], count+2e3, "{:,d}".format(count), horizontalalignment='center')

In [None]:
fig, axes = plt.subplots(figsize=(12, 9))
plt.tick_params(labelsize=14)

USER = "jguiang"

file_types = list((df[df.user == USER].groupby("ext").bytes.count()).nlargest(6).index.values)
counts, bin_edges, _ = axes.hist(
    [df[(df.user == USER) & (df.ext == e)].bytes for e in file_types]+[df[(df.user == USER) & ~df.ext.isin(file_types)].bytes], 
    stacked=True,
    bins=BINS,
    label=file_types+["other"]
);
axes.legend(fontsize=14);
axes.set_xscale("log");
axes.set_xticks(BINS);
axes.set_xticklabels(LABELS);

axes.set_ylabel(r"Count", size=20);
axes.set_xlabel("File Size", size=20);

before_rep = np.sum(df[df.user == USER].bytes)/1e12
plt.title(f"{HADOOP_PATH}/{USER} [{before_rep:.1f} TB Before Repl.]", size=20);

total_counts = counts[-1].astype(int)
total = np.sum(total_counts)
print("Bucket".ljust(8), "Count".rjust(7), end=" ")
print("Count%".rjust(8))
print("-------------------------")
for i, count in enumerate(total_counts):
    print(
        "< {}".format(LABELS[i+1]).ljust(8), 
        "{:7d}".format(count),
        end=" "
    )
    print("{:.2f}%".format(count/total*100).rjust(8))
print("-------------------------")
print("TOTAL".ljust(8), "{:7d}".format(total), end=" ")
print("100.00%".rjust(8))

label_locs = bin_edges[:-1]*3
for i, count in enumerate(total_counts):
    plt.text(label_locs[i], count, "{:,d}".format(count), horizontalalignment='center')

In [None]:
fig, axes = plt.subplots(figsize=(12, 9))

counts, bin_edges, _ = axes.hist(df[df.ext == "root"].bytes, bins=BINS);
axes.set_xscale("log");
axes.set_xticks(BINS);
axes.set_xticklabels(LABELS)

axes.set_ylabel("Count", size=20);
axes.set_xlabel("File Size", size=20);

plt.title("{0}/.../*.root [{1:.1f} TB]".format(HADOOP_PATH, np.sum(df[df.ext == "root"].bytes)/1e12), size=20);

label_locs = bin_edges[:-1]*3
for i, count in enumerate(counts.astype(int)):
    plt.text(label_locs[i], count+2e3, "{:,d}".format(count), horizontalalignment='center')

In [None]:
fig, axes = plt.subplots(figsize=(12, 9))

counts, bin_edges, _ = axes.hist(df[df.ext != "root"].bytes, bins=BINS);
axes.set_xscale("log");
axes.set_xticks(BINS);
axes.set_xticklabels(LABELS)

axes.set_ylabel("Count", size=20);
axes.set_xlabel("File Size", size=20);

plt.title("{0}/.../*{{!.root}} [{1:.1f} TB]".format(HADOOP_PATH, np.sum(df[df.ext != "root"].bytes)/1e12), size=20);

label_locs = bin_edges[:-1]*3
for i, count in enumerate(counts.astype(int)):
    plt.text(label_locs[i], count+1e3, "{:,d}".format(count), horizontalalignment='center')

In [None]:
(df[(df.ext != "root") & (df.bytes > BYTES_LOOKUP["1GB"])].groupby("ext")["bytes"].sum()/1e12).nlargest(10)

In [None]:
bins = [
    1*1e0, 10*1e0, 100*1e0, # b
    1*1e3, 10*1e3, 100*1e3, # kb
    1*1e6, 10*1e6, 100*1e6, # mb
    1*1e9, 10*1e9, 100*1e9, # gb
    1*1e12, 10*1e12, # tb
]

def num_to_human(num, rjust=5):
    if num < 1000**1:   
        return f"{int(num/1000**0)}B".rjust(rjust)
    elif num < 1000**2:   
        return f"{int(num/1000**1)}KB".rjust(rjust)
    elif num < 1000**3:   
        return f"{int(num/1000**2)}MB".rjust(rjust)
    elif num < 1000**4:   
        return f"{int(num/1000**3)}GB".rjust(rjust)
    elif num < 1000**5:   
        return f"{int(num/1000**4)}TB".rjust(rjust)
    else:
        return num

def interval_to_human(interval):
    left = num_to_human(interval.left, rjust=0)
    right = num_to_human(interval.right, rjust=0)
    return f"<{right}"


df["bucket"] = pd.cut(df["bytes"], BINS, right=False)
df_bucketed = df[df.ext != "root"].groupby("bucket")["bytes"].agg(["count", "sum"]).reset_index()
df_bucketed["size"] = df_bucketed["sum"]/1e9 # to GB
df_bucketed = df_bucketed.drop(columns=["sum"])

df_bucketed["count_pct"] = 100.*df_bucketed["count"]/df_bucketed["count"].sum()
df_bucketed["count_pct_cumul"] = np.cumsum(df_bucketed["count_pct"])

df_bucketed["size_pct"] = 100.*df_bucketed["size"]/df_bucketed["size"].sum()
df_bucketed["size_pct_cumul"] = np.cumsum(df_bucketed["size_pct"])

df_bucketed["bucket"] = df_bucketed["bucket"].map(interval_to_human)

# add total row by concating to the bottom
total = df_bucketed.sum().to_frame().T.assign(
    bucket="total",
    size_pct_cumul=100.,
    count_pct_cumul=100.,
)
df_bucketed = pd.concat([df_bucketed, total], axis=0).reset_index(drop=True)

# more compact names
df_bucketed = df_bucketed.rename(columns={
    "count_pct": "count%",
    "count_pct_cumul": "c-count%",
    "size_pct": "size%",
    "size_pct_cumul": "c-size%",
})

# order the columns
df_bucketed = df_bucketed[["bucket", "count", "count%", "c-count%", "size", "size%", "c-size%"]]

df_bucketed

df_bucketed.style.format({
    "count": "{:,.0f}".format,
    "count%": "{:.2f}%".format,
    "c-count%": "{:.2f}%".format,
    
    "size": "{:,.2f} GB".format,
    "size%": "{:.2f}%".format,
    "c-size%": "{:.2f}%".format,
})