In [None]:
!pip install remotezip tqdm opencv-python einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import tqdm
import random
import pathlib
import itertools
import collections
import pandas as pd
from pandas import *

import cv2
import einops
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import tensorflow as tf
import keras
from sklearn.datasets import make_blobs
from keras.utils import to_categorical
from keras import layers
from keras.layers import Dense
from sklearn.neural_network import MLPClassifier

#import tensorflow.compat.v1.keras.backend as K
import tensorflow as tf
#tf.compat.v1.disable_eager_execution()

Data Dowload

In [None]:
def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

class FrameGenerator:


  def __init__(self, path, clas, n_frames, training = False):
    """ Returns a set of frames with their associated label.

      Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.categories = categories = {"low susceptiblity": 0, "mid susceptibility": 1,  "high susceptibility": 2}

    self.training = training
    #self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_names = list(categories.keys())

  #def get_files_and_class_names(self):
   #video_paths = list(self.path.glob('*/*.mp4'))
    #classes = [p.parent.name for p in video_paths]
    #return video_paths, classes

  def __call__(self):
   # video_paths, classes = self.get_files_and_class_names()

    #pairs = list(zip(video_paths, classes))
    paths = list(self.path)
    classes = list(self.class_names)

    pairs = list(zip(paths, classes))
    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:


      video_frames = frames_from_video_file(path, self.n_frames)
       # Encode label

      label = self.categories[name]
      yield video_frames, label





In [None]:
video_data = pd.read_csv("/content/drive/MyDrive/Files/final_model_dataset2.csv")
X = video_data.Videos
Y = video_data.Susceptibility

from sklearn.model_selection import train_test_split

X_train_video, X_test_video, Y_train_video, Y_test_video = train_test_split(X,Y, test_size = 0.30)


X_test_val_video, X_test_test_video, Y_test_val_video, Y_test_test_video = train_test_split(X_test_video,Y_test_video, test_size = 0.10)


fg = FrameGenerator(X_train_video, Y_train_video, 10, training=True)

frames, label = next(fg())

print(f"Shape: {frames.shape}")
print(f"Label: {label}")

X_train_video.shape


NotImplementedError: ignored

In [None]:
n_frames = 60
batch_size = 2

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(X_train_video, Y_train_video, 60, training=True),output_signature = output_signature)

# Batch the data
train_ds = train_ds.batch(batch_size)


val_ds = tf.data.Dataset.from_generator(FrameGenerator(X_test_val_video, Y_test_val_video, n_frames),
                                        output_signature = output_signature)
val_ds = val_ds.batch(batch_size)

test_ds = tf.data.Dataset.from_generator(FrameGenerator(X_test_test_video, Y_test_test_video, n_frames),
                                         output_signature = output_signature)

test_ds = test_ds.batch(batch_size)

Model Development

In [None]:
HEIGHT = 224
WIDTH = 224
class Conv2Plus1D(keras.layers.Layer):
  def __init__(self, filters, kernel_size, padding):
    """
      A sequence of convolutional layers that first apply the convolution operation over the
      spatial dimensions, and then the temporal dimension.
    """
    super().__init__()
    self.seq = keras.Sequential([
        # Spatial decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(1, kernel_size[1], kernel_size[2]),
                      padding=padding),
        # Temporal decomposition
        layers.Conv3D(filters=filters,
                      kernel_size=(kernel_size[0], 1, 1),
                      padding=padding)
        ])

  def call(self, x):
    return self.seq(x)

In [None]:
class ResidualMain(keras.layers.Layer):
  """
    Residual block of the model with convolution, layer normalization, and the
    activation function, ReLU.
  """
  def __init__(self, filters, kernel_size):
    super().__init__()
    self.seq = keras.Sequential([
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
        layers.ReLU(),
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
    ])

  def call(self, x):
    return self.seq(x)

In [None]:
class Project(keras.layers.Layer):
  """
    Project certain dimensions of the tensor as the data is passed through different
    sized filters and downsampled.
  """
  def __init__(self, units):
    super().__init__()
    self.seq = keras.Sequential([
        layers.Dense(units, activation = "softmax"),
        layers.Dense(units),
        layers.LayerNormalization()
    ])

  def call(self, x):
    return self.seq(x)


In [None]:
def add_residual_block(input, filters, kernel_size):
  """
    Add residual blocks to the model. If the last dimensions of the input data
    and filter size does not match, project it such that last dimension matches.
  """
  out = ResidualMain(filters,
                     kernel_size)(input)

  res = input
  # Using the Keras functional APIs, project the last dimension of the tensor to
  # match the new filter size
  if out.shape[-1] != input.shape[-1]:
    res = Project(out.shape[-1])(res)

  return layers.add([res, out])

In [None]:
class ResizeVideo(keras.layers.Layer):
  def __init__(self, height, width):
    super().__init__()
    self.height = height
    self.width = width
    self.resizing_layer = layers.Resizing(self.height, self.width)
    #self.reshape = layers.Reshape(self.height, self.width)

  def call(self, video):
    """
      Use the einops library to resize the tensor.

      Args:
        video: Tensor representation of the video, in the form of a set of frames.

      Return:
        A downsampled size of the video according to the new height and width it should be resized to.
    """
    # b stands for batch size, t stands for time, h stands for height,
    # w stands for width, and c stands for the number of channels.
    old_shape = einops.parse_shape(video, 'b t h w c')
    images = einops.rearrange(video, 'b t h w c -> (b t) h w c')
    images = self.resizing_layer(images)
    videos = einops.rearrange(
        images, '(b t) h w c -> b t h w c',
        t = old_shape['t'])
    return videos

In [None]:
data_augmentation = tf.keras.Sequential(
    [
        layers.RandomFlip("horizontal",
                      input_shape=(HEIGHT,
                                  WIDTH,
                                  3)),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.1),
    ]
)

In [None]:
input_shape = (None, 60, HEIGHT, WIDTH, 3)
input = layers.Input(shape=(input_shape[1:]))
print(input_shape[1:])


x = input
x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 20, (3, 3, 3))
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 30, (3, 3, 3))
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 4
x = add_residual_block(x, 78, (3, 3, 3))
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 5
x = add_residual_block(x, 160, (3, 3, 3))

x = layers.GlobalAveragePooling3D()(x)

x = layers.Flatten()(x)
x = layers.Dense(12, activation = 'relu')(x)

x = layers.Flatten()(x)
x = layers.Dense(3)(x)
model = keras.Model(input, x)

In [None]:
#def CNN_model(train_ds,test_ds, val_ds):
frames, label = next(iter(train_ds))
model.build(frames)
model.compile(loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer = keras.optimizers.Adam(learning_rate = 0.0001),
                metrics = ['accuracy'])
history = model.fit(x = train_ds,
                      epochs = 50,
                      validation_data = val_ds )
model.evaluate(test_ds, return_dict=True)

#CNN_model(train_ds,test_ds,val_ds)

In [None]:
def plot_history(history):
  """
    Plotting training and validation learning curves.
    144506880
    Args:
      history: model history with all the metric measures
  """
  fig, (ax1, ax2) = plt.subplots(2)

  fig.set_size_inches(18.5, 10.5)

  # Plot loss
  ax1.set_title('Loss')
  ax1.plot(history.history['loss'], label = 'train')
  ax1.plot(history.history['val_loss'], label = 'test')
  ax1.set_ylabel('Loss')

  # Determine upper bound of y-axis
  max_loss = max(history.history['loss'] + history.history['val_loss'])

  ax1.set_ylim([0, np.ceil(max_loss)])
  ax1.set_xlabel('Epoch')
  ax1.legend(['Train', 'Validation'])

  # Plot accuracy
  ax2.set_title('Accuracy')
  ax2.plot(history.history['accuracy'],  label = 'train')
  ax2.plot(history.history['val_accuracy'], label = 'test')
  ax2.set_ylabel('Accuracy')
  ax2.set_ylim([0, 1])
  ax2.set_xlabel('Epoch')
  ax2.legend(['Train', 'Validation'])

  plt.show()

plot_history(history)

In [None]:
def get_actual_predicted_labels(dataset):
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
def plot_confusion_matrix(actual, predicted, labels, ds_type):
  cm = tf.math.confusion_matrix(actual, predicted)
  ax = sns.heatmap(cm, annot=True, fmt='g')
  sns.set(rc={'figure.figsize':(12, 12)})
  sns.set(font_scale=1.4)
  ax.set_title('Confusion matrix of action recognition for ' + ds_type)
  ax.set_xlabel('Predicted Action')
  ax.set_ylabel('Actual Action')
  plt.xticks(rotation=90)
  plt.yticks(rotation=0)
  ax.xaxis.set_ticklabels(labels)
  ax.yaxis.set_ticklabels(labels)

In [None]:
fg = FrameGenerator(X_train_video, Y_train_video, n_frames, training=True)
labels = list(fg.categories.keys())

In [None]:
actual, predicted = get_actual_predicted_labels(train_ds)
plot_confusion_matrix(actual, predicted, labels, 'training')

In [None]:
net = tf.keras.applications.EfficientNetB0(include_top = False)
net.trainable = False

model = tf.keras.Sequential([
    tf.keras.layers.Rescaling(scale=255),
    tf.keras.layers.TimeDistributed(net),
    tf.keras.layers.Dense(10),
    tf.keras.layers.GlobalAveragePooling3D()
])

model.compile(optimizer = 'adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
              metrics=['accuracy'])

model.fit(train_ds,
          epochs = 10,
          validation_data = val_ds,
          callbacks = tf.keras.callbacks.EarlyStopping(patience = 2, monitor = 'val_loss'))

MLP Classifer

In [None]:
numericalData = pd.read_csv("/content/drive/MyDrive/Files/final_model_dataset2.csv")


Y= numericalData.Susceptibility

X = numericalData.drop(["Susceptibility", 'Videos', 'Exits'], axis = 1)



from sklearn.model_selection import train_test_split

X_train, X_test, y_train, Y_test = train_test_split(X,Y, test_size = 0.2)

X

In [None]:
import math
from sklearn.metrics  import mean_squared_error
from sklearn.metrics import accuracy_score
mlp = MLPClassifier(hidden_layer_sizes=(100, 3),
                    activation = 'tanh',
                    alpha = 1,
                    max_iter = 1000)
mlp.fit(X_train, y_train)

#prediction
y_train_pred = mlp.predict(X_train)

y_test_pred = mlp.predict(X_test)

#Testing accuracy

mlp_test_accuracy = accuracy_score(Y_test, y_test_pred)

print(mlp_test_accuracy)
#accuracy_mse = mean_squared_error(Y_test, y_test_pred)
#accuracy_RMSE = math.sqrt(accuracy_mse)

Stacking Model/Meta Model

In [None]:
from os import makedirs
directory = "model "

#path = os.path.join(parent_dir, directory)
makedirs("models")

In [None]:
from sklearn.ensemble import StackingClassifier

!pip install scikeras
from scikeras.wrappers import KerasClassifier
from sklearn.linear_model import LogisticRegression


model._estimator_type = "classifier"




estimator_list = [('cnn', model),
                ('mlp', mlp)
                ]

#Build stack model

stack_model = StackingClassifier(
    estimators= estimator_list,
    final_estimator = LogisticRegression()#MLPClassifier(alpha = 1, max_iter = 1000)
)
print(y_train)
stack_model.fit(X_train_video, y_train)

#make predictions

y_train_pred =stack_model.predict(X_train)

y_test_pred = stack_model.predict(Y_test)

#make accuracy on testing data

stack_model_accuracy = accuracy_score(Y_test, y_test_pred)
#stack_model_accuracy_mse = mean_squared_error(Y_test, y_test_pred)
#stack_model_accuracy_RMSE = math.sqrt(accuracy_mse)
