In [1]:
import os
import random

import numpy as np
import torch
from src.test_functions import NeuralNetworkOneLayer
from tqdm.notebook import tqdm

os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = ".80"

import pickle

# Sample Data

In [21]:
# # Read data
# sample_file = "./InvertedDoublePendulum-v4_samples200000.pkl"
# save_name = "InvertedDoublePendulum-v4"
# # Reward: 6.5 ~ 9.5, Err: 0.05

# sample_file = "./HalfCheetah-v4_samples2000000.pkl"
# save_name = "HalfCheetah-v4"
# # Reward: -56 ~ 1.28  , Err: 50

sample_file = "./Swimmer-v4_samples2000000.pkl"
save_name = "Swimmer-v4"
# # Reward: -3.5 ~ 3.5, Err: 0.3

# sample_file = "Hopper-v4_samples2000000.pkl"
# save_name = "Hopper-v4"
# Reward: -1.8746137439401072 , 3.90962180717704, Err: 0.21



with open(sample_file, "rb") as fp:
    sample_dict = pickle.load(fp)

observations = sample_dict["observation"]
actions = sample_dict["actions"]
rewards = sample_dict["rewards"]

In [22]:
# Function to scale the values into bounds
BOUND = 9.9999


def scale_to_bounds(lb, ub, bound):
    scale = bound / np.maximum(np.abs(lb), np.abs(ub))
    return scale

In [None]:
# Scale the observations/actions to uniform bounds
lb, ub = np.min(actions), np.max(actions)
print(f"Bounds for actions are: lb = {lb}, ub = {ub}")


lb, ub = np.min(observations), np.max(observations)
print(f"Bounds for observations are: lb = {lb}, ub = {ub}")
scale = scale_to_bounds(lb, ub, BOUND)
observations *= scale
lb, ub = np.min(observations), np.max(observations)
print(f"Bounds after scaling are: lb = {lb}, ub = {ub}")

In [24]:
# Create sample array
xs = np.hstack((actions, observations))
ys = -rewards

print("Collected samples are in the shape of: ", xs.shape, ys.shape)
print(f"Rewards range: {np.min(rewards)} , {np.max(rewards)}")

Collected samples are in the shape of:  (2000000, 10) (2000000,)
Rewards range: -3.7211230267490145 , 3.4389165232368497


# Neural Network

In [25]:
# Define variables
hidden_dims = 16
num_epochs = 100
batch_size = 1000
learning_rate = 0.0001
use_device = "cuda:0"  # "cuda:0" or "cpu"

In [26]:
# Setup NN 
input_dims = xs.shape[-1]
num_samples = xs.shape[0]
bounds = np.array([[-10.0, 10.0]]*input_dims)


# Create a neural network with one hidden layer
nn = NeuralNetworkOneLayer(dims=input_dims, domain=bounds, hidden_dims=hidden_dims)

criteria = torch.nn.MSELoss()
optimizer = torch.optim.Adam(nn.model.parameters(), lr=learning_rate)

try:
    device = torch.device(use_device)
except:
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
nn.model.to(device)
nn.model.train()

print(f"Training the neural network for {num_epochs} epochs")
for epoch in tqdm(range(num_epochs)):
    shuffled_indices = np.random.permutation(num_samples)
    xs_shuffled = xs[shuffled_indices]
    ys_shuffled = ys[shuffled_indices]

    for i in range(0, num_samples, batch_size):
        x = torch.FloatTensor(xs_shuffled[i : i + batch_size])
        y = torch.FloatTensor(ys_shuffled[i : i + batch_size])

        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad()
        y_pred = nn.model(x)
        loss = criteria(y_pred, y)
        loss.backward()
        optimizer.step()
        if i == 0:
            print(f"Loss at step {epoch} = {loss.item()}")

In [None]:
# Evalute the neural network
nn.model.eval()
losses = []
for i in range(0, num_samples, batch_size):
    x = torch.FloatTensor(xs[i : i + batch_size])
    y = torch.FloatTensor(ys[i : i + batch_size])

    x = x.to(device)
    y = y.to(device)

    with torch.no_grad():
        y_pred = nn.model(x)
    loss = criteria(y_pred, y)
    losses.append(loss.item())

print(f"Mean loss: {np.mean(losses)}")
print(f"Max/Min loss: {np.max(losses)} , {np.min(losses)}")

In [29]:
# Save the model
print("Saving the model")
model_info = {
    "input_dims": input_dims,
    "hidden_dims": hidden_dims,
    "test_function": save_name,
    "bounds": bounds,
    "state_dict": nn.model.state_dict(),
}
torch.save(
    model_info,
    f"src/nn_models/nn_one_layer_{save_name}_{input_dims}_{hidden_dims}.pt",
)

Saving the model


# Test loading model

In [2]:
model_path = "src/nn_models/nn_one_layer_InvertedDoublePendulum-v4_12_16.pt"

device = "cuda:0"

model_info = torch.load(model_path, map_location=device)

In [3]:
model_info

{'input_dims': 12,
 'hidden_dims': 16,
 'test_function': 'InvertedDoublePendulum-v4',
 'bounds': array([-10.,  10., -10.,  10., -10.,  10., -10.,  10., -10.,  10., -10.,
         10., -10.,  10., -10.,  10., -10.,  10., -10.,  10., -10.,  10.,
        -10.,  10.]),
 'state_dict': OrderedDict([('0.weight',
               tensor([[ 1.4677e-01, -2.4173e-02,  2.1543e-01,  2.5001e-02, -5.1423e-02,
                        -2.0905e-03,  2.1313e-02, -2.8338e-02,  2.8718e-03,  9.0760e-02,
                        -1.2921e-01,  1.6068e-01],
                       [ 9.8252e-02,  2.1203e-01,  7.9603e-03,  2.3247e-01, -3.1687e-01,
                        -2.1037e-01,  1.0191e-01,  1.7402e-01, -5.2624e-02, -1.1091e-01,
                        -2.7850e-02, -3.3848e-02],
                       [-7.3925e-02, -6.5786e-02, -5.5284e-02, -1.7577e-01,  2.6927e-01,
                         2.0940e-01,  2.1483e-01, -1.6511e-01, -9.9904e-02, -1.1539e-01,
                        -4.2368e-02,  6.8518e-02],
      