<h2>Implementacja modelu HCRNN do postaci Neurona</h2>

Artykuł naukowy:
Instniejace, podobne rozwiązania:

<h3>Narzędzia do kontroli jakości kodu:</h3>
<ol>
<li> isort do sortowania importowanych modułów - https://pycqa.github.io/isort/</li>
<li> mypy do konwersji dynamicznych metod w statyczne - https://mypy.readthedocs.io/en/stable/getting_started.html</li>
<li> unitest do testowania kodu</li>
<li> sphinx do tworzenia dokumentacji - https://www.sphinx-doc.org/en/master/usage/quickstart.html</li>
<li> flake8 do kontroli stylu - https://flake8.pycqa.org/en/latest/index.html#quickstart</li>
<li> poetry do pakowania i wdrożenia apkietó CI/CD (opcjonalnie) - https://python-poetry.org/</li>
<li> Coverage do testowania wadliwego kodu lub pomijanego kodu - https://coverage.readthedocs.io/en/7.10.1/</li>
<li> hypothesis do Property-Based testing - https://hypothesis.readthedocs.io/en/latest/</li>
<li> cProfile do testowania wydajności kodu - https://www.machinelearningplus.com/python/cprofile-how-to-profile-your-python-code/</li>
</ol> 

In [100]:
import mypy
import numpy as np
import math, os
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch import Tensor
import hypothesis

In [101]:
__all__ = [
    'HCRNN_Neuron'
]

In [102]:
from torch.nn import Module

'''
Klasa rozszerzająca funkcje torch'a.
'''

class HCRNN_Neuron(nn.Module):

    '''
    definicja stałych określająca rozmiar wejścia i wyjścia dla liczb neuronów.
    '''

    __constants__ = ['in_features', 'out_features']
    in_features: int
    out_features: int

    def __init__(
        self,
        *,
        in_features: int,
        out_features: int,

        #definicja urządzenia, CPU, GPU lub TPU
        device = None,
        dtype= None,

    ) -> None:
        factory_kwargs = {"device": device, "dtype": dtype}
        super().__init__()

    #Tensor wchodzi i zawsze Tensor wychodzi
    def forward(self, input:Tensor) -> Tensor:
        pass

<h2>Normalization</h2>

In [103]:
class CDFNorm(nn.Module):
    def __init__(self, method='gaussian', unbiased=True, eps=1e-5, affine=False, track_running_stats=True) -> None:
        """
        Normalizacja CDF (dystrybuanty).

        Parametry:
            method: metoda normalizacji ('gaussian' lub 'empirical')
            unbiased: czy użyć nieobciążonego estymatora wariancji
            eps: mała wartość dla stabilności numerycznej
            affine: czy zastosować transformację afiniczną
            track_running_stats: czy śledzić statystyki podczas uczenia
        """
        super().__init__()
        self.method = method
        self.unbiased = unbiased
        self.eps = eps
        self.affine = affine
        self.track_running_stats = track_running_stats

        if self.affine:
            self.weight = nn.Parameter(torch.ones(1))  # Parametr skalujący
            self.bias = nn.Parameter(torch.zeros(1))    # Parametr przesunięcia

        if self.track_running_stats:
            # Rejestracja buforów dla średniej i wariancji
            self.register_buffer('running_mean', torch.zeros(1))
            self.register_buffer('running_var', torch.ones(1))
            self.register_buffer('num_batches_tracked', torch.tensor(0, dtype=torch.long))

    def _gaussian_transform(self, x):
        """Transformacja Gaussa - normalizacja przy użyciu CDF rozkładu normalnego."""
        if self.training and self.track_running_stats:
            # Obliczanie statystyk podczas uczenia
            mean = x.mean()
            var = x.var(unbiased=self.unbiased)
            with torch.no_grad():
                # Aktualizacja średniej kroczącej
                self.running_mean = (1 - 0.1) * self.running_mean + 0.1 * mean
                # Aktualizacja wariancji kroczącej
                self.running_var = (1 - 0.1) * self.running_var + 0.1 * var
                self.num_batches_tracked += 1
        else:
            # Użycie zapisanych statystyk podczas ewaluacji
            mean = self.running_mean
            var = self.running_var

        # Obliczenie CDF przy użyciu funkcji błędu
        x_norm = 0.5 * (1 + torch.erf((x - mean) / (torch.sqrt(var + self.eps) * math.sqrt(2))))

        if self.affine:
            # Transformacja afiniczną
            x_norm = x_norm * self.weight + self.bias

        return x_norm
    
    ''' błędna funkcja
    def _empirical_transform(self, x):
        """Empiryczna transformacja CDF na podstawie rang."""
        x_norm = torch.zeros_like(x)
        for i in range(len(x)):
            # Obliczenie rangi dla każdego elementu
            x_norm[i] = (x < x[i]).float().mean()

        if self.affine:
            # Transformacja afiniczną
            x_norm = x_norm * self.weight + self.bias

        return x_norm
        '''
    def _empirical_transform(self, x):
        """
        Empiryczna CDF po ostatnim wymiarze: U = rank / n
        Zwraca wartości w [1/n, 1], zgodnie z testem jednostkowym.
        Obsługuje kształty (..., N).
        """
        # argsort(argsort(x)) -> rangi 0..n-1 po ostatnim wymiarze
        order = torch.argsort(x, dim=-1, stable=True)
        ranks0 = torch.argsort(order, dim=-1).to(torch.float32)
        n = x.size(-1)
        u = (ranks0 + 1.0) / float(n)  # 1/n .. 1
        if self.affine:
            u = u * self.weight + self.bias
        return u.to(x.dtype)


    def forward(self, x):
        """
        Przebieg forward normalizacji CDF.

        Parametry:
            x: tensor wejściowy

        Zwraca:
            Znormalizowany tensor w przedziale [0,1]
        """
        if self.method == 'gaussian':
            return self._gaussian_transform(x)
        elif self.method == 'empirical':
            return self._empirical_transform(x)
        else:
            raise ValueError(f"Niewspierana metoda normalizacji: {self.method}")

Baza ortonormalnych wielomianów Legendre'a

In [104]:
class OrthonormalLegendreBasis(nn.Module):
    def __init__(self, max_degree: int) -> None:
        """
        Implementacja ortonormalnych wielomianów Legendre'a na przedziale [0,1].

        Parametry:
            max_degree: maksymalny stopień wielomianu (0 do 3)
        """
        super().__init__()
        self.max_degree = max_degree

        # Współczynniki wielomianów Legendre'a przesuniętych na przedział [0,1]
        # Każdy wiersz odpowiada kolejnemu wielomianowi (P0, P1, P2, P3)
        self.register_buffer('legendre_coeffs', torch.tensor([
            [1, 0, 0, 0],        # P0(x) = 1
            [-1, 2, 0, 0],       # P1(x) = 2x - 1
            [1, -6, 6, 0],       # P2(x) = 6x² - 6x + 1
            [-1, 12, -30, 20]    # P3(x) = 20x³ - 30x² + 12x - 1
        ], dtype=torch.float32))

    def forward(self, x: Tensor) -> Tensor:
        x = x.float().clamp(0, 1)
        # Usuń zbędny wymiar kanału, np. (B,1) -> (B)
        if x.dim() > 0 and x.size(-1) == 1:
            x = x.squeeze(-1)
        # Wielomiany przesunięte na [0,1] (masz coeffs dla 1, 2x-1, 6x^2-6x+1, 20x^3-30x^2+12x-1)
        powers = torch.stack([x**i for i in range(self.legendre_coeffs.size(1))], dim=-1)  # (..., 4)
        legendre = torch.einsum('...i,ji->...j', powers, self.legendre_coeffs)             # (..., 4)
        # Ortonormalizacja na [0,1]: φ_n(x) = √(2n+1) * \tilde P_n(x)
        n = torch.arange(0, self.max_degree + 1, device=x.device, dtype=legendre.dtype)
        scale = torch.sqrt(2.0 * n + 1.0)
        return legendre[..., :self.max_degree+1] * scale


<h2>Joint Distribution</h2>
Uogólniona funkcja dla wymiarów 2 i 3.

In [105]:
class JointDistribution(nn.Module):
    def __init__(self, dim:int, basis_size:int=4):
        super().__init__()
        self.dim = dim
        self.basis_size = basis_size
        self.coeffs = nn.Parameter(torch.zeros(*(basis_size for _ in range(dim))))
        self.basis = OrthonormalLegendreBasis(basis_size - 1)

    def forward(self, *inputs: Tensor) -> Tensor:
        # Konwersja wszystkich inputów do tensora
        processed_inputs = []
        for x in inputs:
            if not isinstance(x, torch.Tensor):
                x = torch.tensor(x, dtype=torch.float32)
            if x.dim() == 0:
                x = x.unsqueeze(0)
            processed_inputs.append(x.float())

        basis_values = [self.basis(x) for x in processed_inputs]

        if self.dim == 2:
            out = torch.einsum('...i,...j,ij->...', basis_values[0], basis_values[1], self.coeffs)
        elif self.dim == 3:
            out = torch.einsum('...i,...j,...k,ijk->...', basis_values[0], basis_values[1],basis_values[2], self.coeffs)
        else:
            raise ValueError(f"Invalid dimensionality: {self.dim}. Supported: 2 or 3.")

        # jeśli input był skalarem, zwróć skalar
        return out.squeeze()

<h2>Estymacja średnich</h2>

In [106]:
class MeanEstimation(nn.Module):
    def __init__(self,
                 *,
                 triplets,
                 feature_fn,
                 feature_dm
                 ):
        super().__init__()
        self.triplets = triplets
        self.feature_fn = feature_fn
        self.feature_dm = feature_dm

    def compute_tensor_mean(self) -> Tensor:
        """
        Parametry:
            triplets: array (x, y, z)
            feature_fn: funckaj mapująca
            feature_dm: wymiary D
        """
        a = np.zeros((self.feature_dm, self.feature_dm, self.feature_dm))

        for (x, y, z) in self.triplets:
            fx = self.feature_fn(x)
            fy = self.feature_fn(y)
            fz = self.feature_fn(z)

            outer = np.einsum('i,j,k->ijk', fx, fy, fz)

            a += outer

        a /= len(self.triplets)  # Normalizacja na trójkach
        return a

<h2>Estymacja warunkowa</h2>

In [107]:
class ConditionalEstimation(nn.Module):
    def __init__(self,
                 *,
                 x_candidates,
                 y,
                 z,
                 a,
                 feature_fn) -> None:
        super().__init__()
        self.x_candidates = x_candidates
        self.y = y
        self.z = z
        self.a = a
        self.feature_fn = feature_fn

    def conditional_score(self):

        D = len(self.a)
        fy = self.feature_fn(self.y)
        fz = self.feature_fn(self.z)

        denominator = 0
        for j in range(D):
            for k in range(D):
                denominator += self.a[0, j, k] * fy[j] * fz[k]

        scores = []
        for x in self.x_candidates:
            fx = self.feature_fn(x)

            score = 0
            for i in range(D):
                context_sum = 0
                for j in range(D):
                    for k in range(D):
                        context_sum += self.a[i, j, k] * fy[j] * fz[k]
                score += fx[i] * (context_sum / (denominator + 1e-8)) #uniknięcie dzielenia przez zero

            scores.append(score)

        return scores

<h2>Propagacja Estymacji</h2>

In [108]:
class PropagationEstimation(nn.Module):
    def __init__(self,
                 *,
                 y,
                 z,
                 a,
                 feature_fn):
        super().__init__()
        self.y = y
        self.z = z
        self.a = a
        self.feature_fn = feature_fn

    def propagate_expectation(self):
        fy = self.feature_fn(self.y).float().view(-1)
        fz = self.feature_fn(self.z).float().view(-1)

        numerator = torch.einsum('jk,j,k->', self.a[1], fy, fz)
        denominator = torch.einsum('jk,j,k->', self.a[0], fy, fz)

        ratio = numerator / (denominator + 1e-8)
        
        #przesunięcie bazy
        centered_ratio = ratio - 1.0

        propagated = 0.5 + (1.0 / (2.0 * torch.sqrt(torch.tensor(3.0)))) * centered_ratio
        
        return propagated

<h2>Entropia</h2>

In [109]:
class EntropyAndMutualInformation(nn.Module):

    def approximate_entropy(self, activations):

        # Normalizacja prawdopodobieństw funkcji aktywacji
        probs = F.softmax(activations, dim=1)
        entropy = -torch.sum(probs ** 2, dim=1).mean()
        return entropy

    def approximate_mutual_information(self, act_X, act_Y):

        # Normalizacja funkcji aktywacji
        probs_X = F.softmax(act_X, dim=1)
        probs_Y = F.softmax(act_Y, dim=1)

        joint_probs = torch.bmm(probs_X.unsqueeze(2), probs_Y.unsqueeze(1))

        mi = torch.sum(joint_probs ** 2, dim=(1,2)).mean()
        return mi

<h2>Dynamicznie modyfikowany model za pomocą EMA</h2>

In [110]:
class DynamicEMA(nn.Module):
    def __init__(self, x, y, z, ema_lambda) -> None:
        super().__init__()
        self.x = x
        self.y = y
        self.z = z
        self.ema_lambda = ema_lambda
        self.a = torch.zeros_like(torch.einsum('i,j,k->ijk', x, y, z))  # pusty tensor o rozmiarze sumy einstein'a

    def EMAUpdateMethod(self):
        def f_i(x): return x
        def f_j(y): return y
        def f_k(z): return z

        update_tensor = torch.einsum('i,j,k->ijk', f_i(self.x), f_j(self.y), f_k(self.z))

        self.a = (1 - self.ema_lambda) * self.a + self.ema_lambda * update_tensor

        return self.a

<h2>Optymizacja bazy</h2>

In [111]:
class BaseOptimization(nn.Module):
    def __init__(self,
                 *,
                 a, #tensor do optymalizacji
                 ) -> None:
        self. a = a

    def optimization_early(self) -> Tensor:
        M = self.a.reshape(len(self.a[0]), -1)

        # Obliczenie SVD
        U, S, Vh = torch.linalg.svd(M, full_matrices=False)

        # Transformacja Bazy, tu przykładowa funkcja, do wymiany
        def f_x(x):
            return torch.sin(x * torch.linspace(0, 1, len(self.a[2])))

        # nowa baza g_i(x) = sum_j v_ij * f_j(x)
        def g_i(x, U):
            f = f_x(x)
            return torch.matmul(U.T, f)

        # Step 4: Transformacja Tensora
        new_a = torch.einsum('li,ljk->ijk', U.T, self.a)

        return new_a

<h2>Information bottleneck</h2>

In [112]:
class InformationBottleneck(nn.Module):
    def __init__(self, beta=1.0):
        super().__init__()
        self.beta = beta

    def forward(self, X_features, Y_features):
        """Implementuje równanie (15) z artykułu"""
        C_X = X_features @ X_features.T
        C_Y = Y_features @ Y_features.T
        return torch.trace(C_X @ C_Y)

    def bottleneck_loss(self, X_features, T_features, Y_features):
        """Implementuje równanie (10) z artykułu"""
        I_XT = self(X_features, T_features)
        I_TY = self(T_features, Y_features)
        return I_XT - self.beta * I_TY

In [114]:
# === Wszystkie testy zakończyły się sukcesem ===

import unittest
import torch
import math
import torch.nn.functional as F

# === Testy CDFNorm ===
class CDFNormTest(unittest.TestCase):
    def test_gaussian_transform(self):
        cdf_norm = CDFNorm(method='gaussian')
        input_tensor = torch.randn(100) * 2 + 5
        output = cdf_norm(input_tensor)

        self.assertTrue(torch.all(output >= 0))
        self.assertTrue(torch.all(output <= 1))
        self.assertAlmostEqual(output.mean().item(), 0.5, delta=0.1)

    def test_empirical_transform(self):
        cdf_norm = CDFNorm(method='empirical')
        input_tensor = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0])
        output = cdf_norm(input_tensor)

        expected = torch.tensor([0.2, 0.4, 0.6, 0.8, 1.0])
        self.assertTrue(torch.allclose(output, expected, atol=1e-5))

# === Testy PropagationEstimation ===
class PropagationEstimationTest(unittest.TestCase):
    def test_propagate_expectation(self):
        a = torch.zeros(2, 2, 2)
        a[0,0,0] = 1.0
        a[1,0,0] = 1.0
        def simple_feature_fn(x): return torch.tensor([1.0, 0.0])
        propagator = PropagationEstimation(y=torch.tensor(1.0), z=torch.tensor(1.0), a=a, feature_fn=simple_feature_fn)
        propagated = propagator.propagate_expectation()
        expected = 0.5
        self.assertAlmostEqual(propagated.item(), expected, delta=1e-5)

# === Testy OrthonormalLegendreBasis ===
class OrthonormalLegendreBasisTest(unittest.TestCase):
    def test_basis_functions(self):
        basis = OrthonormalLegendreBasis(max_degree=3)
        x = torch.linspace(0, 1, 5)
        output = basis(x)
        self.assertEqual(output.shape, (5, 4))

    def test_normalization(self):
        basis = OrthonormalLegendreBasis(max_degree=3)
        x = torch.linspace(0, 1, 1000)  # więcej punktów dla lepszej dokładności
        basis_values = basis(x)
        for i in range(4):
            integral = torch.trapz(basis_values[:,i] * basis_values[:,i], x)
            self.assertAlmostEqual(integral.item(), 1.0, delta=0.05)  # poprawiona tolerancja

# === Testy JointDistribution ===
class JointDistributionTest(unittest.TestCase):
    def test_2d_distribution(self):
        joint_dist = JointDistribution(dim=2)
        joint_dist.coeffs.data.fill_(0)
        joint_dist.coeffs.data[0,0] = 1.0
        x = torch.rand(10, 1)
        y = torch.rand(10, 1)
        output = joint_dist(x, y)
        self.assertEqual(output.shape, (10,))
        self.assertTrue(torch.all(output >= 0))

    def test_3d_distribution(self):
        joint_dist = JointDistribution(dim=3)
        joint_dist.coeffs.data.fill_(0)
        joint_dist.coeffs.data[0,0,0] = 1.0
        x = torch.rand(5, 1)
        y = torch.rand(5, 1)
        z = torch.rand(5, 1)
        output = joint_dist(x, y, z)
        self.assertEqual(output.shape, (5,))
        self.assertTrue(torch.all(output >= 0))

# === Testy Estimation ===
class EstimationTest(unittest.TestCase):
    def test_tensor_mean(self):
        triplets = [(torch.tensor([1.0]), torch.tensor([2.0]), torch.tensor([3.0]))] * 10
        def simple_feature_fn(x): return torch.tensor([x.item(), x.item()**2])
        estimator = MeanEstimation(triplets=triplets, feature_fn=simple_feature_fn, feature_dm=2)
        tensor_mean = estimator.compute_tensor_mean()
        self.assertEqual(tensor_mean.shape, (2, 2, 2))
        self.assertGreater(tensor_mean.sum().item(), 0)

# === Testy ConditionalEstimation ===
class ConditionalEstimationTest(unittest.TestCase):
    def test_conditional_score(self):
        a = torch.zeros(2, 2, 2)
        a[0,0,0] = 1.0
        a[1,1,1] = 2.0
        def simple_feature_fn(x): return torch.tensor([x.item(), x.item()**2])
        estimator = ConditionalEstimation(
            x_candidates=[torch.tensor(1.0), torch.tensor(2.0)],
            y=torch.tensor(1.0),
            z=torch.tensor(1.0),
            a=a,
            feature_fn=simple_feature_fn
        )
        scores = estimator.conditional_score()
        self.assertEqual(len(scores), 2)
        self.assertNotEqual(scores[0], scores[1])

# === Testy DynamicEMA ===
class DynamicEMATest(unittest.TestCase):
    def test_ema_update(self):
        x = torch.tensor([1.0])
        y = torch.tensor([1.0])
        z = torch.tensor([1.0])
        ema = DynamicEMA(x=x, y=y, z=z, ema_lambda=0.1)
        updated_a = ema.EMAUpdateMethod()
        expected = torch.tensor([[[1.0]]]) * ema.ema_lambda
        self.assertTrue(torch.allclose(updated_a, expected, atol=1e-5))

# === Testy BaseOptimization ===
class BaseOptimizationTest(unittest.TestCase):
    def test_basis_optimization(self):
        a = torch.eye(2).unsqueeze(0).repeat(2, 1, 1)
        optimizer = BaseOptimization(a=a)
        new_a = optimizer.optimization_early()
        self.assertEqual(new_a.shape, a.shape)
        self.assertFalse(torch.allclose(new_a, a, atol=1e-5))

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)



test_basis_optimization (__main__.BaseOptimizationTest.test_basis_optimization) ... ok
test_empirical_transform (__main__.CDFNormTest.test_empirical_transform) ... ok
test_gaussian_transform (__main__.CDFNormTest.test_gaussian_transform) ... ok
test_conditional_score (__main__.ConditionalEstimationTest.test_conditional_score) ... ok
test_ema_update (__main__.DynamicEMATest.test_ema_update) ... ok
test_tensor_mean (__main__.EstimationTest.test_tensor_mean) ... ok
test_2d_distribution (__main__.JointDistributionTest.test_2d_distribution) ... ok
test_3d_distribution (__main__.JointDistributionTest.test_3d_distribution) ... ok
test_basis_functions (__main__.OrthonormalLegendreBasisTest.test_basis_functions) ... ok
test_normalization (__main__.OrthonormalLegendreBasisTest.test_normalization) ... ok
test_propagate_expectation (__main__.PropagationEstimationTest.test_propagate_expectation) ... ok

----------------------------------------------------------------------
Ran 11 tests in 0.015s

O

### ===============================================================
### RAPORT TESTÓW JEDNOSTKOWYCH: HCRNN v0.1
### STATUS:
### - 9/11 testów POPRAWNE
### - 1/11 testów NIEUDANE (assert failures)
### - 1/11 testów BŁĘDY WYKONANIA (exceptions)
### SZCZEGÓŁY:
### ------------------------------------------------------------------
#### [POPRAWNE]
####   BaseOptimization: test_basis_optimization
####   CDFNorm: test_gaussian_transform
####   OrthonormalLegendreBasis: test_basis_functions
###    CDFNorm: test_empirical_transform
#### [NIEUDANE]
#### 1. OrthonormalLegendreBasis: test_normalization
####    - Całka = 0.5 (oczekiwano 1.0 ±0.05)
####    - Wymagana weryfikacja:
####      * Obliczenia wag
####      * Normalizacja funkcji bazowych
#### [BŁĘDY]
### TODO:
### 1. Weryfikacja normalizacji funkcji bazowych
### ===============================================================
