In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from sklearn.model_selection import GridSearchCV

In [2]:
def read_data(filename):
    data = pd.read_csv(filename)
    #plot_data(data)
    return data

In [3]:
def plot_data(data):
    sns.pairplot(data=data, hue='z')
    plt.show()

In [4]:
def create_partition(data):
    X = data.drop(['z'], axis=1)
    z = data['z']
    partition = train_test_split(X, z, train_size=0.66)
    return partition

In [5]:
def compute_score(filename, model, param):
    data = read_data(filename)
    partition = create_partition(data)
    score = model(partition, param)
    return score

In [6]:
def compare_model(filename, model1, model2, param1, param2):
    data = read_data(filename)
    scores = []
    for _ in range(100):
        partition = create_partition(data)
        score1 = model1(partition, param1)
        score2 = model2(partition, param2)
        scores.append(score1-score2)
    return np.mean(scores)  

In [7]:
def train_gda(X_train, z_train):
    categories = list(set(z_train))
    prior = [np.mean(z_train==category) for category in categories]
    exp = [np.mean(X_train[z_train==category]) for category in categories]
    cov = [np.cov(X_train[z_train==category].T) for category in categories]
    model = {
        'categories': categories,
        'prior': prior,
        'exp': exp,
        'cov': cov
    }
    return model

In [8]:
def test(model, X_test):
    nb_class = len(model['categories'])
    pdf = [multivariate_normal.pdf(X_test, model['exp'][i], model['cov'][i], allow_singular=True) for i in range(nb_class)]
    product = [model['prior'][i]*pdf[i] for i in range(nb_class)]
    post = np.array([np.divide(product[i], sum(product), out=np.zeros_like(product[i]), where=sum(product)!=0) for i in range(nb_class)]).T
    pred = [model['categories'][list(post[i]).index(max(post[i]))] for i in range(len(X_test))]
    return pred


In [9]:
def plot_predict(pred, X_test, z_test):
    ax = sns.scatterplot(data=X_test, x='X1', y='X2', hue=pred, style=z_test)
    leg_handles = ax.get_legend_handles_labels()[0]
    ax.legend(leg_handles, ['pred1-test1', 'pred1-test2', 'pred2-test1', 'pred2-test2'], title='Legend')
    plt.show()

In [10]:
def gda(partition, param=None):
    X_train, X_test, z_train, z_test = partition
    model = train_gda(X_train, z_train)
    pred = test(model, X_test)
    score = accuracy_score(z_test, pred)
    #plot_predict(pred, X_test, z_test)
    return score

In [11]:
def compute_param(X, z, param):
    param_grid = {'C': np.geomspace(10**-1, 10**1, 2)}
    poly = PolynomialFeatures(degree=2, include_bias=False)
    cv = GridSearchCV(LogisticRegression(max_iter=1000, solver=param['solver'], penalty=param['penalty']), param_grid, cv=3, scoring="accuracy")
    pipe = make_pipeline(poly, cv)
    pipe.fit(X, z)
    cv.fit(X, z)
    param['C'] = cv.best_params_['C']
    return param

In [12]:
def pqlr(partition, param):
    X_train, X_test, z_train, z_test = partition
    if param['penalty']!='none':
        param = compute_param(X_train, z_train, param)
    poly = PolynomialFeatures(degree=2, include_bias=False)
    model = LogisticRegression(max_iter=1000, C=param['C'], solver=param['solver'], penalty=param['penalty'])
    pipe = make_pipeline(poly, model)
    pipe.fit(X_train, z_train)
    pred = pipe.predict(X_test)
    score = accuracy_score(z_test, pred)
    return score

In [13]:
param_qlr = {
   'solver': 'lbfgs',
    'penalty': 'none',
    'C': 1
}

In [14]:
compute_score('data/synth.csv', gda, None)

0.9529411764705882

In [15]:
compute_score('data/synth.csv', pqlr, param_qlr)

0.9529411764705882

In [16]:
compare_model('data/synth.csv', gda, pqlr, None, param_qlr)

0.00017647058823529792

## Partie 2

In [17]:
def train_rda(X_train, z_train, param):
    mean = param['mean']
    shrink = param['shrink']
    scale = param['scale']
    df = param['df']
    categories = list(set(z_train))
    nb_class = len(categories)
    prior = [np.mean(z_train==category) for category in categories ]
    
    n = np.array([z_train[z_train==category].count() for category in categories])
    exp_naive = np.array([np.mean(X_train[z_train==category]) for category in categories])
    exp = [(n[i]*exp_naive[i]+shrink[i]*mean[i]) / (n[i]+shrink[i]) for i in range(nb_class)] 
    
    cov_naive = np.array([np.cov(X_train[z_train==category].T) for category in categories])
    denom = n + df + nb_class + 2
    exp_dif = np.array([(exp_naive[i]-mean[i])*(exp_naive[i]-mean[i]).T for i in range(nb_class)])
    scale_inv = np.array([np.linalg.inv(scale[i]) for i in range(nb_class)])
    cov = [(n[i]*cov_naive[i] + (n[i]*shrink[i]/(n[i]+shrink[i])) * exp_dif[i] + scale_inv[i]) / denom[i] for i in range(nb_class)] 
    model = {
        'categories': categories,
        'prior': prior,
        'exp': exp,
        'cov': cov
    }
    return model

In [18]:
def rda(partition, param):
    X_train, X_test, z_train, z_test = partition
    model = train_rda(X_train, z_train, param)
    pred = test(model, X_test)
    score = accuracy_score(z_test, pred)
    return score

In [19]:
param_pqlr = {
   'solver': 'lbfgs',
    'penalty': 'l2',
    'C':1
}

param_rda = {
    'mean': np.array([np.zeros(2), np.zeros(2)]),
    'shrink': [0, 0],
    'scale': [np.eye(2), np.eye(2)],
    'df': [0, 0]
}

In [20]:
compute_score('data/synth.csv', rda, param_rda)

0.9558823529411765

In [21]:
compute_score('data/synth.csv', pqlr, param_pqlr)

0.9558823529411765

In [23]:
compare_model('data/synth.csv', rda, pqlr, param_rda, param_pqlr)

0.00038235294117646367