<a href="https://colab.research.google.com/github/mz-zarei/Trajectory_Analysis/blob/main/TrajCNNGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Import Libraries - Select Device**

In [107]:
import torch
from torchvision import transforms, datasets
import torch.nn as nn
from torch import optim as optim
import torch.utils.data as data_utils
import numpy as np 
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BS_size = 50
print(device)

cuda


# **Load Data**

In [108]:
# Load preprocessed dataset
preprocessed_data = np.load("/content/drive/MyDrive/Trajectory_Analysis/Interaction/preprocessed_data_rot_360.npy")
# Seperate trajectories to x(first 10 frames) and y(last 10 frames)
x_data = preprocessed_data[:,:,:,:10]
y_data = preprocessed_data[:,:2,:,10:]
# Split train/validation/test sets
X_train, X_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.25, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=42)
# Train data loader
train_data = data_utils.TensorDataset(torch.Tensor(X_train).float(), torch.Tensor(y_train).float())
train_loader = data_utils.DataLoader(train_data, batch_size=BS_size, shuffle=True)
# Test data loader
test_data = data_utils.TensorDataset(torch.Tensor(X_test).float(), torch.Tensor(y_test).float())
test_loader = data_utils.DataLoader(test_data, batch_size=BS_size, shuffle=True)
# Validation data loader
val_data = data_utils.TensorDataset(torch.Tensor(X_val).float(), torch.Tensor(y_val).float())
val_loader = data_utils.DataLoader(val_data, batch_size=BS_size, shuffle=True)


# **Define Generator and Discriminator**

In [98]:
class GeneratorModel(nn.Module):
    def __init__(self, start_filter_num):
        '''
        Initializer function
        args: Training arguments
        infer: Training or test time (true if test time)
        '''
        super(GeneratorModel, self).__init__()
        self.start_filter_num = start_filter_num 

        self.Vconv1 = self.v_conv_layer_set(4, self.start_filter_num)
        self.Vconv2 = self.v_conv_layer_set(self.start_filter_num, self.start_filter_num*2)
        self.Vconv3 = self.v_conv_layer_set(self.start_filter_num*2, self.start_filter_num*4)
        self.Vconv4 = self.v_conv_layer_set(self.start_filter_num*4, self.start_filter_num*8)
        self.Vconv5 = self.v_conv_layer_set(self.start_filter_num*8, self.start_filter_num*16)
        self.Vconv6 = self.v_conv_layer_set(self.start_filter_num*16, self.start_filter_num*32)

        self.Hconv1 = self.h_conv_layer_set(4, self.start_filter_num)
        self.Hconv2 = self.h_conv_layer_set(self.start_filter_num, self.start_filter_num*2)
        self.Hconv3 = self.h_conv_layer_set(self.start_filter_num*2, self.start_filter_num*4)
        self.Hconv4 = self.h_conv_layer_set(self.start_filter_num*4, self.start_filter_num*8)
        self.Hconv5 = self.h_conv_layer_set(self.start_filter_num*8, self.start_filter_num*16)
        self.Hconv6 = self.h_conv_layer_set(self.start_filter_num*16, self.start_filter_num*32)

        
        
        self.fc1 = nn.Linear(self.start_filter_num*32*2, 2)

    def v_conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(3, 1), padding=(1,0)),
        nn.BatchNorm2d(out_c),
        nn.LeakyReLU()
        )
        return conv_layer

    def h_conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(1, 3), padding=(0,1)),
        nn.BatchNorm2d(out_c),
        nn.LeakyReLU()
        )
        return conv_layer


    def forward(self, z, x):
        """
        z: uniform noise [0,1] with shape of (batch,1,36,10)
        x: input with shape of (batch,3,36,10)
        """
        input = torch.cat([z,x], 1)                # -> batch_size * (1+3=4) * 36 * 10

        Voutput = self.Vconv1(input)        
        Voutput = self.Vconv2(Voutput)  
        Voutput = self.Vconv3(Voutput)  
        Voutput = self.Vconv4(Voutput)  
        Voutput = self.Vconv5(Voutput)  
        Voutput = self.Vconv6(Voutput)  

        Houtput = self.Hconv1(input)        
        Houtput = self.Hconv2(Houtput)  
        Houtput = self.Hconv3(Houtput)  
        Houtput = self.Hconv4(Houtput) 
        Houtput = self.Hconv5(Houtput)  
        Houtput = self.Hconv6(Houtput)  

        output = torch.cat([Voutput,Houtput], 1)   # -> BS * C * H(36) * W(10)
        output = torch.transpose(output,1,3)       # -> BS * W(10) * H(36) * C
        output = F.leaky_relu(self.fc1(output))    # -> BS * W(10) * H(36) * 2 
        output = torch.transpose(output,1,3)       # -> BS * 2 * H(36) * W(10) same as y_train[0].shape
        return output.to(device)

class DiscriminatorModel(nn.Module):
    def __init__(self, start_filter_num, fc_layer_out_size):
        super(DiscriminatorModel, self).__init__()
                
        self.start_filter_num = start_filter_num 
        self.fc_layer_out_size = fc_layer_out_size

        self.Vconv1 = self.v_conv_layer_set(4, self.start_filter_num)
        self.Vconv2 = self.v_conv_layer_set(self.start_filter_num, self.start_filter_num*2)
        self.Vconv3 = self.v_conv_layer_set(self.start_filter_num*2, self.start_filter_num*4)
        self.Vconv4 = self.v_conv_layer_set(self.start_filter_num*4, self.start_filter_num*8)
        self.Vconv5 = self.v_conv_layer_set(self.start_filter_num*8, self.start_filter_num*16)
        self.Vconv6 = self.v_conv_layer_set(self.start_filter_num*16, self.start_filter_num*32)

        self.Hconv1 = self.h_conv_layer_set(4, self.start_filter_num)
        self.Hconv2 = self.h_conv_layer_set(self.start_filter_num, self.start_filter_num*2)
        self.Hconv3 = self.h_conv_layer_set(self.start_filter_num*2, self.start_filter_num*4)
        self.Hconv4 = self.h_conv_layer_set(self.start_filter_num*4, self.start_filter_num*8)
        self.Hconv5 = self.h_conv_layer_set(self.start_filter_num*8, self.start_filter_num*16)
        self.Hconv6 = self.h_conv_layer_set(self.start_filter_num*16, self.start_filter_num*32)

        
        
        self.fc1 = nn.Linear(self.start_filter_num*32*2, self.fc_layer_out_size)
        self.fc2 = nn.Linear(self.fc_layer_out_size*36*10, 1)

    def v_conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(3, 1), padding=(1,0)),
        nn.BatchNorm2d(out_c),
        nn.LeakyReLU()
        )
        return conv_layer

    def h_conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv2d(in_c, out_c, kernel_size=(1, 3), padding=(0,1)),
        nn.BatchNorm2d(out_c),
        nn.LeakyReLU()
        )
        return conv_layer


    def forward(self, z, x):
        """
        z: uniform noise [0,1] with shape of (batch,1,36,10)
        x: input with shape of (batch,3,36,10)
        """
        input = torch.cat([z,x], 1)                # -> batch_size * (1+3=4) * 36 * 10

        Voutput = self.Vconv1(input)        
        Voutput = self.Vconv2(Voutput)  
        Voutput = self.Vconv3(Voutput)  
        Voutput = self.Vconv4(Voutput)  
        Voutput = self.Vconv5(Voutput)  
        Voutput = self.Vconv6(Voutput)  

        Houtput = self.Hconv1(input)        
        Houtput = self.Hconv2(Houtput)  
        Houtput = self.Hconv3(Houtput)  
        Houtput = self.Hconv4(Houtput) 
        Houtput = self.Hconv5(Houtput)  
        Houtput = self.Hconv6(Houtput)  

        output = torch.cat([Voutput,Houtput], 1)   # -> BS * (32x2xstart_filter_num) * H(36) * W(10)
        output = torch.transpose(output,1,3)       # -> BS * W(10) * H(36) * (32x2xstart_filter_num)
        output = F.leaky_relu(self.fc1(output))    # -> BS * W(10) * H(36) * fc_layer_out_size 
        output = torch.flatten(output)
        output = torch.sigmoid(self.fc2(output))
        return output.to(device)


In [106]:
# Test the output size for each model
generator = GeneratorModel(16)
x = torch.randn(1,3, 36, 10) # BS x C x H x W
z = torch.randn(1,1, 36, 10)

output = generator(z, x)
print("G output shape: ", output.shape)
G_total_params = sum(p.numel() for p in generator.parameters())
print("G number of parameters: ", G_total_params)



discriminator = DiscriminatorModel(16, 512)
x = torch.randn(1,3, 36, 10)
y = torch.randn(1,1, 36, 10)

output = discriminator(y, x)
print("D output shape: ",output.shape)
D_total_params = sum(p.numel() for p in discriminator.parameters())
print("G number of parameters: ", D_total_params)


G output shape:  torch.Size([1, 2, 36, 10])
G number of parameters:  1056034
D output shape:  torch.Size([1])
G number of parameters:  1763105


Train TC-GAN

In [94]:
output

tensor([0.4746], device='cuda:0', grad_fn=<ToCopyBackward0>)