In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

import json

from utilities.jsonRW import *
from utilities.plots import *

In [37]:
def procesar_json(algorithm, shape):
    json_file_path = get_json_file_path(algorithm, shape)

    # Load and parse the JSON data
    with open(json_file_path, "r") as json_file:
        agent_data = json.load(json_file)

    families_dict = {}
    count = 0

    # Access the data for each agent entry
    for entry in agent_data.get("agents_data", []):
        count += 1

        id = entry["id"]
        algorithm = entry["algorithm"]
        shape = entry["shape"]
        episodes = entry["episodes"]
        max_steps = entry["max_steps"]

        starting_position = tuple(entry["starting_position"])

        string_policy_grid = np.array(entry["string_policy_grid"])
        value_grid = np.array(entry["value_grid"])
        policy_grid = np.array(entry["policy_grid"])


        sy_values = []
        sx_values = []
        a_values = []

        for y in range(len(policy_grid)):
            for x in range(len(policy_grid[0])):
                #En sy y sx tengo las coordenadas de cada estado
                sy_values.append(float(y))
                sx_values.append(float(x))
                a_values.append(float(policy_grid[y][x]))

        """#vamos a printear el resultado de los arrays
        for i in range(len(sy_values)):
            print("[", sy_values[i], ",",sx_values[i], "] --> ", a_values[i])
        
        break"""

        # Si la familia ya existe en el diccionario, simplemente agrega los valores
        if starting_position in families_dict:
            """            families_dict[starting_position]["sy_values"].extend(sy_values)
            families_dict[starting_position]["sx_values"].extend(sx_values)
            families_dict[starting_position]["a_values"].extend(a_values)"""
            families_dict[starting_position]["sy_values"].append(sy_values)
            families_dict[starting_position]["sx_values"].append(sx_values)
            families_dict[starting_position]["a_values"].append(a_values)
        
        else:
            # Si la familia no existe, crea una nueva entrada en el diccionario
            families_dict[starting_position] = {
                "sy_values": [sy_values],
                "sx_values": [sx_values],
                "a_values": [a_values]
            }

    return families_dict, count


algorithms = ["DQN", "DDQN", "Q-Learning"]
shapes = ["5x5", "14x14"]

for shape in shapes:
    for algorithm in algorithms:
        #train_inputs, train_labels = procesar_json(json_path)
        families_dict, count = procesar_json(algorithm, shape)

        totalY, totalX, totalA = 0, 0, 0
        #print the length of each key
        for key in families_dict:
            totalY += len(families_dict[key]["sy_values"])
            totalX += len(families_dict[key]["sx_values"])
            totalA += len(families_dict[key]["a_values"])
            #print(key)
            #print(len(return_dict[key]["sy_values"]))
            #print(len(return_dict[key]["sx_values"]))
            #print(len(return_dict[key]["a_values"]))

        if totalY == totalX and totalX == totalA and totalA == count:
            print(shape, "---", algorithm, ": ", count)
        else:
            print(shape, "---", algorithm, ": ERROR")
            
#print(return_dict[(7,0)]["sy_values"])

# Ahora, train_inputs y train_labels contienen los datos en formato de tensores para entrenar tus modelos.

5x5 --- DQN :  64
5x5 --- DDQN :  65
5x5 --- Q-Learning :  180
14x14 --- DQN :  66
14x14 --- DDQN :  38
14x14 --- Q-Learning :  50


In [None]:
# Define tu arquitectura de red neuronal
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, x):
        return torch.softmax(self.fc(x), dim=-1)

# Función para entrenar un modelo por familia de trayectorias
def train_model(data_loader, input_size, output_size, num_epochs=10, lr=0.001):
    model = PolicyNetwork(input_size, output_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        for inputs, labels in data_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    return model

# Supongamos que tienes datos de entrenamiento en forma de torch tensors 'train_inputs' y 'train_labels'
# Crea DataLoader para cada familia de trayectorias
# train_inputs y train_labels deben contener tus datos de entrenamiento para cada familia

# Ejemplo:
train_inputs_family1, train_labels_family1 = torch.Tensor(...), torch.LongTensor(...)
train_dataset_family1 = TensorDataset(train_inputs_family1, train_labels_family1)
train_loader_family1 = DataLoader(train_dataset_family1, batch_size=64, shuffle=True)

# Entrena un modelo para la familia de trayectorias 1
input_size = ...  # Define el tamaño de entrada según tus datos
output_size = ...  # Define el tamaño de salida según tus datos
model_family1 = train_model(train_loader_family1, input_size, output_size)

In [None]:
# Función para combinar modelos individuales en un modelo global
def combine_models(models):
    class CombinedModel(nn.Module):
        def __init__(self, models):
            super(CombinedModel, self).__init__()
            self.models = nn.ModuleList(models)

        def forward(self, x):
            outputs = [model(x) for model in self.models]
            return torch.mean(torch.stack(outputs), dim=0)

    return CombinedModel(models)

# Combina los modelos individuales en un modelo global
global_model = combine_models([model_family1, model_family2, ...])  # Agrega todos los modelos necesarios

In [None]:
# Función para capturar la incertidumbre utilizando Monte Carlo
def capture_uncertainty(model, state, num_samples=100):
    outputs = torch.zeros((num_samples, state.size(0), model.fc.out_features))

    for i in range(num_samples):
        outputs[i, :, :] = model(state)

    uncertainty = torch.var(outputs, dim=0).mean(dim=-1)
    return uncertainty

# Supongamos que tienes un estado 'sample_state' para el cual deseas capturar la incertidumbre
sample_state = torch.Tensor(...)  # Reemplaza ... con tu estado

# Captura la incertidumbre para el estado dado utilizando Monte Carlo
uncertainty = capture_uncertainty(global_model, sample_state)
print("Uncertainty:", uncertainty)