# Firearm-related action recognition with Transformers


Installazione dei pacchetti necessari

In [None]:
!pip install av
!pip install git+https://github.com/Atze00/MoViNet-pytorch.git
!pip install -q git+https://github.com/tensorflow/docs
!pip install gdown==4.6
!pip install codecarbon

Import delle librerie utilizzate

In [None]:
import time
import torchvision
import torch.nn.functional as F
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.data import random_split, DataLoader
import torch
from movinets import MoViNet
from movinets.config import _C
import torch.nn as nn
import shutil
from sklearn.model_selection import train_test_split
import pandas as pd
import os
import keras
from keras import layers
from keras.applications.densenet import DenseNet121
from tensorflow_docs.vis import embed
import matplotlib.pyplot as plt
import numpy as np
import imageio
import cv2
import tensorflow as tf
from google.colab import files
from tensorflow.keras.preprocessing import image
import random
from PIL import Image
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from codecarbon import EmissionsTracker

Scelta dei parametri:

```
IMPORTA_ELABORAZIONI = True
```
Questa scelta consente di scaricare le feature map dei vari frame, andando a risparmiare qualche minuto.
```
ADDESTRA = False
```
Questa scelta consente di scaricare i pesi della rete, in maniera tale da non dover ripetere l'addestramento.
```
SALVA_PESI = False
```
Questa scelta, se settata a True, ci consente di salvare sul drive i pesi della rete addestrata e le elaborazioni dei frame.
```
MAX_SEQ_LENGTH = 20
```
Indica il numero di frame scelti per ogni video.
```
NUM_FEATURES = 2048
```
Indica il numero delle features estratte.
```
IMG_SIZE = 224
```
Indica la dimensione dei frame.
```
EPOCHS = 30
```
Indica il numero di epoche
```
random.seed(42)
```
Qua viene impostato un seed per la libreria random.
```
RISK_CLASSIFIER = True
```
Questo valore, se impostato a True, avvierà il modello di classificazione dei rischi; mentre, se impostato a False, avvirà il modello di gun_classifier.

In [None]:
IMPORTA_ELABORAZIONI = True #Risparmio tempo importando i file .npy
ADDESTRA = False
SALVA_PESI = False
MAX_SEQ_LENGTH = 20 #Frame da campionare: ne verrà preso uno al centro del video e gli altri in maniera equa a destra e sinistra
NUM_FEATURES = 2048
IMG_SIZE = 224 #Grandezza immagini
EPOCHS = 30
random.seed(42)
RISK_CLASSIFIER = True

In [None]:
if SALVA_PESI:
  from google.colab import drive
  drive.mount('/content/drive')

In [None]:
if RISK_CLASSIFIER:
  !gdown "1Q7P8p9nJkSkABHLDYxpv1hxBWCJK8Bbj" -O dataset.zip #dataset rischio
else:
  !gdown "1CdEx22PRxaJ0V7gaZqvlpt9g39ty04n1" -O dataset.zip #dataset gun classifier
'''
risk classifier dataset:
"No_risk": 0,
"Medium_risk": 1,
"High_risk": 2

gun classifier dataset:
"No_Gun": 0,
"Handgun": 1,
"Machine_Gun": 2
'''
!unzip dataset.zip

Di seguto viene inizializzato il dataset, splittando in 70/15/15 le varie parti.

In [None]:
df = pd.read_csv('dataset/class.csv')
X = df['filename']
y = df['tag']
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42, shuffle = True, stratify = y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, shuffle = True, stratify = y_temp)
print(len(X_train))
print(len(y_train))
print(len(X_val))
print(len(y_val))
print(len(X_test))
print(len(y_test))
df_train = pd.DataFrame({
    'filename': X_train,
    'tag': y_train
})
df_train.to_csv('dataset/train.csv', index=False)

df_val = pd.DataFrame({
    'filename': X_val,
    'tag': y_val
})
df_val.to_csv('dataset/val.csv', index=False)

df_test = pd.DataFrame({
    'filename': X_test,
    'tag': y_test
})
df_test.to_csv('dataset/test.csv', index=False)
# Percorsi dei file CSV
train_csv_path = 'dataset/train.csv'
test_csv_path = 'dataset/test.csv'
val_csv_path = 'dataset/val.csv'
# Cartella di origine dove sono memorizzati i video
source_folder = 'dataset'

def move_videos(csv_path, destination_folder):
    # Leggi il file CSV
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)
    df = pd.read_csv(csv_path)
    # Itera su ogni riga del DataFrame
    for filename in df['filename']:
        # Costruisci il percorso completo del file video
        video_path = os.path.join(source_folder, filename)
        # Verifica se il file esiste
        if os.path.exists(video_path):
            # Costruisci il percorso di destinazione
            destination_path = os.path.join(destination_folder, filename)
            # Sposta il file
            shutil.move(video_path, destination_path)
            print(f"Spostato {filename} a {destination_folder}")
        else:
            print(f"Il file {filename} non esiste in {source_folder}")

# Processa i file specificati nei CSV
move_videos(train_csv_path, 'dataset/train')
move_videos(test_csv_path,'dataset/test')
move_videos(val_csv_path, 'dataset/val')
shutil.move(train_csv_path, 'dataset/train')
shutil.move(test_csv_path, 'dataset/test')
shutil.move(val_csv_path, 'dataset/val')
os.remove('dataset/class.csv')

Inizializzo il modello di MoViNet, utilizzato per estrarre le features.
Rimuovo il classificatore.

In [None]:
model = MoViNet(_C.MODEL.MoViNetA0, causal = False, pretrained = True )
print("Original model head:", model.classifier)
model.classifier = nn.Sequential(
    nn.Conv3d(480, 2048, kernel_size=(1, 1, 1), stride=(1, 1, 1)),  # Mantenere l'ultimo layer convoluzionale
    nn.Identity()  # Aggiungi un layer di identità
)
print("New model head:", model.classifier)

dummy_input = torch.randn(1, 3, 16, 224, 224)  # (batch_size, channels, frames, height, width)
output = model(dummy_input)

print("Output shape:", output.shape)

Definisco le funzioni utili per l'estrazione

In [None]:
train_df = pd.read_csv("dataset/train/train.csv")
val_df = pd.read_csv("dataset/val/val.csv")
test_df = pd.read_csv("dataset/test/test.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for validation: {len(val_df)}")
print(f"Total videos for testing: {len(test_df)}")

def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

# estrazione dei frame dal video
def frames_from_video_file(video_path, n_frames, output_size = (224,224)):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  if video_length > 300:
    frame_step = 15
  else:
    frame_step = int(video_length/n_frames) - 1

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

# Label preprocessing with StringLookup.
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"].astype(str)), mask_token=None
)
print(label_processor.get_vocabulary())

def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["filename"].values.tolist()
    totale = len(video_paths)
    labels = df["tag"].astype(str).values
    labels = label_processor(labels[..., None]).numpy()
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    for idx, path in enumerate(video_paths):
        progressivo = idx + 1
        print(f"{progressivo}/{totale}: {path}")
        # Create a directory for the current video frames if it doesn't exist

        # Gather all its frames and add a batch dimension.
        frames = frames_from_video_file(os.path.join(root_dir, path), MAX_SEQ_LENGTH, (IMG_SIZE, IMG_SIZE))
         # Risolvi la dimensione del batch e dimensione dei canali
        frames = np.transpose(frames, (3, 0, 1, 2))  # Da (frames, height, width, channels) a (channels, frames, height, width)
        frames = frames[None, ...]  # Aggiungi la dimensione del batch: (1, channels, frames, height, width)
        frames = torch.tensor(frames, dtype=torch.float32)  # Converti in tensor

        # Estrai le feature dai frame del video
        temp_frame_features = np.zeros((MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
        with torch.no_grad():
            for i in range(min(MAX_SEQ_LENGTH, frames.shape[2])):
                frame = frames[:, :, i, :, :].unsqueeze(2)  # Prendi un singolo frame mantenendo le dimensioni
                features = model(frame)
                temp_frame_features[i] = features.squeeze().numpy()

        frame_features[idx] = temp_frame_features


    return frame_features, labels


A seconda della mia scelta, estraggo i frame e ne carico i dati sul drive oppure scarico i file già pronti.
Siccome per la feature_extraction utilizzo una rete neurale, tengo conto dei consumi effettuati.

In [None]:
if IMPORTA_ELABORAZIONI:
  if RISK_CLASSIFIER:
    !gdown "1oR0M8H259uHW_lAUq9pE22LajOpQvhHv" -O "transformers.zip"
  else:
    !gdown "1xyBO0CRA4myRcTPQs_tZBMlX729mucG3" -O "transformers.zip"
  !unzip transformers.zip
  train_data = np.load("train_data.npy")
  train_labels = np.load("train_labels.npy")
  val_data = np.load("val_data.npy")
  val_labels = np.load("val_labels.npy")
  test_data = np.load("test_data.npy")
  test_labels = np.load("test_labels.npy")
else:
  tracker = EmissionsTracker()
  tracker.start()
  train_data, train_labels = prepare_all_videos(train_df, "dataset/train")
  test_data, test_labels = prepare_all_videos(test_df, "dataset/test")
  val_data, val_labels = prepare_all_videos(val_df, "dataset/val")
  emission: float = tracker.stop()
  print(f"Emission: {emission}")
  if SALVA_PESI:
    np.save("train_data.npy", train_data)
    np.save("train_labels.npy", train_labels)
    np.save("val_data.npy", val_data)
    np.save("val_labels.npy", val_labels)
    np.save("test_data.npy", test_data)
    np.save("test_labels.npy", test_labels)
    !zip -r transformersElabDS2.zip train_data.npy train_labels.npy test_data.npy test_labels.npy val_data.npy val_labels.npy
    shutil.copy("transformersElabDS2.zip", "/content/drive/My Drive/transformers/transformersElabDS2.zip")

La classe PositionalEmbedding crea un livello che aggiunge informazioni posizionali agli input. Questo è fondamentale per i Transformer perché, diversamente dalle reti neurali ricorrenti (RNN), i Transformer non hanno una nozione intrinseca dell'ordine degli elementi nella sequenza. L'aggiunta degli embedding posizionali permette al modello di tener conto della posizione dei frame nel video, migliorando la capacità del modello di apprendere le dipendenze temporali.

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def build(self, input_shape):
        self.position_embeddings.build(input_shape)

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        inputs = tf.cast(inputs, self.compute_dtype)
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

La classe TransformerEncoder rappresenta un singolo strato di codifica dell'architettura Transformer. Questo strato combina meccanismi di attenzione multi-testa con una rete neurale feed-forward, supportata da tecniche di normalizzazione e connessioni residuali per facilitare l'apprendimento di rappresentazioni ricche e contestuali. Ecco una descrizione dettagliata delle componenti e delle operazioni eseguite da questa classe.
Il meccanismo di attenzione multi-testa permette al modello di focalizzarsi su diverse parti dell'input simultaneamente e di catturare vari tipi di relazioni e dipendenze a lungo raggio.

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.3
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation=keras.activations.gelu),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


Questo codice costruisce un modello di classificazione video utilizzando un Transformer Encoder per elaborare sequenze di frame. Il modello incorpora informazioni posizionali, cattura dipendenze temporali tra i frame, riduce la dimensionalità con global max pooling, e infine usa un layer denso con attivazione softmax per produrre le probabilità delle classi. Il modello è poi compilato con l'ottimizzatore Adam e una funzione di perdita adatta per classificazione multi-classe.

In [None]:
def get_compiled_model(shape):
    sequence_length = MAX_SEQ_LENGTH
    embed_dim = NUM_FEATURES
    dense_dim = 4
    num_heads = 1
    classes = len(label_processor.get_vocabulary())
    inputs = keras.Input(shape=shape)
    x = PositionalEmbedding(
        sequence_length, embed_dim, name="frame_position_embedding"
    )(inputs)
    x = TransformerEncoder(embed_dim, dense_dim, num_heads, name="transformer_layer")(x)
    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(classes, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def run_experiment():
    filepath = "/tmp/video_classifier.weights.h5"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )
    model = get_compiled_model(train_data.shape[1:])
    if ADDESTRA:
      tracker = EmissionsTracker()
      tracker.start()
      history = model.fit(
        train_data,
        train_labels,
        validation_data=(val_data, val_labels),
        epochs=EPOCHS,
        callbacks=[checkpoint],
        batch_size = 16,
    )
      emission: float = tracker.stop()
      print(f"Emission: {emission}")
      if SALVA_PESI: shutil.copy(filepath, "/content/drive/My Drive/transformers/video_gun_classifier.weights.h5")
    else:
      if RISK_CLASSIFIER:
        !gdown "1-E7XEIQkobqxHxnZKa7J4HC4ob_1Zdc1" -O "video_classifier.weights.h5"
      else:
        !gdown "1mfKophETzw_10_ByPClCDfaviXVZ701t" -O "video_classifier.weights.h5"
      model.load_weights("video_classifier.weights.h5")
    return model

In [None]:
trained_model = run_experiment()

Valuto il modello andando a stampare le metriche più importanti.

In [None]:
def evaluate_model(model, test_data, test_labels):
    # Predict the labels for the test set
    predictions = model.predict(test_data)
    predicted_labels = np.argmax(predictions, axis=1)

    # Calculate accuracy
    accuracy = accuracy_score(test_labels, predicted_labels)
    print(f"Test Accuracy: {accuracy}")

    # Print classification report
    print(classification_report(test_labels, predicted_labels))

    # Compute confusion matrix
    cm = confusion_matrix(test_labels, predicted_labels)

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.show()

In [None]:
evaluate_model(trained_model, test_data, test_labels)