# 在玉米 NIR 数据上的 ($\epsilon$, $\delta$)-差分隐私 PLS

作者: Ramin Nikzad-Langerodi

本笔记本包含了 Ramin Nikzad-Langerodi, Du Nguyen Duy, Mohit Kumar 和 Mathab Alghasi (2024) 发表的论文 "($\epsilon, \delta$)-Differentially Private Partial Least Squares Regression" 的配套代码。

In [1]:
# 导入
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error as rmse, r2_score
from sklearn.model_selection import GridSearchCV
from sklearn.cross_decomposition import PLSRegression
from chemotools.baseline import AirPls
from chemotools.scatter import MultiplicativeScatterCorrection
from chemotools.derivative import SavitzkyGolay

# EDPLS
from diPLSlib.models import EDPLS

# 关闭警告
import warnings
warnings.filterwarnings("ignore")

### 数据集
我们将使用来自 Eigenvector Research Inc. 的玉米 (Corn) 数据集。该数据集包含在不同红外光谱仪上记录的玉米样品的 NIR 光谱，以及水分、油、蛋白质和淀粉含量。该数据集可在 https://www.eigenvector.com/data/Corn/ 获得。

In [2]:
# 加载数据
data = sio.loadmat('../data/corn.mat')

# 光谱和参考值
X = data['m5spec'][0][0]['data']
y = data['propvals'][0][0]['data'][:, 0].reshape(-1, 1)
sy = np.std(y)

# 波长
wn = data['m5spec'][0][0][9][1][0][0]

### 准确度-隐私权衡

使用不同的 $\epsilon$ 值训练具有固定 LVs 数量的 $(\epsilon, \delta)$-差分隐私 PLS 模型。

In [None]:
# 准备工作
eps = [1, 10, 100]                             # epsilon 值
bo = 0                                         # 用于绘图的基线偏移
ncomps=10                                      # 组件数量

# 结果数组
EPS=[]

# 将数据分割为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

plt.figure(figsize=(12, 3))
plt.subplot(131)

# 循环遍历 epsilon 值
for i in eps:
    
    m = EDPLS(ncomps, epsilon=i, delta=0.01)
    m.fit(X_train, y_train)

    # 计算 RMSEP
    ypred = m.predict(X_test)

    # 绘制回归系数
    plt.plot(wn, m.coef_ + bo)

    EPS.append(f'$\epsilon = {i:.0e}$')
    bo -= 5

plt.legend(EPS)
plt.xlabel('波长 (nm)')
plt.ylabel('系数')
plt.title('回归系数')


### 准确度-隐私权衡
plt.subplot(132)
# 定义重复次数
num_repetitions = 100

eps = [0.1, 1, 5, 10, 100, 250, 700]

# 初始化存储结果的数组
rmsep_results = np.zeros((num_repetitions, len(eps)))

# 重复实验 100 次
for rep in range(num_repetitions):
    
    # 将数据分割为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

    RMSEP = []
    for i, epsilon in enumerate(eps):
        m = EDPLS(ncomps, epsilon=epsilon, delta=0.01)
        m.fit(X_train, y_train)

        # 计算 RMSEP
        ypred = m.predict(X_test)
        rmsep = np.sqrt(rmse(y_test, ypred))/sy
        RMSEP.append(rmsep)

    # 存储本次重复的结果
    rmsep_results[rep, :] = RMSEP

# 计算均值和标准误差
rmsep_mean = np.mean(rmsep_results, axis=0)
rmsep_std = np.std(rmsep_results, axis=0) / np.sqrt(num_repetitions)

# 绘制结果
default_colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
blue = default_colors[0]
orange = default_colors[1]

# 绘制 RMSEP
r = plt.errorbar(eps, rmsep_mean, yerr=rmsep_std, fmt='o-', capsize=5, label='RMSEP', color=blue, mec='k')
#r = plt.plot(eps, rmsep_mean, 'o-', label='RMSEP', color=blue, mec='k')
plt.xlabel('隐私损失 $\epsilon$')
plt.ylabel(f'RMSEP/$\sigma_y$')
plt.title('RMSEP vs. $\epsilon$')
plt.semilogx()
plt.semilogy()
plt.title('准确度-隐私权衡')


### 真实值 vs 预测值 (PLS 基线)
plt.subplot(133)

pls = PLSRegression(n_components=ncomps, scale=False)
pls.fit(X_train, y_train)
ypred = pls.predict(X_test)
plt.scatter(y_test, ypred, edgecolors='k', label='PLS/' + f'R2P =' + str(np.round(r2_score(y_test, ypred),3)))

# 计算 R2 分数
r2p = r2_score(y_test, ypred)	
rmsep = np.sqrt(rmse(y_test, ypred))

# 标注图表
plt.annotate(f'#LVs = {ncomps}', (0.05, 0.8), xycoords='axes fraction')
plt.annotate(f'R$^2$P = {r2p:.3f}', (0.05, 0.7), xycoords='axes fraction')
plt.annotate(f'RMSEP = {rmsep:.3f}', (0.05, 0.6), xycoords='axes fraction')

plt.plot([9.25, 11], [9.25, 11], 'k--')
plt.xlabel('测量值')
plt.ylabel('预测值')
plt.title('测量值 vs. 预测值')

plt.tight_layout()

plt.show()

$\epsilon$ 越小，隐私/噪声水平越高，但模型的准确度越低。右图显示了基线 PLS 模型（不带隐私保证）在测试集上的准确度。

### 给定 $\epsilon$ 和 $\delta$ 下 LVs 数量的优化

In [None]:
# 定义 epsilon 值的范围
epsilon_values = [1, 10, 100, 200, 500]
delta = 0.001

results = {}

model = EDPLS(8, epsilon=1, delta=delta)

# 定义参数网格
param_grid = {
    'A': np.arange(1, 20)
}

plt.figure(figsize=(12, 3))
plt.subplot(131)
# 循环遍历 epsilon 值
for epsilon in epsilon_values:

    # 使用当前 epsilon 初始化模型
    model = EDPLS(8, epsilon=epsilon, delta=delta)

    # 定义 GridSearchCV 对象
    gs = GridSearchCV(model, param_grid, cv=10, scoring='neg_root_mean_squared_error')

    # 拟合模型
    gs.fit(X_train, y_train)

    # 存储结果
    results[epsilon] = {
        'param_A': gs.cv_results_['param_A'],
        'mean_test_score': gs.cv_results_['mean_test_score'],
        'std_test_score': gs.cv_results_['std_test_score']
    }


# 绘制结果
for epsilon in epsilon_values:
    mean_rmse = np.sqrt(-results[epsilon]['mean_test_score'])
    std_rmse = results[epsilon]['std_test_score']
    epsilon_sci = "{:.1e}".format(epsilon)
    #plt.errorbar(results[epsilon]['param_A'], mean_rmse, yerr=std_rmse, fmt='o-', label=f'$\epsilon = {epsilon_sci}$', capsize=2.5, mec='k')
    plt.plot(results[epsilon]['param_A'], mean_rmse, 'o-', label=f'$\epsilon = {epsilon_sci}$', mec='k')

# 与 PLS 比较
param_grid = {
    'n_components': np.arange(1, 20)
}

pls = GridSearchCV(PLSRegression(scale=False), param_grid, cv=10, scoring='neg_root_mean_squared_error')
pls.fit(X_train, y_train)

# 存储结果
results_pls = {
    'param_n_components': pls.cv_results_['param_n_components'],
    'mean_test_score': pls.cv_results_['mean_test_score'],
    'std_test_score': pls.cv_results_['std_test_score']
}
mean_rmse = np.sqrt(-results_pls['mean_test_score'])
std_rmse = results_pls['std_test_score']
plt.errorbar(results_pls['param_n_components'], mean_rmse, yerr=std_rmse, fmt='x-', capsize=2.5, mec='k', label='PLS')

plt.xlabel('组件数量')
plt.ylabel('RMSECV')
plt.semilogy()
plt.title('RMSECV vs. 潜变量数量')
plt.legend(loc='upper right')

### epsilon = 200 的 EDPLS 模型
epsilon = 10
ncomps = int(np.argmin(-results[epsilon]['mean_test_score']) + 1)

m = EDPLS(ncomps, epsilon=epsilon, delta=delta)
m.fit(X_train, y_train)

# 计算 RMSEP
ypred = m.predict(X_test)
rmsep = np.sqrt(rmse(y_test, ypred))
r2p = r2_score(y_test, ypred)


# 绘制回归系数
plt.subplot(132)
plt.plot(wn, m.coef_)
plt.xlabel('波长 (nm)')
plt.ylabel('系数')
plt.title('回归系数')

# 测量值 vs. 预测值图
plt.subplot(133)
plt.scatter(y_test, ypred, edgecolors='k')

# 标注图表
plt.annotate(f'$\epsilon = {epsilon}$', (0.05, 0.9), xycoords='axes fraction')
plt.annotate(f'#LVs = {ncomps}', (0.05, 0.8), xycoords='axes fraction')
plt.annotate(f'R$^2$P = {r2p:.3f}', (0.05, 0.7), xycoords='axes fraction')
plt.annotate(f'RMSEP = {rmsep:.3f}', (0.05, 0.6), xycoords='axes fraction')

plt.plot([0, 1], [0, 1], 'k--', transform=plt.gca().transAxes)
plt.xlabel('测量值')
plt.ylabel('预测值')
plt.title('测量值 vs. 预测值')
plt.tight_layout()

由于隐私-准确度权衡度高，在原始玉米光谱上实现强隐私保证（即 $\epsilon \leq 1$）是困难的。

### 光谱预处理对隐私-准确度权衡的影响

In [None]:
# 创建预处理过的数据集
airpls = AirPls()                                                           # 基线校正
X_airpls = airpls.fit_transform(X)

msc = MultiplicativeScatterCorrection()                                     # 乘性散射校正
X_msc = msc.fit_transform(X)

sg = SavitzkyGolay(window_size=25, polynomial_order=2, derivate_order=2)    # Savitzky-Golay 二阶导数
X_sg = sg.fit_transform(X)

datasets = {'AirPLS': X_airpls, 'MSC': X_msc, 'SG': X_sg}

# 定义重复次数
num_repetitions = 10

eps = [0.1, 1, 1.5, 2, 5, 10]
delta = 0.05
ncomps = 7

plt.figure(figsize=(12, 3))
plt.subplot(131)

for dataset_name, X_transformed in datasets.items():
    # 初始化存储结果的数组
    rmsep_results = np.zeros((num_repetitions, len(eps)))

    # 重复实验 10 次
    for rep in range(num_repetitions):
        
        # 将数据分割为训练集和测试集
        X_train, X_test, y_train, y_test = train_test_split(X_transformed, y, test_size=0.3)

        RMSEP = []
        for i, epsilon in enumerate(eps):
            m = EDPLS(ncomps, epsilon=epsilon, delta=delta)
            m.fit(X_train, y_train)

            # 计算 RMSEP
            ypred = m.predict(X_test)
            rmsep = np.sqrt(rmse(y_test, ypred))
            RMSEP.append(rmsep)

        # 存储本次重复的结果
        rmsep_results[rep, :] = RMSEP

# 计算均值和标准误差
    rmsep_mean = np.mean(rmsep_results, axis=0)
    rmsep_std = np.std(rmsep_results, axis=0) / np.sqrt(num_repetitions)

    # 绘制结果
    default_colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    blue = default_colors[0]
    orange = default_colors[1]

    # 绘制 RMSEP
    r = plt.errorbar(eps, rmsep_mean, yerr=rmsep_std, fmt="o-", capsize=5, label=dataset_name, mec='k')


plt.xlabel('隐私损失 $\epsilon$')
plt.ylabel('RMSEP')
plt.title('RMSEP vs. $\epsilon$')
plt.semilogx()
plt.semilogy()
plt.title('准确度-隐私权衡')
plt.legend()

# 为所有数据集在 epsilon=1 下进行 GridSearchCV
plt.subplot(132)

epsilon=1

results = {}
model = EDPLS(8, epsilon=2, delta=delta)

# 定义参数网格
param_grid = {
    'A': np.arange(1, 15)
}

# 循环遍历数据集
for dataset_name, X_transformed in datasets.items():

    # 将数据分割为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(X_transformed, y, test_size=0.3)

    # 使用当前 epsilon 初始化模型
    model = EDPLS(8, epsilon=epsilon, delta=delta)

    # 定义 GridSearchCV 对象
    gs = GridSearchCV(model, param_grid, cv=10, scoring='neg_root_mean_squared_error')

    # 拟合模型
    gs.fit(X_train, y_train)

    # 存储结果
    results[(dataset_name)] = {
        'param_A': gs.cv_results_['param_A'],
        'mean_test_score': gs.cv_results_['mean_test_score'],
        'std_test_score': gs.cv_results_['std_test_score']
}

# 绘制结果
for dataset_name in datasets.keys():

    mean_rmse = np.sqrt(-results[(dataset_name)]['mean_test_score'])
    std_rmse = results[(dataset_name)]['std_test_score']
    plt.plot(results[(dataset_name)]['param_A'], mean_rmse, 'o-', label=f'{dataset_name}', mec='k')

plt.xlabel('组件数量')
plt.ylabel('RMSECV')
plt.semilogy()
plt.title('RMSECV vs. 潜变量数量')
plt.legend(loc='upper right')


# 为 SG 数据集在 epsilon = 5 和最佳 LVs 数量下绘制测量值 vs 预测值
plt.subplot(133)

epsilon = 1
ncomps = int(np.argmin(-results['SG']['mean_test_score']) + 1)

m = EDPLS(ncomps, epsilon=epsilon, delta=delta)

# 将数据分割为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X_sg, y, test_size=0.3) 

m.fit(X_train, y_train)
ypred = m.predict(X_test)
rmsep = np.sqrt(rmse(y_test, ypred))
r2p = r2_score(y_test, ypred)

plt.scatter(y_test, ypred, edgecolors='k', label=dataset_name)

plt.plot([9.25, 11], [9.25, 11], 'k--')
plt.xlabel('测量值')
plt.ylabel('预测值')
plt.title('测量值 vs. 预测值')

# 标注图表
plt.annotate(f'$\epsilon = {epsilon}$', (0.05, 0.9), xycoords='axes fraction')
plt.annotate(f'#LVs = {ncomps}', (0.05, 0.8), xycoords='axes fraction')
plt.annotate(f'R$^2$P = {r2p:.3f}', (0.05, 0.7), xycoords='axes fraction')
plt.annotate(f'RMSEP = {rmsep:.3f}', (0.05, 0.6), xycoords='axes fraction')
plt.tight_layout()

基于 Savitzky-Golay（一阶导数）的光谱预处理显著改善了隐私-准确度权衡。模型现在在具有合理准确度的同时表现出强隐私保证（$\epsilon = 1$）。