# Класс Matrix (из предыдущей лабораторной работы)

Создан класс разреженной матрицы Matrix.
* В конструкторе задаются размеры и структуры хранения.
* Метод AddElement добавляет ненулевой элемент в разреженную структуру.
* GetElement и ToRegularMatrix обеспечивают доступ к полному виду матрицы.
* Transpose и Dot выполняют транспонирование и умножение матриц.
* NormalizeVector нормализует вектор по L2-норме.
* Determinant вычисляет определитель через модифицированный метод Гаусса.

In [1]:
import copy

class Matrix:
    def __init__(self, rows: int, cols: int):
        self.rows = rows
        self.cols = cols
        self.data = list()
        self.columns = list()
        self.row_ptr = [0]

# добавляем ненулевое значение в нужную ячейку
    def AddElement(self,row: int,col: int, value : float):
        if (row >= self.rows or col >= self.cols): #Проверка границ строки и столбца
            assert False
        #Поддержка корректной длины row_ptr
        while(len(self.row_ptr) != (row + 2)):
            self.row_ptr.append(self.row_ptr[len(self.row_ptr) - 1])
        #Добавление ненулевых элементов
        if (value != 0):
            self.data.append(value)
            self.columns.append(col)
            self.row_ptr[row+1] += 1

    def GetElement(self,row: int,col: int):
        #Проверка границ строки и столбца
        if (row >= self.rows or col >= self.cols):
            assert False
        start = self.row_ptr[row]
        end = self.row_ptr[row +1]
        #Поиск запрашиваемого элемента
        for i in range(start,end):
            if (self.columns[i] == col):
                value = self.data[i]
                if (int(value) == value):
                    return int(value)
                return value
        #Если элемент не найден
        return 0


    def swap_rows_(self,data,col,row_ptr, row1, row2):

        row1_start, row1_end = row_ptr[row1], row_ptr[row1 + 1]
        row2_start, row2_end = row_ptr[row2], row_ptr[row2 + 1]

        row1_data = data[row1_start:row1_end]
        row1_col = col[row1_start:row1_end]
        row2_data = data[row2_start:row2_end]
        row2_col = col[row2_start:row2_end]

        new_data = []
        new_indices = []
        new_row_ptr = [0]

        # перебираем все строки
        for i in range(self.rows):
            if i == row1:
                new_data.extend(row2_data)
                new_indices.extend(row2_col)
            elif i == row2:
                new_data.extend(row1_data)
                new_indices.extend(row1_col)
            else:
                start, end = row_ptr[i], row_ptr[i + 1]
                new_data.extend(data[start:end])
                new_indices.extend(col[start:end])


            new_row_ptr.append(len(new_data))
        return new_data,new_indices,new_row_ptr


    def ToRegularMatrix(self):
        matrix = [[0] * self.cols for _ in range(self.rows)]
        for i in range(self.rows):

            start = self.row_ptr[i]
            end = self.row_ptr[i + 1]
            for j in range(start, end):
                matrix[i][self.columns[j]] = self.data[j]
        return matrix

    # транспонирование
    def Transpose(self) -> 'Matrix':
        result = Matrix(self.cols, self.rows)
        for i in range(self.rows):
            for j in range(self.row_ptr[i], self.row_ptr[i + 1]):
                col = self.columns[j]
                val = self.data[j]
                result.AddElement(col, i, val)
        return result

    #умножение матриц
    def Dot(self, other: 'Matrix') -> 'Matrix':
        if self.cols != other.rows:
            raise ValueError("Несовместимые размеры для умножения")
        result = Matrix(self.rows, other.cols)
        B = other.ToRegularMatrix()
        for i in range(self.rows):
            row_vec = [0] * self.cols
            for j in range(self.row_ptr[i], self.row_ptr[i + 1]):
                row_vec[self.columns[j]] = self.data[j]
            for col in range(other.cols):
                dot = sum(row_vec[k] * B[k][col] for k in range(self.cols))
                if abs(dot) > 1e-12:
                    result.AddElement(i, col, dot)
        return result

    # нормализация вектора
    def NormalizeVector(vec: 'Matrix') -> 'Matrix':
        norm = math.sqrt(sum(vec.GetElement(i, 0)**2 for i in range(vec.rows)))
        if norm == 0:
            return vec
        result = Matrix(vec.rows, 1)
        for i in range(vec.rows):
            result.AddElement(i, 0, vec.GetElement(i, 0) / norm)
        return result

    def Determinant(self):
        if self.rows != self.cols:
            raise ValueError("Матрица должна быть квадратной")

        n = self.rows
        diag = [0] * n  # Диагональные элементы


        # Копируем исходные данные для изменения
        tmp_data = self.data[:]
        tmp_columns = self.columns[:]
        tmp_row_ptr = self.row_ptr[:]


        sign = 1


        for i in range(n):
            # Поиск диагонального элемента
            diag_elem = None
            for idx in range(tmp_row_ptr[i], tmp_row_ptr[i + 1]):
                if tmp_columns[idx] == i:
                    diag_elem = tmp_data[idx]
                    break

            if diag_elem is None or diag_elem == 0:
                swapped = False
                for j in range(i + 1, n):
                    for idx in range(tmp_row_ptr[j], tmp_row_ptr[j + 1]):
                        if tmp_columns[idx] == i and tmp_data[idx] != 0:
                            # Меняем строки i и j
                            tmp_data,tmp_columns,tmp_row_ptr = self.swap_rows_(tmp_data,tmp_columns,tmp_row_ptr,i,j)
                            swapped = True
                            sign *= -1  # Меняем знак
                            break
                    if swapped:
                        break
                if not swapped:
                    return 0,"нет"

                for idx in range(tmp_row_ptr[i], tmp_row_ptr[i + 1]):
                    if tmp_columns[idx] == i:
                        diag_elem = tmp_data[idx]
                        break


            diag[i] = diag_elem


            for idx in range(tmp_row_ptr[i], tmp_row_ptr[i + 1]):
                tmp_data[idx] /= diag_elem

            # Вычитаем строку i из всех последующих строк
            for j in range(i + 1, n):
                factor = None
                for idx in range(tmp_row_ptr[j], tmp_row_ptr[j + 1]):
                    if tmp_columns[idx] == i:
                        factor = tmp_data[idx]
                        tmp_data[idx] = 0
                        break

                if factor is not None:
                    # Вычитаем строки с учетом коэффициента
                    for idx in range(tmp_row_ptr[i], tmp_row_ptr[i + 1]):
                        col = tmp_columns[idx]
                        value = tmp_data[idx]
                        for k in range(tmp_row_ptr[j], tmp_row_ptr[j + 1]):
                            if tmp_columns[k] == tmp_columns[idx]:
                                tmp_data[k] -= factor * value
                                break
                        else:
                            # Элемент не найден
                            start = tmp_row_ptr[j]
                            end = tmp_row_ptr[j + 1]

                            # Поиск позиции
                            insert_pos = start
                            while insert_pos < end and tmp_columns[insert_pos] < col:
                                insert_pos += 1

                            # Вставляем новый элемент
                            tmp_data.insert(insert_pos, -factor * value)
                            tmp_columns.insert(insert_pos, col)

                            # Обновляем row_ptr
                            for k in range(j + 1, len(tmp_row_ptr)):
                                tmp_row_ptr[k] += 1

        det = sign
        for i in diag:
            det *= i

        return det


---

# EASY LEVEL

---



## Реализовать метод Гаусса для решения СЛАУ

* В приведенной реализации мы сначала
копируем матрицу A и вектор b, чтобы не менять их напрямую.
* Затем выполняется прямой ход: для каждого столбца выбираем максимальный по модулю элемент в качестве опорного (для численной устойчивости и чтобы избежать деления на ноль), при необходимости меняем строки.
* Далее опорную строку масштабируем так, чтобы опорный элемент стал равен 1, и с её помощью зануляем элементы ниже в том же столбце.
* После окончания прямого хода матрица превращается в верхнюю треугольную (в M выше главной диагонали содержатся коэффициенты, ниже – нули).
* Затем выполняется обратный ход: начиная с последней строки, рассчитываем значения неизвестных с учётом уже найденных на предыдущих шагах.
* Функция возвращает список базисных векторов решения системы.

In [2]:
from typing import List

def gauss_solver(A: 'Matrix', b: 'Matrix') -> List['Matrix']:

    # Проверка размерности матриц
    if A.rows != A.cols:
        raise ValueError("Матрица A должна быть квадратной")
    if A.rows != b.rows or b.cols != 1:
        raise ValueError("Несовместимые размеры матриц A и b")

    n = A.rows

    # Преобразуем матрицы к плотному виду
    dense_A = A.ToRegularMatrix()
    dense_b = [row[0] for row in b.ToRegularMatrix()]

    # Создаем расширенную матрицу [A|b]
    aug = [row.copy() + [dense_b[i]] for i, row in enumerate(dense_A)]

    # Прямой ход метода Гаусса
    for col in range(n):
        # Выбор главного элемента (для устойчивости)
        max_row = max(range(col, n), key=lambda r: abs(aug[r][col]))
        aug[col], aug[max_row] = aug[max_row], aug[col]

        # Проверка на вырожденность
        if abs(aug[col][col]) < 1e-10:
            # Проверяем на несовместность
            if abs(aug[col][-1]) > 1e-10:
                raise ValueError("Система несовместна")
            continue

        # Нормализация текущей строки
        pivot = aug[col][col]
        aug[col] = [x / pivot for x in aug[col]]

        # Обнуление элементов ниже
        for row in range(col + 1, n):
            factor = aug[row][col]
            aug[row] = [a - factor * b for a, b in zip(aug[row], aug[col])]

    # Обратный ход
    solution = [0.0] * n
    for row in reversed(range(n)):
        # Ищем первый ненулевой элемент в строке
        pivot_col = next((c for c in range(n) if abs(aug[row][c]) > 1e-10), None)

        if pivot_col is not None:
            solution[pivot_col] = aug[row][-1]
            for c in range(pivot_col + 1, n):
                solution[pivot_col] -= aug[row][c] * solution[c]

    # Создаем объект Matrix для решения
    result = Matrix(n, 1)
    for i in range(n):
        result.AddElement(i, 0, solution[i])

    return [result]

## Реализовать функцию центрирования данных

Функция center_data центрирует данные: для каждой колонки вычисляет её среднее и вычитает его из каждого элемента, делая среднее равным нулю.

In [3]:
def center_data(X: 'Matrix') -> 'Matrix':

    # Получаем размеры матрицы
    n = X.rows
    m = X.cols

    # Создаем новую матрицу для результата
    X_centered = Matrix(n, m)

    # Вычисляем средние значения по каждому столбцу
    means = [0.0] * m
    for col in range(m):
        # Суммируем элементы столбца
        col_sum = 0.0
        for row in range(n):
            col_sum += X.GetElement(row, col)
        # Вычисляем среднее
        means[col] = col_sum / n

    # Центрируем данные
    for row in range(n):
        for col in range(m):
            # Вычитаем среднее значение столбца из каждого элемента
            centered_value = X.GetElement(row, col) - means[col]
            X_centered.AddElement(row, col, centered_value)

    return X_centered

##Вычислить матрицу ковариаций

covariance_matrix строит выборочную ковариационную матрицу центрированных данных: перемножает транспонированную матрицу на саму себя, делит на
n−1 и сохраняет симметричный разреженный результат.

In [4]:
# Реализация функции вычисления ковариационной матрицы
def covariance_matrix(X_centered: 'Matrix') -> 'Matrix':

    n, m = X_centered.rows, X_centered.cols
    result = Matrix(m, m)

    # Словарь для ускоренного доступа: col -> {row: value}
    columns_data = [dict() for _ in range(m)]
    for i in range(n):
        for j in range(X_centered.row_ptr[i], X_centered.row_ptr[i + 1]):
            col = X_centered.columns[j]
            val = X_centered.data[j]
            columns_data[col][i] = val

    for i in range(m):
        for j in range(i, m):
            total = 0.0
            # Перемножаем столбцы i и j
            common_rows = set(columns_data[i].keys()) & set(columns_data[j].keys())
            for row in common_rows:
                total += columns_data[i][row] * columns_data[j][row]
            cov = total / (n - 1)
            if abs(cov) > 1e-12:
                result.AddElement(i, j, cov)
                if i != j:
                    result.AddElement(j, i, cov)  # симметрия

    return result


---

# NORMAL LEVEL

---



## Найти собственные значения матрицы С методом бисекции

В find_eigenvalues используется полный перебор: для сетки λ от 0 до max-элемента матрицы ищется изменение знака det(C−λI), а затем корень уточняется методом бисекции до заданной точности.

In [5]:
from typing import List, Tuple

def find_eigenvalues(C: 'Matrix', tol: float = 1e-6) -> List[float]:

    def det_lambda(matrix: 'Matrix', lam: float) -> float:
        n = matrix.rows
        shifted = Matrix(n, n)
        for i in range(n):
            for j in range(n):
                val = matrix.GetElement(i, j)
                if i == j:
                    val -= lam
                if abs(val) > 1e-12:
                    shifted.AddElement(i, j, val)
        det, _ = shifted.Determinant()
        return det

    # Находим грубые границы поиска по собственным значениям (спектральную норму)
    m = C.rows
    row_sums = [sum(abs(C.GetElement(i, j)) for j in range(m)) for i in range(m)]
    lambda_min = -max(row_sums) - 1
    lambda_max = max(row_sums) + 1

    step = 0.5
    found = []
    x = lambda_min
    while x + step <= lambda_max:
        a = x
        b = x + step
        if det_lambda(C, a) * det_lambda(C, b) < 0:
            # Бисекция
            while abs(b - a) > tol:
                mid = (a + b) / 2
                f_mid = det_lambda(C, mid)
                if f_mid == 0:
                    a = b = mid
                    break
                if det_lambda(C, a) * f_mid < 0:
                    b = mid
                else:
                    a = mid
            root = (a + b) / 2
            # Исключение близких корней
            if all(abs(root - ev) > tol for ev in found):
                found.append(root)
        x += step

    return sorted(found)

## Найти собственные векторы матрицы С

In [6]:
from typing import List

def find_eigenvectors(C: 'Matrix', eigenvalues: List[float]) -> List['Matrix']:
    vectors = []
    m = C.rows

    for lam in eigenvalues:
        # Строим матрицу (C - λI)
        A = Matrix(m, m)
        for i in range(m):
            for j in range(C.row_ptr[i], C.row_ptr[i + 1]):
                col = C.columns[j]
                val = C.data[j]
                if i == col:
                    A.AddElement(i, col, val - lam)
                else:
                    A.AddElement(i, col, val)
            # если элемент на диагонали отсутствует
            if C.GetElement(i, i) == 0:
                A.AddElement(i, i, -lam)

        # Правая часть (нулевой вектор)
        b = Matrix(m, 1)

        try:
            solution = gauss_solver(A, b)[0]  # берём первый вектор из списка
            normalized = Matrix.NormalizeVector(solution)
            vectors.append(normalized)
        except ValueError:
            continue

    return vectors


## Вычислить долю объясненной дисперсии

explained_variance_ratio возвращает отношение суммы первых k собственных значений к сумме всех, показывая, какую долю информации описывают первые компоненты.

In [7]:
from typing import List

# Вычисление доли объяснённой дисперсии
def explained_variance_ratio(eigenvalues: List[float], k: int) -> float:

    if k <= 0 or k > len(eigenvalues):
        raise ValueError("Некорректное значение k")

    numerator = sum(eigenvalues[:k])
    denominator = sum(eigenvalues)
    if abs(denominator) < 1e-12:
        raise ValueError("Сумма собственных значений равна нулю")

    return numerator / denominator


---

# HARD LEVEL

---



## Реализовать полный алгоритм РСА

Функция pca объединяет все шаги: центрирование данных, построение ковариационной матрицы, поиск собственных значений и векторов, выбор k главных компонент, проекцию данных и вычисление объяснённой дисперсии.

In [8]:
from typing import Tuple

def pca(X: 'Matrix', k: int) -> Tuple['Matrix', float]:
    # 1. Центрирование данных
    X_centered = center_data(X)

    # 2. Ковариационная матрица через X^T * X
    n = X_centered.rows
    Xt = X_centered.Transpose()
    C = Xt.Dot(X_centered)  # (m×n) * (n×m) = (m×m)
    for i in range(C.rows):
        for j in range(C.row_ptr[i], C.row_ptr[i + 1]):
            C.data[j] /= (n - 1)

    # 3. Собственные значения и векторы
    eigenvals = find_eigenvalues(C)
    eigvecs = find_eigenvectors(C, eigenvals)

    # 4. Упорядочим по убыванию λ
    eig_pairs = sorted(zip(eigenvals, eigvecs), key=lambda pair: -pair[0])
    top_vectors = [Matrix.NormalizeVector(vec) for _, vec in eig_pairs[:k]]

    # 5. Формируем матрицу из собственных векторов: V_k (m×k)
    Vk = Matrix(C.cols, k)
    for j, vec in enumerate(top_vectors):
        for i in range(vec.rows):
            val = vec.GetElement(i, 0)
            if abs(val) > 1e-12:
                Vk.AddElement(i, j, val)

    # 6. Проекция данных: X_proj = X_centered * Vk (n×k)
    X_proj = X_centered.Dot(Vk)

    # 7. Доля объяснённой дисперсии
    gamma = explained_variance_ratio(eigenvals, k)

    return X_proj, gamma


##Визуализировать проекцию данных на первые две главные компоненты

plot_pca_projection создаёт точечный график проекции данных на первые две главные компоненты, подписывает оси и выводит сетку.

In [9]:
from typing import List, Tuple
from matplotlib.figure import Figure
import matplotlib.pyplot as plt

# Визуализация проекции на первые две главные компоненты
def plot_pca_projection(X_proj: 'Matrix') -> Figure:
    if X_proj.cols < 2:
        raise ValueError("Для визуализации требуется как минимум 2 главные компоненты")

    n = X_proj.rows
    x_vals = [X_proj.GetElement(i, 0) for i in range(n)]
    y_vals = [X_proj.GetElement(i, 1) for i in range(n)]

    fig, ax = plt.subplots()
    ax.scatter(x_vals, y_vals, c='blue', edgecolors='k')
    ax.set_xlabel('PC1')
    ax.set_ylabel('PC2')
    ax.set_title('PCA')
    ax.grid(True)

    return fig


## Вычислить среднеквадратическую ошибку восстановления данных

reconstruction_error считает MSE между элементами исходной и восстановленной матриц

In [10]:
from typing import List, Tuple

# Вычисление среднеквадратической ошибки восстановления данных
def reconstruction_error(X_orig: 'Matrix', X_recon: 'Matrix') -> float:

    if X_orig.rows != X_recon.rows or X_orig.cols != X_recon.cols:
        raise ValueError("Размеры матриц не совпадают")

    n, m = X_orig.rows, X_orig.cols
    total = 0.0

    for i in range(n):
        for j in range(m):
            x = X_orig.GetElement(i, j)
            y = X_recon.GetElement(i, j)
            total += (x - y) ** 2

    return total / (n * m)

---
# EXPERT LEVEL
---

##Добавить автоматический выбор числа главных компонент на основе порога объясненной дисперсии

auto_select_k выбирает минимальное число компонент, при котором кумулятивная доля объяснённой дисперсии достигает заданного порога (по умолчанию 95 %).

In [11]:
from typing import List, Tuple

# Автоматический выбор числа главных компонент по порогу объяснённой дисперсии
def auto_select_k(eigenvalues: List[float], threshold: float = 0.95) -> int:

    total = sum(eigenvalues)
    if total == 0:
        return 0

    cumulative = 0.0
    for i, val in enumerate(eigenvalues):
        cumulative += val
        if cumulative / total >= threshold:
            return i + 1

    return len(eigenvalues)

## Обработать пропущенные значения в данных

handle_missing_values заменяет пропуски (NaN) в каждом столбце на среднее значение этого столбца, сохраняя остальные элементы без изменений.

In [12]:
from typing import List, Tuple

# Обработка пропущенных значений в матрице (NaN заменяются средним по столбцу)
def handle_missing_values(X: 'Matrix') -> 'Matrix':

    import math

    n, m = X.rows, X.cols
    filled = Matrix(n, m)

    column_sums = [0.0] * m
    counts = [0] * m

    for i in range(n):
        for j in range(m):
            val = X.GetElement(i, j)
            if not isinstance(val, float) or not math.isnan(val):
                column_sums[j] += val
                counts[j] += 1

    means = [column_sums[j] / counts[j] if counts[j] > 0 else 0.0 for j in range(m)]

    for i in range(n):
        for j in range(m):
            val = X.GetElement(i, j)
            if isinstance(val, float) and math.isnan(val):
                filled.AddElement(i, j, means[j])
            elif val != 0:
                filled.AddElement(i, j, val)

    return filled


## Исследовать влиянение шума на РСА

add_noise_and_compare добавляет гауссов шум к данным и выводит сравнение объяснённой дисперсии PCA на двух главных компонентах до и после зашумления.

In [13]:
from typing import List, Tuple
import random
import math

# Исследование влияния шума на PCA
def add_noise_and_compare(X: 'Matrix', noise_level: float = 0.1):

    n, m = X.rows, X.cols

    # Вычисляем стандартное отклонение
    values = []
    for i in range(n):
        for j in range(m):
            val = X.GetElement(i, j)
            values.append(val)
    mean_val = sum(values) / len(values)
    std_val = math.sqrt(sum((v - mean_val) ** 2 for v in values) / len(values))

    # Создание копии матрицы с шумом
    X_noisy = Matrix(n, m)
    for i in range(n):
        for j in range(m):
            val = X.GetElement(i, j)
            noise = random.gauss(0, std_val * noise_level)
            noisy_val = val + noise
            if abs(noisy_val) > 1e-12:
                X_noisy.AddElement(i, j, noisy_val)

    # Применим PCA до и после
    k = 2
    X_proj_clean, var_clean = pca(X, k)
    X_proj_noisy, var_noisy = pca(X_noisy, k)

    print("PCA до добавления шума: ", round(var_clean, 4))
    print("PCA после добавления шума: ", round(var_noisy, 4))

## Применить РСА к реальному датасету

apply_pca_to_dataset загружает данные, строит разреженную матрицу, выполняет PCA и возвращает результат проекции и explained variance ratio.




In [None]:
from typing import List, Tuple
from sklearn.datasets import load_iris, load_wine, load_digits

def load_dataset(name: str) -> List[List[float]]:

    name = name.lower()
    if name == 'iris':
        data = load_iris()
    elif name == 'wine':
        data = load_wine()
    elif name == 'digits':
        data = load_digits()
    else:
        raise ValueError(f"неизвестный датасет: {name!r}.")
    return data.data.tolist()

def apply_pca_to_dataset(dataset_name: str, k: int) -> Tuple['Matrix', float]:

    # Загрузка и перевод в Matrix
    raw = load_dataset(dataset_name)
    n = len(raw); m = len(raw[0])
    X = Matrix(n, m)
    for i in range(n):
        for j in range(m):
            X.AddElement(i, j, float(raw[i][j]))

    # Обработка пропусков и центровка
    X = handle_missing_values(X)          # заполняет NaN средним по столбцу
    # вычисляем средние до центровки
    dense_orig = X.ToRegularMatrix()
    means = [ sum(dense_orig[i][j] for i in range(n)) / n for j in range(m) ]
    Xc = center_data(X)                   # центрированная матрица


    C = covariance_matrix(Xc)
    eigvals = find_eigenvalues(C)
    eigvecs = find_eigenvectors(C, eigvals)  # список Matrix

    # Выбираем топ-k по убыванию λ и собираем Vk
    idx_sorted = sorted(range(len(eigvals)), key=lambda i: eigvals[i], reverse=True)
    topk = idx_sorted[:k]
    Vk = Matrix(m, k)
    for col, eig_i in enumerate(topk):
        v = eigvecs[eig_i]
        for row in range(m):
            Vk.AddElement(row, col, v.GetElement(row, 0))

    # Проекция и восстановление
    X_proj = Xc.Dot(Vk)
    Xc_recon = X_proj.Dot(Vk.Transpose())
    dense_recon = Xc_recon.ToRegularMatrix()
    # добавляем обратно means
    recon = [ [ dense_recon[i][j] + means[j]
                for j in range(m) ]
              for i in range(n) ]

    mse = sum( (raw[i][j] - recon[i][j])**2
               for i in range(n) for j in range(m) ) / (n*m)

    return X_proj, mse
