Importar as bibliotecas necessárias

In [1]:
import timeit # Para medir o tempo de execução
import optuna # Para otimização de hiperparâmetros
start_time = timeit.default_timer() # Marcar o início do tempo
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import numpy as np
from torchvision.utils import save_image
import matplotlib.pyplot as plt
# Definir o dispositivo (cuda se disponível, caso contrário CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)


cuda


#### O próximo passo é carregar os dados de entrada. Os dados utilizados são valores de momento transversal dos píons e dos káons, partículas filha do méson D0. Inicialmente, os dados se encontram em um .txt, e devem ser transformados em um tensor no PyTorch. Além disso, os dados são normalizados para aumentar a estabilidade do treino e divididos em set de treino, validação e teste.

In [10]:

def normalize(data, idx):  ## Por que não utilizar torch.nn.functional.normalize?

    data = data[idx]
    mean = data.mean(dim=0)
    std = data.std(dim=0)

    local_holder = {}
    local_holder['mean'] = mean
    local_holder['std'] = std
    norm_holder.append(local_holder)

    return ((data - mean)/std)

def denormalize(data, idx):
    data = data[idx]
    mean = norm_holder[idx]['mean']
    std = norm_holder[idx]['std']

    return (data * std + mean)

In [11]:
# Carregar os dados de entrada
data = np.loadtxt("data_D0_daughtersPx_Py_Pz_E_trk1_trk2.txt")
#data = np.loadtxt("../Skims/data_D0_daughtersPx_Py_Pz_E_trk1_trk2.txt")
data = np.transpose(data) # Transposição para obter 1 dim por variável

# Converter os dados para um tensor PyTorch
tensor_data = torch.tensor(data, dtype=torch.float32)
tensor_data = torch.transpose(tensor_data, 0, 1)
nVar = tensor_data.shape[1]

print(f"Shape do tensor: {tensor_data.shape}")
print(f"Dados do crus: {tensor_data}")
print(f"Número de variáveis de entrada: {nVar}")

# Normalização dos dados
norm_holder = []
for i in range(nVar):
  tensor_data[i] = normalize(tensor_data, i)
print(f"Dados normalizados: {tensor_data}")

# Divisão dos dados em treino, validação e teste
nTotal = tensor_data.size(0)
nTrain = int(0.6 * nTotal)
nVal = int(0.2 * nTotal)
nTest = nTotal - nTrain - nVal
print(f"nTrain: {nTrain}, nVal: {nVal}, nTest: {nTest}")

x_train = tensor_data[:nTrain]
x_val = tensor_data[nTrain:nTrain + nVal]
x_test = tensor_data[nTrain + nVal:]

print(f"x_train: {x_train.size()}, x_val: {x_val.size()}, x_test: {x_test.size()}")

# Configurar DataLoaders para treino, validação e teste
batch_size = 32
trainloader = DataLoader(TensorDataset(x_train), shuffle=True, batch_size=batch_size)
valloader = DataLoader(TensorDataset(x_val), shuffle=False, batch_size=batch_size)
testloader = DataLoader(TensorDataset(x_test), shuffle=False, batch_size=batch_size)

Shape do tensor: torch.Size([92269, 8])
Dados do crus: tensor([[-4.9596, -2.1312, -0.1158,  ..., -0.2362,  5.0173,  2.2076],
        [-1.7995, -1.5433, -0.9338,  ..., -2.2095,  2.7447,  2.8100],
        [-1.5595, -1.6420,  0.8464,  ..., -0.7395,  1.9602,  2.0901],
        ...,
        [ 0.1853,  0.1886, -1.1690,  ...,  0.2351,  1.7225,  1.2063],
        [-0.2826, -0.2887,  1.0728,  ..., -0.1565,  1.3217,  1.1463],
        [ 0.1407,  0.1842,  1.6238,  ..., -0.4333,  1.8862,  2.4643]])
Número de variáveis de entrada: 8
Dados normalizados: tensor([[-1.7112, -0.7436, -0.0542,  ..., -0.0953,  1.7018,  0.7407],
        [-0.6685, -0.5428, -0.2437,  ..., -0.8698,  1.5615,  1.5935],
        [-1.2872, -1.3431,  0.3436,  ..., -0.7314,  1.0985,  1.1866],
        ...,
        [ 0.1853,  0.1886, -1.1690,  ...,  0.2351,  1.7225,  1.2063],
        [-0.2826, -0.2887,  1.0728,  ..., -0.1565,  1.3217,  1.1463],
        [ 0.1407,  0.1842,  1.6238,  ..., -0.4333,  1.8862,  2.4643]])
nTrain: 55361, nVal: 18

In [12]:
def mass(x):
    vec_px = x[0] + x[1]
    vec_py = x[2] + x[3]
    vec_pz = x[4] + x[5]
    E = x[6] + x[7]

    norm_pt = torch.sqrt(vec_px**2 + vec_py**2 + vec_pz**2)
    m = torch.sqrt(E**2 - norm_pt**2)

    return m

Segue a construção do VAE. Aqui, ele está construído de maneira genérica, sem tamanho das camadas e quantidade de camadas (isso será otimizado posteriormente).

Além disso, definimos a loss function como a divergência KL (diferença entre a distribuição aprendida e a normal padrão) somada ao erro de reconstrução (soma dos Mean Squared Error (MSE) de X comparado com a reconstrução de X (X^))

In [13]:
class VAE(nn.Module):
    def __init__(self, nVar, h_dims, z_dim, activation, beta):
        super(VAE, self).__init__()
        self.activation = activation

        # Encoder
        encoder_layers = []
        input_dim = nVar
        for h_dim in h_dims:
            encoder_layers.append(nn.Linear(input_dim, h_dim))
            encoder_layers.append(self.activation)
            input_dim = h_dim
        self.encoder_layers = nn.Sequential(*encoder_layers)
        self.fc21 = nn.Linear(h_dims[-1], z_dim)
        self.fc22 = nn.Linear(h_dims[-1], z_dim)

        # Decoder
        decoder_layers = []
        input_dim = z_dim
        for h_dim in reversed(h_dims):
            decoder_layers.append(nn.Linear(input_dim, h_dim))
            decoder_layers.append(self.activation)
            input_dim = h_dim
        self.decoder_layers = nn.Sequential(*decoder_layers)
        self.fc3 = nn.Linear(h_dims[0], nVar)

    def encoder(self, x):
        h = self.encoder_layers(x)
        return self.fc21(h), self.fc22(h)

    def sampling(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decoder(self, z):
        h = self.decoder_layers(z)
        return self.fc3(h)

    def forward(self, x):
        mu, log_var = self.encoder(x)
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var

def loss_function(recon_x, x, mu, log_var, beta):
    BCE = nn.MSELoss(reduction='sum')(recon_x, x) + nn.MSELoss(reduction='sum')(mass(recon_x), mass(x))
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return beta * BCE + (1 - beta) * KLD

Declaração do treinamento do modelo. Ele é feito em epochs, sendo que cada epoch calcula a perda, realiza retropropagação e atualiza os parâmetros do modelo. A perda média de treinamento para a época é então retornada.

Durante o treinamento, a validação é realizada a cada época, e a técnica de early stopping é usada para interromper o treinamento se a perda de validação não melhorar após um número determinado de épocas.

In [14]:
def mass_verifier(data, recon):
    vec_px = recon[0] + recon[1]
    vec_py = recon[2] + recon[3]
    vec_pz = recon[4] + recon[5]
    E = recon[6] + recon[7]

    term = E**2 - vec_px**2 - vec_py**2 - vec_pz**2
    verif = (term > 0)
    return data[:,verif], recon[:,verif]

In [15]:
def train(model, trainloader, optimizer, beta):
    model.train()
    train_loss = 0
    len_verif = 0
    for data in trainloader:
        data = data[0].to(device)
        optimizer.zero_grad()
        recon_batch, mu, log_var = model(data)
        denorm_data, denorm_recon_batch = torch.transpose(data, 1, 0), torch.transpose(recon_batch, 1, 0)
        for i in range(nVar):
            denorm_data[i] = denormalize(denorm_data, i)
            denorm_recon_batch[i] = denormalize(denorm_recon_batch, i)
        verified_denorm_data, verified_denorm_recon_batch = mass_verifier(denorm_data, denorm_recon_batch)
        torch.autograd.set_detect_anomaly(True)
        loss = loss_function(verified_denorm_recon_batch, verified_denorm_data, mu, log_var, beta)
        loss.backward()
        train_loss += loss.item()
        len_verif = verified_denorm_recon_batch.size()[1]
        optimizer.step()
    return train_loss / len_verif

def validate(model, valloader, beta):
    model.eval()
    val_loss = 0
    len_verif = 0
    with torch.no_grad():
        for data in valloader:
            data = data[0].to(device)
            recon_batch, mu, log_var = model(data)
            denorm_data, denorm_recon_batch = torch.transpose(data, 1, 0), torch.transpose(recon_batch, 1, 0)
            for i in range(nVar):
                denorm_data[i] = denormalize(denorm_data, i)
                denorm_recon_batch[i] = denormalize(denorm_recon_batch, i)
            verified_denorm_data, verified_denorm_recon_batch = mass_verifier(denorm_data, denorm_recon_batch)
            torch.autograd.set_detect_anomaly(True)
            loss = loss_function(verified_denorm_recon_batch, verified_denorm_data, mu, log_var, beta)
            val_loss += loss.item()
            len_verif = denorm_recon_batch.size()[1]
    return val_loss / len_verif

class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
            return False
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            return False
        elif self.counter >= self.patience:
            return True
        self.counter += 1
        return False

Em seguida, é definida a função objetivo para a otimização de hiperparâmetros usando Optuna. A função objective sugere valores para os seguintes hiperparâmetros: número de camadas ocultas (n_layers), as dimensões dessas camadas (h_dims), a dimensão do espaço latente (z_dim), a função de ativação (activation_name), o tamanho do batch (batch_size), a taxa de aprendizado (lr) e o otimizador (optimizer_name). Com esses hiperparâmetros, o modelo de VAE é instanciado e configurado. O DataLoader é usado para carregar os dados de treinamento e validação com o tamanho de batch especificado. Dependendo do nome da função de ativação sugerida, a função correspondente é selecionada (ReLU, Tanh ou LeakyReLU). De forma semelhante, o otimizador é configurado de acordo com a sugestão (Adam ou RMSprop).

O modelo é então treinado por um número definido de epochs, durante as quais a função train executa o treinamento e a função validate calcula a perda de validação. O método trial.report é usado para reportar a perda de validação para cada época, e trial.should_prune verifica se a tentativa deve ser interrompida antecipadamente se os resultados não estiverem melhorando, aplicando a técnica de pruning.

In [16]:
def objective(trial):
    n_layers = trial.suggest_int('n_layers', 2, 5)
    h_dims = [trial.suggest_int(f'h_dim_{i}', 32, 256) for i in range(n_layers)]
    z_dim = trial.suggest_int('z_dim', 2, 32)
    activation_name = trial.suggest_categorical('activation', ['relu', 'tanh', 'leaky_relu'])
    batch_size = trial.suggest_categorical('batch_size', [3000])
    lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop'])
    beta = trial.suggest_float('beta', 0.8, 1.0)


    if activation_name == 'relu':
        activation = nn.ReLU()
    elif activation_name == 'tanh':
        activation = nn.Tanh()
    else:
        activation = nn.LeakyReLU()

    trainloader = DataLoader(TensorDataset(x_train), shuffle=True, batch_size=batch_size)
    valloader = DataLoader(TensorDataset(x_val), shuffle=False, batch_size=batch_size)

    model = VAE(nVar, h_dims, z_dim, activation, beta).to(device)

    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr)

    early_stopping = EarlyStopping(patience=10, min_delta=0.001)

    for epoch in range(30): #era 30
        train_loss = train(model, trainloader, optimizer, beta)
        val_loss = validate(model, valloader, beta)
        trial.report(val_loss, epoch)

        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

        if early_stopping(val_loss):
            break

    return val_loss

Criar e executar o estudo de otimização de hiperparâmetros. O Optuna realiza a busca pelos melhores hiperparâmetros através de várias tentativas de treinamento. Após completar as tentativas, o melhor trial é recuperado. Os valores e parâmetros desse melhor ensaio são então impressos, mostrando o valor da função objetivo (a perda de validação) e os hiperparâmetros correspondentes que produziram o melhor desempenho do modelo.

Aqui, é inputado o número de trials desejado.

In [18]:
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=30) # VOLTAR PARA 30!!!

print('Best trial:')
trial = study.best_trial
print(' Value: ', trial.value)
print(' Params: ')
for key, value in trial.params.items():
    print(f'    {key}: {value}')

[I 2025-03-20 17:05:17,216] A new study created in memory with name: no-name-a08c6698-7986-4398-928a-bd83181215f8
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\runpy.py", line 198, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\site-packages\ipykernel\kernelapp.py", line 739, in start
    self.io_loop.start()
  File "C:\Users\vinic\AppData\Local\Programs\Python\Python313\Lib\site-packages\tornado\platform\asyncio.py", line 205, in start
   

RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation: [torch.cuda.FloatTensor [3000, 8]] is at version 8; expected version 0 instead. Hint: the backtrace further above shows the operation that failed to compute its gradient. The variable in question was changed in there or anywhere later. Good luck!

Os melhores hiperparâmetros encontrados pelo estudo Optuna são recuperados e usados para configurar o modelo VAE. Os melhores parâmetros são extraídos do trial vencedor e utilizados para definir a função de ativação (ReLU, Tanh ou Leaky ReLU), o tamanho do batch e a arquitetura do modelo. O DataLoader é configurado novamente com o melhor tamanho de batch para os dados de treino e validação. Um novo modelo VAE é criado usando os melhores hiperparâmetros. O otimizador apropriado (Adam ou RMSprop) é configurado com a melhor taxa de aprendizado encontrada.

In [None]:
best_params = trial.params
if best_params['activation'] == 'relu':
    activation = nn.ReLU()
elif best_params['activation'] == 'tanh':
    activation = nn.Tanh()
else:
    activation = nn.LeakyReLU()

batch_size = best_params['batch_size']
trainloader = DataLoader(TensorDataset(x_train), shuffle=True, batch_size=batch_size)
valloader = DataLoader(TensorDataset(x_val), shuffle=False, batch_size=batch_size)

model = VAE(nVar, [best_params[f'h_dim_{i}'] for i in range(best_params['n_layers'])], best_params['z_dim'], activation, best_params['beta']).to(device)

if best_params['optimizer'] == 'Adam':
    optimizer = optim.Adam(model.parameters(), lr=best_params['lr'])
elif best_params['optimizer'] == 'RMSprop':
    optimizer = optim.RMSprop(model.parameters(), lr=best_params['lr'])

Treinamento final com os melhores hiperparâmetros

In [None]:
# Treinamento final com os melhores hiperparâmetros
early_stopping = EarlyStopping(patience=20, min_delta=0.001)
train_losses, val_losses = [], []
for epoch in range(30):
    print(f"Training... (Epoch {epoch+1}/30)")
    train_loss = train(model, trainloader, optimizer, best_params['beta'])
    val_loss = validate(model, valloader, best_params['beta'])
    train_losses.append(train_loss)
    val_losses.append(val_loss)

    if early_stopping(val_loss):
        break

print('Final Train Loss:', train_loss)
print('Final Validation Loss:', val_loss)

O modelo é avaliado no conjunto de teste para calcular a perda final de teste

In [None]:
# Teste final
model.eval()
test_loss = 0
with torch.no_grad():
    for data in testloader:
        data = data[0].to(device)
        recon_batch, mu, log_var = model(data)
        loss = loss_function(recon_batch, data, mu, log_var, best_params['beta'])
        test_loss += loss.item()
test_loss /= len(testloader.dataset)
print('Test Loss:', test_loss)

Abaixo, se iniciam os plots. Primeiramente, temos o erro de reconstrução e KLD sendo minimizados no treinamento

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Train and Validation Losses')
plt.legend()
plt.show()

Amostra do VAE tentando reproduzir o dataset de treino (dados normalizados)

In [None]:
model.eval()
with torch.no_grad():
    recon_train, _, _ = model(x_train.to(device))

plt.figure(figsize=(10, 5))
plt.hist(x_train.cpu().numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='blue', label='Train Data')
plt.hist(recon_train.cpu().numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='green', label='Reconstructed Train Data')
plt.title('Train Data vs Reconstructed Train Data')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

Amostra do VAE tentando reproduzir o dataset de teste (dados normalizados)

In [None]:
with torch.no_grad():
    recon_test, _, _ = model(x_test.to(device))

plt.figure(figsize=(10, 5))
plt.hist(x_test.cpu().numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='red', label='Test Data')
plt.hist(recon_test.cpu().numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='green', label='Reconstructed Test Data')
plt.title('Test Data vs Reconstructed Test Data')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

Amostra do VAE tentando gerar novos dados que seguem a distribuição do input (dados normalizados)

In [None]:
with torch.no_grad():
    z = torch.randn(tensor_data.size(0), best_params['z_dim']).to(device)  # Gerar amostras com o mesmo tamanho dos dados de entrada
    generated_data = model.decoder(z)

plt.figure(figsize=(10, 5))
plt.hist(tensor_data.numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='blue', label='Input Data')
plt.hist(generated_data.cpu().numpy().flatten(), bins=80, range=[-3, 3], histtype='step', color='red', label='Generated Data')
plt.title('Input Data vs Generated Data')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

In [None]:
import scipy.stats as stats
# Teste de Kolmogorov-Smirnov
train_recon_ks = stats.ks_2samp(x_train.numpy().flatten(), recon_train.numpy().flatten())
test_recon_ks = stats.ks_2samp(x_test.numpy().flatten(), recon_test.numpy().flatten())
input_generated_ks = stats.ks_2samp(tensor_data.numpy().flatten(), generated_data.numpy().flatten())

print(f"KS test for Train vs. Reconstructed Train: statistic={train_recon_ks.statistic}, p-value={train_recon_ks.pvalue}")
print(f"KS test for Test vs. Reconstructed Test: statistic={test_recon_ks.statistic}, p-value={test_recon_ks.pvalue}")
print(f"KS test for Input vs. Generated: statistic={input_generated_ks.statistic}, p-value={input_generated_ks.pvalue}")

Abaixo, iremos desnormalizar os dados para gerar plots com a distribuição original, retomando a problemática da distribuição dos momentos tranversais dos píons e dos kaons.

In [None]:
# Desnormalizar os dados de treino, validação e teste
x_train_denorm = torch.clone(x_train.cpu())
x_test_denorm = torch.clone(x_test.cpu())
for i in range(nVar):
    x_train_denorm[i] = denormalize(x_train_denorm, i)
    x_test_denorm[i] = denormalize(x_test_denorm, i)

# Desnormalizar os dados reconstruídos e gerados
with torch.no_grad():
    # Reconstrução dos dados de treino
    recon_train, _, _ = model(x_train.to(device))
    for i in range(nVar):
        recon_train[i] = denormalize(recon_train.cpu(), i)

    # Reconstrução dos dados de teste
    recon_test, _, _ = model(x_test.to(device))
    for i in range(nVar):
        recon_test[i] = denormalize(recon_test.cpu(), i)

    # Geração de novos dados
    z = torch.randn(tensor_data.size(0), best_params['z_dim']).to(device)
    generated_data = model.decoder(z)
    for i in range(nVar):
        generated_data[i] = denormalize(generated_data.cpu(), i)

# Desnormalizar os dados de entrada (linha azul)
tensor_data_denorm = torch.clone(tensor_data)
for i in range(nVar):
    tensor_data_denorm[i] = denormalize(tensor_data, i)

Amostra do VAE tentando reproduzir o dataset de treino (dados desnormalizados)

In [None]:
plt.figure(figsize=(10, 5))
plt.hist(x_train_denorm.numpy().flatten(), bins=80, range=[0, 4], histtype='step', color='blue', label='Train Data')
plt.hist(recon_train.numpy().flatten(), bins=80, range=[0, 4], histtype='step', color='green', label='Reconstructed Train Data')
plt.title('Train Data vs Reconstructed Train Data')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

Amostra do VAE tentando reproduzir o dataset de teste (dados desnormalizados)

In [None]:
plt.figure(figsize=(10, 5))
plt.hist(x_test_denorm.numpy().flatten(), bins=80, range=[0, 4], histtype='step', color='red', label='Test Data')
plt.hist(recon_test.numpy().flatten(), bins=80, range=[0, 4], histtype='step', color='green', label='Reconstructed Test Data')
plt.title('Test Data vs Reconstructed Test Data')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

Amostra do VAE tentando gerar novos dados que seguem a mesma distribuição do input (dados desnormalizados)

In [None]:
# PLOTS FUNCTION

def subplot_trk(data, generated, trk):
    if trk == "trk1":
        id_list = [0, 2, 4, 6]
        name_list = ["Px1", "Py1", "Pz1", "E1"]
    if trk == "trk2":
        id_list = [1, 3, 5, 7]
        name_list = ["Px2", "Py2", "Pz2", "E2"]
    if trk == "both":
        px = data[(0,1), :].flatten()
        py = data[(2,3), :].flatten()
        pz = data[(4,5), :].flatten()
        E = data[(6,7), :].flatten()
        data = np.zeros([4, len(px)])
        data[0][:] = px
        data[1][:] = py
        data[2][:] = pz
        data[3][:] = E
        px = generated[(0,1), :].flatten()
        py = generated[(2,3), :].flatten()
        pz = generated[(4,5), :].flatten()
        E = generated[(6,7), :].flatten()
        generated = np.zeros([4, len(px)])
        generated[0][:] = px
        generated[1][:] = py
        generated[2][:] = pz
        generated[3][:] = E
        id_list = [0, 1, 2, 3]
        name_list = ["Px1+Px2", "Py1+Py2", "Pz1+Pz2", "E1+E2"]
    range_list = [[-4, 4], [-4, 4], [-4, 4], [0, 4]]

    plt.figure(figsize=(10,10))

    for i in range(len(id_list)):
        plt.subplot(2, 2, i+1)
        plt.hist(data[id_list[i]].flatten(), bins=80, range=range_list[i], histtype='step', color='blue', label='Input Data')
        plt.hist(generated[id_list[i]].flatten(), bins=80, range=range_list[i], histtype='step', color='red', label='Generated Data')
        plt.title(f'Input Data vs Generated Data ({name_list[i]})')
        plt.xlabel('Value')
        plt.ylabel('Frequency')
        plt.legend()

    return plt.show()

In [None]:
def test_plot(data, generated):
    generated_px = generated[0] + generated[1]
    generated_py = generated[2] + generated[3]
    generated_pz = generated[4] + generated[5]
    generated_E = generated[6] + generated[7]
    generated_term = generated_E**2 - generated_px**2 - generated_py**2 - generated_pz**2
    my_linspace = np.linspace(-10, 10, int(92269))
    
    plt.figure(figsize=(20, 10))
    n_list = [5, 30, 150, 1000, 10000, 92269]
    plt.title("E^2 - p^2")
    for i in range(6):
        plt.subplot(2, 3, i+1)
        plt.scatter(my_linspace[0:n_list[i]], generated_term[0:n_list[i]], color='red')
        plt.plot(my_linspace[0:n_list[i]], np.zeros(92269)[0:n_list[i]], color='black')
    return plt.show()

In [None]:
if type(tensor_data_denorm) != type(np.array([])):
    tensor_data_denorm = tensor_data_denorm.numpy()
if tensor_data_denorm.shape == tensor_data.numpy().shape:
    tensor_data_denorm = np.transpose(tensor_data_denorm)

if type(generated_data) != type(np.array([])):
    generated_data = generated_data.numpy()
if generated_data.shape == tensor_data.numpy().shape:
    generated_data = np.transpose(generated_data)

plt.figure(figsize=(10, 5))
plt.hist(tensor_data_denorm.flatten(), bins=80, range=[-4, 4], histtype='step', color='blue', label='Input Data')
plt.hist(generated_data.flatten(), bins=80, range=[-4, 4], histtype='step', color='red', label='Generated Data')
plt.title('Input Data vs Generated Data (All vars, trk1+trk2)')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()
plt.show()

subplot_trk(tensor_data_denorm, generated_data, "trk1")
subplot_trk(tensor_data_denorm, generated_data, "trk2")
subplot_trk(tensor_data_denorm, generated_data, "both")
test_plot(tensor_data_denorm, generated_data)

Abaixo, é rodado um teste de Kolmogorov-Smirnov para analisar a proximidade dos dois hitogramas em cada gráfico (apenas para as distribuições desnormalizadas). Uma das métricas de interesse é o p-valor.

P-Valor Significativo: um p-valor abaixo de 0.05 é frequentemente considerado significativo, o que indica que a hipótese nula (de que as distribuições são iguais) pode ser rejeitada.

P-Valor Não Significativo: Um p-valor acima de 0.05 geralmente sugere que não há evidência suficiente para rejeitar a hipótese nula. Valores como 0.10, 0.15, ou mais altos são interpretados como indicando uma falta de diferença significativa entre as distribuições.

P-Valor > 0.05: Indica que não há uma diferença significativa entre as distribuições comparadas. Esse é um bom sinal, sugerindo que as distribuições dos dados gerados ou reconstruídos são semelhantes aos dados reais.

P-Valor > 0.10 ou 0.15: Indica uma forte semelhança entre as distribuições, o que é ótimo.

In [None]:
import scipy.stats as stats
# Teste de Kolmogorov-Smirnov
train_recon_ks = stats.ks_2samp(x_train_denorm.flatten(), recon_train.flatten())
test_recon_ks = stats.ks_2samp(x_test_denorm.flatten(), recon_test.flatten())
input_generated_ks = stats.ks_2samp(tensor_data_denorm.flatten(), generated_data.flatten())

print(f"KS test for Train vs. Reconstructed Train: statistic={train_recon_ks.statistic}, p-value={train_recon_ks.pvalue}")
print(f"KS test for Test vs. Reconstructed Test: statistic={test_recon_ks.statistic}, p-value={test_recon_ks.pvalue}")
print(f"KS test for Input vs. Generated: statistic={input_generated_ks.statistic}, p-value={input_generated_ks.pvalue}")