# Import Statements

In [1]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets
import torchvision.models as models
from torch.utils.data import random_split
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from utils import GardynDataset, envDataset, tensor_toImage, initiate_dir, save_model_config, load_model_config, imshow_batch, denormalize_image_tensor, normalize_image_tensor,calculate_fid, save_metrics, images_to_video
torch.set_printoptions(precision=4, threshold=10, edgeitems=10, linewidth=100)
from scipy import linalg
from PIL import Image

import numpy as np
np.set_printoptions(precision=4, threshold=10, edgeitems=10, linewidth=100, suppress=True)

import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import os
from tqdm.notebook import tqdm, trange
import gc

import time, datetime, pytz
str_format = '%Y%m%d%H%M%S'

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Available Device: {device}')

Available Device: cuda


# Defining data paths and playing around the data

In [2]:
## User input
data_dir = './data/'
data_file = 'data.csv'


model_dir = f'./models/'
model_name = 'SIPGS_baseline_NormalizedProModelbeta0.90_thisone_20240518084019'

trial_name = 'test1'

In [3]:
# Data Paths
initiate_dir(data_dir)
initiate_dir(model_dir)

##Output directories

sim_out_dir = f'{data_dir}{trial_name}_simulated_output/'

initiate_dir(sim_out_dir, True)


Directory './data/' already exists.
Directory './models/' already exists.
Directory './data/test1_simulated_output/' already exists.
Directory './data/test1_simulated_output/' replaced.


'./data/test1_simulated_output/'

# Load data

In [4]:
data = envDataset(f'{data_dir}{data_file}', normalized =True, cumulative = True, params = ['Temperature_C', 'Humidity_percent', 'EC', 'PH', 'WaterTemp_C'])
data_loader = torch.utils.data.DataLoader(data, batch_size=1, shuffle=False)
dataiter = iter(data_loader)

# Load Model

## LSTM Feature Encoder

In [5]:
## LSTM encoder for creating an embedded vector
class LSTM_Feature_Encoder(nn.Module):
    def __init__(self, feature_size, hidden_size = 128, num_layers=4, output_size= 64):
        super(LSTM_Feature_Encoder, self).__init__()

        self.params = {
            'feature_size':feature_size,
            'hidden_size':hidden_size,
            'num_layers': num_layers,
            'output_size': output_size
        }

        self.lstm = nn.LSTM(feature_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, output_size),
            nn.Tanh()
                               )
        
    def forward(self, padded_sequences, sequence_lengths):

        packed_sequences = pack_padded_sequence(padded_sequences, sequence_lengths, batch_first = True, enforce_sorted=False)

        packed_output, (hidden, cell) = self.lstm(packed_sequences)

        embedding = self.fc(hidden[-1])

        return embedding

## CVAE Generator

In [6]:
class CVAE_Generator(nn.Module):
    def __init__(self, img_shape, latent_size, conditional_size):
        super(CVAE_Generator, self).__init__()

        self.params = {
            'img_shape':img_shape,
            'latent_size':latent_size,
            'conditional_size': conditional_size
        }

        self.img_shape = img_shape
        self.conditional_size = conditional_size
        self.input_size = self.img_shape[0] + self.conditional_size
        self.latent_size = latent_size
        
        self.encoder = nn.Sequential(
            nn.Conv2d(self.input_size, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
        )

        self.hidden_shape, self.hidden_size = self._get_hidden_dimensions()

        self.mu_fc = nn.Linear(self.hidden_size, self.latent_size)
        self.logvar_fc = nn.Linear(self.hidden_size, self.latent_size)
        
        self.z_fc = nn.Linear(self.latent_size + self.conditional_size, self.hidden_size)
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(self.hidden_shape[0], 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def encode(self, x, y):
        x = x.view(x.size(0), self.conditional_size, 1, 1).repeat(1, 1, self.img_shape[1], self.img_shape[2])
        x = torch.cat([y, x], dim=1)
        x = self.encoder(x)
        x = x.view(x.size(0), -1) #Flatten encoder output for final Fully cunnected liear layer

        mu, logvar = self.mu_fc(x), self.logvar_fc(x)
        
        return mu, logvar

    def reparam(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)

        z = mu + eps*std
        
        return z
        
    def decode(self, z, x):

        x = x.view(x.size(0),-1)
        x = torch.cat([z,x], dim=1)
        x = self.z_fc(x)
        x = x.view(x.size(0), self.hidden_shape[0], self.hidden_shape[1],self.hidden_shape[2])
        y_hat = self.decoder(x)

        return y_hat
        

    def forward(self, x, y):
        
        mu, logvar = self.encode(x,y)
        z = self.reparam(mu, logvar)
        y_hat = self.decode(z,x)

        return y_hat, mu, logvar

    def _get_hidden_dimensions(self):

        dummy_input = torch.randn(1, self.input_size, self.img_shape[1], self.img_shape[2])

        x = self.encoder(dummy_input)
        hidden_shape = x.size()[1:]
        
        x = x.view(x.size(0), -1) #flatten to (batch_size, flattened_dimension)

        hidden_size = x.size(1)

        return hidden_shape, hidden_size 

## Discriminator

In [7]:
class Discriminator(nn.Module):
    def __init__(self, img_shape):
        super(Discriminator, self).__init__()
        
        self.params = {
            'img_shape':img_shape
        }

        self.img_shape = img_shape
        
        self.cnn = nn.Sequential(
            nn.Conv2d(self.img_shape[0], 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
        )

        self.hidden_shape, self.hidden_size= self._get_hidden_dimensions()

        self.fc = nn.Sequential(
            nn.Linear(self.hidden_size, 512),
            nn.ReLU(),
            nn.Linear(512, 1),
            nn.Sigmoid()
            # nn.Tanh()
        )

    def forward(self, y):
        x = self.cnn(y)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

    def _get_hidden_dimensions(self):

        dummy_input = torch.randn(1, self.img_shape[0], self.img_shape[1], self.img_shape[2])
        x = self.cnn(dummy_input)
        hidden_shape = x.size()[1:]
        
        x = x.view(x.size(0), -1) #flatten to (batch_size, flattened_dimension)
        hidden_size = x.size(1)

        return hidden_shape, hidden_size 

## import model .pt file

In [8]:
iteration_dir = f'{model_dir}{model_name}/'

feature_encoder_params, feature_encoder_pt_file = load_model_config(f'{model_dir}{model_name}/feature_encoder.json')
generator_params, generator_pt_file = load_model_config(f'{model_dir}{model_name}/generator.json')
dicriminator_params, discriminator_pt_file= load_model_config(f'{model_dir}{model_name}/discriminator.json')

feature_encoder = LSTM_Feature_Encoder(**feature_encoder_params).to(device)
generator = CVAE_Generator(**generator_params).to(device)
discriminator = Discriminator(**dicriminator_params).to(device)

feature_encoder.load_state_dict(torch.load(feature_encoder_pt_file, map_location=device))
generator.load_state_dict(torch.load(generator_pt_file, map_location=device))
discriminator.load_state_dict(torch.load(discriminator_pt_file, map_location=device))

print(feature_encoder)
print(generator)
print(discriminator)


LSTM_Feature_Encoder(
  (lstm): LSTM(7, 128, num_layers=2, batch_first=True)
  (fc): Sequential(
    (0): Linear(in_features=128, out_features=64, bias=True)
    (1): Tanh()
  )
)
CVAE_Generator(
  (encoder): Sequential(
    (0): Conv2d(67, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (7): ReLU()
  )
  (mu_fc): Linear(in_features=614400, out_features=128, bias=True)
  (logvar_fc): Linear(in_features=614400, out_features=128, bias=True)
  (z_fc): Linear(in_features=192, out_features=614400, bias=True)
  (decoder): Sequential(
    (0): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), paddi

# Run the model

In [9]:
with torch.no_grad():
    i=0
    z_cs = torch.randn(1,generator.latent_size).to(device)
    
    feature_encoder.eval(), generator.eval(), discriminator.eval()
    for label, sequence_length, ts in tqdm(data_loader):
        
        label = label.to(device)
        
        ## Encoder sequential feature data
        feature_embedding = feature_encoder(label, sequence_length)

        y_hat = generator.decode(z_cs, feature_embedding)

        y_out = y_hat[0].detach().cpu().numpy()
            
        img = np.transpose(y_out, (1, 2, 0))
        img = Image.fromarray((img*255).astype(np.uint8))
        img.save(f'{sim_out_dir}{i}.jpg')

        i+=1

  0%|          | 0/11446 [00:00<?, ?it/s]

## Save Video

In [10]:
images_to_video(sim_out_dir, f'{data_dir}{trial_name}.mp4',120)

combining images to video at: ./data/test1.mp4
Video ./data/test1.mp4 created successfully!
