# A short demo of Tumult Analytics

This notebook shows what the best-in-class differential privacy framework looks like :-)

First, we import what we'll need for this demo, and load some data in Spark.

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from math import floor
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from tmlt.analytics.binning_spec import BinningSpec
from tmlt.analytics.constraints import MaxGroupsPerID, MaxRowsPerGroupPerID, MaxRowsPerID
from tmlt.analytics.keyset import KeySet
from tmlt.analytics.privacy_budget import PureDPBudget, RhoZCDPBudget
from tmlt.analytics.protected_change import AddOneRow, AddRowsWithID
from tmlt.analytics.query_builder import QueryBuilder
from tmlt.analytics.session import Session, ColumnType

# silence some Spark warnings
import warnings
warnings.simplefilter('ignore', UserWarning)
warnings.simplefilter('ignore', ResourceWarning)
warnings.simplefilter('ignore', FutureWarning)
spark = SparkSession.builder.config('spark.driver.memory', '4g').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# download the data
spark.sparkContext.addFile(
    "https://tumult-public.s3.amazonaws.com/library-members.csv"
)
members_df = spark.read.csv(
    SparkFiles.get("library-members.csv"), header=True, inferSchema=True
)

This dataset lists the members of a fictional public library. Let's take a look.

In [None]:
display(members_df.limit(10).toPandas())

## How many members are in our public library?

First, we initialize a Tumult Analytics **Session**.

In [None]:
session = Session.from_dataframe(
    source_id="members",
    dataframe=members_df,                   # our data
    protected_change=AddOneRow(),           # what we protect in our data
    privacy_budget=PureDPBudget(epsilon=1), # what privacy budget we protect it with
)

Second, we compute the simplest statistic there is: how many people are in our data?

We do this in two steps:
- **building** the query;
- and **evaluating** our query on our Session.

In [None]:
total_count_query = QueryBuilder("members").count()

total_count = session.evaluate(total_count_query, PureDPBudget(0.4))
total_count.show()

That was easy! How much budget do we have left?

In [None]:
session.remaining_privacy_budget

## Which authors are most popular?

Now for something a little more complicated: finding which authors are most popular.

Let's download two more datasets: 
- `checkouts_df` lists the books borrowed by library members.
- `books_df` lists all the books that the library holds.

In [None]:
spark.sparkContext.addFile(
    "https://tumult-public.s3.amazonaws.com/checkout-logs.csv"
)
spark.sparkContext.addFile(
    "https://tumult-public.s3.amazonaws.com/library_books.csv"
)
checkouts_df = spark.read.csv(
    SparkFiles.get("checkout-logs.csv"), header=True, inferSchema=True
)
books_df = spark.read.csv(
    SparkFiles.get("library_books.csv"), header=True, inferSchema=True
)

Let's take a look at the checkouts dataset.

In [None]:
display(checkouts_df.limit(5).toPandas())

Let's load it in a Session, using **privacy IDs** to protect all the contributions of each library member.

In [None]:
session = Session.from_dataframe(
    source_id="checkouts",
    dataframe=checkouts_df,                   
    protected_change=AddRowsWithID("member_id"), # protecting all the contributions of any single member
    privacy_budget=PureDPBudget(epsilon=1),
)

And we can now see who the most popular authors are, using a group-by query with **public group-by keys**.

In [None]:
# public group-by keys
keyset = KeySet.from_dataframe(
    books_df.select("author"),
)
count_query = (
    QueryBuilder("checkouts")
    .enforce(MaxRowsPerID(20))
    .groupby(keyset)
    .count()
)
author_counts = session.evaluate(count_query, PureDPBudget(1))
top_five_authors = author_counts.sort("count", ascending=False).limit(5)
display(top_five_authors.toPandas())

## What is the median age of readers of each literary genre?

The `checkouts_df` table has data about book checkouts, while `members_df` has demographic information about members. 

To find correlations between literary genres and the age of people who read books in this genre, we need to load **multiple tables** in our Session. We will also use **zero-concentrated differential privacy** as a privacy notion, to get a better privacy/accuracy trade-off.

In [None]:
session_budget = RhoZCDPBudget(rho=0.5) # zero-concentrated differential privacy budget
id_space = "members_id_space"           # indicating that both tables share an identifier
session = (
    Session.Builder()
    .with_privacy_budget(session_budget)
    .with_id_space(id_space)
    .with_private_dataframe(
        "checkouts",
        checkouts_df,
        protected_change=AddRowsWithID( # protecting arbitrarily many rows with the same identifier…
            id_column="member_id",
            id_space=id_space,
        ),
    )
    .with_private_dataframe(
        "members",
        members_df,
        protected_change=AddRowsWithID( # … in both tables.
            id_column="id",
            id_space=id_space,
        ),
    )
    .build()
)

Each book can have *multiple* genres: we will expand the `"checkout"` table to put each genre on its own row, using a **flat map**, and creating a **view** with the result.

In [None]:
expand_genres = (
    QueryBuilder("checkouts").flat_map(
        lambda row: [{"genre": genre} for genre in row["genres"].split(",")],
        {"genre": ColumnType.VARCHAR},
        augment=True,
    )
)

session.create_view(expand_genres, "checkouts_single_genre", cache=True)
print(f"Private dataframes: {session.private_sources}")

Then, we **join** the table we just created with the `"members"` table.

In [None]:
private_join = (
    QueryBuilder("members")
    .rename({"id": "member_id"})
    .join_private("checkouts_single_genre")
)
session.create_view(private_join, "checkouts_joined", cache=True)
print(f"Private dataframes: {session.private_sources}")

Finally, we can select a few genres, and compute the **median** age of readers of each genre.

In [None]:
genres = [
    "Mystery/thriller/crime",
    "History",
    "Romance",
    "Fantasy",
    "Classics/Literature",
    "Children",
]
median_age_by_genre_query = (
    QueryBuilder("checkouts_joined")
    .enforce(MaxRowsPerID(10))
    .groupby(KeySet.from_dict({"genre": genres}))
    .median("age", low=0, high=100)
)
median_age_by_genre = session.evaluate(median_age_by_genre_query, RhoZCDPBudget(rho=0.1)).toPandas()

In [None]:
ax = sns.barplot(
    x="genre",
    y="age_median",
    data=median_age_by_genre.sort_values(["age_median"]),
    color="#1f77b4"
)
ax.set(xlabel="Genre", ylabel="Median age", title="Median age by genre")
ax.set_xticklabels(
    median_age_by_genre["genre"], rotation=45, horizontalalignment="right"
)
plt.show()

## How does reading behavior vary with age?

The information about median age is somewhat coarse. Let's get a clearer picture, by looking at what genres people read most depending on their age.

To do so, we will first **bin** our data into age ranges.

In [None]:
# define age groups, binning edges at [0, 20, 40, ...,100]
age_binspec = BinningSpec(bin_edges = [20*i for i in range(0, 6)])
binned_age_genre_keys = KeySet.from_dict({
    "binned_age": age_binspec.bins(),
    "genre": genres,
})

And we can now run our query, using **advanced truncation** to optimize the differential privacy mechanism.

In [None]:
genre_by_age_query = (
    QueryBuilder("checkouts_joined")
    .bin_column("age", age_binspec, name="binned_age") # bin the data
    .enforce(MaxGroupsPerID("binned_age", 1))          # each person has a single age
    .enforce(MaxGroupsPerID("genre", 6))               # we are only considering 6 genres in our analysis
    .enforce(MaxRowsPerGroupPerID("genre", 1))         # we only want to count each person once per genre…
    .enforce(MaxRowsPerGroupPerID("binned_age", 6))    # … so each person will appear in at most 6 rows 
    .groupby(binned_age_genre_keys)
    .count()
)
genre_by_age = session.evaluate(genre_by_age_query, RhoZCDPBudget(rho=0.4)).toPandas()

In [None]:
# convert binned_age to categorical
genre_by_age["binned_age"] = pd.Categorical(genre_by_age["binned_age"], age_binspec.bins())
age_counts = (
    genre_by_age.groupby("binned_age").sum().rename(columns={"count": "age_count"})
)
# compute percentage of each genre in each age group, replace negative values with 0
genre_by_age_pct = genre_by_age.join(age_counts, on="binned_age")
genre_by_age_pct["pct"] = genre_by_age_pct["count"] / genre_by_age_pct["age_count"] * 100
genre_by_age_pct["pct"] = genre_by_age_pct["pct"].clip(lower=0)

ax = sns.barplot(
    x="binned_age",
    y="pct",
    order=age_binspec.bins(),
    hue="genre",
    data=genre_by_age_pct,
)
ax.set(xlabel="Age Group", ylabel="Genre (%)", title="Reading Preferences by Age")
sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1), ncol=1, title="Genre")