# WMF Demo

This is the notebook for the WMF demo of Tumult Analytics.

## Installation instructions

First, follow the [installation instructions for Tumult Analytics](https://docs.tmlt.dev/analytics/latest/howto-guides/installation.html).

Then, install two packages used to display the graphs in this notebook:

```
pip install matplotlib
pip install seaborn
```

In [None]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat, expr
from pyspark.sql.functions import abs as pyabs

from tmlt.analytics import (
    AddRowsWithID,
    KeySet,
    MaxGroupsPerID,
    MaxRowsPerGroupPerID,
    MaxRowsPerID,
    PureDPBudget,
    QueryBuilder,
    RhoZCDPBudget,
    Session,
)
from tmlt.core.utils.exact_number import ExactNumber


#### Data

In [None]:
import sympy as sp
def epsilon_delta_to_rho_conversion(epsilon, delta):
    exact_epsilon = ExactNumber.from_float(epsilon, round_up=True).expr
    exact_delta = ExactNumber.from_float(delta, round_up=True).expr
    rho = float(-2 * sp.sqrt(-sp.log(exact_delta)) * sp.sqrt(exact_epsilon - sp.log(exact_delta))
        - 2 * sp.log(exact_delta)
        + exact_epsilon)
    return rho

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
spark.sparkContext.addFile(
    "https://gitlab.com/tumult-labs/demo-data/-/raw/main/demos/wmf/v1/wmf_countries.csv"
)
spark.sparkContext.addFile(
    "https://gitlab.com/tumult-labs/demo-data/-/raw/main/demos/wmf/v1/wmf_input_250_k.csv"
)
spark.sparkContext.addFile(
    "https://gitlab.com/tumult-labs/demo-data/-/raw/main/demos/wmf/v1/wmf_public_page_views.csv"
)

In [None]:
wmf_df = spark.read.csv(
    SparkFiles.get("wmf_input_250_k.csv"),
    header=True,
    inferSchema=True
)

grouping_column = concat(col("project"), col("pageid"), col("country"))
wmf_df = wmf_df.withColumn("grouping_column", grouping_column)
wmf_df.show()


#### Ground truth calculations

In [None]:
ground_truth_df = wmf_df.groupBy("project", "pageid", "country").agg(
    {"actor_signature": "count"}
).withColumnRenamed("count(actor_signature)", "true_counts").orderBy("true_counts", ascending=False).alias("ground_truth")

ground_truth_df.show()


#### Error calculations

In [None]:

from pyspark.sql.functions import when, isnull, abs, round, lit, col, expr, greatest

def calculate_median_relative_error(ground_truth_df, dp_df, join_columns, true_values, dp_values):
  """Calculates the median relative error between two columns in two joined DataFrames.

  Args:
    ground_truth_df (DataFrame): The first DataFrame to join.
    dp_df (DataFrame): The second DataFrame to join.
    join_columns (List[str]): The columns to join the DataFrames on.
    true_values (str): The column in the first DataFrame to calculate the relative error for.
    dp_values (str): The column in the second DataFrame to calculate the relative error for.
  """

  combined_df = ground_truth_df.join(dp_df, on=join_columns, how="right")
  magic_percentile = expr('percentile_approx(relerror * 100, 0.5)')
  mape = combined_df.withColumn("abserror", col(true_values) - col(dp_values)).withColumn("relerror", abs(col("abserror")/greatest(col(true_values), lit(1)))).agg(magic_percentile.alias("median_perc_error")).collect()[0]['median_perc_error']
  return map

In [None]:
def spurious_rate(ground_truth_df, dp_df, join_columns, true_values, dp_values):
  """Calculates spurious rate between two columns in two joined DataFrames.

  Args:
    ground_truth_df (DataFrame): The first DataFrame to join.
    dp_df (DataFrame): The second DataFrame to join.
    join_columns (List[str]): The columns to join the DataFrames on.
    true_values (str): The column in the first DataFrame to calculate the spurious rate for.
    dp_values (str): The column in the second DataFrame to calculate the spurious rate for.
  """
  combined_df = ground_truth_df.join(dp_df, join_columns, "outer")
  total_published = dp_df.count()
  if total_published == 0:
    return float("nan")
  total_non_spurious_published = combined_df.dropna(
    subset=[true_values, dp_values]
  ).count()
  total_spurious_published = total_published - total_non_spurious_published
  return (total_spurious_published / total_published)*100

In [None]:
def calculate_total_published_ratio(dp_df):
  """Calculates the total published ratio compared to hypothetical past release.

  Args:
    dp_df (DataFrame): Differentially private results DataFrame.
  """
  return dp_df.count() / 10


#### Differential Privacy using Tumult Analytics


#### First run using pureDP

In [None]:
EPSILON = 5

# Levers (tunable)
KEYSET_THRESHOLD = 10
OUTPUT_THRESHOLD = 20
ROWS_PER_ID = 10

#  We will generate keysets that only include project x pageid combinations that are above a certain threshold.
public_page_views_df = (
    spark.read.csv(
        SparkFiles.get("wmf_public_page_views.csv"),
        header=True,
        inferSchema=True
    )
    .where(col("page_views") >= KEYSET_THRESHOLD)
    .select("project", "pageid")
)
countries_df = spark.read.csv(
    SparkFiles.get("wmf_countries.csv"),
    header=True,
    inferSchema=True
)
keyset = KeySet.from_dataframe(public_page_views_df) *  KeySet.from_dataframe(countries_df)

grouping_column = concat(col("project"), col("pageid"), col("country"))
new_keyset_df = keyset.dataframe().withColumn("grouping_column", grouping_column)
keyset = KeySet.from_dataframe(new_keyset_df)

keyset.dataframe().show()


puredp_sess = Session.from_dataframe(
    privacy_budget=PureDPBudget(EPSILON),
    source_id="wmf_synthetic_data",
    dataframe=wmf_df,
    protected_change=AddRowsWithID("actor_signature"),
)

query = (
    QueryBuilder("wmf_synthetic_data")
    .enforce(MaxRowsPerID(ROWS_PER_ID))
    .groupby(keyset)
    .count()
)

dp_df = puredp_sess.evaluate(query, PureDPBudget(EPSILON))
dp_df.show()

In [None]:
filtered_dp_df = dp_df.where(f"count >= {OUTPUT_THRESHOLD}")
mre = calculate_median_relative_error(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
sr = spurious_rate(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
total_published_ratio = calculate_total_published_ratio(filtered_dp_df)
print("median_relative_error",  mre, "%")
print("spurious_rate", sr, "%")
print("total_published_ratio", total_published_ratio)


#### Optimizations


#### Differentially Private answer using Tumult Analytics ZCDP

In [None]:
EPSILON = 5
DELTA = 1e-5
ROWS_PER_GROUP_PER_ID = 1

# Levers (tunable)
KEYSET_THRESHOLD = 10
OUTPUT_THRESHOLD = 20
GROUPS_PER_ID = 10
RHO = epsilon_delta_to_rho_conversion(EPSILON, DELTA)

#  We will generate keysets that only include project x pageid combinations that are above a certain threshold.
public_page_views_df = (
    spark.read.csv(
        SparkFiles.get("wmf_public_page_views.csv"),
        header=True,
        inferSchema=True
    )
    .where(col("page_views") >= KEYSET_THRESHOLD)
    .select("project", "pageid")
)
countries_df = spark.read.csv(
    SparkFiles.get("wmf_countries.csv"),
    header=True,
    inferSchema=True
)
keyset = KeySet.from_dataframe(public_page_views_df) *  KeySet.from_dataframe(countries_df)

grouping_column = concat(col("project"), col("pageid"), col("country"))
new_keyset_df = keyset.dataframe().withColumn("grouping_column", grouping_column)
keyset = KeySet.from_dataframe(new_keyset_df)

keyset.dataframe().show()

zcdp_sess = Session.from_dataframe(
    privacy_budget=RhoZCDPBudget(RHO),
    source_id="wmf_synthetic_data",
    dataframe=wmf_df,
    protected_change=AddRowsWithID("actor_signature"),
)

query = (
    QueryBuilder("wmf_synthetic_data")
    .enforce(MaxGroupsPerID("grouping_column", GROUPS_PER_ID)) #20
    .enforce(MaxRowsPerGroupPerID("grouping_column", ROWS_PER_GROUP_PER_ID))
    .groupby(keyset)
    .count()
)
dp_df = zcdp_sess.evaluate(query, RhoZCDPBudget(RHO))
dp_df.show()

In [None]:
filtered_dp_df = dp_df.where(f"count >= {OUTPUT_THRESHOLD}")
mre = calculate_median_relative_error(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
sr = spurious_rate(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
total_published_ratio = calculate_total_published_ratio(filtered_dp_df)
print("median_relative_error",  mre, "%")
print("spurious_rate", sr, "%")
print("total_published_ratio", total_published_ratio)

#### Optimization - tuning parameters

In [None]:
import pandas as pd

import sympy as sp
from tmlt.core.utils.parameters import calculate_noise_scale
from tmlt.core.measures import RhoZCDP

keyset_thresholds = [10]
groups_per_ids = [10]

def grid_search(epsilon_deltas, output_thresholds):
    zcdp_sess = Session.from_dataframe(
        privacy_budget=RhoZCDPBudget(float("inf")),
        source_id="wmf_synthetic_data",
        dataframe=wmf_df,
        protected_change=AddRowsWithID("actor_signature"),
    )

    results = []

    for epsilon, delta in epsilon_deltas:
        rho = epsilon_delta_to_rho_conversion(epsilon, delta)

        for keyset_threshold in keyset_thresholds:
            public_page_views_df = (
                spark.read.csv(
                    SparkFiles.get("wmf_public_page_views.csv"),
                    header=True,
                    inferSchema=True
                )
                .where(col("page_views") >= KEYSET_THRESHOLD)
                .select("project", "pageid")
            )
            countries_df = spark.read.csv(
                SparkFiles.get("wmf_countries.csv"),
                header=True,
                inferSchema=True
            )
            keyset = KeySet.from_dataframe(public_page_views_df) *  KeySet.from_dataframe(countries_df)

            grouping_column = concat(col("project"), col("pageid"), col("country"))
            new_keyset_df = keyset.dataframe().withColumn("grouping_column", grouping_column)
            keyset = KeySet.from_dataframe(new_keyset_df)

            for groups_per_id in groups_per_ids:
                query = (
                    QueryBuilder("wmf_synthetic_data")
                    .enforce(MaxGroupsPerID("grouping_column", groups_per_id))
                    .enforce(MaxRowsPerGroupPerID("grouping_column", ROWS_PER_GROUP_PER_ID))
                    .groupby(keyset)
                    .count()
                )
                dp_df = zcdp_sess.evaluate(query, RhoZCDPBudget(rho))

                for output_threshold in output_thresholds:
                    filtered_dp_df = dp_df.where(f"count >= {output_threshold}")
                    mre = calculate_median_relative_error(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
                    sr = spurious_rate(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
                    total_published_ratio = calculate_total_published_ratio(filtered_dp_df)
                    results.append({
                        "epsilon": epsilon,
                        "rho": rho,
                        "keyset_threshold": keyset_threshold,
                        "output_threshold": output_threshold,
                        "groups_per_id": groups_per_id,
                        "median_relative_error": mre,
                        "spurious_rate": sr,
                        "total_published_ratio": total_published_ratio,

                    })
    return results

In [None]:
# Tuning epsilon and output_threshold
epsilon_deltas = [(5, 1e-5), (4, 1e-5), (3, 1e-5), (2, 1e-5), (1, 1e-5) ]
output_thresholds = range(5, 85, 5)

results_sdf = grid_search(epsilon_deltas, output_thresholds)
display(results_sdf)

In [None]:
import pandas as pd

spark.sparkContext.addFile(
    "https://gitlab.com/tumult-labs/demo-data/-/raw/main/demos/wmf/v1/grid_report_2_new.csv"
)
results_sdf = spark.read.csv(
    SparkFiles.get("grid_report_2_new.csv"),
    header=True,
    inferSchema=True,
)

display(results_sdf.toPandas())

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import pandas as pd

results_df = pd.DataFrame(results_sdf.toPandas())

metrics = ['median_relative_error', 'spurious_rate', 'total_published_ratio']
titles = ['Median Relative Error (%) with Max Groups Contributed = 10', 'Spurious Rate (%) with Max Groups Contributed = 10', 'Total Published Ratio with Max Groups Contributed = 10']
sns.set(font_scale=1.5)
sns.set_style("ticks")
sns.set_context("paper")

fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for i, (metric, title) in enumerate(zip(metrics, titles)):
    ax = axes[i]
    heatmap_df = results_df.pivot(index="output_threshold", columns="epsilon", values=metric)
    heatmap_df = heatmap_df.fillna(1)
    heatmap_df = heatmap_df.round(2)
    heatmap_df = heatmap_df.iloc[::-1]
    if metric not in ["spurious_rate"]:
        norm = mcolors.LogNorm()  # Logarithmic scale
    else:
        norm = None
    sns.heatmap(heatmap_df, annot=True, fmt=".2f", cmap="Reds_r", ax=ax, norm=norm, cbar=False)
    ax.set_xlabel("Epsilon")
    ax.set_ylabel("Output Threshold")
    ax.set_title(title)
    # ax.invert_yaxis()
plt.tight_layout()
plt.show()


#### Differentially Private answer using Tumult Analytics ZCDP and finalized param values

In [None]:
EPSILON = 2
DELTA = 1e-5
ROWS_PER_GROUP_PER_ID = 1

# Levers (tunable)
KEYSET_THRESHOLD = 10
OUTPUT_THRESHOLD = 40
GROUPS_PER_ID = 10
RHO = epsilon_delta_to_rho_conversion(EPSILON, DELTA)

#  We will generate keysets that only include project x pageid combinations that are above a certain threshold.
public_page_views_df = (
    spark.read.csv(
        SparkFiles.get("wmf_public_page_views.csv"),
        header=True,
        inferSchema=True
    )
    .where(col("page_views") >= KEYSET_THRESHOLD)
    .select("project", "pageid")
)
countries_df = spark.read.csv(
    SparkFiles.get("wmf_countries.csv"),
    header=True,
    inferSchema=True
)
keyset = KeySet.from_dataframe(public_page_views_df) *  KeySet.from_dataframe(countries_df)

grouping_column = concat(col("project"), col("pageid"), col("country"))
new_keyset_df = keyset.dataframe().withColumn("grouping_column", grouping_column)
keyset = KeySet.from_dataframe(new_keyset_df)

keyset.dataframe().show()

zcdp_sess = Session.from_dataframe(
    privacy_budget=RhoZCDPBudget(RHO),
    source_id="wmf_synthetic_data",
    dataframe=wmf_df,
    protected_change=AddRowsWithID("actor_signature"),
)

query = (
    QueryBuilder("wmf_synthetic_data")
    .enforce(MaxGroupsPerID("grouping_column", GROUPS_PER_ID))
    .enforce(MaxRowsPerGroupPerID("grouping_column", ROWS_PER_GROUP_PER_ID))
    .groupby(keyset)
    .count()
)

dp_df = zcdp_sess.evaluate(query, RhoZCDPBudget(RHO))
dp_df.show()

In [None]:
filtered_dp_df = dp_df.where(f"count >= {OUTPUT_THRESHOLD}")
mre = calculate_median_relative_error(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
sr = spurious_rate(ground_truth_df, filtered_dp_df, ["project", "pageid", "country"], "true_counts", "count")
total_published_ratio = calculate_total_published_ratio(filtered_dp_df)
print("median_relative_error",  mre, "%")
print("spurious_rate", sr, "%")
print("total_published_ratio", total_published_ratio)

# License
SPDX-License-Identifier: Apache-2.0

Copyright Tumult Labs 2025