# Data Ingestion


In [2]:

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
# formality 1

In [3]:
!kaggle datasets download -d mohamedmustafa/real-life-violence-situations-dataset


Dataset URL: https://www.kaggle.com/datasets/mohamedmustafa/real-life-violence-situations-dataset
License(s): copyright-authors
Downloading real-life-violence-situations-dataset.zip to /content
100% 3.58G/3.58G [00:50<00:00, 76.9MB/s]
100% 3.58G/3.58G [00:50<00:00, 75.7MB/s]


In [4]:
import tqdm
import collections
import os
import numpy as np
import tensorflow as tf
import keras
from keras import layers

In [9]:
import zipfile


In [57]:

import zipfile
zip_ref = zipfile.ZipFile('/content/dataset.zip', 'r')
zip_ref.extractall('/content/data')
zip_ref.close()



In [58]:
def get_class(url):
  classes=os.listdir(url)
  return classes

In [59]:
import os

In [67]:

def get_files_with_class(url):
  classes=get_class(url)
  files=collections.defaultdict(list)
  for class_ in classes:
    paths=os.path.join(url,class_)
    files[class_].extend(os.listdir(paths))
  return files



In [68]:
dataset_url="/content/data/Real Life Violence Dataset"

In [69]:
files=get_files_with_class(dataset_url)

In [70]:
len(files) # 2 CLasses : 1 violence and 1 non violence

2

In [71]:
len(files['NonViolence']) # 1000 videos each

1000

# Preprocessing

In [72]:
import random
import shutil  # to move files

In [73]:
files.keys()

dict_keys(['Violence', 'NonViolence'])

In [74]:
def divide_data(files,source,split_ratio):
  classes=os.listdir(source)
  print(classes)
  for class_name in classes:
    print(class_name)
    #getting videos from path
    class_path = os.path.join(source,str(class_name))
    videos=[f for f in os.listdir(class_path) if f.endswith('.mp4')]
    random.shuffle(videos)
    # setting the split
    split_point=int(len(videos)*split_ratio)
    train_videos=videos[:split_point]
    test_videos=videos[split_point:]

    # passing from each file
    for phase,file_list in [("train",train_videos),("test",test_videos)]:
      destination=os.path.join(source,phase,class_name)
      os.makedirs(destination,exist_ok=True)

      for vid in file_list:
        src=os.path.join(class_path,vid)
        dest=os.path.join(destination,vid)
        shutil.move(src,dest)



In [76]:
divide_data(files,'/content/data/Real Life Violence Dataset',0.5)

['Violence', 'NonViolence']
Violence
NonViolence


In [122]:
len(os.listdir("/content/data/Real Life Violence Dataset/test/Violence"))

500

In [81]:
shutil.rmtree("/content/data/real life violence situations")

#  ---

In [123]:
import cv2

In [82]:
def format_frames(frame,output_size):
  #setting frames
  #padding and resizing
  #output size
  frame=tf.image.convert_image_dtype(frame,tf.float32)
  frame=tf.image.resize_with_pad(frame,*output_size)
  return frame

In [128]:
def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

# Frame generator

In [129]:
class FrameGenerator:
  def __init__(self,path,n_frames,training=False):
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths=list(self.path.glob('*/*.mp4'))
    classes=[p.parent.name for p in video_paths]
    return video_paths,classes


  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [130]:
from pathlib import Path
# Create the training set
output_signature = (
    tf.TensorSpec(shape=(10, 224, 224, 3), dtype=tf.float32),
    tf.TensorSpec(shape=(),                  dtype=tf.int32)
)
fg_train = FrameGenerator(Path('/content/data/Real Life Violence Dataset/train'), n_frames=10, training=True)

train_ds = tf.data.Dataset.from_generator(fg_train, output_signature=output_signature)


In [131]:
for frames,labels in train_ds.take(10):
  print(labels)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)


In [132]:
#validation dataset

output_signature = (
    tf.TensorSpec(shape=(10, 224, 224, 3), dtype=tf.float32),
    tf.TensorSpec(shape=(),                  dtype=tf.int32)
)

fg_val = FrameGenerator(Path('/content/data/Real Life Violence Dataset/test'), n_frames=10, training=False)

val_ds = tf.data.Dataset.from_generator(fg_val, output_signature=output_signature)


In [133]:

train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)

# Model Creation

In [143]:
# TRansfer Learning:
    #fine tuning

In [144]:
import tensorflow
from tensorflow.keras import layers

In [145]:
models=[
    tensorflow.keras.applications.EfficientNetB0,
    tensorflow.keras.applications.MobileNetV2

]

In [146]:
shape=(224,224,3)

In [161]:
#trying without fine tuning

for model_fn in models:

  base_model=model_fn(include_top=False,pooling='avg',input_shape=(224,224,3))
  base_model.trainable=False
  print(f" Using base model: {model_fn.__name__}")
  model =tf.keras.Sequential([
    tf.keras.layers.Input(shape=(10, 224, 224, 3)),
    tf.keras.layers.Rescaling(scale=1./255),
    tf.keras.layers.TimeDistributed(base_model ),
    tf.keras.layers.GlobalAveragePooling1D(),
    tf.keras.layers.Dense(2)


  ])
  model.compile(optimizer='adam',loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=['accuracy'])
  model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=3,
    steps_per_epoch=10,
    validation_steps=2,
)


 Using base model: EfficientNetB0
Epoch 1/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m172s[0m 6s/step - accuracy: 0.5339 - loss: 0.7052 - val_accuracy: 0.0000e+00 - val_loss: 0.9165
Epoch 2/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 2s/step - accuracy: 0.6666 - loss: 0.6554 - val_accuracy: 1.0000 - val_loss: 0.1752
Epoch 3/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2s/step - accuracy: 0.6574 - loss: 0.7740 - val_accuracy: 1.0000 - val_loss: 0.2952
 Using base model: MobileNetV2
Epoch 1/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m89s[0m 3s/step - accuracy: 0.5466 - loss: 0.7462 - val_accuracy: 1.0000 - val_loss: 0.2078
Epoch 2/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 2s/step - accuracy: 0.4537 - loss: 1.0649 - val_accuracy: 1.0000 - val_loss: 0.2273
Epoch 3/3
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 2s/step - accuracy: 0.4425 - loss: 0.9342 - val_accuracy: 0.

In [163]:
import tensorflow as tf
import numpy as np
from pathlib import Path
print("model : ",model_fn.__name__)
# Path to  single test video
test_video_path = Path("/content/data/Real Life Violence Dataset/test/NonViolence/NV_101.mp4")

# 1. Extract frames from video
frames = frames_from_video_file(test_video_path, n_frames=10)  # shape: (10, 224, 224, 3)

# 2. Add a batch dimension: (1, 10, 224, 224, 3)
frames = tf.expand_dims(frames, axis=0)

# 3. Pass it to your model
pred = model(frames)  # shape: (1, num_classes)

# 4. Get predicted label
pred_label = tf.argmax(pred, axis=-1).numpy()[0]
print("Predicted class ID:", pred_label)

model :  MobileNetV2
Predicted class ID: 0


In [None]:

#0: non violence