# Семинар: K-Nearest Neighbors (KNN) своими руками

Сегодня мы разберем алгоритм KNN "под капотом". Мы напишем его с нуля сначала для текстов, а затем для задачи оттока клиентов банка.

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

## Часть 1. KNN для классификации текстов (Jaccard Distance)

У нас есть игрушечный датасет отзывов о ресторанах.

In [None]:
# Игрушечный датасет
dataset = [
    {'text': 'Это отличный ресторан, всегда туда буду ходить. Очень вкусная еда', 'label': 'good'},
    {'text': 'Это замечательный ресторан, хочу еще туда придти. Все было очень вкусно', 'label': 'good'},
    {'text': 'Это классный ресторан, просто шикарный, буду туда еще ходить. Все было очень здорово', 'label': 'good'},
    {'text': 'Это великолепный ресторан, приду туда еще. Еда пальчики оближешь', 'label': 'good'},
    {'text': 'Шикарное место, приду еще, заведение огонь', 'label': 'good'},

    {'text': 'Заведение помойка, больше туда ни ногой, отвратительное обслуживание и блюда мерзкие', 'label': 'bad'},
    {'text': 'Место дно, нигода больше туда не пойду, было невкусно и долго готовили', 'label': 'bad'},
    {'text': 'Больше не приду, еда не понравилась, готовят долго, отстой', 'label': 'bad'},
    {'text': 'Официант нахамил и плюнул мне в тарелку, чтобы я еще туда зашел, никогда в жизни', 'label': 'bad'},
    {'text': 'Просто отвратительно, мне не понравилось', 'label': 'bad'},
]

### Задание 1: Препроцессинг и Jaccard Metric

Для текстов мы будем использовать расстояние Жаккара. 
Напомним формулу: $J(A, B) = 1 - \frac{|A \cap B|}{|A \cup B|}$, где A и B — множества слов в текстах.

**Ваша задача:** дописать функцию `jaccard_dist`.

In [None]:
def preprocess(s):
    # Простая предобработка: нижний регистр и удаление знаков препинания
    return s.lower().replace(',', '').replace('.', '')

def jaccard_dist(s1, s2):
    # 1. Преобразуем строки в множества слов
    set1 = set(preprocess(s1).split())
    set2 = set(preprocess(s2).split())
    
    # 2. Вычисляем пересечение и объединение
    # ВАШ КОД ЗДЕСЬ
    intersection = ...
    union = ...
    
    # 3. Считаем коэффициент Жаккара (IoU) и возвращаем дистанцию (1 - IoU)
    # ВАШ КОД ЗДЕСЬ
    iou = ...
    dist = ...
    
    return dist

In [None]:
# Проверка
t1 = "привет как дела"
t2 = "привет как настроение"
# Пересечение: {привет, как} (2 слова)
# Объединение: {привет, как, дела, настроение} (4 слова)
# IoU = 0.5, Dist = 0.5
assert jaccard_dist(t1, t2) == 0.5
print("Функция работает корректно!")

### Задание 2: Реализация класса KNN

Реализуйте метод `predict`. Алгоритм:
1. Пройтись по всем объектам в `self.X`.
2. Посчитать расстояние от входящего `x` до каждого объекта из `self.X`.
3. Отсортировать объекты по расстоянию (от меньшего к большему).
4. Взять топ-K объектов.
5. Вернуть самый частый класс среди этих топ-K.

In [None]:
from collections import Counter

class TextKNN:
    def __init__(self, k=3):
        self.k = k
    
    def fit(self, X, y):
        # KNN — ленивый алгоритм, он просто запоминает данные
        self.X = X
        self.y = y
        
    def predict(self, x_text):
        distances = []
        
        # ВАШ КОД ЗДЕСЬ
        # Цикл по всем примерам из обучения
        for x_train, y_train in zip(self.X, self.y):
            # Посчитайте дистанцию и сохраните кортеж (dist, label)
            pass 
        
        # Сортировка по дистанции
        distances.sort(key=lambda x: x[0])
        
        # Берем топ K соседей
        neighbors = ...
        
        # Получаем метки классов соседей
        neighbor_labels = [n[1] for n in neighbors]
        
        # Находим самый популярный класс (Mode)
        most_common_label = Counter(neighbor_labels).most_common(1)[0][0]
        return most_common_label

In [None]:
# Проверяем
X_text = [x['text'] for x in dataset]
y_text = [x['label'] for x in dataset]

knn_text = TextKNN(k=3)
knn_text.fit(X_text, y_text)

test_review = "Было невкусно, больше не приду"
print(f"Отзыв: '{test_review}' -> Класс: {knn_text.predict(test_review)}")

## Часть 2. Работа с данными Churn (Отток клиентов)

Теперь переходим к числовым данным. 
Загружаем датасет о клиентах банка.

In [None]:
df = pd.read_csv('https://raw.githubusercontent.com/obulygin/content/main/bank_churn/Churn.csv')
df.head()

### Подготовка данных (Preprocessing)

KNN работает с дистанциями. Чтобы считать дистанции:
1. Данные должны быть числовыми (категории кодируем).
2. Данные должны быть одного масштаба (нормализация/стандартизация). Иначе зарплата в 100,000 будет вносить больший вклад в расстояние, чем возраст 40.

In [None]:
# 1. Удаляем ненужные столбцы (ID не несут смысла для модели)
df = df.drop(['RowNumber', 'CustomerId', 'Surname'], axis=1)

# 2. Чистим пропуски. Взглянем, есть ли они
print(df.isnull().sum())

In [None]:
# Удаляем строки с пропусками
df = df.dropna()

# 3. Кодируем категории.
# Gender: Female/Male -> 0/1
# Geography: France/Spain/Germany -> One Hot Encoding

df = pd.get_dummies(df, columns=['Geography', 'Gender'], drop_first=True)

print(df.head())

# Разделяем на X и y
X = df.drop('Exited', axis=1).values # Конвертируем в numpy array для удобства
y = df['Exited'].values

# Разбиваем на train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 4. Масштабируем данные!
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

## Часть 3. KNN для табличных данных

Текстовая метрика (Жаккар) здесь не подойдет. Нам нужна Евклидова метрика (обычное расстояние между точками).

$$ d(p, q) = \sqrt{\sum_{i=1}^n (q_i - p_i)^2} $$

### Задание 3: Реализуйте Евклидову метрику и обновите класс

In [None]:
def euclidean_dist(a, b):
    # a и b — это numpy массивы (векторы строк датафрейма)
    # ВАШ КОД ЗДЕСЬ: корень из суммы квадратов разностей
    
    diff = ...
    sq_diff = ...
    sum_sq = ...
    return ...

# Класс KNN для числовых данных
class NumKNN:
    def __init__(self, k=5):
        self.k = k
    
    def fit(self, X, y):
        self.X = X
        self.y = y
    
    def predict(self, X_input):
        # В sklearn метод predict принимает сразу много объектов (матрицу)
        predictions = []
        
        for x in X_input:
            # Это тот самый цикл, который нужно написать самому
            # ВАШ КОД ЗДЕСЬ:
            # 1. Посчитать расстояния от x до всех точек в self.X используя euclidean_dist
            
            distances = []
            # Подсказка: можно циклом, а можно векторизировано, если умеете
            for i, x_train in enumerate(self.X):
                d = euclidean_dist(x, x_train)
                distances.append((d, self.y[i]))
            
            # 2. Сортировка и выбор топ-K
            distances.sort(key=lambda item: item[0])
            top_k = distances[:self.k]
            
            # 3. Голосование (majority vote)
            labels = [item[1] for item in top_k]
            prediction = Counter(labels).most_common(1)[0][0]
            
            predictions.append(prediction)
            
        return np.array(predictions)

In [None]:
# Обучаем и предсказываем
# ВНИМАНИЕ: KNN работает долго на больших данных. Возьмем подвыборку теста для скорости, если хотите
my_knn = NumKNN(k=5)
my_knn.fit(X_train_scaled, y_train)

# Сделаем предсказание только на первых 100 объектах теста, чтобы не ждать долго
y_pred = my_knn.predict(X_test_scaled[:100])
y_true = y_test[:100]

print("Предсказания готовы")

## Часть 4. Метрики качества своими руками

Не используйте `sklearn.metrics`! Напишем всё сами.

- **Accuracy**: Доля правильных ответов.
- **Precision (Точность)**: Доля реально положительных среди всех предсказанных положительными ($TP / (TP + FP)$).
- **Recall (Полнота)**: Доля предсказанных положительных среди всех реально положительных ($TP / (TP + FN)$).
- **F1-score**: Гармоническое среднее ($2 * (P * R) / (P + R)$).

In [None]:
def get_metrics(y_true, y_pred):
    # Вычисляем TP, FP, TN, FN
    # y_true и y_pred - это массивы 0 и 1
    
    TP = 0 # True Positive: PRED=1, TRUE=1
    TN = 0 # True Negative: PRED=0, TRUE=0
    FP = 0 # False Positive: PRED=1, TRUE=0
    FN = 0 # False Negative: PRED=0, TRUE=1
    
    # ВАШ КОД ЗДЕСЬ (пройдитесь циклом и заполните счетчики)
    for yt, yp in zip(y_true, y_pred):
        if ...:
             pass 
    
    # Считаем метрики
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    
    # Добавьте защиту от деления на 0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = ...
    f1 = ...
    
    print(f"Accuracy: {accuracy:.2f}")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1: {f1:.2f}")

get_metrics(y_true, y_pred)

## Часть 5. Взвешенный KNN (Weighted KNN)

В обычном KNN голос далекого соседа учитывается так же, как голос близкого. Это неправильно.
Давайте взвесим голоса: чем меньше дистанция, тем больше вес.

Формула веса: $w = \frac{1}{distance + \epsilon}$ (эпсилон, чтобы не делить на 0).

### Задание 4: Реализуйте взвешенное голосование

Модифицируйте только метод `predict`.

In [None]:
class WeightedKNN(NumKNN):
    def predict(self, X_input):
        predictions = []
        EPS = 1e-5
        
        for x in X_input:
            distances = []
            for i, x_train in enumerate(self.X):
                d = euclidean_dist(x, x_train)
                distances.append((d, self.y[i]))
            
            distances.sort(key=lambda item: item[0])
            top_k = distances[:self.k]
            
            # ВЗВЕШЕННОЕ ГОЛОСОВАНИЕ
            # Создадим словарь очков: {0: score, 1: score}
            class_scores = {0: 0.0, 1: 0.0}
            
            for dist, label in top_k:
                # ВАШ КОД ЗДЕСЬ
                # Посчитать вес и прибавить к соответствующему классу
                weight = ...
                class_scores[label] += weight
            
            # Выбираем класс с максимальной суммой весов
            predicted_label = max(class_scores, key=class_scores.get)
            predictions.append(predicted_label)
            
        return np.array(predictions)

In [None]:
w_knn = WeightedKNN(k=5)
w_knn.fit(X_train_scaled, y_train)

y_pred_weighted = w_knn.predict(X_test_scaled[:100])

print("Метрики для взвешенного KNN:")
get_metrics(y_true, y_pred_weighted)