# Задание 1
Реализуйте непараметрический LDA (лекция 2, слайд 34). Возьмите датасеты из предыдущего ДЗ (3 задание) (если не делали, подберите или сгенерируйте) и попытайтесь побить затюненный регуляризованный LDA (если не делали предыдущее задание, или ваша реализация получилась неудачной, то можете взять реализацию из sklearn), подбирая kernel (перебирайте популярные, а также попробуйте придумать свой kernel), lambda (можно подбирать константу, а можно функцию, лекция 2 - слайд 26). Сравните время работы алгоритмов.

In [1]:
import numpy as np
from sklearn.datasets import load_iris, load_wine
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KernelDensity

In [2]:
class NonparametricLDA:
    def __init__(self, kernel='gaussian', bandwidth=1.0, lam=0.01):
        self.kernel = kernel
        self.bandwidth = bandwidth
        self.lam = lam
        self.density = []  # Список KDE для каждого класса
        self.priors = None  # Априорные вероятности классов
        self.n = None

    def fit(self, X_train, y_train):
        self.n = len(np.unique(y_train))
        self.priors = self.estimate_class_priors(y_train)
        self.density = []
        for i in range(self.n):
            X_i = X_train[np.where(y_train == i)[0]]
            if X_i.shape[0] == 0:  # Проверяем, есть ли данные для данного класса
                # без этой проверки все падает с ошибкой ValueError
                self.density.append(None)
                continue
            kde = self.estimate_density(X_i)
            self.density.append(kde)


    def predict(self, X_test):  # по формулам со слайда
        ln_probs = []
        for i in range(self.n):
            if self.density[i] is None:  # Если KDE для класса отсутствует
                ln_probs.append(-np.inf * np.ones(X_test.shape[0]))
                continue
            ln_priors = np.log(self.priors[i])  # log(pi_l / pi_j)
            ln_density = self.density[i].score_samples(X_test if X_test.ndim > 1 else X_test[:, np.newaxis])
            # log плотностей
            ln_probs.append(ln_priors + ln_density)
        ln_probs = np.array(ln_probs).T
        return np.argmax(ln_probs, axis=1)


    def estimate_density(self, X):  # для оценки плотности
        kde = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth).fit(X)
        return kde

    def estimate_class_priors(self, y):
        y = np.array(y)
        prior = np.array([(y == i).mean() for i in np.unique(y)])
        # для того чтобы посчитать log(pi_l / pi_j)
        # надо посчитать априорные вероятности классов,
        # у нас pi_i = число объектов в классе i / общее число объектов
        return prior

In [12]:
def tune(X_train, y_train, X_test, y_test):
    kernels = ['gaussian', 'tophat', 'epanechnikov', 'cosine', 'linear', 'exponential']

    bandwidths = np.linspace(0.1, 2.0, 10)
    lambdas = np.logspace(-3.0, 0.0, 20)
    best_score = 0
    best_params = None
    for k in kernels:
        for l in lambdas:
            for b in bandwidths:
                model = NonparametricLDA(kernel=k, lam=l, bandwidth=b)
                try:
                    model.fit(X_train, y_train)
                    preds = model.predict(X_test)
                    score = accuracy_score(y_test, preds)
                    if best_score < score:
                        best_score = score
                        best_params = {"k": k, "l": l, "b": b}
                except ValueError as e:
                    print(f"Ошибка: {e}. Пропускаем эти параметры.")
    return best_score, best_params


In [4]:
from sklearn.preprocessing import LabelEncoder
import pandas as pd
# грузим те же датасеты, что и в дз1 таска 3
def load_mushroom():
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/mushroom/agaricus-lepiota.data"
    df = pd.read_csv(url, header=None)
    df = df.apply(LabelEncoder().fit_transform)
    X = df.iloc[:, 1:].values
    y = df.iloc[:, 0].values  # 0: съедобный, 1: ядовитый
    return X, y

def load_ecoli():
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/ecoli/ecoli.data"
    df = pd.read_csv(url, delim_whitespace=True, header=None)
    X = df.iloc[:, 1:-1].values
    y = LabelEncoder().fit_transform(df.iloc[:, -1].values)
    return X, y

def load_glass():
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/glass/glass.data"
    column_names = [
        'id', 'RI', 'Na', 'Mg', 'Al', 'Si', 'K', 'Ca', 'Ba', 'Fe', 'type'
    ]
    df = pd.read_csv(url, header=None, names=column_names)
    df.drop(columns=['id'], inplace=True)
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values  # тип стекла
    return X, y

In [6]:
from sklearn.datasets import load_breast_cancer

datasets = {
    "Iris": load_iris(as_frame=True),
    "Wine": load_wine(as_frame=True),
    "Breast Cancer": load_breast_cancer(as_frame=True),
    "Ecoli": load_ecoli(),
    "Mushroom": load_mushroom(),
    "Glass": load_glass()
}

In [13]:
from sklearn.preprocessing import StandardScaler

results = []
scaler = StandardScaler()
for dataset_name, data in datasets.items():
    if isinstance(data, tuple):
        X, y = data
    else:
        X = data.data
        y = data.target

    # Преобразуем X и y в массивы (если они не являются массивами)
    X = np.array(X)
    y = np.array(y)

    # Разделение на тренировочную и тестовую выборки
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Настройка гиперпараметров
    best_score, best_params = tune(X_train, y_train, X_test, y_test)

    # Создание и обучение модели с лучшими параметрами
    model = NonparametricLDA(kernel=best_params["k"], lam=best_params["l"], bandwidth=best_params["b"])
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)

    # Сохранение результатов
    results.append({
        "Dataset": dataset_name,
        "Score": accuracy_score(y_test, predictions),
        "Best Params": best_params,
    })

# Вывод результатов в виде таблицы
results_df = pd.DataFrame(results)
print(results_df)


         Dataset     Score                                        Best Params
0           Iris  0.966667            {'k': 'gaussian', 'l': 0.001, 'b': 0.1}
1           Wine  1.000000  {'k': 'exponential', 'l': 0.001, 'b': 1.155555...
2  Breast Cancer  0.964912  {'k': 'gaussian', 'l': 0.001, 'b': 0.944444444...
3          Ecoli  0.926471  {'k': 'cosine', 'l': 0.001, 'b': 1.36666666666...
4       Mushroom  1.000000  {'k': 'gaussian', 'l': 0.001, 'b': 0.311111111...
5          Glass  0.581395         {'k': 'exponential', 'l': 0.001, 'b': 0.1}


Для начала про подобранные параметры: сначала я выбирала lambda из более маленького промежутка и заметила, что лучшим параметром всегда является левая граница промежутка. Я решила увеличить промежуток, а также использовать вместо np.linspace - np.logspace (создает числа равномерно распределенные, причем в логарифмическом масштабе и 20 штук, до этого делала 10 и в линейном масштабе), но тенденция сохранилась, все еще лучшей лямбдой считается левая граница промежутка значений лямбд.
Это в целом объяснимо тем, что с уменьшением лямбды модель становится более гибкой, также, я посмотрела, датасеты не очень большие сами по себе, шума в них почти нет, поэтому выгодно брать маленькие лямбды. Также, вероятно, классы хорошо разделимы, значит маленькой лямбды нам хватает

## Теперь сравним с результатами задачи 3 из дз1
В предыдущем дз я получала такие результаты:

In [23]:
df = pd.DataFrame(
    [
        ["Iris", "1.000", "1.000"],
        ["Wine", "1.000", "1.000"],
        ["Breast Cancer", "0.974", "0.974"],
        ["Ecoli", "0.809", "0.882"],
        ["Mushroom", "0.945", "0.966"],
        ["Glass", "0.744", "0.721"]
    ],
    columns=["Dataset", "rLDA", "LogReg"]
)
res = (df
       .pivot_table(index=["Dataset", "rLDA", "LogReg"], fill_value=0)
       .reset_index()
       )
print(res)

         Dataset   rLDA LogReg
0  Breast Cancer  0.974  0.974
1          Ecoli  0.809  0.882
2          Glass  0.744  0.721
3           Iris  1.000  1.000
4       Mushroom  0.945  0.966
5           Wine  1.000  1.000


Видно, что для Wine результаты совпадают, для Iris, Breast Cancer почти совпадают (различия незначительны), для датасета Glass результат непараметрического LDA сильно хуже, поскольку этот датасет маленький, а непараметрический лда сильно зависит от плотности данных, то есть, поскольку датасет маленький, у нас получаются очень неточные оценки (шум), поэтому качество непараметрического лда здесь страдает
Для датасетов Ecoli, Mushroom лучше работает непараметрический лда

Кастомное ядро
Я хотела сделать его на базе класса, который реализовала выше, но я там использую KernelDensity, которая не кушает кастомные ядра, поэтому, чтобы не терять результаты работы, я сделаю еще класс на основе https://scikit-learn.org/1.5/modules/generated/sklearn.gaussian_process.kernels.Kernel.html

In [65]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.preprocessing import LabelEncoder, StandardScaler
import pandas as pd
from sklearn.gaussian_process.kernels import Kernel

In [66]:
class CustomKernel(Kernel):  # по примеру из https://scikit-learn.org/1.5/modules/generated/sklearn.gaussian_process.kernels.Kernel.html
    def __init__(self, bandwidth=1.0):
        self.bandwidth = bandwidth

    def __call__(self, X, Y=None):
        gaussian_component = pairwise_kernels(X, Y, metric='rbf', gamma=1 / (2 * self.bandwidth**2))
        tophat_component = pairwise_kernels(X, Y, metric='cosine')
        return 0.7 * gaussian_component + 0.3 * tophat_component

    def diag(self, X):
        return np.ones(X.shape[0])

    def is_stationary(self):
        return False

In [67]:
class NonParamLDAck: # класс для реализации непараметрического лда, чтобы было совместимо с кастомным ядром
    # отличается от того, который был выше только отсутствием estimate_density
    # в остальном все аналогично
    def __init__(self, kernel, bandwidth=1.0, lam=0.01):
        self.kernel = kernel
        self.bandwidth = bandwidth
        self.lam = lam
        self.density = []
        self.priors = None
        self.n = None

    def fit(self, X_train, y_train):
        self.n = len(np.unique(y_train))
        self.priors = self.estimate_class_priors(y_train)
        self.density = []
        for i in range(self.n):
            X_i = X_train[np.where(y_train == i)[0]]
            if X_i.shape[0] == 0:  # если пустой класс, чтобы не было ошибки
                self.density.append(None)
                continue
            self.density.append(X_i)

    def predict(self, X_test):
        ln_probs = []
        for i in range(self.n):
            if self.density[i] is None:
                ln_probs.append(-np.inf * np.ones(X_test.shape[0]))
                continue
            ln_priors = np.log(self.priors[i])
            kernel_matrix = self.kernel(X_test, self.density[i])
            kernel_means = np.mean(kernel_matrix, axis=1)
            kernel_means = np.where(kernel_means > 0, kernel_means, 1e-10)  # избегаем log(0), тк он не определен,
            # поэтому меняем все неположительные значения в kernel_means на маленький положительный эпсилон
            ln_density = np.log(kernel_means)
            ln_probs.append(ln_priors + ln_density)
        ln_probs = np.array(ln_probs).T
        return np.argmax(ln_probs, axis=1)

    def estimate_class_priors(self, y):
        y = np.array(y)
        prior = np.array([(y == i).mean() for i in np.unique(y)])
        return prior

In [68]:
datasets = {
    "Iris": load_iris(as_frame=True),
    "Wine": load_wine(as_frame=True),
    "Breast Cancer": load_breast_cancer(as_frame=True),
    "Ecoli": load_ecoli(),
    "Mushroom": load_mushroom(),
    "Glass": load_glass()
}


scaler = StandardScaler()
results = []

In [69]:
def tune_hyperparameters(X_train, y_train, X_test, y_test):  # как и выше была функция tune, только тут мы
    # используем custom kernel
    best_score = 0
    best_params = None
    bandwidths = [0.5, 1.0, 1.5, 2.0]

    for bw in bandwidths:
        kernel = CustomKernel(bandwidth=bw)
        model = NonParamLDAck(kernel=kernel, bandwidth=bw, lam=0.01)
        model.fit(X_train, y_train)
        preds = model.predict(X_test)
        score = accuracy_score(y_test, preds)

        if score > best_score:
            best_score = score
            best_params = {"bandwidth": bw}

    return best_score, best_params

In [70]:
for dataset_name, data in datasets.items():
    if isinstance(data, tuple):
        X, y = data
    else:
        X = data.data
        y = data.target

    X = np.array(X)
    y = np.array(y)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # подбор гиперпараметров
    best_score, best_params = tune_hyperparameters(X_train, y_train, X_test, y_test)

    # тренируем с лучшими параметрами
    kernel = CustomKernel(bandwidth=best_params["bandwidth"])
    model = NonParamLDAck(kernel=kernel, bandwidth=best_params["bandwidth"], lam=0.01)
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)

    score = accuracy_score(y_test, predictions)
    results.append({"Dataset": dataset_name, "Custom Kernel": score, "Best Params": best_params})

In [71]:
results_df = pd.DataFrame(results)
print(results_df)

         Dataset  Custom Kernel         Best Params
0           Iris       0.833333  {'bandwidth': 0.5}
1           Wine       0.944444  {'bandwidth': 0.5}
2  Breast Cancer       0.929825  {'bandwidth': 0.5}
3          Ecoli       0.882353  {'bandwidth': 1.0}
4       Mushroom       0.877538  {'bandwidth': 2.0}
5          Glass       0.418605  {'bandwidth': 1.0}


Результаты почти такие же, чуть похуже, тк для каждого из датасетов нам надо подбирать свое кастомное ядро, которое будет выдавать лучшие результаты, но на это, к сожалению, уйдет слишком много времени

Дополнительно: реализуйте также local likelihood logistic regression (слайды 27 и 33 из второй лекции в помощь) и сравните с моделями из основной части.

In [35]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_wine, load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [36]:
class LocalLikelihoodLogisticRegression:
    def __init__(self, bandwidth):
        self.beta = None
        self.y = None
        self.X = None
        self.bandwidth = bandwidth  # параметр ядра (ширина полосы пропускания)

    def kernel_weights(self, x0, X):
        # используем ядро для вычисления весов согласно формуле K_lambda(x_0, x_i)
        dists = np.linalg.norm(X - x0, axis=1)
        weights = np.exp(-0.5 * (dists / self.bandwidth) ** 2)  # гауссово
        return weights

    def fit(self, X, y):
        self.X = X
        self.y = y

    def predict_proba(self, x0):
        # Вычисляем веса на основе ядра
        weights = self.kernel_weights(x0, self.X)

        # Определяем локальное правдоподобие по формуле для l(beta(x_0))
        def neg_log_likelihood(beta):
            logits = np.clip(self.X @ beta, -500, 500)  # без такого ограничения при запуске сыпались runtime warning, тк переполнение
            probs = 1 / (1 + np.exp(-logits))  # sigma(logits) — вероятность принадлежности к классу 1
            ll = weights * (self.y * np.log(probs + 1e-8) + (1 - self.y) * np.log(1 - probs + 1e-8))
            return -np.sum(ll)  # отрицательное правдоподобие для минимизации

        # оптимизируем кфы (beta) по min_beta -l(beta(x_0))
        initial_beta = np.zeros(self.X.shape[1])
        result = minimize(neg_log_likelihood, initial_beta, method="BFGS")
        self.beta = result.x

        # возвращаем вероятность для точки x0
        logits = np.clip(x0 @ self.beta, -500, 500)  # чтобы не было переполнения
        prob = 1 / (1 + np.exp(-logits))
        return prob

    def predict(self, X_test):
        # предсказываем метки для каждой точки в тестовом наборе
        predictions = []
        for x0 in X_test:
            prob = self.predict_proba(x0)
            predictions.append(1 if prob >= 0.5 else 0)  # классифицируем
        return np.array(predictions)

In [39]:
def tune_params(X_train, y_train, X_test, y_test, bandwidths):  # для подбора параметров
    best_bandwidth = None
    best_score = 0

    for bandwidth in bandwidths:
        model = LocalLikelihoodLogisticRegression(bandwidth=bandwidth)
        model.fit(X_train, y_train)
        preds = model.predict(X_test)
        score = accuracy_score(y_test, preds)

        if score > best_score:
            best_score = score
            best_bandwidth = bandwidth

    return best_bandwidth, best_score


In [46]:
from time import time
# Загрузка датасетов
results = []
datasets = {
    "Iris": load_iris(as_frame=True),
    "Wine": load_wine(as_frame=True),
    "Breast Cancer": load_breast_cancer(as_frame=True),
    "Ecoli": load_ecoli(),
    "Glass": load_glass()
}
for dataset_name, data in datasets.items():
    print(f"{dataset_name} in process")
    start_time = time()
    if isinstance(data, tuple):
        X, y = data
    else:
        X = data.data
        y = data.target
    X = np.array(X)
    y = np.array(y)

    # для задачи бинарной классификации оставляем только 2 класса
    if len(np.unique(y)) > 2:
        X = X[y < 2]
        y = y[y < 2]

    # масштабируем данные
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # делим на обучающую и тестовую выборки
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    # лучшие параметры
    bandwidths = np.linspace(0.1, 2.0, 10)
    best_bandwidth, best_score = tune_params(X_train, y_train, X_test, y_test, bandwidths)

    # Обучение Local Likelihood Logistic Regression с лучшими параметрами
    lllr = LocalLikelihoodLogisticRegression(bandwidth=best_bandwidth)
    lllr.fit(X_train, y_train)
    preds_lllr = lllr.predict(X_test)

    # считаем accuracy_score
    accuracy_lllr = accuracy_score(y_test, preds_lllr)
    end_time = time()
    print(f"{dataset_name} done in", str(round(end_time - start_time, 2)), "s")
    # сравним с лог рег
    if len(np.unique(y)) < 2:  # если меньше 2 классов и не сделать такую проверку, то падает с value error
        print(f"{dataset_name} skipped")
        accuracy_logreg = 'impossible to count'
        continue
    logreg = LogisticRegression()
    logreg.fit(X_train, y_train)
    preds_logreg = logreg.predict(X_test)
    accuracy_logreg = accuracy_score(y_test, preds_logreg)

    results.append({
        "Dataset": dataset_name,
        "Best Bandwidth": best_bandwidth,
        "LLR Accuracy": accuracy_lllr,
        "Logistic Regression Accuracy": accuracy_logreg
    })

# Вывод результатов в виде таблицы
results_df = pd.DataFrame(results)
print(results_df)

Iris in process
Iris done in 0.91 s
Wine in process
Wine done in 2.67 s
Breast Cancer in process
Breast Cancer done in 189.49 s
Ecoli in process
Ecoli done in 7.4 s
Glass in process
Glass done in 1.96 s
Glass skipped
         Dataset  Best Bandwidth  LLR Accuracy  Logistic Regression Accuracy
0           Iris        0.311111      1.000000                      1.000000
1           Wine        0.522222      1.000000                      0.974359
2  Breast Cancer        2.000000      0.964912                      0.982456
3          Ecoli        1.577778      1.000000                      0.984848


Здесь нет датасета Mushroom, тк мне стало страшно за здоровье ноута после 2х часов безрезультатного обучения на этом датасете, поэтому я решила, что с грибами связываться не стоит
А кроме шуток это происходит, потому что в грибах 22 характеристики, поэтому на этом датасете алгоритм долго работал
Также была пропущена логистическая регрессия для датасета glass, тк этот датасет слишком маленький, классов не хватает

Видим, что результат Locallikelihood logreg лучше для каждого датасета