In [None]:
import os
import json
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, models
from torch.nn.utils.rnn import pad_sequence
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image
import random


In [None]:

# Load the .npy file to inspect its contents
trajectories_data = np.load('trajectories_3.npy', allow_pickle=True)
print(type(trajectories_data))
print(trajectories_data.shape)
print(trajectories_data)

In [None]:

# Step 1: Define the Resnet_Visual_Encoder function
class ResNet18VisualEncoder(nn.Module):
    def __init__(self):
        super(ResNet18VisualEncoder, self).__init__()
        self.resnet18 = models.resnet18(pretrained=False)
        # Replace the global max pooling with softmax pooling
        self.resnet18.avgpool = nn.AdaptiveAvgPool2d(1)
        self.softmax_pooling = nn.Softmax(dim=1)
        # Remove the final fully connected layer
        self.resnet18 = nn.Sequential(*list(self.resnet18.children())[:-1])

    def forward(self, x):
        x = self.resnet18(x)
        x = self.softmax_pooling(x)
        return x

def load_images(folder):
    images = []
    transform = transforms.Compose([transforms.Resize((100, 100)), transforms.ToTensor()])
    for filename in os.listdir(folder):
        if filename.endswith('.png'):
            img = Image.open(os.path.join(folder, filename)).convert('RGB')
            img = transform(img)
            images.append(img)
    return torch.stack(images)

def Resnet_Visual_Encoder():
    encoder = ResNet18VisualEncoder()
    images = load_images('./maze_maps3')
    latent_embeddings = encoder(images)
    return latent_embeddings


In [None]:

# Step 2: Define the Pathl function
def Pathl(data):
    trajectories = data['trajectories']
    lengths = [len(traj) for traj in trajectories]
    return lengths


In [None]:

# Step 3: Define the adding_Gaussian_noise function
def cosine_noise_scheduler(T, beta_start=0.0001, beta_end=0.02):
    return np.cos((np.linspace(0, T, T) + 0.008) / (1.008) * np.pi * 0.5) ** 2 * (beta_end - beta_start) + beta_start

def adding_Gaussian_noise(lengths, trajectories):
    noisy_trajectories = []
    T = 1000  # Number of diffusion steps
    beta_schedule = cosine_noise_scheduler(T)
    for i, length in enumerate(lengths):
        trajectory = trajectories[i]
        noise = np.random.normal(0, 1, (length, 2))
        noisy_trajectory = trajectory + noise * beta_schedule[:length, None]
        noisy_trajectories.append(noisy_trajectory)
    return noisy_trajectories


In [None]:
# Step 4: Define the noise_prediction_network function
class NoisePredictionNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(NoisePredictionNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_noise_prediction_network(latent_embeddings, start_goal, noisy_trajectories, actual_noises, epochs=2, lr=0.1):
    model = NoisePredictionNetwork(input_dim=latent_embeddings.shape[1] + 4, output_dim=2)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        for i in range(len(noisy_trajectories)):
            # Flatten latent_embeddings to match the dimensions of start_goal
            latent_embedding = latent_embeddings[i].view(latent_embeddings[i].size(0), -1).squeeze()
            inputs = torch.cat((latent_embedding, start_goal[i].flatten()), dim=-1)
            predictions = model(inputs)
            loss = criterion(predictions, actual_noises[i])
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

    return model


In [None]:
# Step 5: Define the DiPPeR model
class DiPPeR(nn.Module):
    def __init__(self, noise_prediction_network):
        super(DiPPeR, self).__init__()
        self.noise_prediction_network = noise_prediction_network

    def forward(self, O, noisy_trajectory, path_length):
        denoised_trajectory = noisy_trajectory
        for t in reversed(range(path_length)):
            noise_pred = self.noise_prediction_network(O)
            denoised_trajectory = denoised_trajectory - noise_pred
        return denoised_trajectory



In [None]:

# Load data and create model
latent_embeddings = Resnet_Visual_Encoder()
trajectories_data = np.load('trajectories_3.npy', allow_pickle=True)

print("Type of loaded data: ", type(trajectories_data))
print("Shape of loaded data: ", trajectories_data.shape)

# Assuming the data is a list of dictionaries
trajectories = [data['trajectory'] for data in trajectories_data]
start_goal = [torch.tensor([data['start'], data['goal']]) for data in trajectories_data]
lengths = Pathl({'trajectories': trajectories})

# Adding Gaussian noise to the trajectories
noisy_trajectories = adding_Gaussian_noise(lengths, trajectories)

# Ensure both lists have the same length
min_length = min(len(noisy_trajectories), len(trajectories))
noisy_trajectories = noisy_trajectories[:min_length]
trajectories = trajectories[:min_length]

# Convert lists to tensors and pad/truncate to ensure same dimensions
padded_noisy_trajectories = pad_sequence([torch.tensor(traj) for traj in noisy_trajectories], batch_first=True)
padded_trajectories = pad_sequence([torch.tensor(traj) for traj in trajectories], batch_first=True)

# Convert lists to tensors
noisy_trajectories_tensor = padded_noisy_trajectories
trajectories_tensor = padded_trajectories

# Perform element-wise subtraction to get actual_noises
actual_noises = noisy_trajectories_tensor - trajectories_tensor

# Convert start_goal to a tensor
start_goal = torch.stack(start_goal)

print("Actual noises: ", actual_noises)
print("Start and Goal: ", start_goal)

# Train the noise prediction network
noise_prediction_model = train_noise_prediction_network(latent_embeddings, start_goal, noisy_trajectories_tensor, actual_noises)

# Create and use the DiPPeR model
dipper_model = DiPPeR(noise_prediction_model)
final_trajectories = [dipper_model(latent_embeddings[i], noisy_trajectories_tensor[i], lengths[i]) for i in range(len(noisy_trajectories_tensor))]


In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, models
from torch.nn.utils.rnn import pad_sequence
from PIL import Image

# Load the .npy file to inspect its contents
trajectories_data = np.load('trajectories_3.npy', allow_pickle=True)
print("Loaded trajectories_data shape:", trajectories_data.shape)

# Step 1: Define the ResNet18VisualEncoder class
class ResNet18VisualEncoder(nn.Module):
    def __init__(self):
        super(ResNet18VisualEncoder, self).__init__()
        self.resnet18 = models.resnet18(pretrained=False)
        # Replace the global max pooling with softmax pooling
        self.resnet18.avgpool = nn.AdaptiveAvgPool2d(1)
        self.softmax_pooling = nn.Softmax(dim=1)
        # Remove the final fully connected layer
        self.resnet18 = nn.Sequential(*list(self.resnet18.children())[:-1])

    def forward(self, x):
        x = self.resnet18(x)
        x = self.softmax_pooling(x)
        return x

def load_images(folder):
    images = []
    transform = transforms.Compose([transforms.Resize((100, 100)), transforms.ToTensor()])
    for filename in os.listdir(folder):
        if filename.endswith('.png'):
            img = Image.open(os.path.join(folder, filename)).convert('RGB')
            img = transform(img)
            images.append(img)
    return torch.stack(images)

def Resnet_Visual_Encoder():
    encoder = ResNet18VisualEncoder()
    images = load_images('./maze_maps3')
    latent_embeddings = encoder(images)
    return latent_embeddings

# Step 2: Define the Pathl function
def Pathl(data):
    trajectories = data['trajectories']
    lengths = [len(traj) for traj in trajectories]
    return lengths

# Step 3: Define the adding_Gaussian_noise function
def cosine_noise_scheduler(T, beta_start=0.0001, beta_end=0.02):
    return np.cos((np.linspace(0, T, T) + 0.008) / (1.008) * np.pi * 0.5) ** 2 * (beta_end - beta_start) + beta_start

def adding_Gaussian_noise(lengths, trajectories):
    noisy_trajectories = []
    T = 1000  # Number of diffusion steps
    beta_schedule = cosine_noise_scheduler(T)
    for i, length in enumerate(lengths):
        trajectory = trajectories[i]
        noise = np.random.normal(0, 1, (length, 2))
        noisy_trajectory = trajectory + noise * beta_schedule[:length, None]
        noisy_trajectories.append(noisy_trajectory)
    return noisy_trajectories

# Step 4: Define the noise_prediction_network function
import torch
import torch.nn as nn
import torch.optim as optim

class NoisePredictionNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(NoisePredictionNetwork, self).__init__()
        # Adjust the input dimensions to match the concatenated input
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def train_noise_prediction_network(latent_embeddings, start_goal, noisy_trajectories, actual_noises, epochs=2, lr=0.01):
    model = NoisePredictionNetwork(input_dim=latent_embeddings.shape[1] + 4, output_dim=2)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        for i in range(len(noisy_trajectories)):
            # Flatten latent_embeddings to match the dimensions of start_goal
            latent_embedding = latent_embeddings[i].view(-1)  # Flatten to 1D tensor
            start_goal_flat = start_goal[i].view(-1)  # Flatten start_goal to 1D tensor
            inputs = torch.cat((latent_embedding, start_goal_flat), dim=0).float()  # Convert to float

            # Make inputs compatible with the model
            inputs = inputs.unsqueeze(0)  # Add batch dimension

            predictions = model(inputs)

            # Ensure predictions and actual_noises have the same shape for loss computation
            actual_noise = actual_noises[i].view(predictions.shape).float()  # Reshape to match predictions
            print(f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(noisy_trajectories)}], Predictions shape: {predictions.shape}, Actual noise shape: {actual_noise.shape}")

            loss = criterion(predictions, actual_noise)  # Convert to float if needed

            optimizer.zero_grad()
            loss.backward()  # No need to retain the graph
            optimizer.step()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

    return model

# Assuming your data is already defined and properly shaped
latent_embeddings = torch.rand(5, 512, 1, 1)  # Example latent embeddings
start_goal = torch.rand(5, 2, 2)  # Example start and goal positions
noisy_trajectories_tensor = torch.rand(5, 191, 2)  # Example noisy trajectories
actual_noises = torch.rand(5, 191, 2)  # Example actual noises

# Ensure the number of elements matches across tensors
min_length = min(len(latent_embeddings), len(start_goal), len(noisy_trajectories_tensor), len(actual_noises))
latent_embeddings = latent_embeddings[:min_length]
start_goal = start_goal[:min_length]
noisy_trajectories_tensor = noisy_trajectories_tensor[:min_length]
actual_noises = actual_noises[:min_length]

print(f"latent_embeddings shape: {latent_embeddings.shape}")
print(f"start_goal shape: {start_goal.shape}")
print(f"noisy_trajectories_tensor shape: {noisy_trajectories_tensor.shape}")
print(f"actual_noises shape: {actual_noises.shape}")

assert latent_embeddings.shape[0] == start_goal.shape[0] == noisy_trajectories_tensor.shape[0] == actual_noises.shape[0], "Batch size mismatch!"

# Train the noise prediction network
noise_prediction_model = train_noise_prediction_network(latent_embeddings, start_goal, noisy_trajectories_tensor, actual_noises)


# Step 5: Define the DiPPeR model
class DiPPeR(nn.Module):
    def __init__(self, noise_prediction_network):
        super(DiPPeR, self).__init__()
        self.noise_prediction_network = noise_prediction_network

    def forward(self, O, noisy_trajectory, path_length):
        denoised_trajectory = noisy_trajectory
        for t in reversed(range(path_length)):
            noise_pred = self.noise_prediction_network(O)
            denoised_trajectory = denoised_trajectory - noise_pred
        return denoised_trajectory

# Load data and create model
latent_embeddings = Resnet_Visual_Encoder().float()
trajectories_data = np.load('trajectories_3.npy', allow_pickle=True)

print("Type of loaded data: ", type(trajectories_data))
print("Shape of loaded data: ", trajectories_data.shape)

# Assuming the data is a list of dictionaries
trajectories = [data['trajectory'] for data in trajectories_data]
start_goal = [torch.tensor([data['start'], data['goal']]).float() for data in trajectories_data]
lengths = Pathl({'trajectories': trajectories})

# Adding Gaussian noise to the trajectories
noisy_trajectories = adding_Gaussian_noise(lengths, trajectories)

# Ensure both lists have the same length
min_length = min(len(noisy_trajectories), len(trajectories), latent_embeddings.shape[0])
noisy_trajectories = noisy_trajectories[:min_length]
trajectories = trajectories[:min_length]

# Convert lists to tensors and pad/truncate to ensure same dimensions
padded_noisy_trajectories = pad_sequence([torch.tensor(traj) for traj in noisy_trajectories], batch_first=True).float()
padded_trajectories = pad_sequence([torch.tensor(traj) for traj in trajectories], batch_first=True).float()

# Convert lists to tensors
noisy_trajectories_tensor = padded_noisy_trajectories[:min_length]
trajectories_tensor = padded_trajectories[:min_length]

# Perform element-wise subtraction to get actual_noises
actual_noises = noisy_trajectories_tensor - trajectories_tensor

# Convert start_goal to a tensor and ensure correct batch size
start_goal = torch.stack(start_goal)[:min_length]

# Check shapes before training
print(f"latent_embeddings shape: {latent_embeddings[:min_length].shape}")
print(f"start_goal shape: {start_goal.shape}")
print(f"noisy_trajectories_tensor shape: {noisy_trajectories_tensor.shape}")
print(f"actual_noises shape: {actual_noises.shape}")

# Ensure the number of elements matches across tensors
assert latent_embeddings[:min_length].shape[0] == start_goal.shape[0] == noisy_trajectories_tensor.shape[0] == actual_noises.shape[0], "Batch size mismatch!"

# Train the noise prediction network
noise_prediction_model = train_noise_prediction_network(latent_embeddings[:min_length], start_goal, noisy_trajectories_tensor, actual_noises)

# Create and use the DiPPeR model
dipper_model = DiPPeR(noise_prediction_model)
final_trajectories = [dipper_model(latent_embeddings[i], noisy_trajectories_tensor[i], lengths[i]) for i in range(len(noisy_trajectories_tensor))]


  warn(


Loaded trajectories_data shape: (10,)


NameError: name 'latent_embeddings' is not defined