# **Importing the required libraries**

In [2]:
import os
import shutil
import cv2
import math
import random
import numpy as np
import datetime as dt
import tensorflow 
import keras
from collections import deque
import matplotlib.pyplot as plt
plt.style.use("seaborn")

%matplotlib inline
 
from sklearn.model_selection import train_test_split
 


# **Visualize the Data**

In [3]:
from IPython.display import HTML
from base64 import b64encode

# To Show a Video in Notebook
def Play_Video(filepath):
    html = ''
    video = open(filepath,'rb').read()
    src = 'data:video/mp4;base64,' + b64encode(video).decode()
    html += '<video width=640 muted controls autoplay loop><source src="%s" type="video/mp4"></video>' % src 
    return HTML(html)

In [4]:
# Classes Directories
NonViolnceVideos_Dir = "../input/real-life-violence-situations-dataset/Real Life Violence Dataset/NonViolence/"
ViolnceVideos_Dir = "../input/real-life-violence-situations-dataset/Real Life Violence Dataset/Violence/"

# Retrieve the list of all the video files present in the Class Directory.
NonViolence_files_names_list = os.listdir(NonViolnceVideos_Dir)
Violence_files_names_list = os.listdir(ViolnceVideos_Dir)

# Randomly select a video file from the Classes Directory.
Random_NonViolence_Video = random.choice(NonViolence_files_names_list)
Random_Violence_Video = random.choice(Violence_files_names_list)

**Play Random Non Violence Video**

In [5]:
Play_Video(f"{NonViolnceVideos_Dir}/{Random_NonViolence_Video}")

**Play Random Violence Video**

In [6]:
Play_Video(f"{ViolnceVideos_Dir}/{Random_Violence_Video}")

# **Extracting Frames**

In [7]:
# Specify the height and width to which each video frame will be resized in our dataset.
IMAGE_HEIGHT , IMAGE_WIDTH = 64, 64
 
# Specify the number of frames of a video that will be fed to the model as one sequence.
SEQUENCE_LENGTH = 16
 

DATASET_DIR = "../input/real-life-violence-situations-dataset/Real Life Violence Dataset/"
 
CLASSES_LIST = ["NonViolence","Violence"]

In [8]:
def frames_extraction(video_path):
 
    frames_list = []
    
    # Read the Video File
    video_reader = cv2.VideoCapture(video_path)
 
    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
 
    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)
 
    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):
 
        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)
 
        # Reading the frame from the video. 
        success, frame = video_reader.read() 
 
        if not success:
            break
 
        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))
        
        # Normalize the resized frame
        normalized_frame = resized_frame / 255
        
        # Append the normalized frame into the frames list
        frames_list.append(normalized_frame)
    
 
    video_reader.release()
 
    return frames_list

# **Creating the Data**

# for 3d cnn create data

In [9]:
def create_dataset():
 
    features = []
    labels = []
    video_files_paths = []
    
    # Iterating through all the classes.
    for class_index, class_name in enumerate(CLASSES_LIST):
        
        print(f'Extracting Data of Class: {class_name}')
        
        # Get the list of video files present in the specific class name directory.
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))
        
        # Iterate through all the files present in the files list.
        for file_name in files_list:
            
            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)
 
            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path)
 
            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified.
            # So ignore the vides having frames less than the SEQUENCE_LENGTH.
            if len(frames) == SEQUENCE_LENGTH:
 
                # Append the data to their repective lists.
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)
    
    features = np.asarray(features)                                # 10-16-64-64-3
    features = np.moveaxis(np.asarray(features), -1, 1)              #for 10,3,16,64,64 for 3d cnn
    #features = np.moveaxis(features, -1, 2)              #for 10,16,3,64,64 for 2dcnn+lstm
    labels = np.array(labels)  

    return features, labels, video_files_paths

# for (mlp-mixer/2dcnn) + lstm create data

In [None]:
def create_dataset():
 
    features = []
    labels = []
    video_files_paths = []
    
    # Iterating through all the classes.
    for class_index, class_name in enumerate(CLASSES_LIST):
        
        print(f'Extracting Data of Class: {class_name}')
        
        # Get the list of video files present in the specific class name directory.
        files_list = os.listdir(os.path.join(DATASET_DIR, class_name))
        
        # Iterate through all the files present in the files list.
        for file_name in files_list:
            
            # Get the complete video path.
            video_file_path = os.path.join(DATASET_DIR, class_name, file_name)
 
            # Extract the frames of the video file.
            frames = frames_extraction(video_file_path)
 
            # Check if the extracted frames are equal to the SEQUENCE_LENGTH specified.
            # So ignore the vides having frames less than the SEQUENCE_LENGTH.
            if len(frames) == SEQUENCE_LENGTH:
 
                # Append the data to their repective lists.
                features.append(frames)
                labels.append(class_index)
                video_files_paths.append(video_file_path)
    
    features = np.asarray(features)                                # 10-16-64-64-3
    #features = np.moveaxis(np.asarray(features), -1, 1)              #for 10,3,16,64,64 for 3d cnn
    features = np.moveaxis(features, -1, 2)              #for 10,16,3,64,64 for 2dcnn+lstm
    labels = np.array(labels)  

    return features, labels, video_files_paths

In [10]:
# Create the dataset.
features, labels, video_files_paths = create_dataset()

Extracting Data of Class: NonViolence


[h264 @ 0x5635ed1ac680] mb_type 104 in P slice too large at 98 31
[h264 @ 0x5635ed1ac680] error while decoding MB 98 31
[h264 @ 0x5635ed1ac680] mb_type 104 in P slice too large at 98 31
[h264 @ 0x5635ed1ac680] error while decoding MB 98 31
[h264 @ 0x5635ed1ac680] mb_type 104 in P slice too large at 98 31
[h264 @ 0x5635ed1ac680] error while decoding MB 98 31


Extracting Data of Class: Violence


In [11]:
# Saving the extracted data
np.save("features.npy",features)
np.save("labels.npy",labels)
np.save("video_files_paths.npy",video_files_paths)

In [12]:
features, labels, video_files_paths = np.load("features.npy") , np.load("labels.npy") ,  np.load("video_files_paths.npy")

# **Encoding and Splitting Training-Testing Sets**

In [13]:
# Split the Data into Train ( 90% ) and Test Set ( 10% ).
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.1,
                                                                            shuffle = True, random_state = 42)

In [14]:
print(X_train.shape,y_train.shape )
print(X_test.shape, y_test.shape)

(1800, 3, 16, 64, 64) (1800,)
(200, 3, 16, 64, 64) (200,)


In [15]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import torch
import torchvision
import torch.nn as nn # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
import torchvision.datasets as datasets # Has standard datasets we can import in a nice way
import torchvision.transforms as transforms # Transformations we can perform on our dataset
import torch.nn.functional as F # All functions that don't have any parameters
from torch.utils.data import DataLoader, Dataset # Gives easier dataset managment and creates mini batches
from torchvision.datasets import ImageFolder
import torch.optim as optim # For all Optimization algorithms, SGD, Adam, etc.
from PIL import Image

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # use gpu or cpu

In [17]:

# ImageLoader Class

class vv(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels=labels
        self.transform = transform
        
    
    def __getitem__(self, index):
        label = self.labels[index]
        image = self.images[index]
        image = image.astype('float32')
        if self.transform is not None:
            image = self.transform(image)
        

        return image, label

    def __len__(self):
        return len(self.images)
    
    

In [18]:

train_dataset = vv(X_train, y_train)
test_dataset = vv(X_test, y_test)

In [19]:
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=True)

In [20]:
a = iter(train_loader)
img,mask = a.next()
print(img.shape,mask.shape)

torch.Size([10, 3, 16, 64, 64]) torch.Size([10])


# 2d cnn + lstm

In [1]:
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from tqdm import tqdm
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

In [2]:
x_3d= torch.rand(10,16,3,64,64)


In [3]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import json
from matplotlib import pyplot as plt
from skimage import color
from skimage.feature import hog
from sklearn import svm
from sklearn.metrics import classification_report,accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list the files in the input directory

from subprocess import check_output
print(check_output(["ls", "../input"]).decode("utf8"))

# Any results you write to the current directory are saved as output.

real-life-violence-situations-dataset



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class MyNet(nn.Module):
    def __init__(self):
        super(MyNet, self).__init__()
        self.fc1 = nn.Linear(4, 8)
        self.fc2 = nn.Linear(8, 16)
        self.fc3 = nn.Linear(16, 4)
        self.fc4 = nn.Linear(4,1)
    def forward(self, x):
        x = F.relu(self.fc1(x))
        print(x.shape)
        x = F.relu(self.fc2(x))
        print(x.shape)
        x = F.relu(self.fc3(x))
        print(x.shape)
        return torch.sigmoid(self.fc4(x))

model = MyNet()
print(model)

In [None]:
x= torch.rand(16,4)
y=model(x)
y.shape

In [None]:
ppc=4

x=torch.moveaxis(x_3d,2,-1)
hog_features = []
for sx in x:
    #print(x.shape) 
    for xx in sx:
        #print(xx.shape)
        fd,hog_image = hog(xx, orientations=4, pixels_per_cell=(ppc,ppc),cells_per_block=(4, 4),block_norm= 'L2',visualize=True, channel_axis=-1)
        #print(fd.shape)
        #print(hog_image.shape)
        hog_features.append(fd)
    x_fe =  np.array(hog_features)
    print(x_fe.shape)
#     pca.fit(x_fe)
#     x_pca = pca.transform(x_fe)
    
        
    

In [None]:
hog(xx, orientations=4, pixels_per_cell=(ppc,ppc),cells_per_block=(4, 4),block_norm= 'L2',visualize=True, channel_axis=-1)

In [None]:

for t in range(x_3d.size(1)):
            print(t)
            # ResNet CNN
            with torch.no_grad():
                print(x_3d[:, t, :, :, :].shape)
                x = resnet(x_3d[:, t, :, :, :])  # ResNet
                print(x.shape)
                x = x.view(x.size(0), -1)
                print(x.shape)

In [30]:
class ResCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(ResCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        
        self.fc1 = nn.Linear(resnet.fc.in_features, fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
        
    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.resnet(x_3d[:, t, :, :, :])  # ResNet
                print(x.shape)
                x = x.view(x.size(0), -1)             # flatten output of conv
            print(x.shape)
            # FC layers
            x = self.bn1(self.fc1(x))
            x = F.relu(x)
            print(x.shape)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            print(x.shape)
            x = self.fc3(x)
            print(x.shape)
            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)
        print(cnn_embed_seq.shape)
        return cnn_embed_seq


class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=2):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,        
            num_layers=h_RNN_layers,       
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        
        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)  
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """ 
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """
        print(RNN_out.shape)
        # FC layers
        x = self.fc1(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        print(x.shape)
        x = self.fc2(x)
        print(x.shape)
        return x

        

In [29]:
class ResCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(ResCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        
        self.fc1 = nn.Linear(resnet.fc.in_features, fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
        self.fc11 = nn.Linear(10816,fc_hidden1)
        
    def forward(self, x_3d):
        hog_ans=[]
        
        inp = torch.moveaxis(x_3d,2,-1)
        
        for sx in inp:
            hog_features = []
            for xx in sx:
        
                fd,hog_image = hog(xx, orientations=4, pixels_per_cell=(4,4),cells_per_block=(4, 4),block_norm= 'L2',visualize=True, channel_axis=-1)
        
                hog_features.append(fd)
            
            x_fe =  torch.tensor(hog_features)
            x = self.bn1(self.fc11(x_fe))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)
            
            hog_ans.append(x)
            
        hog_ans = torch.stack(hog_ans, dim=0)
        
        
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.resnet(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            
            x = self.bn1(self.fc1(x))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)
        
        return cnn_embed_seq + hog_ans


class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=2):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,        
            num_layers=h_RNN_layers,       
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        
        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)  
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """ 
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """

        # FC layers
        x = self.fc1(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc2(x)

        return x

        

In [10]:
model = nn.Sequential(ResCNNEncoder(),
                      DecoderRNN()
                        )

In [31]:
x=torch.rand(10,16,300)

In [32]:
m=DecoderRNN()
y=m(x)
y.shape

torch.Size([10, 16, 256])
torch.Size([10, 128])
torch.Size([10, 2])


torch.Size([10, 2])

In [33]:
model.to(device)

Sequential(
  (0): ResCNNEncoder(
    (resnet): Sequential(
      (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (4): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu): ReLU(inplace=True)
        

# mlp mixer + lstm

In [40]:
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from tqdm import tqdm
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

In [41]:
pip install mlp-mixer-pytorch

Collecting mlp-mixer-pytorch
  Downloading mlp_mixer_pytorch-0.1.1-py3-none-any.whl (4.2 kB)
Collecting einops>=0.3
  Downloading einops-0.6.0-py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.6/41.6 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m
Installing collected packages: einops, mlp-mixer-pytorch
Successfully installed einops-0.6.0 mlp-mixer-pytorch-0.1.1
[0mNote: you may need to restart the kernel to use updated packages.


In [42]:
import torch
from mlp_mixer_pytorch import MLPMixer

In [43]:

mlp_mixer = MLPMixer(
    image_size = (64,64),
    channels = 3,
    patch_size = 16,
    dim = 512,
    depth = 24,
    num_classes = 256
)


In [44]:
class ResCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=256):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(ResCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        #resnet = models.resnet152(pretrained=True)
        #modules = list(resnet.children())[:-1]      # delete the last fc layer.
        #self.resnet = nn.Sequential(*modules)
        mlp_mixer = MLPMixer(image_size = (64,64),channels = 3,patch_size = 8,dim = 512,depth = 16,num_classes = 1024)
        #modules = list(mlp_mixer.children())[:-2]
        #self.mlp_mixer = nn.Sequential(*modules)
        self.mlp_mixer = mlp_mixer
        self.fc1 = nn.Linear(1024, fc_hidden1)
        #self.fc1 = nn.Linear(mlp_mixer. , fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
        
    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.mlp_mixer(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            x = self.bn1(self.fc1(x))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)

        return cnn_embed_seq


class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=256, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=2):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,        
            num_layers=h_RNN_layers,       
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        
        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)  
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """ 
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """

        # FC layers
        x = self.fc1(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc2(x)

        return x

        

In [45]:
model = nn.Sequential(ResCNNEncoder(),
                      DecoderRNN()
                        )

In [46]:
enm=ResCNNEncoder()

inp = torch.randn(32, 16, 3, 64,64)
x=enm(inp)
x.shape

torch.Size([32, 16, 256])

In [47]:
dnm=DecoderRNN()

inp = torch.randn(32, 16,256)
x=dnm(inp)
x.shape

torch.Size([32, 2])

In [48]:
x=torch.rand(10,16,3,64,64)

In [49]:
y=model(x)
y.shape

torch.Size([10, 2])

In [50]:
model.to(device)

Sequential(
  (0): ResCNNEncoder(
    (mlp_mixer): Sequential(
      (0): Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=8, p2=8)
      (1): Linear(in_features=192, out_features=512, bias=True)
      (2): Sequential(
        (0): PreNormResidual(
          (fn): Sequential(
            (0): Conv1d(64, 256, kernel_size=(1,), stride=(1,))
            (1): GELU()
            (2): Dropout(p=0.0, inplace=False)
            (3): Conv1d(256, 64, kernel_size=(1,), stride=(1,))
            (4): Dropout(p=0.0, inplace=False)
          )
          (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        )
        (1): PreNormResidual(
          (fn): Sequential(
            (0): Linear(in_features=512, out_features=256, bias=True)
            (1): GELU()
            (2): Dropout(p=0.0, inplace=False)
            (3): Linear(in_features=256, out_features=512, bias=True)
            (4): Dropout(p=0.0, inplace=False)
          )
          (norm): LayerNorm((512,), eps=1e-05, el

# 3d cnn

In [None]:
#need to hpertune

In [21]:
keepprobab = 1
class CNN(torch.nn.Module):

    def __init__(self,flag=0):
        super(CNN, self).__init__()
        self.flag=flag
        self.layer1 = torch.nn.Sequential(
            torch.nn.Conv3d(3, 32, kernel_size=(3,3,3), stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool3d(kernel_size=(2,2,2), stride=2),
            torch.nn.Dropout(p=1 - keepprobab))
        self.layer2 = torch.nn.Sequential(
            torch.nn.Conv3d(32, 64, kernel_size=(3,3,3), stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool3d(kernel_size=(2,2,2), stride=2),
            torch.nn.Dropout(p=1 - keepprobab))
        self.layer3 = torch.nn.Sequential(
            torch.nn.Conv3d(64, 128, kernel_size=(3,3,3), stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool3d(kernel_size=(2,2,2), stride=2, padding=1),
            torch.nn.Dropout(p=1 - keepprobab))

        self.layer4 = torch.nn.Sequential(
            torch.nn.Conv3d( 128, 256, kernel_size=(3,3,3), stride=1, padding=1),
            torch.nn.ReLU(),
            torch.nn.MaxPool3d(kernel_size=(2,2,2), stride=2, padding=1),
            torch.nn.Dropout(p=1 - keepprobab))
        
        self.fc1 = torch.nn.Linear(12800, 1024, bias=True)
        torch.nn.init.xavier_uniform(self.fc1.weight)
        self.layer5 = torch.nn.Sequential(
            self.fc1,
            torch.nn.ReLU(),
            torch.nn.Dropout(p=1 - keepprobab))
       
        self.fc2 = torch.nn.Linear(1024, 256, bias=True)
        # initialize parameters
        torch.nn.init.xavier_uniform_(self.fc2.weight) 
        
        self.fc3 = torch.nn.Linear(256, 2, bias=True)
        
    def forward(self, y):
        if self.flag==1:
            print(y.shape)
        output = self.layer1(y)
        if self.flag==1:
            print(output.shape)
        output = self.layer2(output)
        if self.flag==1:
            print(output.shape)
        output = self.layer3(output)
        if self.flag==1:
            print(output.shape)
        output = self.layer4(output)
        if self.flag==1:
            print(output.shape)
         # Flatten them for FC
        output = output.view(output.size(0), -1)
        if self.flag==1:
            print(output.shape)
        output = self.fc1(output)
        if self.flag==1:
            print(output.shape)
        output = self.fc2(output)
        if self.flag==1:
            print(output.shape)
        output = self.fc3(output)
        if self.flag==1:
            print(output.shape)
        return output
model = CNN()
model.to(device)



CNN(
  (layer1): Sequential(
    (0): Conv3d(3, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
    (1): ReLU()
    (2): MaxPool3d(kernel_size=(2, 2, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Dropout(p=0, inplace=False)
  )
  (layer2): Sequential(
    (0): Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
    (1): ReLU()
    (2): MaxPool3d(kernel_size=(2, 2, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Dropout(p=0, inplace=False)
  )
  (layer3): Sequential(
    (0): Conv3d(64, 128, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
    (1): ReLU()
    (2): MaxPool3d(kernel_size=(2, 2, 2), stride=2, padding=1, dilation=1, ceil_mode=False)
    (3): Dropout(p=0, inplace=False)
  )
  (layer4): Sequential(
    (0): Conv3d(128, 256, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
    (1): ReLU()
    (2): MaxPool3d(kernel_size=(2, 2, 2), stride=2, padding=1, dilation=1, ceil_mode=False)
    (3

# mlp-mixer + resnet with lstm

In [53]:
class ResCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(ResCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        
        mlp_mixer = MLPMixer(image_size = (64,64),channels = 3,patch_size = 8,dim = 512,depth = 16,num_classes = 1024)
        
        self.mlp_mixer = mlp_mixer
        self.fc11 = nn.Linear(1024, fc_hidden1)
        
        self.fc1 = nn.Linear(resnet.fc.in_features, fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
        
    def forward(self, x_3d):
        cnn_embed_seq = []
        cnn_embed_seq2 = []
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.resnet(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            x = self.bn1(self.fc1(x))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)
        
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.mlp_mixer(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            x = self.bn1(self.fc11(x))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq2.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq2 = torch.stack(cnn_embed_seq2, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)

        return cnn_embed_seq + cnn_embed_seq2
    

    



class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=2):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,        
            num_layers=h_RNN_layers,       
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        
        self.LSTM.flatten_parameters()
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)  
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """ 
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """

        # FC layers
        x = self.fc1(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc2(x)

        return x

        

In [54]:
model = nn.Sequential(ResCNNEncoder(),
                      DecoderRNN()
                        )
model.to(device)

Sequential(
  (0): ResCNNEncoder(
    (resnet): Sequential(
      (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (4): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (relu): ReLU(inplace=True)
        

# train and test

In [22]:
from tqdm import tqdm

In [23]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)


# Train and test

def train(num_epoch, model):
    for epoch in range(0, num_epoch):
#         current_loss = 0.0
#         current_corrects = 0
        losses = []
        model.train()
        loop = tqdm(enumerate(train_loader), total=len(train_loader)) # create a progress bar
        for batch_idx, (data, targets) in loop:
            data = data.to(device=device)
            targets = targets.to(device=device)
            scores = model(data)
            
            
            
            loss = criterion(scores, targets)
            optimizer.zero_grad()
            losses.append(loss)
            loss.backward()
            optimizer.step()
            _, preds = torch.max(scores, 1)
#             current_loss += loss.item() * data.size(0)
#             current_corrects += (preds == targets).sum().item()
#             accuracy = int(current_corrects / len(train_loader.dataset) * 100)
            loop.set_description(f"Epoch {epoch+1}/{num_epoch} process: {int((batch_idx / len(train_loader)) * 100)}")
            loop.set_postfix(loss=loss.data.item())
        
        # save model
#         torch.save({ 
#                     'model_state_dict': model.state_dict(), 
#                     'optimizer_state_dict': optimizer.state_dict(), 
#                     }, 'checpoint_epoch_'+str(epoch)+'.pt')


        
# model.eval() is a kind of switch for some specific layers/parts of the model that behave differently,
# during training and inference (evaluating) time. For example, Dropouts Layers, BatchNorm Layers etc. 
# You need to turn off them during model evaluation, and .eval() will do it for you. In addition, 
# the common practice for evaluating/validation is using torch.no_grad() in pair with model.eval() 
# to turn off gradients computation:
        
def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for x, y in test_loader:
            x = x.to(device)
            #print(x.shape)
            y = y.to(device)
            #print(y.shape)
            output = model(x)
            #print(output.shape)
            _, predictions = torch.max(output, 1)
            
            #print(predictions.shape)
            correct += (predictions == y).sum().item()
            test_loss = criterion(output, y)
            
    test_loss /= len(test_loader.dataset)
    print("Average Loss: ", test_loss, "  Accuracy: ", correct, " / ",
    len(test_loader.dataset), "  ", int(correct / len(test_loader.dataset) * 100), "%")

In [24]:
if __name__ == "__main__":
    train(20, model) # train
    test() # test

Epoch 1/20 process: 99: 100%|██████████| 180/180 [00:13<00:00, 13.83it/s, loss=0.44] 
Epoch 2/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.64it/s, loss=0.481]
Epoch 3/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.55it/s, loss=0.418] 
Epoch 4/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 27.13it/s, loss=0.649] 
Epoch 5/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.72it/s, loss=0.322] 
Epoch 6/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.99it/s, loss=0.334]  
Epoch 7/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 27.03it/s, loss=0.0495] 
Epoch 8/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.94it/s, loss=0.0292] 
Epoch 9/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.95it/s, loss=0.000641]
Epoch 10/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 27.45it/s, loss=0.301]   
Epoch 11/20 process: 99: 100%|██████████| 180/180 [00:06<00:00, 26.96it/s, loss=0.0268]  
Epoch 12/20 process: 99: 100%|████

Average Loss:  tensor(0.0008, device='cuda:0')   Accuracy:  182  /  200    91 %


In [None]:
#resnet +mlp-mixer+lstm

In [None]:
torch.save({ 
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict(), 
                    }, 'checpoint_epoch_'+'.pt')