In [None]:
from google.colab import drive
drive.mount('/content/drive/')



In [None]:
import os
import cv2
import numpy as np
np.random.seed(1337) 
import tensorflow
from tensorflow import keras
from tensorflow.keras import applications
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import Model, Sequential
import tensorflow.keras.layers
from tensorflow.keras.layers import Dropout, Dense, Flatten
from tensorflow.keras.layers import ConvLSTM2D
from sklearn.model_selection import train_test_split
import tensorflow.keras.metrics as km
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler



import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.applications import VGG16,VGG19,inception_v3
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import RMSprop,Adam,Optimizer,Optimizer, SGD
from tensorflow.keras.layers import Input, Lambda,Dense, Dropout, Flatten, Conv2D, MaxPool2D, BatchNormalization,MaxPooling2D,BatchNormalization,\
                                    Permute, TimeDistributed, Bidirectional,GRU, SimpleRNN,\
                                    LSTM, GlobalAveragePooling2D, SeparableConv2D, ZeroPadding2D, Convolution2D, ZeroPadding2D,Reshape,\
                                    Conv2DTranspose, LeakyReLU, Conv1D, AveragePooling1D, MaxPooling1D

In [None]:
data_dir = "/content/drive/MyDrive/real_life_violence_situations/Real_Life_Violence_Dataset"

img_height , img_width = 90, 90
seq_len = 40 #number of images we pass per video
 
classes = ["NonViolence", "Violence"]

In [None]:
#The below function takes the path of a single video and then extracts frames from that video.
def frames_extraction(video_path):
    frames_list = []
     
    vidObj = cv2.VideoCapture(video_path)
    
    count = 1
    num = 0
    num22=0
    while count <= seq_len: 
        success, image = vidObj.read() 
        if success:
            image = cv2.resize(image, (img_height, img_width))
            frames_list.append(image)
            count += 1
        else:
            num+=1
            print(f"{num} Deafected Frame")
            
            break
 
            
    return frames_list

In [None]:
def create_data(input_dir):
    X = []
    Y = []
     
    classes_list = os.listdir(input_dir)
     
    for c in classes_list:
        print(c)
        files_list = os.listdir(os.path.join(input_dir, c))
        for f in files_list[:600]:
            frames = frames_extraction(os.path.join(os.path.join(input_dir, c), f))
            if len(frames) == seq_len:
                X.append(frames)
             
                y = [0]*len(classes)
                y[classes.index(c)] = 1
                Y.append(y)
     
    X = np.asarray(X)
    Y = np.asarray(Y)
    return X, Y
 
X, Y = create_data(data_dir)

In [None]:
print(np.shape(X))
print(np.shape(Y))

### Baseline

In [None]:
X2 = np.reshape(X, (1199, 40*90*90*3))
print(np.shape(X2))
X2 = np.concatenate((X2,np.zeros((1199,1))), axis = 1)
print(np.shape(X2))

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X2, Y, test_size=0.2, shuffle=True, random_state=1)

In [None]:
ff_model = keras.Sequential([
    keras.layers.InputLayer(input_shape=X_train.shape[1:]),
    keras.layers.Dense(units=5, activation="relu"),
    keras.layers.Dense(units=1),

])
opt = SGD(lr=0.001)
ff_model.compile(loss='binary_crossentropy', optimizer=opt, metrics=["accuracy"])
ff_model.summary()

In [None]:
hist = ff_model.fit(X_train, y_train ,batch_size=5, epochs=10)

In [None]:
y_pred = ff_model.predict(X_test)
y_pred = np.argmax(y_pred, axis = 1)
y_test = np.argmax(y_test, axis = 1)


In [None]:
print(classification_report(y_test, y_pred))

In [None]:
from sklearn.metrics import confusion_matrix
mypreds = ff_model.predict(X_test).argmax(axis=1)
cm = confusion_matrix(y_test, mypreds)
print(cm)

In [None]:
print(hist.history.keys())

In [None]:
from matplotlib import pyplot as plt
plt.plot(hist.history['loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


In [None]:
plt.plot(hist.history['accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

## ConvLSTM2D 

### Experiment 1

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, shuffle=True, random_state=1)


In [None]:
model = Sequential()
model.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(254, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(2, activation = "softmax"))

In [None]:
opt = SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping

In [None]:

earlystop = EarlyStopping(patience=7)
callbacks = [earlystop]

In [None]:
model.summary()

In [None]:
history = model.fit(x=X_train, y=y_train, epochs=25, batch_size = 6,shuffle = False, validation_split=0.2,callbacks=callbacks)


In [None]:
model.save('./model1.h5')

In [None]:
from IPython.display import FileLink
FileLink(r'./model1.h5')

In [None]:
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis = 1)
y_test = np.argmax(y_test, axis = 1)

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
from sklearn.metrics import confusion_matrix
mypreds = model.predict(X_test).argmax(axis=1)
cm = confusion_matrix(y_test, mypreds)
print(cm)

In [None]:
from matplotlib import pyplot as plt
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss']) 
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

### Experiment 2 
with 2 hidden layer
Dropout(0.8)
Dense 254 and 128

In [None]:
model = Sequential()
model.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model.add(Dropout(0.8))
model.add(Flatten())
model.add(Dense(254, activation="relu"))
model.add(Dropout(0.8))
model.add(Dense(128, activation = "softmax"))

In [None]:
opt = SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
 history3 = model.fit(X_train, y_train, epochs=25, batch_size = 5,shuffle = False, validation_split=0.2,callbacks=callbacks)



### Experiment 3 
with Dropout(0.9)
Dense 265 and 2


In [None]:
model2 = Sequential()
model2.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model2.add(Dropout(0.9))
model2.add(Flatten())
model2.add(Dense(265, activation="relu"))
model2.add(Dropout(0.9))
model2.add(Dense(2, activation = "softmax"))


In [None]:
opt = SGD(lr=0.001)
model2.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
 history6 = model2.fit(X_train, y_train, epochs=35, batch_size = 5,shuffle = False, validation_split=0.2,callbacks=callbacks)


### Conv1D
### Experiment 4


In [None]:
from keras.layers import Input, Dense, LSTM, MaxPooling1D, Conv1D
from keras.models import Model
modell = Sequential()
modell.add(Conv1D(filters=32,kernel_size=8 ,  input_shape = ( seq_len,img_height, img_width,3)))
modell.add(Dropout(0.2))
modell.add(Flatten())
modell.add(Dense(32, activation="relu"))
modell.add(Dropout(0.3))
modell.add(Dense(2, activation = "softmax")
)


In [None]:
modell.compile(loss='categorical_crossentropy', optimizer='adam',metrics=['accuracy'])

In [None]:
modell.summary()

In [None]:
history2 = modell.fit(X_train[0:50], y_train[0:50], epochs=25, batch_size = 5, validation_split=0.2)


### Transfare Learning

### InceptionV3

In [None]:
from keras.applications.inception_v3 import InceptionV3
import cv2
import os
import sys
import glob
from tqdm import tqdm
import argparse

In [None]:
X2 = np.reshape(X, (1199, 40*90*90*3))
print(np.shape(X2))
X2 = np.concatenate((X2,np.zeros((1199,1))), axis = 1)
print(np.shape(X2))

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X2, Y, test_size=0.2, shuffle=True, random_state=1)

In [None]:
 
baseModel = InceptionV3(weights="imagenet",include_top=False,input_shape=(img_height, img_width, 3))

In [None]:
from keras.layers.pooling import AveragePooling2D
headModel = baseModel.output
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(256, activation="relu")(headModel)
headModel = Dropout(0.5)(headModel)
headModel = Dense(len(classes), activation="softmax")(headModel)


In [None]:

model = Model(inputs=baseModel.input, outputs=headModel)

In [None]:

model.trainable = True

In [None]:
args = {
    "dataset": "data",
    "model": "model/violence_model.h5",
    "label-bin": "model/lb.pickle",
    "epochs": 10,
    "plot": "plot.png"
    
}

In [None]:
 
print('-'*100)
print("[INFO] compiling model...")
print('-'*100)
opt = SGD(lr=1e-4, momentum=0.9, decay=1e-4 / args["epochs"])
model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])
print(model.summary())

In [None]:

trainAug = ImageDataGenerator(
    rotation_range=30,
    zoom_range=0.15,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    horizontal_flip=True,
    fill_mode="nearest")

In [None]:

print('-'*50)
print("[INFO] training head...")
print('-'*50)
H = model.fit_generator(
    trainAug.flow(X_train, y_train, batch_size=5),
    steps_per_epoch=len(X_train) // 5,
    validation_data=valAug.flow(testX, testY),
    validation_steps=len(X_test) // 5,
    epochs=args["epochs"])

In [None]:

print('-'*100)
print("[INFO] plot the training loss and accuracy...")
print('-'*100)
N = args["epochs"]
plt.style.use("ggplot")
plt.figure()
plt.plot(np.arange(0, N), H.history["loss"], label="train_loss")
plt.plot(np.arange(0, N), H.history["val_loss"], label="val_loss")
plt.plot(np.arange(0, N), H.history["accuracy"], label="train_accuracy")
plt.plot(np.arange(0, N), H.history["val_accuracy"], label="val_accuracy")
plt.title("Training Loss and Accuracy on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.savefig(args["plot"])

### VGG16

In [None]:
### VGG16
#It's a simple model which uses 
#VGG16 features (GloabAveragePooling) for each frame, and LSTM to classify the frame sequences.

In [None]:
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import numpy as np

from tensorflow.keras.applications import VGG16
conv_base = VGG16(weights='imagenet',
                  include_top=False,
                  input_shape=(90, 90, 3))

IMG_SIZE=(90,90,3)
num_class = 2

def create_base():
  conv_base = VGG16(weights='imagenet',
                  include_top=False,
                  input_shape=(90, 90, 3))
  x = GlobalAveragePooling2D()(conv_base.output)
  base_model = Model(conv_base.input, x)
  return base_model

conv_base = create_base()

ip = Input(shape=(40,90,90,3))
t_conv = TimeDistributed(conv_base)(ip) # vgg16 feature extractor

t_lstm = LSTM(10, return_sequences=False)(t_conv)

f_softmax = Dense(num_class, activation='softmax')(t_lstm)

model = Model(ip, f_softmax)

model.summary()

In [None]:
opt = SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping
earlystop = EarlyStopping(patience=7)
callbacks = [earlystop]

In [None]:
 history3 = model.fit(X_train, y_train, epochs=25, batch_size = 5,shuffle = False, validation_split=0.2,callbacks=callbacks)


### Experiment 5

In [None]:
model = Sequential()
model.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model.add(Dropout(0.8))
model.add(Flatten())
model.add(Dense(254, activation="relu"))
model.add(Dropout(0.8))
model.add(Dense(2, activation = "softmax"))

In [None]:
opt = SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
history3 = model.fit(X_train[0:200], y_train[0:200], epochs=25, batch_size = 5,shuffle = False, validation_split=0.2,callbacks=callbacks)

### Experiment 6

In [None]:
model1 = Sequential()
model1.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model1.add(Dropout(0.8))
model1.add(Flatten())
model1.add(Dense(254, activation="relu"))
model1.add(Dropout(0.8))
model1.add(Dense(2, activation = "softmax"))


In [None]:
opt = SGD(lr=0.001)
model1.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])

In [None]:
history1 = model1.fit(X_train, y_train, epochs=35, batch_size = 5,shuffle = False, validation_split=0.2,callbacks=callbacks)

### Experiment 7

In [None]:
model = Sequential()
model.add(ConvLSTM2D(filters = 72, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (seq_len, img_height, img_width, 3)))
model.add(Dropout(0.2))
model.add(Flatten())
model.add(Dense(254, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(2, activation = "softmax"))

In [None]:
model.summary()

In [None]:
opt = SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=["accuracy"])
history = model.fit(x=X_train, y=y_train, epochs=25, batch_size = 5,shuffle = False, validation_split=0.2)


In [None]:
model.save('./model.h5')


In [None]:
from IPython.display import FileLink
FileLink(r'./model.h5')

In [None]:
y_pred = model.predict(X_test)
y_pred = np.argmax(y_pred, axis = 1)
y_test = np.argmax(y_test, axis = 1)

In [None]:
print(classification_report(y_test, y_pred))


In [None]:
from sklearn.metrics import confusion_matrix
mypreds = model.predict(X_test).argmax(axis=1)
cm = confusion_matrix(y_test, mypreds)
print(cm)

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss']) 
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

### Method to Predict  if video is violant or not

In [None]:
from keras.models import load_model
import cv2
import numpy as np

model = load_model('./model.h5')

video_path = 'fight.mp4'
category =['NonViolence', 'Violence']
frames = []
frame_count = 0
queue=[]

try:
    capture = cv2.VideoCapture(video_path)
    while (capture.isOpened()):
        ret, frame = capture.read()
          
        frame = cv2.resize(frame, (90, 90))
        if not ret:
            break

   
        frame_count += 1
        frames.append(frame)

        if frame_count < 40:
            continue 
        else:
            frame_count = 0
  
        sequence = []
        for image in frames:
            sequence.append(image) 

        prediction = model.predict(np.expand_dims(sequence, axis=0))
        print(prediction)
        queue.append(prediction)
        results = np.array(queue).mean(axis=0)
        i = category[np.argmax(results)]
except Exception as e:
    print(str(e))
print('Predicted: '+ i)
capture.release()

In [None]:
from keras.models import load_model
import cv2
import numpy as np

model = load_model('./model.h5')

video_path = 'Greeting.mp4'
category =['NonViolence', 'Violence']
frames = []
frame_count = 0
queue=[]

try:
    capture = cv2.VideoCapture(video_path)
    while (capture.isOpened()):
        ret, frame = capture.read()
          
        frame = cv2.resize(frame, (90, 90))
        if not ret:
            break

   
        frame_count += 1
        frames.append(frame)

        if frame_count < 40:
            continue 
        else:
            frame_count = 0
  
        sequence = []
        for image in frames:
            sequence.append(image) 

        prediction = model.predict(np.expand_dims(sequence, axis=0))
        print(prediction)
        queue.append(prediction)
        results = np.array(queue).mean(axis=0)
        i = category[np.argmax(results)]
except Exception as e:
    print(str(e))
print('Predicted: '+ i)
capture.release()