<a href="https://colab.research.google.com/github/takao-takenouchi/dp_tutorial/blob/main/DP_Part2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Part2: 応用（複数Contributionの対応）

# はじめに

Part2は、Part1に続いての応用編です。特に、Part1のはじめにで記載したGoogleが提供しているExampleを参考にして作成しています。

## 「Part2:応用（複数Contributionの対応）」の学習内容

Part2では以下の内容について学びます。
*   複数contributionへの対応方法、contributionの上限を設定
*   集計に応じたsensitibityの設計、事前集計によりsensitivityを減らす方法

# Contributionとは

Part1では、一人が1レコードでありました。しかし、実際のDBは１名が複数レコードに存在するケースはよくあります。このような、あるテーブルにある１名がどのくらいのレコードに存在するかをContributionという用語で表現しています。（表現例： 山田太郎のContributionは2である。テーブル全体でContributionは高々5である）

1名のユーザのプライバシーを保護するためには、その人がいる/いないに応じてsensitibityを考える必要があるため、contributionをどう考えるかは重要です。
もし、contribuionが無限大であるとすると、sensitibityが設定できず、DPを適用できません。そこで、Contributionに上限を設けるのが一つのやり方です。

例えば、ある1名があるECサイトで買い物するのは1日で高々4回程度とするのは、ある程度リーズナブルでしょう。
そして、それ以上の回数買い物するのは外れ値として削除して集計しても良いと思います。そうすれば、sensitivityを設定でき、DPを適用できるようになります。

なお、PyDPでは、`l0_sensitivity`の値で、このContributionの最大値を設定可能です。

# 分析シナリオ

今回は、ECサイトの収益データを集計する例を想定します。

このデータは、{来客者 VisitorID、入店時間、滞在時間、曜日}で構成されます。

集計者は、ECサイトの売上分析をしたい人（外部コンサル、外部の投資家など）とします。
コンサルであれば、このECサイトの売上をあげるために分析がしたく、投資家であればこのECサイトのポテンシャルを図りたいと思うでしょう。

集計のシナリオは以下の通りです。
* 集計者は、毎月や毎週の売上の推移を見たところ、売上が順調に伸びていないことに気づきました。そこで、**ある週に限定**して、細かく分析することにしました。（一般に分析とは、分割することや、比較することです。分割の方法としては、時間の観点や、計算式の観点が考えられます）
* ある週の売上は、時間の観点では、曜日に分割できます。また、売上高の観点では「売上 ＝ 一人当たりの平均売上 x 一人当たりの平均購入回数」のように分割できます。
* よって、最新の1週間のデータについて、以下のような集計をすることにしました。
 * 集計1：曜日毎の購入者数
 * 集計2：曜日毎の売上高

これらの集計結果から、曜日毎の一人当たりの売上単価もわかりますので、分析結果から何らかの施策検討もできる可能性があります。
* 例えば、もし、平日の売上単価が低いのであれば、現在行っているxx円以上は割引キャンペーンを平日限定にして割引率を高くすることで、平日の売上増を狙うなどが考えられます。
* 他の例では、もし、曜日で売上単価が変わらず、土日だけ来店者が多いなら、平日限定で広告を打ち購入者数を増やす手も考えられます。一般的に作業が平準化できると、効率（例：配送効率）が高くなり利益（売上-コスト）の向上が狙える可能性があります。（平準化していないと臨時バイトを雇ったり、配送遅延が起こり結果として効率が悪くなったり、コストが高くなったりすることがあります）


# サンプルコード



## 準備

In [None]:
!pip install opendp --break-system-packages

In [None]:
import opendp.prelude as dp

import math
import statistics as s
import pandas as pd
from collections import defaultdict, OrderedDict
import matplotlib.pyplot as plt
import numpy as np

# OpenDPの設定を有効化
dp.enable_features('contrib')

#### 処理のための設定

In [None]:
_epsilon = 0.5

### データ取得

VisitorIdが来場者（購入者）のIDであり、Money spentが購入額、Dayは曜日を意味し1〜7の数値である。

In [None]:
_day_visits = pd.read_csv('https://raw.githubusercontent.com/OpenMined/PyDP/dev/examples/Tutorial_2-restaurant_demo/week_data.csv', sep=",")
print("data set size: ", _day_visits.shape)
_day_visits.head()

## 集計1：曜日毎の購入者数 （Contributionの最大値を設定）

前提：
* 1名がECサイトで買い物するのは1日で大人で4回程度(Contributionが4)とする
* 1名は週に最大7回購入する可能性があるとする
* epsilonを0.5とする
 (ただし、1週間の7日ごとにepsilonを消費するのでこのレポート全体ではepsilonは3.5となる)
* なお、sensitivityはcountなので1である

コードの概要：
1. データ取得
2. 関数定義: 事前処理としてContributionを超えているものを削除する関数
3. 関数定義: DPなしでのcount集計する関数
4. 関数定義: DPありでのcount集計する関数
5. 関数定義: DPあり・なしの関数を呼び出す関数
6. DPあり、DPなしの関数を呼び出して、結果表示

### 関数定義：DPノイズを入れる前に、設定を超えるデータを削除する関数

In [None]:
def bound_visits_per_week(df, limit):
    """ 引数のDataFrame dfから、設定したContributuionの最大値である limit を満たす DataFrameを返すユーティリティ関数

    DataFrameの１列目のVisitorIdで来場者（購入者）を識別し、その人が、ある日(今回は1週間のデータなので、曜日（Day列）で日が決まる)において、
    何回Contributeしているかカウントし、limitを超えてない分を含んだ新たなDataFrameを作成し、
    それを返す。
    """
    # 新たなDataFrameを作成し、その中に制限内のデータを入れていく。
    updated_df = pd.DataFrame(columns=df.columns)
    # 新たなDataFrameのindex管理のための変数
    df_idx = 0

    # visiter_idの訪問者が何回訪問したかをカウントするdicronary。
    # visiterIDと訪問回数のマッピングしておき、訪問回数の最大値のlimitを超えていないか確認する
    id_to_visited = defaultdict(int)

    # 元のDataFrameの中身を見ていき、カウントしながら、limitを超えてなければ、新たなDataFrameに追加
    for index in df.index:
        if id_to_visited[df.loc[index]["VisitorId"]] < limit:
            updated_df.loc[df_idx] = df.loc[index]
            df_idx += 1
            id_to_visited[df.loc[index]["VisitorId"]] += 1

    return updated_df

### 関数定義： DPノイズを入れずに、週毎にカウントする関数

In [None]:
def get_non_private_counts_per_day() -> dict:
    """ 差分プライバシ無しでの曜日毎の訪問者数を計算する。

    曜日(1〜7)と利用者数がマッピングされたdictionaryを返す
    """
    day_counts = dict()

    for day in _day_visits["Day"].unique():
        day_counts[day] = _day_visits[_day_visits["Day"] == day][
            "Day"
        ].count()

    return day_counts

### 関数定義：DPノイズを入れ、週毎にカウントする関数

In [None]:
COUNT_MAX_CONTRIBUTED_DAYS = 4

def get_private_counts_per_day(epsilon: float = _epsilon) -> dict:
    """指定されたepsilonを満たす差分プライバシありでの曜日毎の訪問者数を計算する。

    曜日(1〜7)と利用者数がマッピングされたdictionaryを返す
    """
    # 事前処理：訪問者が貢献した日数をCOUNT_MAX_CONTRIBUTED_DAYSに制限する。この制限を超えたDataFrameのデータは削除されている。
    day_visits = bound_visits_per_week(_day_visits, COUNT_MAX_CONTRIBUTED_DAYS)

    day_counts = dict()

    # === OpenDPのパイプライン構築 ===
    # OpenDPでは >>演算子 でデータ処理のステップを順番につなげます
    # l0_sensitivity（最大Contribution数）を考慮したカウント計算

    # Step 1: 入力空間の定義
    # カウントは整数のベクトルを扱う
    input_space_count = (
        dp.vector_domain(dp.atom_domain(T=int)),
        dp.symmetric_distance()
    )

    # Step 2: Transformation（データ変換、ノイズなし）
    # 入力空間 >> then_count() でベクトルの要素数をカウント
    count_transformation = input_space_count >> dp.t.then_count()

    # Step 3: Measurement（ノイズ追加）
    # l0_sensitivity（Contribution）を考慮したスケールパラメータを計算
    # sensitivity = l0_sensitivity * 1（countのsensitivityは1）
    # scale = sensitivity / epsilon
    scale_count = float(COUNT_MAX_CONTRIBUTED_DAYS) / epsilon
    count_measurement = count_transformation >> dp.m.then_laplace(scale=scale_count)

    # 注意: 各曜日の集計は、上記で指定したepsilonを適用する
    for day in day_visits["Day"].unique():
        # OpenDPでは毎回新しいノイズが生成される（reset不要）
        data = day_visits[day_visits["Day"] == day]["Day"].astype(np.int32).to_list()
        noisy_count = count_measurement(data)
        # 結果を整数に丸める
        day_counts[day] = int(round(noisy_count))

    return day_counts

### 関数定義： DPノイズあり・なしのそれぞれの関数を呼ぶ関数

In [None]:
def count_visits_per_day() -> tuple:
        """一日の時間ごとの訪問回数を計算し、一日とその日の訪問回数を対応させた2つの辞書を返す。

         1つ目の辞書は差分プライバシーなしのカウント計算、2つ目はPyDPライブラリを使ったプライベート計算
        """
        non_private_counts = get_non_private_counts_per_day()
        private_counts = get_private_counts_per_day()

        return non_private_counts, private_counts

### countの集計を実行
集計結果は(曜日,count値)が、1週間分（7個）並んでいる。

In [None]:
# _epsilon = 0.5
# COUNT_MAX_CONTRIBUTED_DAYS = 4

np_count_day, p_count_day = count_visits_per_day()
np_count_day = OrderedDict(sorted(np_count_day.items()))
p_count_day = OrderedDict(sorted(p_count_day.items()))

print("Visits per day:")
print("-" * 50)
print(f"{'Day':<10} {'Without DP':<15} {'With DP':<15}")
print("-" * 50)
for day in np_count_day.keys():
    print(f"{int(day):<10} {int(np_count_day[day]):<15} {int(p_count_day[day]):<15}")
print("-" * 50)

グラフを表示するとわかりますが、今回の分析の目的は、曜日毎のある程度の傾向を知ることです。多少の誤差（ノイズ）が乗っていても、ノイズなしの場合と比較して、同様な傾向が読み取れると思います。

つまり、プライバシーを保護しつつ、必要な分析ができています。分析の目的に応じたdata minimizaion原則に従っているとも言えます。

In [None]:
ax = plt.subplot(111)
X = np.arange(len(np_count_day.keys()))
width = 0.35  # the width of the bars

ax.bar(X-width/2, np_count_day.values(), width=width, color='b', align='center')
ax.bar(X+width/2, p_count_day.values(), width=width, color='g', align='center')
ax.legend(('Without DP','With DP'))
ax.set_ylabel('Count')
ax.set_xlabel('Day of week')
plt.xticks(X, np_count_day.keys())
plt.title("Vistor count throughout the week", fontsize=17)
plt.show()

### 各自で行うこと
* 最大のcontributionを変えて試す
 * 最大のContributuonを増やすと： 事前処理でデータが消えないが、ノイズが大きい
 * 最大のContributuonを減らすと： ノイズを抑えられるが、事前処理でデータが消える
* eplisonを変えて試す

## 集計2: 曜日毎の売上高 （事前集計によるSensitivityの低下）

続いて、ある週における、曜日毎の売上数を計測します。
ここでは、集計によっては、事前集計することでsensitivityを減らすことができることを学びます。

先ほどはcountでしたが、今度は曜日毎の売上集計を行うので、sumです。
countのsensitityは1ですが、sumの場合のsensitivityはPart１で説明したとおり、Uper bound - Lower boundで決まります。

簡単に考えると、それで良いのですが、sumのような処理の場合は事前集計ができる場合もあるので、コード例を見て学びましょう。

## 集計2-1:事前集計しない例

まずは、事前集計しない例を示します。

先ほどのcountの時と同様に、１日あたり最大4回購入する前提なので、最大のcontributionを4とします。

sumのsensitivityのためには、ECサイトで1人1回あたりの最大の購入金額（売上）の下限と上限を決める必要があります。ここでは、Lowerが0、Upperが50が妥当そうです。

なお、上記の条件でsensitivityを考えると、contributionが1〜4なので、Lowerが0x1=0、Upperが4x50=200なので、sensitivityは200となります。

コードの概要：
1. 関数定義: DPなしでのsum集計する関数
2. 関数定義: DPありでのsum集計する関数
3. 関数定義: DPあり・なしの関数を呼び出す関数
4. DPあり、DPなしの関数を呼び出して、結果表示

### 関数定義： DPノイズを入れずに、週毎にSumする関数

先ほどはcountでしたが、今度はsumです。

In [None]:
def get_non_private_sum_revenue() -> dict:
    """Compute the revenue per day of visits without any differential privacy.

    Return a dictionary mapping days to revenue
    """
    day_revenue = dict()

    for day in _day_visits["Day"].unique():
        day_revenue[day] = _day_visits[_day_visits["Day"] == day][
            "Money spent (euros)"
        ].sum()

    return day_revenue

### 関数定義： DPノイズを入れて、週毎にSumする関数

先ほどはcountでしたが、今度はsumです。

In [None]:
SUM_MAX_CONTRIBUTED_DAYS = 4  # Contributionの最大値
MIN_EUROS_SPENT = 0.0  # Lower bound（floatとして明示）
MAX_EUROS_SPENT_1 = 50.0  # Upper bound（floatとして明示）

def get_private_sum_revenue(epsilon: float = _epsilon) -> dict:
    """指定されたepsilonを満たす差分プライバシありでの曜日毎の売上高を計算する。

    曜日(1〜7)と売上高がマッピングされたdictionaryを返す
    """
    # 事前処理：訪問者が貢献した日数をSUM_MAX_CONTRIBUTED_DAYSに制限する
    day_visits = bound_visits_per_week(_day_visits, SUM_MAX_CONTRIBUTED_DAYS)

    day_revenue = dict()

    # === OpenDPのパイプライン構築 ===
    # OpenDPでは >>演算子 でデータ処理のステップを順番につなげます
    # l0_sensitivity（最大Contribution数）を考慮したSum計算

    # Step 1: 入力空間の定義
    # バウンド付きのベクトルドメイン（float型、MIN_EUROS_SPENT～MAX_EUROS_SPENT_1の範囲）
    input_space_sum = (
        dp.vector_domain(dp.atom_domain(bounds=(MIN_EUROS_SPENT, MAX_EUROS_SPENT_1), T=float)),
        dp.symmetric_distance()
    )

    # Step 2: Transformation（データ変換、ノイズなし）
    # 入力空間 >> then_sum() でベクトルを合計値に変換
    # 例：[10.0, 20.0, 30.0] → 60.0
    sum_transformation = input_space_sum >> dp.t.then_sum()

    # Step 3: Measurement（ノイズ追加）
    # l0_sensitivity（Contribution）を考慮したスケールパラメータを計算
    # sensitivity = l0_sensitivity * (upper_bound - lower_bound)
    # 1人が最大SUM_MAX_CONTRIBUTED_DAYS回貢献でき、1回あたり最大MAX_EUROS_SPENT_1消費するため
    # scale = sensitivity / epsilon
    sensitivity = SUM_MAX_CONTRIBUTED_DAYS * (MAX_EUROS_SPENT_1 - MIN_EUROS_SPENT)
    scale_sum = float(sensitivity) / epsilon
    sum_measurement = sum_transformation >> dp.m.then_laplace(scale=scale_sum)

    # パイプライン全体を一度に書くこともできます：
    # sum_measurement = (
    #     input_space_sum
    #     >> dp.t.then_sum()
    #     >> dp.m.then_laplace(scale=scale_sum)
    # )

    for day in day_visits["Day"].unique():
        # データを取得してクリップ（境界値内に収める）
        data = list(day_visits[day_visits["Day"] == day]["Money spent (euros)"])
        data_clipped = np.clip(data, MIN_EUROS_SPENT, MAX_EUROS_SPENT_1).tolist()

        # OpenDPでは毎回新しいノイズが生成される（reset不要）
        # パイプライン： data_clipped >> then_sum() >> then_laplace() >> 結果
        noisy_sum = sum_measurement(data_clipped)
        # 結果を整数に丸める
        day_revenue[day] = int(round(noisy_sum))

    return day_revenue

### 関数定義： DPノイズあり・なしのそれぞれの関数を呼ぶ関数

In [None]:
def sum_revenue_per_day() -> tuple:
    non_private_sum = get_non_private_sum_revenue()
    private_sum = get_private_sum_revenue()

    return non_private_sum, private_sum

### 計算を実行

In [None]:
np_sum_day, p_sum_day = sum_revenue_per_day()
np_sum_day = OrderedDict(sorted(np_sum_day.items()))
p_sum_day = OrderedDict(sorted(p_sum_day.items()))

print("Revenue per day:")
print("-" * 50)
print(f"{'Day':<10} {'Without DP':<15} {'With DP':<15}")
print("-" * 50)
for day in np_sum_day.keys():
    print(f"{int(day):<10} {int(np_sum_day[day]):<15} {int(p_sum_day[day]):<15}")
print("-" * 50)

In [None]:
ax = plt.subplot(111)
X = np.arange(len(np_sum_day.keys()))
width = 0.35  # the width of the bars
ax.bar(X-width/2, np_sum_day.values(), width=width, color='b', align='center')
ax.bar(X+width/2, p_sum_day.values(), width=width, color='g', align='center')
ax.legend(('Without DP','With DP'))
ax.set_ylabel('Revenue')
ax.set_xlabel('Day of week')
plt.xticks(X, np_sum_day.keys())
plt.title("Revenue throughout the week", fontsize=17)
plt.show()

## 集計2-2:事前集計する例 (Sensitivityを減らせる）

ここで、sensitivityを減らせる可能性がある事に気づきます。

実際のデータを見ると、ECサイトで１日に4回購入する人は、各回の購入額が少ないことに気づきます。つまり、Upperが4x50=200というのは現実的ではないということです。1日の最大合計は65ユーロであることがわかりました。

sensitivity計算のためのLowerとUpperを、ある人が１日に購入する合計額のLowerとUpperとすると、週毎の集計のUpperとLowerを減らせることができます。
そのためには事前集計をして、DPノイズを入れる前のレコードは1名のその日の合計額を出し、それに対してUpper,Lower,contributionを設定すれば良いことになります。

新たなプライバシーパラメータ：
* 1日の1名の合計購入額のLowerは 0 (先ほどと変わらず)
* 1日の1名の合計購入額のUpperは 65
* 1日の1名の合計購入額なので、１名しか入っていないので、Contributionは1

### 関数定義：事前集計をして、sumを計算しDPノイズを入れる関数

In [None]:
MAX_EUROS_SPENT_2 = 65.0  # 変更したUpper bound（floatとして明示）

def get_private_sum_revenue_with_preaggregation(
        epsilon: float = _epsilon
    ) -> dict:
    """事前集計してsensitivityを減らした差分プライバシありの曜日毎の売上高を計算する。

    曜日(1〜7)と売上高がマッピングされたdictionaryを返す
    """
    day_visits = bound_visits_per_week(_day_visits, SUM_MAX_CONTRIBUTED_DAYS)

    day_revenue = dict()

    # === OpenDPのパイプライン構築（事前集計版） ===
    # 事前集計により、1名1日あたりの合計購入額に対してDPを適用
    # これによりsensitivityを減らすことができる
    # l0_sensitivity = 1（1名は1日につき1つのレコードのみ）

    # Step 1: 入力空間の定義
    # バウンド付きのベクトルドメイン（float型、MIN_EUROS_SPENT～MAX_EUROS_SPENT_2の範囲）
    input_space_sum_preagg = (
        dp.vector_domain(dp.atom_domain(bounds=(MIN_EUROS_SPENT, MAX_EUROS_SPENT_2), T=float)),
        dp.symmetric_distance()
    )

    # Step 2: Transformation（データ変換、ノイズなし）
    # 入力空間 >> then_sum() でベクトルを合計値に変換
    sum_transformation_preagg = input_space_sum_preagg >> dp.t.then_sum()

    # Step 3: Measurement（ノイズ追加）
    # 事前集計により、l0_sensitivity = 1（1名1日につき1レコード）
    # sensitivity = 1 * (MAX_EUROS_SPENT_2 - MIN_EUROS_SPENT)
    # これは集計2-1の sensitivity = 4 * 50 = 200 よりも小さい 65 となる
    sensitivity_preagg = 1 * (MAX_EUROS_SPENT_2 - MIN_EUROS_SPENT)
    scale_sum_preagg = float(sensitivity_preagg) / epsilon
    sum_measurement_preagg = sum_transformation_preagg >> dp.m.then_laplace(scale=scale_sum_preagg)

    for day in day_visits["Day"].unique():
        # For each visitor, 曜日毎に事前の集計処理を行う
        visits_on_day = day_visits[day_visits["Day"] == day][
            ["VisitorId", "Money spent (euros)"]
        ]
        visitor_to_spending = dict()

        # 訪問者毎にその日の合計額を計算（事前集計）
        for visitor in visits_on_day["VisitorId"].unique():
            visitor_to_spending[visitor] = visits_on_day[
                visits_on_day["VisitorId"] == visitor
            ]["Money spent (euros)"].sum()

        # 訪問者毎の合計額のリストを作成
        spending = list(visitor_to_spending.values())

        # データをクリップ（境界値内に収める）
        spending_clipped = np.clip(spending, MIN_EUROS_SPENT, MAX_EUROS_SPENT_2).tolist()

        # OpenDPでは毎回新しいノイズが生成される（reset不要）
        # パイプライン： spending_clipped >> then_sum() >> then_laplace() >> 結果
        noisy_sum = sum_measurement_preagg(spending_clipped)
        # 結果を整数に丸める
        day_revenue[day] = int(round(noisy_sum))

    return day_revenue

### 関数定義： DPノイズあり・なしのそれぞれの関数を呼ぶ関数

In [None]:
def sum_revenue_per_day_with_preaggregation() -> tuple:
    non_private_sum = get_non_private_sum_revenue()
    private_sum = get_private_sum_revenue_with_preaggregation()

    return non_private_sum, private_sum

### 計算を実行

In [None]:
np_sum_day_pa, p_sum_day_pa = sum_revenue_per_day_with_preaggregation()
np_sum_day_pa = OrderedDict(sorted(np_sum_day_pa.items()))
p_sum_day_pa = OrderedDict(sorted(p_sum_day_pa.items()))

print("Revenue per day with preaggregation:")
print("-" * 50)
print(f"{'Day':<10} {'Without DP':<15} {'With DP':<15}")
print("-" * 50)
for day in np_sum_day_pa.keys():
    print(f"{int(day):<10} {int(np_sum_day_pa[day]):<15} {int(p_sum_day_pa[day]):<15}")
print("-" * 50)

In [None]:
ax = plt.subplot(111)
X = np.arange(len(np_sum_day_pa.keys()))
width = 0.35  # the width of the bars

ax.bar(X-width/2, np_sum_day_pa.values(), width=width, color='b', align='center')
ax.bar(X+width/2, p_sum_day_pa.values(), width=width, color='g', align='center')
ax.legend(('Without DP','With DP'))
ax.set_ylabel('Revenue')
ax.set_xlabel('Day of week')
plt.xticks(X, np_sum_day_pa.keys())
plt.title("Revenue with non-unique customer", fontsize=17)
plt.show()

先ほどよりもノイズが少ないように見えます

# まとめ

DPの応用として以下を学びました。
* 複数contributionへの対応方法、contributionの上限を設定
* 集計に応じたsensitibityの設計、事前集計によりsensitivityを減らす方法