In [None]:
from tqdm import tqdm
import os
import cv2
import tensorflow as tf
import numpy as np
from random import shuffle
import time
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Dropout, LSTM, Conv2D, Bidirectional, Dense, MaxPooling2D, MaxPooling3D, Reshape, Flatten, TimeDistributed
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.losses import CategoricalCrossentropy
import gc

In [None]:
# # Assume that you have 12GB of GPU memory and want to allocate ~4GB:
# tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.333)

# gpus = tf.config.experimental.list_physical_devices('GPU')
# for gpu in gpus:
#   tf.config.experimental.set_memory_growth(gpu, True)

gpus = tf.config.list_physical_devices('GPU')
if gpus:
  # Restrict TensorFlow to only allocate 1GB of memory on the first GPU
  try:
    tf.config.set_logical_device_configuration(
        gpus[0],
        [tf.config.LogicalDeviceConfiguration(memory_limit=6000)])
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Virtual devices must be set before GPUs have been initialized
    print(e)


1 Physical GPUs, 1 Logical GPUs


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Global Variables
IMG_SIZE = 100
OPTIMIZER = 'SGD'
LEARNING_RATE = 0.01
DECAY = 1e-6
EPOCHS = 25
BATCH_SIZE = 5
LOSS = CategoricalCrossentropy()
NUM_FRAMES = 10
KERNEL = 64

In [None]:
OUTPUTFILEPATH ="./Hockeyfights/"
 
violenceIndicator ='fi'
nonViolenceIndicator = 'no'
 
if not os.path.exists(OUTPUTFILEPATH):
    os.mkdir(OUTPUTFILEPATH)

In [None]:
#Video Frame Extraction
def extractFrames(folder):
 
    if os.path.exists(folder) == False:
        print(str(folder) + " not found")
        return;
 
    c =0
    for files in tqdm(os.listdir(folder)):
        path = os.path.join(folder, files)
        cap = cv2.VideoCapture(path)
 
        success = True
 
        while success:
            success, image = cap.read()
 
            # RGB_img =cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
 
            RGB_img = image
 
            if not success:
                break
 
            if (violenceIndicator in files):
                if not os.path.exists(OUTPUTFILEPATH + "/" + violenceIndicator):
                    os.mkdir(OUTPUTFILEPATH + "/" + violenceIndicator +"/")
 
                cv2.resize(cv2.imwrite(OUTPUTFILEPATH + "/" + violenceIndicator+ "/"+ str(c) + '.jpg', RGB_img), (IMG_SIZE, IMG_SIZE))
            else:
                if not os.path.exists(OUTPUTFILEPATH + "/" + nonViolenceIndicator):
                    os.mkdir(OUTPUTFILEPATH + "/" + nonViolenceIndicator +"/")
                    
                cv2.resize(cv2.imwrite(OUTPUTFILEPATH + "/" + nonViolenceIndicator+ "/"+ str(c) + '.jpg', RGB_img), (IMG_SIZE, IMG_SIZE))
 
            c +=1
    print(c)
    print("Done with file extraction")

In [None]:
#dataset generation
def GenerateDataset():
    dataset = []
    images = []
    limit = 0
    
    violenceFilePath = OUTPUTFILEPATH + violenceIndicator +"/"
    nonViolenceFilePath = OUTPUTFILEPATH + nonViolenceIndicator + "/"
 
    for frames in tqdm(os.listdir(violenceFilePath)):
        path = os.path.join(violenceFilePath, frames)
        img = cv2.resize(cv2.imread(path), (IMG_SIZE, IMG_SIZE))
 
        images.append(np.array(img))
        limit +=1
    
 
        if limit == NUM_FRAMES:
            limit = 0
            dataset.append(np.array([images, np.array([0, 1])]))
            images = []
 
    for frames in tqdm(os.listdir(nonViolenceFilePath)):
        path = os.path.join(nonViolenceFilePath, frames)
        img = cv2.resize(cv2.imread(path), (IMG_SIZE, IMG_SIZE))
 
        images.append(np.array(img))
        limit +=1
    
 
        if limit == NUM_FRAMES:
            limit = 0
            dataset.append(np.array([images, np.array([1, 0])]))
            images = []
 
 
    print("Dataset Count " + str(len(dataset)))
    shuffle(dataset)
    np.save('datasetCNN.npy', dataset)
    return dataset

In [None]:
#Assign Extracted Data

#__Extract frames from data____
#extractFrames("./drive/MyDrive/NewTrainHockey")
#extractFrames("./drive/MyDrive/HockeyDataset")
extractFrames("./drive/MyDrive/HockeyFights")

data = GenerateDataset()

#Uncomment if data has been extracted
#data = np.load('datasetCNN.npy', allow_pickle=True)

#print(data)

./drive/MyDrive/HockeyDataset not found


100%|██████████| 1012/1012 [12:34<00:00,  1.34it/s]


41548
Done with file extraction


100%|██████████| 20926/20926 [00:36<00:00, 575.84it/s]
100%|██████████| 20622/20622 [00:36<00:00, 568.37it/s]


Dataset Count 4154


In [None]:
train, test = train_test_split(data, train_size=0.7, shuffle=True)

# print(train[0][1])
# print(train[1][1])


X = np.array([i[0] for i in train]).reshape(-1, NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3)
y = np.array([i[1] for i in train])

x_valid = np.array([i[0] for i in test]).reshape(-1, NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3)
y_valid = np.array([i[1] for i in test])

X = X.astype('float32')
x_valid = x_valid.astype('float32')

X /= 255
x_valid /=255

#print(X)
print("Training sample size = " +str(len(X)))
print("Testing sample size = " +str(len(x_valid)))
print(x_valid.shape)
# print(y[0])
# print(len(y))


In [None]:
num = 110
visible_frame = X[num]
visible_frame =  np.array(visible_frame).reshape(10,100,100,3)
 
plt.imshow(visible_frame[7])
print(y[num])
#print(np.array)

In [None]:
#clear up memory
del data
#del visible_frame
#del num
del train
del test
gc.collect()

In [None]:
#Architecture of Model

model = Sequential()
#input
model.add(Conv2D(KERNEL, (3, 3), activation='relu',  input_shape = (NUM_FRAMES, IMG_SIZE, IMG_SIZE, 3), padding ='same'))

#____v_____
model.add(Conv2D(KERNEL, (3, 3), activation='relu'))
model.add(MaxPooling3D((1,2,2)))
model.add(Dropout(0.25))

#____v_____
model.add(Conv2D(KERNEL, (3, 3), activation='relu'))
model.add(MaxPooling3D((1,2,2)))
model.add(Dropout(0.25))

#____v_____
model.add(Conv2D(KERNEL, (3, 3), activation='relu'))
model.add(MaxPooling3D((1,2,2)))
model.add(Dropout(0.25))

#___V____
#model.add(Flatten())
model.add(Reshape((NUM_FRAMES, KERNEL * IMG_SIZE)));

#____V_____________
lstmForward = LSTM(units=32)
lstmBackward = LSTM(units=32, go_backwards=True)
model.add(Bidirectional(lstmForward, backward_layer= lstmBackward))


#________V_________
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
#model.add(Dense(32, activation='sigmoid'))
model.add(Dropout(0.5))
#model.add(Dense(2, activation='softmax'))
model.add(Dense(2, activation='sigmoid'))


model.summary()


In [None]:
class MyCustomCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    gc.collect()

In [None]:
#Model Training

model.compile(loss= LOSS, optimizer= SGD(LEARNING_RATE), metrics=['accuracy'])

history = model.fit(X, y, batch_size=BATCH_SIZE, epochs= EPOCHS, verbose=1, validation_data=(x_valid,y_valid))


In [None]:
model.save("SavedFirst/BiLstMNet.h5");
print("Saved Model");

In [None]:
score = model.evaluate(x_valid, y_valid, verbose =0)
print('Test loss', score[0])
print('Test accuracy', score[1])

In [None]:
history_dict = history.history;

loss_value = history_dict['loss'];
val_loss_value = history_dict['val_loss'];
epoch = range(1,len(loss_value) + 1);

line1 =plt.plot(epoch, val_loss_value, label="Validation/Test loss");
line2 =plt.plot(epoch, loss_value, label="Training loss");

plt.setp(line1, linewidth=2.0, marker ='+', markersize = 10.0);
plt.setp(line2, linewidth=2.0, marker ='4', markersize = 10.0);

plt.xlabel('Epochs');
plt.ylabel('Loss');
plt.grid(True);
plt.legend()

plt.show();


In [None]:
history_dict = history.history;

acc_value = history_dict['accuracy'];
val_acc_value = history_dict['val_accuracy'];
epoch = range(1,len(acc_value) + 1);

line1 =plt.plot(epoch, val_acc_value, label="Validation/Test accuracy");
line2 =plt.plot(epoch, acc_value, label="Training accuracy");

plt.setp(line1, linewidth=2.0, marker ='+', markersize = 10.0);
plt.setp(line2, linewidth=2.0, marker ='4', markersize = 10.0);

plt.xlabel('Epochs');
plt.ylabel('Accuracy');
plt.grid(True);
plt.legend()

plt.show();


In [None]:
classifier = load_model("SavedFirst/BiLstMNet.h5")

classifier.summary()

score = classifier.evaluate(x_valid, y_valid, verbose =0)

print('Test loss', score[0])
print('Test accuracy', score[1])

In [None]:
def showModelTest(input_value, prediction):
  print("Prediction= " + str(prediction))
  if (prediction[0][0] > prediction[0][1]):
    print("No violence here")
  else:
    print("Violence Present")
  plt.imshow(input_value)

In [None]:
rand = np.random.randint(0, len(x_valid))
input_v = x_valid[rand]
#print(input_v.shape)

input_v = input_v[0].reshape(100,100,3)

#print(x_valid[rand].shape)
#print(x_valid.shape);
val = x_valid[rand].reshape(1,10,100,100,3)

result = classifier.predict_classes(val, 1,  verbose = 0)

#print(result)

showModelTest(input_v, result)