In [None]:
from typing import List, Tuple, Dict
from dataclasses import dataclass


def transpose(M: List[List[float]]) -> List[List[float]]:
    rows = len(M)
    cols = len(M[0])
    result = [[0.0] * rows for _ in range(cols)]
    for i in range(rows):
        for j in range(cols):
            result[j][i] = M[i][j]
    return result


def add_intercept(X: List[List[float]]) -> List[List[float]]:
    rows = len(X)
    cols = len(X[0])
    result = [[0.0] * (cols + 1) for _ in range(rows)]
    for i in range(rows):
        result[i][0] = 1.0
        for j in range(cols):
            result[i][j + 1] = X[i][j]
    return result


def to_column_matrix(v: List[float]) -> List[List[float]]:
    return [[val] for val in v]

def from_column_matrix(M: List[List[float]]) -> List[float]:
    assert len(M[0]) == 1, "Ожидается матрица-столбец (p×1)"
    return [row[0] for row in M]

def multiply_map_reduce(A: List[List[float]], B: List[List[float]]) -> List[List[float]]:
    rows_a = len(A)
    cols_a = len(A[0])
    cols_b = len(B[0])
    
    def map_phase() -> List[Tuple[Tuple[int, int], float]]:
        mapped = []
        for i in range(rows_a):
            for j in range(cols_b):
                for k in range(cols_a):
                    key = (i, j)
                    value = A[i][k] * B[k][j]
                    mapped.append((key, value))
        return mapped
    
    def shuffle_phase(mapped: List[Tuple[Tuple[int, int], float]]) -> Dict[Tuple[int, int], List[float]]:
        grouped: Dict[Tuple[int, int], List[float]] = {}
        for key, value in mapped:
            if key not in grouped:
                grouped[key] = []
            grouped[key].append(value)
        return grouped
    
    def reduce_phase(grouped: Dict[Tuple[int, int], List[float]]) -> List[List[float]]:
        result = [[0.0] * cols_b for _ in range(rows_a)]
        for (i, j), values in grouped.items():
            result[i][j] = sum(values)
        return result
    
    # Выполнение MapReduce pipeline
    mapped = map_phase()
    grouped = shuffle_phase(mapped)
    result = reduce_phase(grouped)
    
    return result

def solve_linear_system(A: List[List[float]], b: List[float]) -> List[float]:
    """
    Решение системы Ax = b методом Гаусса с частичным выбором ведущего элемента
    """
    n = len(A)
    assert all(len(row) == n for row in A), "A должна быть квадратной n×n"
    assert len(b) == n, "Размер b должен быть n"
    
    a = [[A[i][j] if j < n else b[i] for j in range(n + 1)] for i in range(n)]

    for col in range(n):
        pivot = col
        best = abs(a[pivot][col])
        for row in range(col + 1, n):
            v = abs(a[row][col])
            if v > best:
                best = v
                pivot = row
        
        assert best != 0.0, "Матрица вырождена или близка к сингулярной"

        if pivot != col:
            a[col], a[pivot] = a[pivot], a[col]

        lead = a[col][col]
        for j in range(col, n + 1):
            a[col][j] /= lead
        
        # Обнулить элементы ниже
        for row in range(col + 1, n):
            factor = a[row][col]
            if factor == 0.0:
                continue
            for j in range(col, n + 1):
                a[row][j] -= factor * a[col][j]
    
    # Обратный ход
    x = [0.0] * n
    for row in range(n - 1, -1, -1):
        sum_val = a[row][n]
        for j in range(row + 1, n):
            sum_val -= a[row][j] * x[j]
        x[row] = sum_val
    
    return x

@dataclass
class LinearRegressionResult:
    weights: List[float]
    intercept: bool
    
    def __eq__(self, other):
        if not isinstance(other, LinearRegressionResult):
            return False
        return self.intercept == other.intercept and self.weights == other.weights
    
    def __hash__(self):
        return hash((tuple(self.weights), self.intercept))


def linear_regression_normal_eq(X_raw: List[List[float]], y: List[float], intercept: bool = True) -> LinearRegressionResult:
    assert len(X_raw) == len(y), \
        f"Число строк X ({len(X_raw)}) должно равняться длине y ({len(y)})"
    
    X = add_intercept(X_raw) if intercept else X_raw
    
    Xt = transpose(X)
    XtX = multiply_map_reduce(Xt, X)
    Y_col = to_column_matrix(y)
    XtY_col = multiply_map_reduce(Xt, Y_col)
    XtY = from_column_matrix(XtY_col)
    
    w = solve_linear_system(XtX, XtY)
    return LinearRegressionResult(weights=w, intercept=intercept)


if __name__ == "__main__":
    X_raw = [
        [1.0, 2.0],
        [2.0, 0.0],
        [3.0, 1.0],
        [4.0, 3.0],
        [5.0, 2.0]
    ]
    
    y = [6.1, 7.0, 9.4, 12.6, 13.1]
    
    model = linear_regression_normal_eq(X_raw, y, intercept=True)
    w = model.weights
    
    print("Линейная регрессия:")
    print(f"w0 (intercept) = {w[0]:.4f}")
    print(f"w1 = {w[1]:.4f}, w2 = {w[2]:.4f}")
    
    x_test = [6.0, 1.0]
    y_hat = w[0] + w[1] * x_test[0] + w[2] * x_test[1]
    print(f"Прогноз для набора признаков: x1 = 6.0, x2 = 1.0: ŷ = {y_hat:.4f}")

Линейная регрессия:
w0 (intercept) = 3.3367
w1 = 1.7786, w2 = 0.6047
Прогноз для набора признаков: x1 = 6.0, x2 = 1.0: ŷ = 14.6130
