In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data.sampler import WeightedRandomSampler


In [None]:
df = pd.read_csv("./data/extracted_info.csv")

In [None]:
df.head()

In [None]:
def fill_nans_birads(row):
    if pd.isna(row.right_birads) and pd.isna(row.left_birads):
        return row
    elif pd.isna(row.right_birads):
        row.right_birads = -1
        return row
    elif pd.isna(row.left_birads):
        row.left_birads = -1
        return row
    else:
        return row

In [None]:
def fill_nans_calcifications(row):
    if pd.notna(row.right_microcal) and pd.notna(row.left_microcal):
        if row.right_microcal == -1:
            row.right_microcal = 0
        if row.left_microcal == -1:
            row.left_microcal = 0
    return row

In [None]:
def fill_nans_masses(row):
    if pd.notna(row.right_mass) and pd.notna(row.left_mass):
        if row.right_mass == -1:
            row.right_mass = 0
        if row.left_mass == -1:
            row.left_mass = 0
    return row

In [None]:
def any_mass(row):
    if pd.notna(row.right_mass) and pd.notna(row.left_mass):
        if row.right_mass == 1 or row.left_mass == 1:
            return 1
        else:
            return 0
    else:
        return np.nan

In [None]:
def any_calcification(row):
    if pd.notna(row.right_microcal) and pd.notna(row.left_microcal):
        if row.right_microcal == 1 or row.left_microcal == 1:
            return 1
        else:
            return 0
    else:
        return np.nan

In [None]:
def max_birads(row):
    if pd.isna(row.left_birads) and pd.isna(row.right_birads):
        return np.nan
    elif pd.isna(row.left_birads):
        return row.right_birads
    elif pd.isna(row.right_birads):
        return row.left_birads
    else:
        return max(row.left_birads, row.right_birads)

In [None]:
df.composition = df.composition.str.upper()

In [None]:
df = df.apply(fill_nans_birads, axis=1)
df = df.apply(fill_nans_calcifications, axis=1)
df = df.apply(fill_nans_masses, axis=1)

In [None]:
df["max_birads"] = df.apply(max_birads, axis=1)
df["any_mass"] = df.apply(any_mass, axis=1)
df["any_calcification"] = df.apply(any_calcification, axis=1)

# all are categorical
df.composition = df.composition.astype("category")
df.max_birads = df.max_birads.astype("category")
df.any_mass = df.any_mass.astype("category")
df.any_calcification = df.any_calcification.astype("category")

In [None]:
df = df.dropna(how="any", subset=["max_birads", "any_calcification", "composition", "any_mass"])

In [None]:
len(df)

In [None]:
# plot a heatmap of max_birads vs composition (both are categorical)
def plot_heatmap(df, x_col, y_col, title):
    df_grouped = df.groupby([x_col, y_col]).size().reset_index(name='counts')
    df_pivot = df_grouped.pivot(index=x_col, columns=y_col, values='counts').fillna(0)
    plt.figure(figsize=(6, 6))
    sns.heatmap(df_pivot, annot=True, fmt=",", cmap="YlGnBu")
    plt.title(title)
    plt.xlabel(y_col)
    plt.ylabel(x_col)
    plt.show()
plot_heatmap(df, "any_mass", "any_calcification", "2D Histogram of max_birads vs composition")

In [None]:
def make_nd_pmf(df, cols: list[str]):
    """
    Make an n-dimensional PMF from the specified columns in the dataframe.
    Returns a dict mapping each combination of values to its probability.
    """
    # count occurrences of each combination
    counts = df.groupby(cols).size()
    # normalize to get probabilities
    pmf = counts / counts.sum()
    # if only one column, return scalar-key dict
    if len(cols) == 1:
        return pmf.to_dict()
    # else return tuple-key dict
    return {tuple(idx): prob for idx, prob in pmf.items()}

In [None]:
def pmf_to_weight(
    pmf: dict[tuple, float], 
    smoothing_factor: float = 0.00, 
    eps: float = 1e-6,
    ignore_zero: bool = True
) -> dict[tuple, float]:
    """
    Convert a PMF to a weight dictionary.
    The weights are the inverse of the probabilities.
    The weights are normalized to sum to 1.
    The weights are smoothed by adding a small constant to each weight.
    if ignore_zero is True, the weights for zero probabilities are set to 0.
    """
    total = sum(pmf.values())
    weights = {k: (total / (v + eps)) for k, v in pmf.items()}

    if ignore_zero:
        weights = {k: w for k, w in weights.items() if pmf[k] > 0}

    weights = {k: w / sum(weights.values()) for k, w in weights.items()}
    weights = {k: w + smoothing_factor for k, w in weights.items()}
    weights = {k: w / sum(weights.values()) for k, w in weights.items()}
    return weights

In [None]:
def plot_2d_pmf(pmf: dict[tuple, float], title: str, captions: list[str] = None):
    """
    Plot a 2D PMF as a heatmap.
    """
    # convert to DataFrame
    df = pd.DataFrame.from_dict(pmf, orient='index', columns=['prob'])
    df.reset_index(inplace=True)
    df[['x', 'y']] = pd.DataFrame(df['index'].tolist(), index=df.index)
    df.drop(columns=['index'], inplace=True)

    # pivot for heatmap
    df_pivot = df.pivot(index='x', columns='y', values='prob')
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(df_pivot, annot=True, fmt=".2f", cmap="YlGnBu")
    plt.title(title)
    if captions is None:
        plt.xlabel('y')
        plt.ylabel('x')
    else:
        plt.xlabel(captions[1])
        plt.ylabel(captions[0])
    plt.show()


In [None]:
pmf = make_nd_pmf(df, ["any_mass", "any_calcification"])

In [None]:
plot_2d_pmf(pmf, "2D PMF of Mass and Composition", captions=["Mass", "Calcification"])

In [None]:
weights = pmf_to_weight(pmf, smoothing_factor=0.00)

In [None]:
plot_2d_pmf(weights, "2D Weights of Mass and Calcification", captions=["Mass", "Calcification"])

In [None]:
overall_pmf = make_nd_pmf(df, ["any_mass", "any_calcification", "max_birads", "composition"])

In [None]:
overall_pmf

In [None]:
overall_weights = pmf_to_weight(overall_pmf)

In [None]:
overall_weights

In [None]:
def apply_weights(row, weights_dict):
    """
    Apply weights to a row based on the values in the row.
    The weights are taken from the weights_dict.
    """
    key = (row["any_mass"], row["any_calcification"], row["max_birads"], row["composition"])
    if key in weights_dict:
        return weights_dict[key]
    else:
        return 0.0

In [None]:
df["weight"] = df.apply(lambda row: apply_weights(row, overall_weights), axis=1)

In [None]:
sampler = WeightedRandomSampler(
    weights=df["weight"].values,
    num_samples=len(df),
    replacement=True,
)

In [None]:
sample = list(iter(sampler))  # check that the sampler is working

In [None]:
df_sample = df.iloc[sample]

In [None]:
plot_heatmap(df_sample, "max_birads", "composition", "2D Histogram of max_birads vs composition (sampled)")

In [None]:
plot_heatmap(df_sample, "any_mass", "any_calcification", "2D Histogram of Mass vs Calcification (Sampled)")

In [None]:
complete_df = pd.read_csv("./data/complete-mammo02.csv")

In [None]:
df = df.sort_values(by="id").reset_index(drop=True)
complete_df = complete_df.sort_values(by="id").reset_index(drop=True)

In [None]:
len(df), len(complete_df)

In [None]:
# merge the two dataframes using "id" as the key and the column "weight" from df to complete_df
complete_df = complete_df.merge(df[["id", "weight"]], on="id", how="left")