## Module adverse - analyse de qualité de l'anonymisation

### Import

In [None]:
# Import modules

# Classic modules
import pandas as pd
import matplotlib.pyplot as plt

#from anonympy.pandas import dfAnonymizer

from pycanon import anonymity, report

# Functions
from utils.exploration import explo, clean, drop
from utils.correlation import categorical_comparison, p_vals_correction, numerical_correlation
from utils.tools import col_set
from utils.outliers import identify_outliers, explore_outliers, identify_num_outliers, cluster
from utils.stats import categorical_loss, numerical_loss, plot_info_loss, target_loss
from utils.inference import Infer_Model, compare_models

from utils.ano_correc import all_local_aggregation, get_diversities, less_diverse_groups, get_l

In [None]:
# Path to your dataset
path_to_ano = ""
path_to_original = ""

# Columns you want to study
cols = []

In [None]:
# Import data

cols_df = pd.read_csv(
    path_to_original,
    usecols = cols,

    # Uncomment the following lines as needed

    #encoding="utf-8",
    #sep=",",
    #lineterminator="\n",
    #header=0,

    # The following is useful if your dataset is large and you wish to test this notebook.
    #nrows=100000
    )

In [None]:
ano_df = pd.read_csv(path_to_ano, usecols=cols)

## User variables

In [None]:
target =''
QI = []
SA = []
num_cols = []
cat_cols = []

n_1_perc = int((len(cols_df)*0.001)//1)

### Approche adverse

In [None]:
temp = ano_df
val_set = cols_df.sample(frac=0.05)
cols_df.drop(index = val_set.index, inplace=True)

In [None]:
# Calculate k for k-anonymity:
k = anonymity.k_anonymity(temp, QI)

print("According to the anonymity pycanon module, the k-anonymity is {}".format(k))

In [None]:
# Print the anonymity report:
temp.reset_index(inplace=True, drop=True)
report.print_report(temp, QI, SA)

# Analyze results

#### Non individualisation :

In [None]:
# Identify outliers

temp.describe()

Identification des outliers en terme de donnée catégorielle

In [None]:
# regarder PCA projetée en 2D et regarder un plot

from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder()
X = enc.fit_transform(temp[QI]).toarray()

from sklearn.decomposition import PCA

pca = PCA(n_components=2)

res = pd.DataFrame(pca.fit_transform(X),columns=['PC1','PC2']) 

plt.scatter(res['PC1'], res['PC2'])
plt.show()

Identification des outliers en terme de donnée numérique

In [None]:
# use identify_num_outliers

Identification des doubles outliers

In [None]:
# Cluster analysis

cluster(temp, n_clusters = 100, cat_cols = cat_cols)


#### Non inférence :

Premier scénario d'attaque : prédire un attribut d'un individu

In [None]:
# Comparer l'inférence pour toutes les répartitions de variables

res = pd.DataFrame(columns = ["Target", "Score de performance de la prédiction", "Delta in score"])

for x in temp.columns :
    target = str(x) 

    model = Infer_Model(temp, cat_cols=cat_cols, num_cols = num_cols, target = target)
    before_model = Infer_Model(cols_df, cat_cols=cat_cols, num_cols = num_cols, target = target)
    val_model = Infer_Model(val_set, cat_cols=cat_cols, num_cols = num_cols, target = target)

    model.prep_data()
    before_model.prep_data()
    val_model.prep_data()

    model.df = model.df.align(before_model.df, join='right', axis=1, fill_value=0)[0]
    val_model.df = val_model.df.align(before_model.df, join='right', axis=1, fill_value=0)[0]

    x_train, x_test, y_train, y_test =  model.split()
    pred = model.train_model(x_train, x_test, y_train, y_test)
    x_train, x_test, y_train, y_test =  before_model.split()
    pred_original = before_model.train_model(x_train, x_test, y_train, y_test)

    new_cols =list(val_model.df.columns)
    new_cols.remove(target)
    _, model_score, delta = compare_models(pred, pred_original, val_model.df[new_cols], val_model.df[target], print=False)
    res.loc[len(res)] = [target, model_score, delta]


In [None]:
res.plot.bar(x='Target', stacked=True, color=['tomato','lightseagreen'], figsize=(7,5))

In [None]:
plot_info_loss(cols_df, temp, cat_cols)

Deuxième scénario d'attaque : ré-entrainer un modèle en connaissant un certain nombre de lignes

In [None]:
# on train sur un sous-jeu
attack_df = cols_df[cols].copy()

attacker_set = attack_df.sample(frac=0.3)
attack_df.drop(index = attacker_set.index, inplace=True)

In [None]:
# On entraine le modèle
attack_model = Infer_Model(attacker_set, cat_cols=cat_cols, num_cols = [], target = target)
attack_model.prep_data()
x_train, x_test, y_train, y_test =  attack_model.split()
attack_pred = attack_model.train_model(x_train, x_test, y_train, y_test)

In [None]:
# on teste sur la donnée anonymisée et sur la donnée normale et on voit si on sous-performe maintenant
attacker_ano = all_local_aggregation(attack_df.copy(),k=n_1_perc, variables = QI, method = 'regroup_with_smallest')

for x in [attacker_ano, attack_df]:
    val_model = Infer_Model(x, cat_cols=QI, num_cols = [], target = target)
    val_model.prep_data()
    val_model.df = val_model.df.align(attack_model.df, join='right', axis=1, fill_value=0)[0]
    new_cols =list(val_model.df.columns)
    new_cols.remove(target)
    print(attack_pred.score(val_model.df[new_cols], val_model.df[target]))

#### Qualité de la donnée statistique

In [None]:
# Perte d'information - donnée catégorielle
categorical_loss(cols_df, temp, QI)

target_loss(cols_df, ano_df, target)

In [None]:
# Perte d'information - donnée numérique
numerical_loss(cols_df, temp, num_cols)