In [None]:
! git clone https://github.com/seymanurakti/fight-detection-surv-dataset.git

In [None]:
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications import VGG16
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
from google.colab import drive


In [None]:
fight_data_path = '/content/fight-detection-surv-dataset/fight'
non_fight_data_path = '/content/fight-detection-surv-dataset/noFight'

### frame extraction

In [None]:
import cv2

def extract_frames(video_path, num_frames):
    frames = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Unable to open video file {video_path}")
        return frames

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames

    for count in range(num_frames):
        frame_number = count * interval
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
        ret, frame = cap.read()
        if not ret:
            print(f"Error: Unable to read frame {frame_number}")
            continue
        frames.append(frame)

    cap.release()
    # print(f"Extracted {len(frames)} frames from {video_path}")
    return frames

# Example usage
# video_path = "path_to_your_video.mp4"
# num_frames = 10

# frames = extract_frames(video_path, num_frames)
# print(f"Total frames extracted: {len(frames)}")


In [None]:
frames=extract_frames('/content/fight-detection-surv-dataset/fight/fi010.mp4',10)

In [None]:
import glob

In [None]:
fight_video_data_path = glob.iglob('*.mp4',root_dir=fight_data_path)
non_fight_video_data_path = glob.iglob('*.mp4', root_dir=non_fight_data_path)

In [None]:
fight_frames=[]
for i,path in enumerate(fight_video_data_path):
    video_path=fight_data_path + '/' + path
    fight_frames.append(extract_frames(video_path,10))

In [None]:
non_fight_frames=[]
for i,path in enumerate(non_fight_video_data_path):
    video_path=non_fight_data_path + '/' + path
    non_fight_frames.append(extract_frames(video_path,10))

### Resizing

In [None]:
import numpy as np

In [None]:
cap=cv2.VideoCapture('/content/fight-detection-surv-dataset/fight/fi003.mp4')
frame_number = 0
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
print(frame.shape)
cap.release()
cv2.destroyAllWindows()

In [None]:
def resize_frame(frame, target_size, interpolation=cv2.INTER_CUBIC):
    return cv2.resize(frame, target_size, interpolation=interpolation)

In [None]:
def resize_video(list_of_frames, target_size):
    resized_video = []
    for frame in list_of_frames:
      resized_frame=resize_frame(frame,target_size)
      resized_video.append(resized_frame)
    return resized_video

In [None]:
fight_resize=[]
target_size=(224,224)
i=0
for frame_list in fight_frames:
  fight_resize.append(resize_video(frame_list,target_size))
  # print('list {} resized'.format(i))
  i+=1

In [None]:
fight_resize[9][0].shape

In [None]:
non_fight_resize=[]
target_size=(224,224)
i=0
for frame_list in non_fight_frames:
  non_fight_resize.append(resize_video(frame_list,target_size))
  # print('list {} resized'.format(i))
  i+=1

In [None]:
non_fight_resize[3][0].shape

In [None]:
import numpy as np
import albumentations as A

In [None]:
# Function to augment a single frame
def augment_frame(frame,augmentation_pipeline):
    augmented = augmentation_pipeline(image=frame)
    return augmented['image']

In [None]:
def augment_frames(frames_list, num_augmented_per_frame):
    augmented_frames = []
    augmentation_pipeline = A.Compose([
        A.HorizontalFlip(p=0.4),                    # Apply horizontal flip with 50% probability
        A.RandomBrightnessContrast(p=0.48),          # Adjust brightness and contrast with 30% probability
        A.GaussianBlur(blur_limit=(1, 3), p=0.25),   # Apply Gaussian blur with 20% probability
        A.RandomScale(scale_limit=(-0.15, 0.15), p=0.3),  # Scale the frame randomly with a limit of -20% to +20%
        A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=20, p=0.3),  # Shift, scale, and rotate with limits
        A.CenterCrop(height=171, width=175, p=4)
    ])

    # Augment each frame and add to augmented_frames
    for frame in frames_list:
        # Add original frame
        augmented_frames.append(frame)
        # Augment the frame multiple times
        for _ in range(num_augmented_per_frame):
            augmented_frame = augment_frame(frame,augmentation_pipeline)
            augmented_frames.append(augmented_frame)

    return augmented_frames

# Example usage of augment_frames function
# augmented_frames = augment_frames(frames_list, num_augmented_per_frame)


In [None]:
fight_augmented=[]
i=0
for frame_list in fight_resize:
  fight_augmented.append(augment_frames(frame_list,4))
  # print('augmented {}th list'.format(i))
  i+=1

In [None]:
import matplotlib.pyplot as plt

def show_image_matplotlib(frame_array):
    frame_array_rgb = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)
    plt.imshow(frame_array_rgb)
    plt.axis('off')
    plt.show()


In [None]:
show_image_matplotlib(fight_augmented[3][4])

In [None]:
non_fight_augmented=[]
i=0
for frame_list in non_fight_resize:
  non_fight_augmented.append(augment_frames(frame_list,4))
  # print('augmented {}th list'.format(i))
  i+=1

### PROCESSING DATA FOR VGG INPUTS AND TRAIN TEST SPLIT

In [None]:
import random

combined_list = fight_augmented + non_fight_augmented

# fight_augmented gets label 1 and non_fight_augmented gets label 0
labels = [1] * len(fight_augmented) + [0] * len(non_fight_augmented)

combined_with_labels = list(zip(combined_list, labels))
random.shuffle(combined_with_labels)

shuffled_combined_list, shuffled_labels = zip(*combined_with_labels) #unzip

# Convert back to lists
input_data = list(shuffled_combined_list)
input_labels = list(shuffled_labels)

In [None]:
input_data[0][0].shape

In [None]:
num_test_samples = 25

#split the data
test_df = input_data[:num_test_samples]   #frame sampling
train_df = input_data[num_test_samples:]

#split the labels
test_labels = input_labels[:num_test_samples]
train_labels = input_labels[num_test_samples:]

In [None]:
print('train-1 ',train_labels.count(1))
print('train-0 ',train_labels.count(0))
print('test-1 ',test_labels.count(1))
print('test-0 ',test_labels.count(0))
#no oversampling

### Model building

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Input, Dense, LSTM, Bidirectional, Dropout, GlobalMaxPooling2D, GlobalMaxPooling1D, Masking, Attention, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
# Constants
num_frames_per_video = 50  # Each video has 50 frames
num_augmentations_per_frame = 4  # Assuming 4 augmentations per frame
frame_height = 224
frame_width = 224
channels = 3  # Assuming RGB channels
num_classes = 2  # Number of output classes (fight and non-fight)
batch_size = 8

### VGG MODEL

In [None]:
def build_vgg16_feature_extractor(input_shape=(224, 224, 3)):
    vgg16 = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
    vgg16.trainable = False
    inputs = Input(shape=input_shape)
    x = vgg16(inputs)
    outputs = GlobalMaxPooling2D()(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
def extract_features_from_frames_training(video, model):
  video_features = []
  num_frames_per_segment = 5   #1 original + 4 augmented frames
  num_segments = len(video) // num_frames_per_segment

  for i in range(num_segments):
      segment_features = []
      for j in range(num_frames_per_segment):
          frame_index = i * num_frames_per_segment + j
          frame = np.expand_dims(video[frame_index], axis=0)
          features = model.predict(frame)
          segment_features.append(features)

      # concatenated_features = np.concatenate(segment_features, axis=-1)
      # video_features.append(concatenated_features)
      averaged_features = np.mean(segment_features, axis=0)
      video_features.append(averaged_features)

  return video_features  #shape=(10,len of features of 5 frames concatenated)

In [None]:
# EXTRACTING FEATURES FOR EVERY VIDEO
def df_feature_extractor(input_data):
  vgg16_feature_extractor = build_vgg16_feature_extractor()
  extracted_features = []  #shape=(total videos, 10, features per frame ater 5 concatenate)
  for video in input_data:  #inputs shape(no. of videos, frames per video, heigth, width, channels)
    features = extract_features_from_frames_training(video, vgg16_feature_extractor)
    extracted_features.append(features)
  return extracted_features
# train_data = df_feature_extractor(train_df)
# print('train data features extracted')
# test_data = df_feature_extractor(test_df)
# print('test data features extracted')

In [None]:
train_data=df_feature_extractor(train_df)

In [None]:
test_data=df_feature_extractor(test_df)

In [None]:
train_data_np=np.array(train_data) #shape=(227,10,1,512)
train_data_np=np.squeeze(train_data_np,axis=2) #shape=(227,10,512)

test_data_np=np.array(test_data) #shape=(227,10,1,512)
test_data_np=np.squeeze(test_data_np,axis=2) #shape=(227,10,512)

### LSTM MODEL

In [None]:
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    # x = Masking(mask_value=0.0)(inputs)
    x = Bidirectional(LSTM(256, return_sequences=True))(inputs)
    x = Dropout(0.5)(x)
    attention = Attention()([x, x])
    x = Dense(1024, activation='relu')(attention)
    x = Dropout(0.5)(x)
    # x = GlobalMaxPooling1D()(x)
    x = Dense(50, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = GlobalMaxPooling1D()(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
def model_training():
  filepath = "/content/model_weights"
  checkpoint = ModelCheckpoint(
      filepath, save_weights_only=True, save_best_only=True, verbose=1
  )

  train_labels_array = np.array(train_labels)
  test_labels_array = np.array(test_labels)

  seq_model = build_lstm_model((10,512))    #10->timestamps,,,,512->features of 1 timestamp(averaged out of 5 frames)
  history = seq_model.fit(
      train_data_np,
      train_labels_array,
      validation_split=0.2,
      epochs=30,
      batch_size=11,
      callbacks=[checkpoint],
  )

  seq_model.load_weights(filepath)
  _, accuracy = seq_model.evaluate(test_data_np, test_labels_array)
  print(f"Test accuracy: {round(accuracy * 100, 2)}%")

  return history, seq_model

In [None]:
run_exp=model_training()