In [None]:
import numpy as np
import pandas as pd
import keras
from keras import optimizers
from keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt

# === Utils Functions ===
def dm2comp(dm):
    '''Extract weights and vectors from a density matrix.'''
    return dm[:, :, 0], dm[:, :, 1:]

def pure2dm(psi):
    '''Construct a pure-state density matrix from feature vectors.'''
    ones = keras.ops.ones_like(psi[:, 0:1])
    dm = keras.ops.concatenate((ones[:, np.newaxis, :], psi[:, np.newaxis, :]), axis=2)
    return dm

def dm_rbf_loglik(x, dm, sigma):
    '''Log-likelihood under an RBF-kernel density matrix.'''
    d = keras.ops.shape(x)[-1]
    w, v = dm2comp(dm)
    dist = keras.ops.sum((x[:, np.newaxis, :] - v) ** 2, axis=-1)
    ll = keras.ops.log(
        keras.ops.einsum('...i,...i->...', w,
                         keras.ops.exp(-dist / (2 * sigma ** 2)) ** 2)
        + 1e-12
    )
    coeff = d * keras.ops.log(sigma + 1e-12) + d * np.log(np.pi) / 2
    return ll - coeff

def dm_rbf_expectation(dm):
    '''Expectation under an RBF-kernel density matrix.'''
    w, v = dm2comp(dm)
    return keras.ops.einsum('...i,...ij->...j', w, v)

def dm_rbf_variance(dm, sigma):
    '''Variance (trace) under an RBF-kernel density matrix.'''
    sigma_adj = sigma / keras.ops.sqrt(2)
    w, v = dm2comp(dm)
    d = keras.ops.shape(v)[-1]
    squared_norms = keras.ops.sum(v ** 2, axis=-1)
    weighted_sq = keras.ops.einsum('...i,...i->...', w, squared_norms)
    means = keras.ops.einsum('...i,...ij->...j', w, v)
    sq_means = keras.ops.sum(means ** 2, axis=-1)
    between = weighted_sq - sq_means
    return between + d * (sigma_adj ** 2)

def gauss_entropy_lb(d, sigma):
    '''Lower bound on entropy of a Gaussian mixture.'''
    return (d / 2.0) * (1.0 + keras.ops.log(2.0 * np.pi * (sigma ** 2)))

def l1_loss(vals):
    '''L1 regularization over normalized activations or components.'''
    b_size = keras.ops.cast(keras.ops.shape(vals)[0], dtype=keras.float32)
    vals_norm = keras.utils.normalize(vals, order=2, axis=1)
    return keras.ops.sum(keras.ops.abs(vals_norm)) / b_size



In [None]:
# === Kernel Layers ===
class RBFKernelLayer(keras.layers.Layer):
    def __init__(self, sigma, dim, trainable=True, min_sigma=1e-3, **kwargs):
        super().__init__(**kwargs)
        self.sigma = self.add_weight(
            shape=(), initializer=keras.initializers.Constant(value=sigma), trainable=trainable
        )
        self.dim = dim
        self.min_sigma = min_sigma

    def call(self, A, B):
        shape_A = keras.ops.shape(A)
        A_norm = keras.ops.sum(A**2, axis=-1)[..., np.newaxis]
        B_norm = keras.ops.sum(B**2, axis=-1)[np.newaxis, np.newaxis, :]
        A_flat = keras.ops.reshape(A, [-1, shape_A[2]])
        AB = keras.ops.matmul(A_flat, keras.ops.transpose(B))
        AB = keras.ops.reshape(AB, [shape_A[0], shape_A[1], keras.ops.shape(B)[0]])
        dist2 = keras.ops.clip(A_norm + B_norm - 2.*AB, 0., np.inf)
        self.sigma.assign(keras.ops.clip(self.sigma, self.min_sigma, np.inf))
        return keras.ops.exp(-dist2 / (2.*self.sigma**2))

    def log_weight(self):
        sigma_c = keras.ops.clip(self.sigma, self.min_sigma, np.inf)
        return -self.dim * keras.ops.log(sigma_c + 1e-12) - self.dim * np.log(4 * np.pi)

class CosineKernelLayer(keras.layers.Layer):
    def __init__(self, **kwargs):
        '''
        Builds a layer that calculates the cosine kernel between two set of vectors
        '''
        super().__init__(**kwargs)
        self.eps = 1e-6

    def call(self, A, B):
        '''
        Input:
            A: tensor of shape (bs, n, d)
            B: tensor of shape (m, d)
        Result:
            K: tensor of shape (bs, n, m)
        '''
        A = keras.utils.normalize(A, order=2, axis=-1)
        B = keras.utils.normalize(B, order=2, axis=-1)
        K = keras.ops.einsum("...nd,md->...nm", A, B)
        return K

    def log_weight(self):
        return 0

class KDMLayer(keras.layers.Layer):
    def __init__(self, kernel, dim_x, dim_y, x_train=True, y_train=True, w_train=True,
                 n_comp=0, l1_x=0., l1_y=0., l1_act=0., generative=0., **kwargs):
        super().__init__(**kwargs)
        self.kernel, self.dim_x, self.dim_y = kernel, dim_x, dim_y
        self.x_train, self.y_train, self.w_train = x_train, y_train, w_train
        self.n_comp, self.l1_x, self.l1_y, self.l1_act, self.generative = (
            n_comp, l1_x, l1_y, l1_act, generative
        )
        self.c_x = self.add_weight(
            shape=(n_comp, dim_x), initializer=keras.initializers.random_normal(),
            trainable=x_train, name="c_x"
        )
        self.c_y = self.add_weight(
            shape=(n_comp, dim_y),
            initializer=keras.initializers.Constant(np.sqrt(1./dim_y)),
            trainable=y_train, name="c_y"
        )
        self.c_w = self.add_weight(
            shape=(n_comp,),
            initializer=keras.initializers.constant(1./n_comp),
            trainable=w_train, name="c_w"
        )
        self.eps = 1e-12

    def call(self, inputs):
        if self.l1_x:
            self.add_loss(self.l1_x * l1_loss(self.c_x))
        if self.l1_y:
            self.add_loss(self.l1_y * l1_loss(self.c_y))
        cw = keras.ops.abs(self.c_w)
        cw_sum = keras.ops.clip(keras.ops.sum(cw), self.eps, np.inf)
        self.c_w.assign(cw / cw_sum)

        in_w = inputs[..., 0]
        in_v = inputs[..., 1:]
        out_vw = self.kernel(in_v, self.c_x)
        out_w = cw[np.newaxis, np.newaxis, :] * keras.ops.square(out_vw)

        if self.generative:
            proj = keras.ops.einsum('...i,...ij->...', in_w, out_w)
            logp = keras.ops.log(proj + self.eps) + self.kernel.log_weight()
            self.add_loss(-self.generative * keras.ops.mean(logp))

        out_w = keras.ops.maximum(out_w, self.eps)
        out_w = out_w / keras.ops.sum(out_w, axis=2, keepdims=True)
        out_w = keras.ops.einsum('...i,...ij->...j', in_w, out_w)

        if self.l1_act:
            self.add_loss(self.l1_act * l1_loss(out_w))

        out_w = out_w[..., np.newaxis]
        b = keras.ops.shape(out_w)[0]
        out_y = keras.ops.broadcast_to(self.c_y[np.newaxis, ...], [b, self.n_comp, self.dim_y])
        return keras.ops.concatenate((out_w, out_y), axis=2)

    def get_config(self):
        base = super().get_config()
        return {**base, 'dim_x': self.dim_x, 'dim_y': self.dim_y, 'n_comp': self.n_comp}

class KDMRegressModelRBF(keras.Model):
    def __init__(self, encoded_size, dim_y, encoder, n_comp,
                 sigma_x=0.1, min_sigma_x=1e-3,
                 sigma_y=0.1, min_sigma_y=1e-3,
                 x_train=True, y_train=True, w_train=True,
                 generative=0., entropy_reg_x=0.,
                 sigma_x_trainable=True, sigma_y_trainable=True,
                 **kwargs):
        super().__init__(**kwargs)
        self.dim_y, self.encoded_size, self.n_comp = dim_y, encoded_size, n_comp
        self.encoder, self.entropy_reg_x = encoder, entropy_reg_x
        if generative > 0:
            self.encoder.trainable = False

        self.kernel = RBFKernelLayer(sigma_x, encoded_size,
                                     trainable=sigma_x_trainable,
                                     min_sigma=min_sigma_x)
        self.kdm = KDMLayer(self.kernel, encoded_size, dim_y,
                             x_train=x_train, y_train=y_train, w_train=w_train,
                             n_comp=n_comp, generative=generative)
        self.sigma_y = self.add_weight(
            shape=(), initializer=keras.initializers.constant(sigma_y),
            trainable=sigma_y_trainable, name="sigma_y"
        )
        self.min_sigma_y = min_sigma_y

    def call(self, inputs):
        rho_x = pure2dm(self.encoder(inputs))
        rho_y = self.kdm(rho_x)
        self.sigma_y.assign(keras.ops.clip(self.sigma_y, self.min_sigma_y, np.inf))
        return rho_y

    def predict_reg(self, inputs, **kwargs):
        rho_y = self.predict(inputs, **kwargs)
        y_exp = keras.ops.convert_to_numpy(dm_rbf_expectation(rho_y))
        y_var = keras.ops.convert_to_numpy(dm_rbf_variance(rho_y, self.sigma_y))
        return y_exp, y_var

    def get_sigmas(self):
        return keras.ops.convert_to_numpy(self.kernel.sigma), keras.ops.convert_to_numpy(self.sigma_y)

    def loglik(self, y_true, y_pred):
        return -keras.ops.mean(dm_rbf_loglik(y_true, y_pred, self.sigma_y))

    def init_components(self, samples_x, samples_y, init_sigma=False, sigma_mult=1):
        enc = self.encoder.predict(samples_x)
        if init_sigma:
            np_enc = keras.ops.convert_to_numpy(enc)
            d, _ = NearestNeighbors(n_neighbors=3).fit(np_enc).kneighbors(np_enc)
            self.kernel.sigma.assign(np.mean(d[:, 2]) * sigma_mult)
        self.kdm.c_x.assign(enc)
        self.kdm.c_y.assign(samples_y)
        self.kdm.c_w.assign(keras.ops.ones((self.n_comp,)) / self.n_comp)

class KDMRegressModelCos(keras.Model):
    def __init__(self, encoded_size, dim_y, encoder, n_comp,
                 sigma_y=0.1, min_sigma_y=1e-3,
                 x_train=True, y_train=True, w_train=True,
                 generative=0., entropy_reg_x=0.,
                 sigma_y_trainable=True,
                 **kwargs):
        super().__init__(**kwargs)
        self.dim_y, self.encoded_size, self.n_comp = dim_y, encoded_size, n_comp
        self.encoder, self.entropy_reg_x = encoder, entropy_reg_x
        if generative > 0:
            self.encoder.trainable = False

        self.kernel = CosineKernelLayer()
        self.kdm = KDMLayer(self.kernel, encoded_size, dim_y,
                             x_train=x_train, y_train=y_train, w_train=w_train,
                             n_comp=n_comp, generative=generative)
        self.sigma_y = self.add_weight(
            shape=(), initializer=keras.initializers.constant(sigma_y),
            trainable=sigma_y_trainable, name="sigma_y"
        )
        self.min_sigma_y = min_sigma_y

    def call(self, inputs):
        rho_x = pure2dm(self.encoder(inputs))
        rho_y = self.kdm(rho_x)
        self.sigma_y.assign(keras.ops.clip(self.sigma_y, self.min_sigma_y, np.inf))
        return rho_y

    def predict_reg(self, inputs, **kwargs):
        rho_y = self.predict(inputs, **kwargs)
        y_exp = keras.ops.convert_to_numpy(dm_rbf_expectation(rho_y))
        y_var = keras.ops.convert_to_numpy(dm_rbf_variance(rho_y, self.sigma_y))
        return y_exp, y_var

    def get_sigmas(self):
        return keras.ops.convert_to_numpy(self.kernel.sigma), keras.ops.convert_to_numpy(self.sigma_y)

    def loglik(self, y_true, y_pred):
        return -keras.ops.mean(dm_rbf_loglik(y_true, y_pred, self.sigma_y))

    def init_components(self, samples_x, samples_y, sigma_mult=1):
        enc = self.encoder.predict(samples_x)
        self.kdm.c_x.assign(enc)
        self.kdm.c_y.assign(samples_y)
        self.kdm.c_w.assign(keras.ops.ones((self.n_comp,)) / self.n_comp)

In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, Normalizer
from sklearn.model_selection import train_test_split

data = pd.read_csv('/content/housing.txt', header = None, sep = ' ')
X = data.to_numpy()
y = X[:,-1].reshape(-1, 1)
X = X[:,0:-1]

# Split data into 70% training and 30% testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

print(f"Training set size: {X_train.shape}")
print(f"Test set size: {X_test.shape}")

X_train = np.asarray(X_train)
X_test = np.asarray(X_test)

scaler_x = StandardScaler()
scaler_x.fit(X_train)
X_train = scaler_x.transform(X_train)
X_test = scaler_x.transform(X_test)

scaler = MinMaxScaler(feature_range=(0.2, np.pi/2-0.2))
scaler.fit(y_train)
y_train = scaler.transform(y_train)
#y_test = scaler.transform(y_test))
y_train = np.asarray(y_train).reshape(-1, 1)
y_test = np.asarray(y_test).reshape(-1)

print(f"Training features size: {X_train.shape}")
print(f"Training labels size: {y_train.shape}")

FileNotFoundError: [Errno 2] No such file or directory: '/content/housing.txt'

In [None]:
from itertools import product
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Define los valores que quieres probar
param_grid = {
    'n_comp': [32],
    'sigma_x': [0.1, 0.5],
    'min_sigma_x': [1e-4],
    'sigma_y': [0.1],
    'min_sigma_y': [0.1, 0.5],
    'learning_rate': [1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 5e-3, 1e-2, 1e-1]
}

# Crear combinaciones de todos los parámetros
param_combinations = list(product(*param_grid.values()))
param_names = list(param_grid.keys())


In [None]:
results = []

for combo in param_combinations:
    params = dict(zip(param_names, combo))
    print(f"Evaluando combinación: {params}")

    encoder = keras.Sequential([keras.layers.Identity()])
    model = KDMRegressModelRBF(
        encoded_size=X_train.shape[1],
        dim_y=y_train.shape[1],
        encoder=encoder,
        n_comp=params['n_comp'],
        sigma_x=params['sigma_x'],
        min_sigma_x=params['min_sigma_x'],
        sigma_y=params['sigma_y'],
        min_sigma_y=params['min_sigma_y'],
        generative=1.0,
        sigma_x_trainable=True,
        sigma_y_trainable=False
    )

    model.compile(
        optimizer=optimizers.Adam(learning_rate=params['learning_rate']),
        loss=model.loglik
    )

    early_stop = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
    idx = np.random.RandomState(42).randint(X_train.shape[0], size=params['n_comp'])
    model.init_components(X_train[idx], y_train[idx], init_sigma=True)

    model.fit(
        X_train, y_train,
        validation_split=0.2,
        epochs=200,
        batch_size=32,
        callbacks=[early_stop],
        verbose=0
    )

    # Evaluar sobre el conjunto de prueba
    y_pred, y_var = model.predict_reg(X_test)
    y_pred = scaler.inverse_transform(y_pred)

    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # Guardar resultados
    result = {**params, 'MAE': mae, 'MSE': mse, 'R2': r2}
    results.append(result)


In [None]:
results_df = pd.DataFrame(results)
# Ordenar por MAE o cualquier otra métrica
results_df.sort_values(by='MAE').head()


In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation

param_grid = {
    'n_comp': [32,50,75,100,150],
#    'sigma_x': [0.1,0.5,1.0],
#    'min_sigma_x': [0.001, 0.01],
    'sigma_y': [0.1,0.5,1.0],
    'min_sigma_y': [0.01, 0.1],
    'learning_rate': [1e-5, 5e-5, 1e-4, 5e-4, 1e-3, 5e-3, 1e-2, 1e-1]
#    'sigma_y_trainable': [True, False],
#    'generative': [0.0, 1.0],
#    'kernel': ['rbf', 'cosine']
}
MAE = 100
# Crear combinaciones de todos los parámetros
param_combinations = list(product(*param_grid.values()))
param_names = list(param_grid.keys())

encoding_dim = 16

results = []

for combo in param_combinations:
    params = dict(zip(param_names, combo))
    print(f"Evaluando combinación: {params}")

    encoder = encoder = Sequential([
    Dense(X_train.shape[1], activation='relu'),
    Dropout(0.1),
    Dense(encoding_dim),
    Activation('tanh')
])
    model = KDMRegressModelCos(
        encoded_size=encoding_dim,
        dim_y=y_train.shape[1],
        encoder=encoder,
        n_comp=params['n_comp'],
        sigma_y=params['sigma_y'],
        min_sigma_y=params['min_sigma_y'],
        generative=1.0,
        sigma_y_trainable=True
    )

    model.compile(
        optimizer=optimizers.Adam(learning_rate=params['learning_rate']),
        loss=model.loglik
    )

    early_stop = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)
    idx = np.random.RandomState(42).randint(X_train.shape[0], size=params['n_comp'])
    #x_init_encoded = encoder(X_train[idx], training=False)
    #model.init_components(x_init_encoded, y_train[idx])
    model.init_components(X_train[idx], y_train[idx])


    history = model.fit(
        X_train, y_train,
        validation_split=0.2,
        epochs=200,
        batch_size=32,
        callbacks=[early_stop],
        verbose=0
    )

    # Evaluar sobre el conjunto de prueba
    y_pred, y_var = model.predict_reg(X_test)
    y_pred = scaler.inverse_transform(y_pred)

    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    if mae < MAE:
      history_best = history.history
      MAE = mae
    # Guardar resultados
    result = {**params, 'MAE': mae, 'MSE': mse, 'R2': r2}
    results.append(result)

In [None]:
# Plotting the learning curves
plt.figure(figsize=(10, 6))
plt.plot(history_best['loss'], label='Training Loss')
plt.plot(history_best['val_loss'], label='Validation Loss')
plt.title('Learning Curves')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
results_df = pd.DataFrame(results)
# Ordenar por MAE o cualquier otra métrica
results_df.sort_values(by='MAE').head()