<h2>Implementacja modelu HCRNN do postaci Neurona</h2>

Artykuł naukowy:
Instniejace, podobne rozwiązania:

In [1]:
import numpy as np
import math, os
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch import Tensor

In [2]:
__all__ = [
    'HCRNN_Neuron'
]

In [3]:
from torch.nn import Module

'''
Klasa rozszerzająca funkcje torch'a.
'''

class HCRNN_Neuron(nn.Module):

    '''
    definicja stałych określająca rozmiar wejścia i wyjścia dla liczb neuronów.
    '''

    __constants__ = ['in_features', 'out_features']
    in_features: int
    out_features: int

    def __init__(
        self,
        *,
        in_features: int,
        out_features: int,

        #definicja urządzenia, CPU, GPU lub TPU
        device = None,
        dtype= None,

    ) -> None:
        factory_kwargs = {"device": device, "dtype": dtype}
        super().__init__()

    #Tensor wchodzi i zawsze Tensor wychodzi
    def forward(self, input:Tensor) -> Tensor:
        pass

<h2>Normalization</h2>

In [4]:
class CDFNorm(nn.Module):
    def __init__(self, method='gaussian', unbiased=True, eps=1e-5, affine=False, track_running_stats=True):
        """
        Normalizacja CDF (dystrybuanty).

        Parametry:
            method: metoda normalizacji ('gaussian' lub 'empirical')
            unbiased: czy użyć nieobciążonego estymatora wariancji
            eps: mała wartość dla stabilności numerycznej
            affine: czy zastosować transformację afiniczną
            track_running_stats: czy śledzić statystyki podczas uczenia
        """
        super().__init__()
        self.method = method
        self.unbiased = unbiased
        self.eps = eps
        self.affine = affine
        self.track_running_stats = track_running_stats

        if self.affine:
            self.weight = nn.Parameter(torch.ones(1))  # Parametr skalujący
            self.bias = nn.Parameter(torch.zeros(1))    # Parametr przesunięcia

        if self.track_running_stats:
            # Rejestracja buforów dla średniej i wariancji
            self.register_buffer('running_mean', torch.zeros(1))
            self.register_buffer('running_var', torch.ones(1))
            self.register_buffer('num_batches_tracked', torch.tensor(0, dtype=torch.long))

    def _gaussian_transform(self, x):
        """Transformacja Gaussa - normalizacja przy użyciu CDF rozkładu normalnego."""
        if self.training and self.track_running_stats:
            # Obliczanie statystyk podczas uczenia
            mean = x.mean()
            var = x.var(unbiased=self.unbiased)
            with torch.no_grad():
                # Aktualizacja średniej kroczącej
                self.running_mean = (1 - 0.1) * self.running_mean + 0.1 * mean
                # Aktualizacja wariancji kroczącej
                self.running_var = (1 - 0.1) * self.running_var + 0.1 * var
                self.num_batches_tracked += 1
        else:
            # Użycie zapisanych statystyk podczas ewaluacji
            mean = self.running_mean
            var = self.running_var

        # Obliczenie CDF przy użyciu funkcji błędu
        x_norm = 0.5 * (1 + torch.erf((x - mean) / (torch.sqrt(var + self.eps) * math.sqrt(2))))

        if self.affine:
            # Transformacja afiniczną
            x_norm = x_norm * self.weight + self.bias

        return x_norm

    def _empirical_transform(self, x):
        """Empiryczna transformacja CDF na podstawie rang."""
        x_norm = torch.zeros_like(x)
        for i in range(len(x)):
            # Obliczenie rangi dla każdego elementu
            x_norm[i] = (x < x[i]).float().mean()

        if self.affine:
            # Transformacja afiniczną
            x_norm = x_norm * self.weight + self.bias

        return x_norm

    def forward(self, x):
        """
        Przebieg forward normalizacji CDF.

        Parametry:
            x: tensor wejściowy

        Zwraca:
            Znormalizowany tensor w przedziale [0,1]
        """
        if self.method == 'gaussian':
            return self._gaussian_transform(x)
        elif self.method == 'empirical':
            return self._empirical_transform(x)
        else:
            raise ValueError(f"Niewspierana metoda normalizacji: {self.method}")

Baza ortonormalnych wielomianów Legendre'a

In [5]:
class OrthonormalLegendreBasis(nn.Module):
    def __init__(self, max_degree: int):
        """
        Implementacja ortonormalnych wielomianów Legendre'a na przedziale [0,1].

        Parametry:
            max_degree: maksymalny stopień wielomianu (0 do 3)
        """
        super().__init__()
        self.max_degree = max_degree

        # Współczynniki wielomianów Legendre'a przesuniętych na przedział [0,1]
        # Każdy wiersz odpowiada kolejnemu wielomianowi (P0, P1, P2, P3)
        self.register_buffer('legendre_coeffs', torch.tensor([
            [1, 0, 0, 0],        # P0(x) = 1
            [-1, 2, 0, 0],       # P1(x) = 2x - 1
            [1, -6, 6, 0],       # P2(x) = 6x² - 6x + 1
            [-1, 12, -30, 20]    # P3(x) = 20x³ - 30x² + 12x - 1
        ], dtype=torch.float32))

    def forward(self, x: Tensor) -> Tensor:
        """
        Oblicza wartości wielomianów Legendre'a dla zadanego wejścia.

        Parametry:
            x: tensor wejściowy w przedziale [0,1]

        Zwraca:
            Tensor z wartościami wielomianów znormalizowanych do normy L2
        """
        # Zabezpieczenie przed wartościami spoza przedziału [0,1]
        x = x.float().clamp(0, 1)

        # Obliczenie potęg x: [x^0, x^1, x^2, x^3]
        powers = torch.stack([x**i for i in range(4)], dim=-1)

        # Obliczenie wartości wielomianów poprzez iloczyn współczynników i potęg
        legendre = torch.einsum('...i,ji->...j', powers, self.legendre_coeffs)

        # Normalizacja do normy L2 i wybór odpowiednich stopni wielomianów
        return legendre[..., :self.max_degree+1] / math.sqrt(2.0)


<h2>Joint Distribution</h2>
Uogólniona funkcja dla wymiarów 2 i 3.

In [6]:
class JointDistribution(nn.Module):
    def __init__(self, dim, basis_size=4):
        """
        Implementacja rozkładu łącznego przy użyciu wielomianów Legendre'a.

        Parametry:
            dim: wymiarowość rozkładu (2 lub 3)
            basis_size: rozmiar bazy funkcji ortogonalnych
        """
        super().__init__()
        self.dim = dim
        self.basis_size = basis_size

        # Tensor współczynników rozkładu łącznego
        self.coeffs = nn.Parameter(torch.zeros(*(basis_size for _ in range(dim))))

        # Baza wielomianów Legendre'a
        self.basis = OrthonormalLegendreBasis(basis_size - 1)

    def forward(self, *inputs: Tensor) -> Tensor:
        """
        Oblicza wartość rozkładu łącznego dla zadanych wejść.

        Parametry:
            *inputs: tensory wejściowe (2 lub 3 w zależności od dim)

        Zwraca:
            Wartość rozkładu łącznego dla zadanych punktów
        """
        # Obliczenie wartości funkcji bazowych dla każdego wejścia
        basis_values = [self.basis(x).squeeze() for x in inputs]  # Usunięcie nadmiarowych wymiarów

        if self.dim == 2:
            # Rozkład 2D: suma po i,j (coeffs[i,j] * basis_i(x) * basis_j(y))
            return torch.einsum('i,j,ij->', *basis_values, self.coeffs).expand(inputs[0].shape[0])
        elif self.dim == 3:
            # Rozkład 3D: suma po i,j,k (coeffs[i,j,k] * basis_i(x) * basis_j(y) * basis_k(z))
            return torch.einsum('i,j,k,ijk->', *basis_values, self.coeffs).expand(inputs[0].shape[0])
        else:
            raise ValueError(f"Niewspierana wymiarowość: {self.dim}")

<h2>Estymacja średnich</h2>

In [7]:
class Estimation(nn.Module):
    def __init__(self,
                 *,
                 triplets,
                 feature_fn,
                 feature_dm
                 ):
        super().__init__()
        self.triplets = triplets,
        self.feature_fn = feature_fn,
        self.feature_dm - feature_dm

    def compute_tensor_mean(self) -> Tensor:
        """
        Parametry:
            triplets: array (x, y, z)
            feature_fn: funckaj mapująca
            feature_dim: wymiary D
        """
        a = np.zeros((self.feature_dim, self.feature_dim, self.feature_dim))

        for (x, y, z) in self.triplets:
            fx = self.feature_fn(x)
            fy = self.feature_fn(y)
            fz = self.feature_fn(z)

            outer = np.einsum(fx, fy, fz)

            a += outer

        a /= len(self.triplets)  # Normalizacja na trójkach
        return a

<h2>Estymacja warunkowa</h2>

In [8]:
class ConditionalEstimation(nn.Module):
    def __init__(self,
                 *,
                 x_candidates,
                 y,
                 z,
                 a,
                 feature_fn) -> None:
        super().__init__()
        self.x_candidates = x_candidates,
        self.y = y,
        self.z = z,
        self.a = a,
        self.feature_fn - feature_fn

    def conditional_score(self):

        D = self.a.shape[0]
        fy = self.feature_fn(self.y)
        fz = self.feature_fn(self.z)

        denominator = 0
        for j in range(D):
            for k in range(D):
                denominator += self.fa[0, j, k] * fy[j] * fz[k]

        scores = []
        for x in self.x_candidates:
            fx = self.feature_fn(x)

            score = 0
            for i in range(D):
                context_sum = 0
                for j in range(D):
                    for k in range(D):
                        context_sum += self.a[i, j, k] * fy[j] * fz[k]
                score += fx[i] * (context_sum / (denominator + 1e-8)) #uniknięcie dzielenia przez zero

            scores.append(score)

        return scores

<h2>Propagacja 1</h2>

In [9]:
class PropagationEstimation(nn.Module):
    def __init__(self,
                 *,
                 y,
                 z,
                 a,
                 feature_fn):
        super().__init__()
        self.y = y,
        self.z = z,
        self.a = a,
        self.feature_fn = feature_fn

    def propagate_expectation(self):

        fy = self.feature_fn(self.y)
        fz = self.feature_fn(self.z)
        D = fy.shape[0]

        numerator = 0.0
        denominator = 0.0
        for j in range(D):
            for k in range(D):
                numerator += self.a[1, j, k] * fy[j] * fz[k]
                denominator += self.a[0, j, k] * fy[j] * fz[k]

        propagated = 0.5 + (1 / (2 * np.sqrt(3))) * (numerator / (denominator + 1e-8))
        return propagated

<h2>Entropia</h2>

In [11]:
class EntropyAndMutualInformation(nn.Module):

    def approximate_entropy(self, activations):

        # Normalizacja prawdopodobieństw funkcji aktywacji
        probs = F.softmax(activations, dim=1)
        entropy = -torch.sum(probs ** 2, dim=1).mean()
        return entropy

    def approximate_mutual_information(self, act_X, act_Y):

        # Normalizacja funkcji aktywacji
        probs_X = F.softmax(act_X, dim=1)
        probs_Y = F.softmax(act_Y, dim=1)

        joint_probs = torch.bmm(probs_X.unsqueeze(2), probs_Y.unsqueeze(1))

        mi = torch.sum(joint_probs ** 2, dim=(1,2)).mean()
        return mi

<h2>Dynamicznie modyfikowany model za pomocą EMA</h2>

In [12]:
class DynamicEMA(nn.Module):
    def __init__(self, x, y, z, ema_lambda) -> None:
        self.x = x,
        self.y = y,
        self.z = z,
        self.ema_lambda = ema_lambda

    def EMAUpdateMethod(self):
        def f_i(x): return x
        def f_j(y): return y
        def f_k(z): return z

        update_tensor = torch.einsum('i,j,k->ijk', f_i(self.x), f_j(self.y), f_k(self.z))

        # EMA updating values
        a = (1 - self.ema_lambda) * a + self.ema_lambda * update_tensor

        return a

<h2>Optymizacja bazy</h2>

In [13]:
class BaseOptimization(nn.Module):
    def __init__(self,
                 *,
                 a, #tensor do optymalizacji
                 ) -> None:
        self. a = a

    def optimization_early(self) -> Tensor:
        M = self.a.reshape(len(self.a[0]), -1)

        # Obliczenie SVD
        U, S, Vh = torch.linalg.svd(M, full_matrices=False)

        # Transformacja Bazy, tu przykładowa funkcja, do wymiany
        def f_x(x):
            return torch.sin(x * torch.linspace(0, 1, len(self.a[2])))

        # nowa baza g_i(x) = sum_j v_ij * f_j(x)
        def g_i(x, U):
            f = f_x(x)
            return torch.matmul(U.T, f)

        # Step 4: Transformacja Tensora
        new_a = torch.einsum('li,ljk->ijk', U.T, self.a)

        return new_a

Information bottleneck

In [14]:
class InformationBottleneck(nn.Module):
    def __init__(self, beta=1.0):
        super().__init__()
        self.beta = beta

    def forward(self, X_features, Y_features):
        """Implementuje równanie (15) z artykułu"""
        C_X = X_features @ X_features.T
        C_Y = Y_features @ Y_features.T
        return torch.trace(C_X @ C_Y)

    def bottleneck_loss(self, X_features, T_features, Y_features):
        """Implementuje równanie (10) z artykułu"""
        I_XT = self(X_features, T_features)
        I_TY = self(T_features, Y_features)
        return I_XT - self.beta * I_TY