# Imports and Hyperparameters

In [None]:
! pip3 install -r requirements.txt

In [None]:
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.onnx
import plotly.io as pio
import copy

%load_ext autoreload
%autoreload 2

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
seed = 30
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

pio.templates.default = "plotly_dark"

# DATA PARAMETERS
num_evaluation_samples = 50                                       
inf_repacement = 1000

# TRAINING PARAMETERS
training_set_proportion = 0.8                         
num_epochs = 500                                    
batch_size = 50          
learning_rate = 0.001
kl_weight = 0.0001

# MODEL PARAMETERS
latent_dims = 4

# Preprocessing

Requirements:
- one random constant
- one generate equation
- values of the instantiated equation

![](./images/dataloader.png)

In [None]:
%reload_ext autoreload
from src.preprocessing import generate_dataset, preprocessing
from src.evaluation import generate_values
import json
import equation_tree
from equation_tree.util.conversions import infix_to_prefix, prefix_to_infix
from src.preprocessing import EquationDatasetClassify
import random
from torch.utils.data import Dataset, DataLoader, TensorDataset
from equation_tree.tree import node_from_prefix

is_function = lambda x: x in ["sin", "exp"]
is_operator = lambda x : x in ["+", "-", "*"]
is_variable = lambda x : x in ["x_1", "x_1", "X"]
is_constant = lambda x : x in ["c_0"]
max_len = 0
# read json file 
classes = json.load(open('second_order_simple.json', 'r'))

# transfrom to prefix notation
classes_list = []
global classes_dict
classes_dict = {}
for cl in classes:
    try:
        eq_prefix = infix_to_prefix(cl.replace('X', 'x_1', ), is_function, is_operator)
        inf_eq = prefix_to_infix(eq_prefix, is_function, is_operator)
        classes_list.append(eq_prefix)
        classes_dict[cl] = eq_prefix
    except:
        pass

equations = []
values = []
constants = []

# generate dataset
for e, cl in enumerate(classes_list):
    for i in range(100):
        c = random.random()*10
        v = generate_values(cl, c, is_function, is_operator, is_variable, is_constant, infix=classes[e])
        if len(cl) > max_len:
             max_len = len(cl)
        if v != (None,):
            equations.append(cl)
            values.append(v)
            constants.append([c])

equations_final = []
for eq_prefix in equations:
    # try block due to complex infinity exception
    # add padding so that all equations have the same shape
    if len(eq_prefix) < max_len:
        eq_prefix = eq_prefix + ["<PAD>"] * (max_len - len(eq_prefix))
    # add equations, constants and values to their list
    equations_final.append(eq_prefix)


all_symbols = [item for sublist in equations_final for item in sublist]
unique_symbols = sorted(list(set(all_symbols)))

# obtain mapping from symbols to indices and vice versa
symb_to_idx = {symbol: idx for idx, symbol in enumerate(unique_symbols)}
idx_to_symb = {idx: symb for symb, idx in symb_to_idx.items()}

dataset = EquationDatasetClassify(equations_final, values, symb_to_idx,len(unique_symbols), constants)

classes = set(tuple(i) for i in equations_final)
classes = [list(i) for i in classes]
classes = [dataset.encode_equation(i) for i in classes]
print(len(classes), len(dataset))

train_loader, test_loader, test_size = preprocessing(
    dataset=dataset,
    batch_size=batch_size,
    training_set_proportion=training_set_proportion
)
max_len

In [None]:
%reload_ext autoreload
from src.evaluation import plot_functions

# visualize 20 first equations in the dataset
plot_functions(
    equations=equations[:20],
    constants=constants[:20],
    values=values[:20],
    is_function=is_function,
    is_operator=is_operator,
)

# Autoencoder Training

In [None]:
from src.training import training_AE_C
autoencoder_equations, train_losses, test_losses, correlations_cor, correlations_dis, correlations_dis_train, x_batches, x_hat_batches, df_results = training_AE_C(train_loader, test_loader, latent_dims, unique_symbols, num_epochs, learning_rate, test_size, max_len, classes, 1.0)
best_correlation_dis = df_results['correlation_dis']
best_correlation_cor = df_results['correlation_cor']

## VAE

In [None]:
from src.training import training_VAE_C
autoencoder_equations, train_losses, test_losses, correlations_cor, correlations_dis, x_batches, x_hat_batches, df_results = training_VAE_C(train_loader, test_loader, latent_dims, unique_symbols, num_epochs, learning_rate, test_size, kl_weight, classes, max_len, 1.0)

## Loss curves

In [None]:
%reload_ext autoreload
from src.utils import plot_losses

plot_losses(
    train_losses,
    test_losses,
    correlation_cor=correlations_cor,
    correlation_dis=correlations_dis,
    df = df_results,
)
last_correlations_cor = np.sum(correlations_cor[-10:]) / 10
last_correlations_dis = np.sum(correlations_dis[-10:]) / 10
print(f"Last 10 epochs average correlation: {last_correlations_cor}")
print(f"Last 10 epochs average correlation: {last_correlations_dis}")

# Evaluation





In [None]:
%reload_ext autoreload
from src.evaluation import evaluation_ec
from src.evaluation import get_latent_representation
from equation_tree.util.conversions import prefix_to_infix
from src.evaluation import get_interpolated_df


x_hat_batches_n = [torch.argmax(batch[0], dim=1).tolist() for batch in x_hat_batches]
# constants

for i, batch in enumerate(x_hat_batches_n):
    for j, eq in enumerate(batch):
        x_hat_batches_n[i][j] = classes[eq]

# concatenate all batches
x_hat_batches_n = [item for sublist in x_hat_batches_n for item in sublist]
x_batches_n = [item for sublist in x_batches for item in sublist[0]]
x_constants = [item for sublist in x_batches for item in sublist[1]]
# caclulate accuracy
count = 0
for rec, real in zip(x_hat_batches_n, x_batches_n):
    if rec == real.tolist():
        count += 1
    else: 
        print(f"rec: {rec}, real: {real}")

accuracy = count / (len(x_hat_batches_n))
print(f"Equation reconstruction accuracy: {accuracy * 100}%")



## Correlation


In [None]:
%reload_ext autoreload
import plotly.express as px


fig = px.imshow([[1, correlations_dis[-1] ], 
                 [correlations_dis[-1], 1]], 
                 x = ["Latent Space", "Function Correlation"], 
                 y = ["Latent Space", "Function Correlation"], 
                 color_continuous_scale='RdBu', 
                 title="Correlation Matrix")
fig.update_xaxes(side="top")
fig.show()
print(f"Correlation between Latent Space and Function correlation: {correlations_dis[-1]}")




## Visualizations

In [None]:

latent_space_representation, x_decoded, test_values, = get_latent_representation(
    model=autoencoder_equations,
    device=device,
    test_dataloader=test_loader,
    x_batches_p=x_batches_n,
    x_hat_batches_p=x_hat_batches_n,
    equation_tree_dataset=dataset,
    num_interpolations=5
)
len(latent_space_representation)

### Visualization of Latent Space
Plot the first 3 latent space dimensions

In [None]:
from plotly.offline import plot
import plotly.express as px
from equation_tree.util.conversions import prefix_to_infix

df = {
    "x": latent_space_representation[:, 0],
    "y": latent_space_representation[:, 1],
    "z": latent_space_representation[:, 2],
    "category": [prefix_to_infix(dataset.decode_equation(eq.tolist()), is_function, is_operator).replace("c_0", str(round(float(const[0]),2))) for eq, const in zip(x_decoded, x_constants)], #TODO: replace constant wit real value
}
df = pd.DataFrame(df)
fig = px.scatter_3d(
    df,
    x="x",
    y="y",
    z="z",
    color="category",
    title="Latent Space Representation",
    size_max=0.1,
)
plot(fig, filename="latent_space.html", auto_open=False, image="png")
fig.show()

In [None]:
from colour import Color
import seaborn as sns
import matplotlib.pyplot as plt
# plots the first 2 dimensions of the latent space
sns.scatterplot(df, x="x", y="y",hue='category', legend=False, palette="tab10")

### Plot interpolations

In [None]:
from src.evaluation import plot_interpolations
equation_1 = 'x_1*c_1'
equation_2 = 'x_1+c_1'
df, _ = get_interpolated_df(
    kind="classifier",
    model=autoencoder_equations,
    equation_tree_dataset=dataset,
    latent_space_representation=latent_space_representation,
    equation_1=equation_1,
    equation_2=equation_2,
    c_1=-1.0,
    c_2=1.0,
    num_interpolations=20,
    assignment=(is_function, is_operator, is_variable, is_constant),
    classes=classes,
)
if len(df.values)> 0:
    fig = plot_interpolations(df, assignment=(is_function, is_operator, is_variable, is_constant))
    fig.show()

In [None]:
%autoreload
from src.evaluation import plot_interpolations, generate_values

# plot the interpolation between two functions
for i in range(1):
    rand_idx1 = random.randint(0, len(x_batches_n))
    rand_idx2 = random.randint(0, len(x_batches_n))
    df, _ = get_interpolated_df(
        kind="classifier",
        model=autoencoder_equations,
        equation_tree_dataset=dataset,
        latent_space_representation=latent_space_representation,
        equation_1=rand_idx1,
        equation_2=rand_idx2,
        c_1=1.5,
        c_2=0.5,
        num_interpolations=20,
        assignment=(is_function, is_operator, is_variable, is_constant),
        classes=classes,
    )
    if len(df.values)> 0:
        fig = plot_interpolations(df, assignment=(is_function, is_operator, is_variable, is_constant))
        fig.show()


## Random Embedding

In [None]:
from src.evaluation import random_embedding

# evaluate how many of the random embeddings returned valid functions
random_embedding('AE_C', autoencoder_equations, dataset, latent_dims, (is_function, is_operator, is_variable, is_constant), classes=classes)

# Sytematic Evaluation
Every parameter change is evaluated 10 times.

## Analyse number of latent units

In [None]:
%reload_ext autoreload
from src.evaluation import evaluate_different_models

datasets = [dataset] * 10

kind = "VAE_C"
weight = 1.0
kl_weight = 0.0001

df_units = pd.DataFrame(columns=['latent dims', 'correlation_cor', 'correlation_dis', 'correlation_cor last 10 epochs', 'correlation_dis last 10 epochs', 'recovered equations', 'accuracy (individualt)', 'accuracy equations', 'constant MSE', 'average distance constants'])
count = 0
for d in datasets: 
    count += 1
    for units in [32, 64, 128]: 
        print(f"Dataset {count} with {units} units")
        dct = evaluate_different_models(d, batch_size, training_set_proportion, units, num_epochs, learning_rate, kind, weight, classes=classes, assignments=(is_function, is_operator, is_variable, is_constant), klweight=kl_weight)
        print(f"Correlation: {dct['correlation_dis']}, Accuracy: {dct['accuracy equations']}")
        df = pd.DataFrame(dct, index=[0])
        df_units = pd.concat([df_units, df], ignore_index=True, axis=0)
df_units.to_csv('results/latent_dims_vae_calssify_big_final.csv', index=False)

## Analyse KL weighting

In [None]:
from src.evaluation import evaluate_different_models
datasets = [dataset] * 10
kind = "VAE_C"

#df_weight = pd.DataFrame(columns=['latent dims', 'correlation_cor', 'correlation_dis', 'correlation_cor last 10 epochs', 'correlation_dis last 10 epochs', 'recovered equations', 'accuracy (individual)', 'accuracy equations', 'constant MSE', 'average distance constants', 'weight', 'kl_weight', 'test_reconstruction_loss', 'test_constant_loss', 'test_latent_correlation_loss', 'test_kl_divergence', 'correlations_dis_train'])


count = 0
for d in datasets: 
    count += 1
    for klweight in [0.00001]: 
        print(f"Dataset {count} with a weighting of {klweight} for the latent correlation loss")
        dct = evaluate_different_models(d, batch_size, training_set_proportion, latent_dims, num_epochs, learning_rate, kind, 1.0, klweight=klweight, classes=classes, assignments=(is_function, is_operator, is_variable, is_constant))
        #print(dct)
        print(f"Correlation: {dct['correlation_dis']}, Accuracy: {dct['accuracy equations']}")
        df = pd.DataFrame(dct, index=[0])
        df_weight = pd.concat([df_weight, df], ignore_index=True, axis=0)
df_weight.to_csv('results/classify_klweight_2.csv', index=False)

## Learning Rate Analysis

In [None]:
from src.evaluation import evaluate_different_models
datasets = [dataset] * 10
is_VAE = True
if is_VAE:
    df_lr = pd.DataFrame(columns=['latent dims', 'correlation_cor', 'correlation_dis', 'correlation_cor last 10 epochs', 'correlation_dis last 10 epochs', 'recovered equations', 'accuracy (individualt)', 'accuracy equations', 'constant MSE', 'average distance constants', 'kl_weight', 'test_reconstruction_loss', 'test_constant_loss', 'test_latent_correlation_loss', 'test_kl_divergence', 'correlation_dis reonstructed equations', 'learning_rate'])
else:
    df_lr = pd.DataFrame(columns=['latent dims', 'correlation_cor', 'correlation_dis', 'correlation_cor last 10 epochs', 'correlation_dis last 10 epochs', 'recovered equations', 'accuracy (individualt)', 'accuracy equations', 'constant MSE', 'average distance constants', 'learning_rate'])

kind = 'VAE_C'
count = 0
for d in datasets: 
    count += 1
    for lr in [0.00001, 0.0001, 0.001, 0.01, 0.1]: 
        print(f"Dataset {count} with {lr} learning rate")
        dct = evaluate_different_models(d, batch_size, training_set_proportion, latent_dims, num_epochs, lr, kind, 1.0, klweight=klweight, classes=classes, assignments=(is_function, is_operator, is_variable, is_constant))
        df = pd.DataFrame(dct, index=[0])
        print(f"Correlation: {dct['correlation_dis']}, Accuracy: {dct['accuracy equations']}")
        df_lr = pd.concat([df_lr, df], ignore_index=True, axis=0)
df_lr.to_csv('results/learning_rate_vae_classify.csv', index=False)

## Latent Correlation weighting

In [None]:
from src.evaluation import evaluate_different_models
datasets = [dataset] * 10
kind = "VAE_C"

df_weight = pd.DataFrame(columns=['latent dims', 'correlation_cor', 'correlation_dis', 'correlation_cor last 10 epochs', 'correlation_dis last 10 epochs', 'recovered equations', 'accuracy (individual)', 'accuracy equations', 'constant MSE', 'average distance constants', 'weight', 'kl_weight', 'test_reconstruction_loss', 'test_constant_loss', 'test_latent_correlation_loss', 'test_kl_divergence', 'correlations_dis_train'])


count = 0
for d in datasets: 
    count += 1
    for weight in [0, 0.1, 1, 10, 100, 1000]: 
        print(f"Dataset {count} with a weighting of {weight} for the latent correlation loss")
        dct = evaluate_different_models(d, batch_size, training_set_proportion, latent_dims, num_epochs, learning_rate, kind, weight, klweight=kl_weight, classes=classes, assignments=(is_function, is_operator, is_variable, is_constant))
        #print(dct)
        print(f"Correlation: {dct['correlation_dis']}, Accuracy: {dct['accuracy equations']}")
        df = pd.DataFrame(dct, index=[0])
        df_weight = pd.concat([df_weight, df], ignore_index=True, axis=0)
df_weight.to_csv('results/classify_weight_vae.csv', index=False)

# Sampling

In [None]:
import pyro
import pyro.distributions as dist
from src.evaluation import generate_values, decode_latent_classify
#target_function = "(c_0-x_1)*exp(x_1)"
target_function = "exp(x_1)*c_0"
#target_function = "x_1+exp(x_1)+c_0"
target_function = "c_0*x_1"
num_samples = 1000
target_constant = 5.0
target_dist = generate_values(target_function, target_constant, is_function, is_operator, is_variable, is_constant)
iteration_values = []

# define the probabilistic model
def probabilistic_model(data):
    latent_variables = []
    for i in range(latent_dims):
        latent_variables.append(pyro.sample(f"latent_variable_{i}", dist.Normal(0, 5)))
    variance = torch.tensor(0.1) * 50

    embedding = [latent_variables]

    equations, constants = decode_latent_classify(autoencoder_equations, dataset, embedding, classes)
    values = generate_values(equations[0], constants[0][0][0], is_function, is_operator, is_variable, is_constant)[1]
    try: 
        values = torch.tensor(values, dtype=torch.float32)
    except:
        print(values)
        print(type(values))
    
    pyro.sample(f"observed_data", dist.Normal(values, variance).to_event(1), obs=data)

## run MCMC

In [None]:

import pyro.infer
import pyro.optim
import pyro.poutine as poutine
import torch
from pyro.infer import MCMC, NUTS
pyro.clear_param_store()


observed_data = torch.tensor(target_dist[1])

nuts_kernel = NUTS(probabilistic_model)

mcmc = MCMC(nuts_kernel, num_samples=num_samples, warmup_steps=200)

mcmc.run(torch.tensor(target_dist[1]))

samples = mcmc.get_samples()
print(samples)



In [None]:
results = []
# take just the mean of the samples
for i in range(latent_dims):
    results.append(samples[f'latent_variable_{i}'].mean())

# sample from the distribution
results_sampled = []
#for i in range(latent_dims):
    #results_sampled.append(dist.Normal(samples[f'latent_variable_{i}'].mean(), samples[f'latent_variable_{i}'].std()).sample())

# random embedding
random_samples = []
for i in range(latent_dims):
    random_samples.append(dist.Normal(0, 5).sample())

# decode results
results_dec = decode_latent_classify(autoencoder_equations, dataset, [results], classes)
result_equation = results_dec[0][0]
result_constant = results_dec[1][0][0][0]

# decode random samples
random_dec = decode_latent_classify(autoencoder_equations, dataset, [random_samples], classes)
random_equation = random_dec[0][0]
random_constant = random_dec[1][0][0][0]

print(f"resulting function: {prefix_to_infix(result_equation, is_function, is_operator)} with constant {result_constant}")
print(observed_data)

v_sample = generate_values(result_equation, result_constant, is_function, is_operator, is_variable, is_constant)
v_real = generate_values(target_function, target_constant, is_function, is_operator, is_variable, is_constant)
v_random = generate_values(random_equation, random_constant, is_function, is_operator, is_variable, is_constant)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
def plot_sampling_functions(smpls, with_Random):
    # latent variabls values
    latent_variable_MCMC = []
    equations_MCMC = []
    constants_MCMC = []
    # generate a list of lists for the latent variables of each iteration (num_iterations, latent_dims)
    for s in range(len(smpls['latent_variable_0'])):
        embedding = []
        for i in range(latent_dims):
            embedding.append(smpls[f'latent_variable_{i}'][s].item())
        equations, constants = decode_latent_classify(autoencoder_equations, dataset, [embedding], classes)
        values = torch.tensor(generate_values(equations[0], constants[0][0][0], is_function, is_operator, is_variable, is_constant)[1], dtype=torch.float32)
        latent_variable_MCMC += values.detach().numpy().tolist()
        constants_MCMC += [constants[0][0][0]]
        equations_MCMC += [prefix_to_infix(equations[0], is_function, is_operator)]*50

    df = {
        "y": latent_variable_MCMC,
        "x": np.linspace(-1,1,50).tolist() * num_samples,
        "equation": equations_MCMC

    }

    df_compare = {
        'x': np.linspace(-1, 1, 50),
        'y_sample': v_sample[1],
        'y_real': v_real[1],
        'y_random': v_random[1]
    }   
    print(f"the smallest constant is {min(constants_MCMC)} and the largest constant is {max(constants_MCMC)}")
    data = pd.DataFrame(df)
    #g = sns.lineplot(data=data, x='x', y='y', hue='equation', fit_reg=True, legend=False, height=5, scatter_kws={'alpha':0.5, 's': 0.05})
    sns.lineplot(data=data, x='x', y='y', hue='equation')
    #sns.lineplot(data=df_compare, x='x', y='y_sample', label=f"Sampled: {prefix_to_infix(result_equation, is_function, is_operator).replace('c_0', str(round(result_constant, 2)) )}")
    sns.lineplot(data=df_compare, x='x', y='y_real', label=f"Real: {target_function.replace('c_0', str(target_constant))}")
    if with_Random:
        sns.lineplot(data=df_compare, x='x', y='y_random', label=f"Random: {prefix_to_infix(random_equation, is_function, is_operator).replace('c_0', str(round(random_constant, 2)) )}")

    plt.legend(markerscale=30)




def plot_dist(smpls):
    # Plot resulting probability distribution
    fig, ax = plt.subplots()

    for i in range(latent_dims):
        sns.histplot(smpls[f'latent_variable_{i}'], kde=True, ax=ax)

    # add legend
    ax.legend([f"latent_variable_{i}" for i in range(latent_dims)])

    ax.set_title("Posterior distribution of the latent variables")
    plt.show()


In [None]:
plot_sampling_functions(samples, False)
plot_dist(samples)