In [25]:
import cv2
import time
import numpy as np
from glob import glob
import tensorflow as tf
import albumentations as A
from numpy.random import default_rng

# Limit tensorflow from taking all the GPU memory if using it
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [2]:
# Input params
frames_num = 8
seq_length = 16
input_size = 224

backbone = '0'
classes = {0:'default', 1:'bienie'}

DATASET_PATH = "/kaggle/input/bienievideos"
WEIGHTS_PATH = "/kaggle/input/bienieweights"

In [14]:
# Gather video file(s) path(s)
videos = glob(f"{DATASET_PATH}/*/*.mp4")

In [15]:
# Class for reading video and taking frames
class VideoReader:

    def __init__(self):
        
        self.rng = default_rng()

    def read_random_frames_sequences(self, path, frames_num, seq_length, input_size):
        
        capture = cv2.VideoCapture(path)
        frames_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
        
        frames = {}        
        for start_frame_idx in self.rng.choice(frames_count-seq_length, size=frames_num, replace=False):
            frames[start_frame_idx] = []
            for frame_idx in np.arange(start_frame_idx, start_frame_idx+seq_length):
                frames[start_frame_idx].append(self._read_frame_at_index(path, capture, frame_idx, input_size))
        capture.release()
        return frames

    def _read_frame_at_index(self, path, capture, frame_idx, input_size):
        
        capture.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = capture.read()    
        if not ret or frame is None:
            return None
        else:
            return A.Resize(height=input_size, width=input_size)(image=cv2.cvtColor(frame[:,128:-128], cv2.COLOR_BGR2RGB))['image']

In [31]:
# Model architecture
def efnet_lstm(backbone='0', seq_length=16, input_size=224):

    if backbone == '0':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB0
    elif backbone == '1':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB1
    elif backbone == '2':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB2
    elif backbone == '3':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB3
    elif backbone == '4':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB4
    elif backbone == '5':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB5
    elif backbone == '6':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB6
    elif backbone == '7':
        EFNet = tf.keras.applications.efficientnet.EfficientNetB7
        
    bottleneck = EFNet(weights='imagenet', include_top=False, pooling='avg')
    inp = tf.keras.layers.Input((seq_length, input_size, input_size, 3))
    x = tf.keras.layers.TimeDistributed(bottleneck)(inp)
    x = tf.keras.layers.LSTM(128)(x)
    x = tf.keras.layers.Dense(64, activation='elu')(x)
    x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(inp,x)

    return model

In [32]:
# Initialize model and load weights
model = efnet_lstm(backbone=backbone, seq_length=seq_length, input_size=input_size)
model.load_weights(f"{WEIGHTS_PATH}/best.hdf5")

In [33]:
# Initialize video reader and select video file
video_reader = VideoReader()
video = videos[0]

In [34]:
# Inference
print(f"Video file: {video}\n")

t = time.time()
print(f"Reading {frames_num} random frames sequences...")
frames = video_reader.read_random_frames_sequences(video, frames_num, seq_length, input_size)
print(f"Reading random frames sequences starting at indices {list(frames.keys())} has been finished\n")

print(f"Generating predictions...")
preds = []
for frame_idx, images in frames.items():
    preds.append(model.predict(np.expand_dims(images, axis=0)))
    
gt = video.split('/')[-2]
conf = np.array(preds).sum()/frames_num
probability = conf if conf>0.5 else 1-conf
pred = classes[int(conf>0.5)]
print(f"The video contains {pred.upper()} scenario with {100*probability:.2f}% probability")
print(f"Ground truth: {gt.upper()}")
print(f"Inference time: {time.time()-t:.2f} seconds")