# Measuring data lake stats
This notebook is used to measure some aggregated statistics on each data lake. Every table in each data lake is read and 
some metrics are evaluated, then the result is saved on disk. 

Some of the statistics measured here are reported in table  of the paper. 

In [1]:
cd ~/bench

/home/soda/rcappuzz/work/benchmark-join-suggestions


In [2]:
import polars as pl
from pathlib import Path
import polars.selectors as cs
from tqdm.notebook import tqdm
from joblib import Parallel, delayed

In [None]:
def table_profile(table_path, data_lake):
    df = pl.read_parquet(table_path)
    n_num = df.select(cs.numeric()).shape[1]
    c_num = df.select(~cs.numeric()).shape[1]
    if len(df) > 0:
        avg_null = df.null_count().mean_horizontal().item() / len(df)
    else:
        avg_null = 0
    d = {
        "data_lake": data_lake,
        "table": table_path.stem,
        "num_attr": n_num,
        "cat_attr": c_num,
        "n_rows": len(df),
        "n_cols": len(df.columns),
        "avg_null": avg_null,
    }

    return d


def get_stats(df: pl.DataFrame):
    return df.select(
        pl.col("data_lake").first(),
        pl.col("n_tables").first(),
        pl.col("n_rows").sum().alias("tot_rows"),
        pl.col("n_cols").sum().alias("tot_cols"),
        pl.col("n_cols").mean().alias("mean_n_cols"),
        pl.col("n_cols").median().alias("median_n_cols"),
        pl.col("n_rows").mean().alias("mean_n_rows"),
        pl.col("n_rows").median().alias("median_n_rows"),
        pl.col("num_attr").mean().alias("mean_num_attr"),
        pl.col("num_attr").median().alias("median_num_attr"),
        pl.col("cat_attr").mean().alias("mean_cat_attr"),
        pl.col("cat_attr").median().alias("median_cat_attr"),
        pl.col("avg_null").mean().alias("mean_avg_null"),
        pl.col("avg_null").median().alias("median_avg_null"),
    )

In [None]:
path_list = list(
    map(
        Path,
        [
            "data/yadl/binary_update/",
            "data/yadl/wordnet_full/",
            "data/yadl/wordnet_vldb_10/",
            "data/yadl/wordnet_vldb_50/",
            "data/open_data_us/",
        ],
    )
)
stats = []
for path in path_list:
    profiles = []
    profiles = Parallel(n_jobs=8, verbose=0)(
        delayed(table_profile)(tab, path.stem)
        for tab in tqdm(
            path.glob("**/*.parquet"),
            total=sum(1 for _ in path.glob("**/*.parquet")),
            position=0,
            leave=False,
            desc=path.stem,
        )
    )
    df = pl.from_dicts(profiles).with_columns(pl.lit(len(profiles)).alias("n_tables"))
    stats.append(get_stats(df))
df_stats = pl.concat(stats)

In [9]:
display(
    df_stats.transpose(include_header=True, column_names="data_lake")
    .to_pandas()
    .style.format(precision=2)
)


KeyboardInterrupt



In [19]:
# Save stats on disk.
df_stats.transpose(include_header=True, column_names="data_lake").write_csv(
    "stats_data_lakes.csv"
)