# Quality Assurance

In this tutorial we will replicate key parts of the **accuracy** and **privacy** calculations of MOSTLY AI's Quality Assurance report. For a more extensive treatment on the topic, see also [our documentation](https://mostly.ai/synthetic-data-generator-docs/mostly-ai-help-support-tutorials-documentation), our [peer-reviewed journal paper](https://www.frontiersin.org/articles/10.3389/fdata.2021.679939/full), as well as the accompanying [benchmarking study](https://mostly.ai/blog/how-to-benchmark-synthetic-data-generators).

In [None]:
#!pip install pandas==2.0
import numpy as np
import pandas as pd
print(f"loaded NumPy {np.__version__}")
print(f"loaded Pandas {pd.__version__}")

In [None]:
repo = 'https://github.com/mostly-ai/mostly-tutorials/raw/dev/quality-assurance'
tgt = pd.read_parquet(f'{repo}/census-training.parquet')
print(f'fetched original data with {tgt.shape[0]:,} records and {tgt.shape[1]} attributes')
syn = pd.read_parquet(f'{repo}/census-synthetic.parquet')
print(f'fetched synthetic data with {syn.shape[0]:,} records and {syn.shape[1]} attributes')

In [None]:
tgt.sample(n=5)

In [None]:
syn.sample(n=5)

## Accuracy Calculation

Accuracy is measured as the distances between the lower-level (binned) marginal empirical distributions of the original vs. the synthetic dataset. We perform the calculation for all univariate and bivariate distributions, and then average across to determine simple summary statistics.

In [None]:
def bin_data(dt1, dt2, bins=10):
    dt1 = dt1.copy()
    dt2 = dt2.copy()
    # quantile binning of numerics
    num_cols = dt1.select_dtypes(include='number').columns
    cat_cols = dt1.select_dtypes(include=['object', 'category', 'string', 'bool']).columns
    for col in num_cols:
        # determine breaks based on `dt1`
        breaks = dt1[col].quantile(np.linspace(0, 1, bins+1)).unique()
        dt1[col] = pd.cut(dt1[col], bins=breaks, include_lowest=True)
        dt2_vals = pd.to_numeric(dt2[col], 'coerce')
        dt2_bins = pd.cut(dt2_vals, bins=breaks, include_lowest=True)
        dt2_bins[dt2_vals < min(breaks)] = '_other_'
        dt2_bins[dt2_vals > max(breaks)] = '_other_'
        dt2[col] = dt2_bins
    # top-C binning of categoricals
    for col in cat_cols:
        dt1[col] = dt1[col].astype('str')
        dt2[col] = dt2[col].astype('str')
        # determine top values based on `dt1`
        top_vals = dt1[col].value_counts().head(bins).index.tolist()
        dt1[col].replace(np.setdiff1d(dt1[col].unique().tolist(), top_vals), '_other_', inplace=True)
        dt2[col].replace(np.setdiff1d(dt2[col].unique().tolist(), top_vals), '_other_', inplace=True)
    return dt1, dt2

    
def calculate_accuracies(dt1_bin, dt2_bin, k=1):
    # build grid of all cross-combinations
    cols = dt1_bin.columns
    interactions = pd.DataFrame(np.array(np.meshgrid(cols, cols)).reshape(2, len(cols)**2).T)
    interactions.columns = ['col1', 'col2']
    if k == 1:
        interactions = interactions.loc[(interactions['col1']==interactions['col2'])]
    elif k == 2:
        interactions = interactions.loc[(interactions['col1']<interactions['col2'])]
    else:
        raise('k>2 not supported')

    results = []
    for idx in range(interactions.shape[0]):
        row = interactions.iloc[idx]
        val1 = dt1_bin[row.col1].astype(str) + "|" + dt1_bin[row.col2].astype(str)
        val2 = dt2_bin[row.col1].astype(str) + "|" + dt2_bin[row.col2].astype(str)
        # calculate empirical marginal distributions (=relative frequencies)
        freq1 = val1.value_counts(normalize=True, dropna=False).to_frame(name='p1')
        freq2 = val2.value_counts(normalize=True, dropna=False).to_frame(name='p2')
        freq = freq1.join(freq2, how='outer').fillna(0.0)
        # calculate Total Variation Distance between relative frequencies
        tvd = np.sum(np.abs(freq['p1'] - freq['p2'])) / 2
        # calculate Accuracy as (100% - TVD)
        acc = (1 - tvd)
        out = pd.DataFrame({
          'Column': [row.col1], 'Column 2': [row.col2],
          'TVD': [tvd], 'Accuracy': [acc],
        })
        results.append(out)

    return pd.concat(results)

In [None]:
# restrict to max 100k records
tgt = tgt.sample(frac=1).head(n=100_000)
syn = syn.sample(frac=1).head(n=100_000)
# bin data
tgt_bin, syn_bin = bin_data(tgt, syn, bins=10)

In [None]:
# calculate univariate accuracies
acc_uni = calculate_accuracies(tgt_bin, syn_bin, k=1)[['Column', 'Accuracy']]
acc_uni.head()

In [None]:
# calculate bivariate accuracies
acc_biv = calculate_accuracies(tgt_bin, syn_bin, k=2)[['Column', 'Column 2', 'Accuracy']]
acc_biv = pd.concat([acc_biv, acc_biv.rename(columns={'Column': 'Column 2', 'Column 2': 'Column'})])
acc_biv.head()

In [None]:
# calculate the average bivariate accuracy
acc_biv_avg = acc_biv.groupby('Column')['Accuracy'].mean().to_frame('Bivariate Accuracy').reset_index()
# merge to univariate and avg. bivariate accuracy to single overview table
acc = pd.merge(acc_uni.rename(columns={'Accuracy': 'Univariate Accuracy'}), acc_biv_avg, on='Column').sort_values('Univariate Accuracy', ascending=False)
# report accuracy as percentage
acc['Univariate Accuracy'] = acc['Univariate Accuracy'].apply(lambda x: f"{x:.1%}")
acc['Bivariate Accuracy'] = acc['Bivariate Accuracy'].apply(lambda x: f"{x:.1%}")
acc

In [None]:
print(f"Avg. Univariate Accuracy: {acc_uni['Accuracy'].mean():.1%}")
print(f"Avg. Bivariate Accuracy:  {acc_biv['Accuracy'].mean():.1%}")
print(f"-------------------------------")
acc_avg = (acc_uni['Accuracy'].mean() + acc_biv['Accuracy'].mean()) / 2
print(f"Avg. Overall Accuracy:    {acc_avg:.1%}")

## Accuracy Plots

In [None]:
import plotly.graph_objects as go

def plot_univariate(tgt_bin, syn_bin, col, accuracy):
    freq1 = tgt_bin[col].value_counts(normalize=True, dropna=False).to_frame('tgt')
    freq2 = syn_bin[col].value_counts(normalize=True, dropna=False).to_frame('syn')
    freq = freq1.join(freq2, how='outer').fillna(0.0).reset_index()
    freq = freq.sort_values(col)
    freq[col] = freq[col].astype(str)
    
    layout = go.Layout(
        title=dict(text=f"<b>{col}</b> <sup>{accuracy:.1%}</sup>", x=0.5, y=0.98),
        autosize=True,
        height=300,
        width=800,
        margin=dict(l=10, r=10, b=10, t=40, pad=5),
        plot_bgcolor="#eeeeee",
        hovermode="x unified",
        yaxis=dict(
            zerolinecolor="white",
            rangemode="tozero",
            tickformat=".0%",
        ),
    )
    fig = go.Figure(layout=layout)
    trn_line = go.Scatter(
        mode="lines",
        x=freq[col],
        y=freq["tgt"],
        name="target",
        line_color="#666666",
        yhoverformat=".2%",
    )
    syn_line = go.Scatter(
        mode="lines",
        x=freq[col],
        y=freq["syn"],
        name="synthetic",
        line_color="#24db96",
        yhoverformat=".2%",
        fill="tonexty",
        fillcolor="#ffeded",
    )
    fig.add_trace(trn_line)
    fig.add_trace(syn_line)
    fig.show(config= dict(displayModeBar = False))

def plot_bivariate(tgt_bin, syn_bin, col1, col2, accuracy):
    x = pd.concat([tgt_bin[col1], syn_bin[col1]]).drop_duplicates().to_frame(col1)
    y = pd.concat([tgt_bin[col2], syn_bin[col2]]).drop_duplicates().to_frame(col2)
    df = pd.merge(x, y, how="cross")
    df = pd.merge(
        df,
        pd.concat([tgt_bin[col1], tgt_bin[col2]], axis=1)
        .value_counts()
        .to_frame("target")
        .reset_index(),
        how="left",
    )
    df = pd.merge(
        df,
        pd.concat([syn_bin[col1], syn_bin[col2]], axis=1)
        .value_counts()
        .to_frame("synthetic")
        .reset_index(),
        how="left",
    )
    df = df.sort_values([col1, col2], ascending=[True, True]).reset_index(drop=True)
    df["target"] = df["target"].fillna(0.0)
    df["synthetic"] = df["synthetic"].fillna(0.0)
    # normalize values row-wise (used for visualization)
    df["target_by_row"] = df["target"] / df.groupby(col1)["target"].transform("sum")
    df["synthetic_by_row"] = df["synthetic"] / df.groupby(col1)["synthetic"].transform("sum")
    # normalize values across table (used for accuracy)
    df["target_by_all"] = df["target"] / df["target"].sum()
    df["synthetic_by_all"] = df["synthetic"] / df["synthetic"].sum()
    df["y"] = df[col1].astype("str")
    df["x"] = df[col2].astype("str")

    layout = go.Layout(
        title=dict(text=f"<b>{col1} ~ {col2}</b> <sup>{accuracy:.1%}</sup>", x=0.5, y=0.98),
        autosize=True,
        height=300,
        width=800,
        margin=dict(l=10, r=10, b=10, t=40, pad=5),
        plot_bgcolor="#eeeeee",
        showlegend=True,
        # prevent Plotly from trying to convert strings to dates
        xaxis=dict(type="category"),
        xaxis2=dict(type="category"),
        yaxis=dict(type="category"),
        yaxis2=dict(type="category"),
    )
    fig = go.Figure(layout=layout).set_subplots(
        rows=1,
        cols=2,
        horizontal_spacing=0.05,
        shared_yaxes=True,
        subplot_titles=("target", "synthetic"),
    )
    fig.update_annotations(font_size=12)
    # plot content
    hovertemplate = (
        col1[:10] + ": `%{y}`<br />" + col2[:10] + ": `%{x}`<br /><br />"
    )
    hovertemplate += "share target vs. synthetic<br />"
    hovertemplate += "row-wise: %{customdata[0]} vs. %{customdata[1]}<br />"
    hovertemplate += "absolute: %{customdata[2]} vs. %{customdata[3]}<br />"
    customdata = df[
        ["target_by_row", "synthetic_by_row", "target_by_all", "synthetic_by_all"]
    ].apply(lambda x: x.map("{:.2%}".format))
    heat1 = go.Heatmap(
        x=df["x"],
        y=df["y"],
        z=df["target_by_row"],
        name="target",
        zmin=0,
        zmax=1,
        autocolorscale=False,
        colorscale=["white", "#A7A7A7", "#7B7B7B", "#666666"],
        showscale=False,
        customdata=customdata,
        hovertemplate=hovertemplate,
    )
    heat2 = go.Heatmap(
        x=df["x"],
        y=df["y"],
        z=df["synthetic_by_row"],
        name="synthetic",
        zmin=0,
        zmax=1,
        autocolorscale=False,
        colorscale=["white", "#81EAC3", "#43E0A5", "#24DB96"],
        showscale=False,
        customdata=customdata,
        hovertemplate=hovertemplate,
    )
    fig.add_trace(heat1, row=1, col=1)
    fig.add_trace(heat2, row=1, col=2)
    fig.show(config= dict(displayModeBar = False))

### Univariate Plots

In [None]:
# plot all empirical univariate distributions, and their accuracy
for idx, row in acc_uni.sample(n=5, random_state=0).iterrows():
    plot_univariate(tgt_bin, syn_bin, row['Column'], row['Accuracy'])
    print("")

### Bivariate Plots

In [None]:
for idx, row in acc_biv.sample(n=5, random_state=0).iterrows():
    plot_bivariate(tgt_bin, syn_bin, row['Column'], row['Column 2'], row['Accuracy'])
    print("")

## Privacy Calculation

To gauge the privacy risk of the generated synthetic data, we calculate the distances between the synthetic samples and their "nearest neighbor", i.e., their most similar record, from the original dataset. We then compare these distances to the same distances calculated for the original dataset itself. We expect that the synthetic samples are not systematically any closer to the original, than the original samples are to each other.

In [None]:
from sklearn.compose import make_column_transformer
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

no_of_records = min(tgt.shape[0] // 2, syn.shape[0], 10_000)
tgt = tgt.sample(n=2 * no_of_records)
trn = tgt.head(no_of_records)
hol = tgt.tail(no_of_records)
syn = syn.sample(n=no_of_records)

string_cols = trn.select_dtypes(exclude=np.number).columns
numeric_cols = trn.select_dtypes(include=np.number).columns
transformer = make_column_transformer(
    (SimpleImputer(missing_values=np.nan, strategy='mean'), numeric_cols),
    (OneHotEncoder(), string_cols),
    remainder="passthrough",
)
transformer.fit(pd.concat([trn, hol, syn], axis=0))
trn_hot = transformer.transform(trn)
hol_hot = transformer.transform(hol)
syn_hot = transformer.transform(syn)

# calculcate distances to nearest neighbors
index = NearestNeighbors(n_neighbors=2, algorithm="brute", metric="l2", n_jobs=-1)
index.fit(trn_hot)
# k-nearest-neighbor search for both training and synthetic data, k=2 to calculate DCR + NNDR
dcrs_hol, _ = index.kneighbors(hol_hot)
dcrs_syn, _ = index.kneighbors(syn_hot)
dcrs_hol = np.square(dcrs_hol)
dcrs_syn = np.square(dcrs_syn)

In [None]:
dcr_bound = np.maximum(np.quantile(dcrs_hol[:, 0], 0.95), 1e-8)
ndcr_hol = dcrs_hol[:,0]/dcr_bound
ndcr_syn = dcrs_syn[:,0]/dcr_bound
print(f"Normalized DCR 5-th percentile original  {np.percentile(ndcr_hol, 5):.3f}")
print(f"Normalized DCR 5-th percentile synthetic {np.percentile(ndcr_syn, 5):.3f}")

In [None]:
print(f"NNDR 5-th percentile original  {np.percentile(dcrs_hol[:,0]/dcrs_hol[:,1], 5):.3f}")
print(f"NNDR 5-th percentile synthetic {np.percentile(dcrs_syn[:,0]/dcrs_syn[:,1], 5):.3f}")