In [None]:
from time import time
import os
import cv2
import numpy as np
from tensorflow.keras.models import load_model


feature_extractor = load_model('feature_extractor.h5')
sequence_model = load_model('sequence_model.h5')
IMG_SIZE = 224
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 576


start = time()

def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)


def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask


def sequence_prediction(path):
    
    class_vocab = ['Dangerous', 'Safe']
    frames = load_video(os.path.join("test", path))
    start = time()
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = sequence_model.predict([frame_features, frame_mask])[0]
    end = time()
    print(f'Inference Time: {end - start}')
    print(f"{len(frames)} frames")
    inference_time = end - start

    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return inference_time, frames


test_video = "/content/Dangerous_1.avi"
print(f"Test video path: {test_video}")
inference_time, test_frames = sequence_prediction(test_video)
# to_gif(test_frames[:90])

end = time()

duration = end - start
print(duration)
print(duration/91)

In [None]:
from time import time
import os
import cv2
import numpy as np
from tensorflow.keras.models import load_model


def DangerousDrivingDetection(frames, 
                                feature_extractor_path='feature_extractor.h5',
                                sequence_model_path='sequence_model.h5',
                                IMG_SIZE = 224,
                                MAX_SEQ_LENGTH = 20,
                                NUM_FEATURES = 576):

    feature_extractor = load_model(feature_extractor_path)
    sequence_model = load_model(sequence_model_path)
    IMG_SIZE = IMG_SIZE
    MAX_SEQ_LENGTH = MAX_SEQ_LENGTH
    NUM_FEATURES = NUM_FEATURES


    start = time()

    def load_video(frames):
        return np.array(frames)



    def prepare_single_video(frames):
        frames = frames[None, ...]
        frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
            frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        return frame_features, frame_mask


    def sequence_prediction(frames):
        
        class_vocab = ['Dangerous', 'Safe']
        frames = load_video(frames)
        start = time()
        frame_features, frame_mask = prepare_single_video(frames)
        probabilities = sequence_model.predict([frame_features, frame_mask])[0]
        end = time()
        print(f'Inference Time: {end - start}')
        inference_time = end - start

        for i in np.argsort(probabilities)[::-1]:
            print(f"{i}  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
        
        if probabilities[0] > probabilities[1]:
            pass

        return inference_time, frames

    inference_time, test_frames = sequence_prediction(frames)

    end = time()

    duration = end - start
    print(duration)


cap = cv2.VideoCapture("Dangerous_10.avi")

counter = 0
frames = []
while True:

    ret, frame = cap.read()
    counter += 1
    IMG_SIZE = 224
    frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
    cv2.imshow("Camera", frame)
    frame = frame[:, :, [2, 1, 0]]
    frames.append(frame)

    if counter == 90:
        DangerousDrivingDetection(frames=frames,
                                    sequence_model_path = "lstm.h5")
        counter = 0
        frames = []
    else:
        pass

    # if cv2.waitKey(0) == ord("q"):
    #     break
        # cap.release()
        # cv2.destroyAllWindows()
    # else:
    #     pass



In [5]:
import multiprocessing as mp
from multiprocessing import Pool
import os

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))


In [4]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

In [27]:
import tensorflow as tf
from time import time
import os
import cv2
import numpy as np
from tensorflow.keras.models import load_model


def DangerousDrivingDetection(frames, 
                                feature_extractor_path='feature_extractor.h5',
                                sequence_model_path='sequence_model.h5',
                                IMG_SIZE = 224,
                                MAX_SEQ_LENGTH = 20,
                                NUM_FEATURES = 576):

    feature_extractor = load_model(feature_extractor_path)
    sequence_model = load_model(sequence_model_path)
    IMG_SIZE = IMG_SIZE
    MAX_SEQ_LENGTH = MAX_SEQ_LENGTH
    NUM_FEATURES = NUM_FEATURES


    start = time()

    def load_video(frames):
        return np.array(frames)



    def prepare_single_video(frames):
        frames = frames[None, ...]
        frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="int8")

        interpreter = tf.lite.Interpreter(model_path="feature_extractor_quantized.tflite")
        interpreter.allocate_tensors()
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()
        input_shape = input_details[0]['shape']
        
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                input = np.random.random_sample(input_shape)
                input[0] = batch[None, j, :]
                # input[0] = cv2.resize((224,224))
                input_data = np.array(input, dtype=np.float32)
                interpreter.set_tensor(input_details[0]['index'], input_data)
                interpreter.invoke()
                output_data = interpreter.get_tensor(output_details[0]['index'])
                
                # frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
            frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        return frame_features, frame_mask


    def sequence_prediction(frames):
        
        class_vocab = ['Dangerous', 'Safe']
        frames = load_video(frames)
        start = time()
        frame_features, frame_mask = prepare_single_video(frames)
        probabilities = sequence_model.predict([frame_features, frame_mask])[0]
        end = time()
        print(f'Inference Time: {end - start}')
        inference_time = end - start

        for i in np.argsort(probabilities)[::-1]:
            print(f"{i}  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
        
        if probabilities[0] > probabilities[1]:
            pass

        return inference_time, frames

    inference_time, test_frames = sequence_prediction(frames)

    end = time()

    duration = end - start
    print(duration)


cap = cv2.VideoCapture("Dangerous_10.avi")

counter = 0
frames = []
while True:

    ret, frame = cap.read()
    counter += 1
    IMG_SIZE = 224
    frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
    # frame = cv2.resize(frame,dsize=(224, 224))
    cv2.imshow("Camera", frame)
    frame = frame[:, :, [2, 1, 0]]
    frames.append(frame)

    if counter == 90:
        DangerousDrivingDetection(frames=frames,
                                    sequence_model_path = "lstm.h5")
        counter = 0
        frames = []
    else:
        pass

    # if cv2.waitKey(0) == ord("q"):
    #     break
        # cap.release()
        # cv2.destroyAllWindows()
    # else:
    #     pass



Inference Time: 9.574350357055664
0  Dangerous: 84.66%
1  Safe: 15.34%
9.597288131713867


error: OpenCV(4.6.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'


In [None]:
import numpy as np
import tensorflow as tf

# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path="converted_model.tflite")
interpreter.allocate_tensors()

# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Test the model on random input data.
input_shape = input_details[0]['shape']
input_data = np.array(np.random.random_sample(input_shape), dtype=np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)

interpreter.invoke()

# The function `get_tensor()` returns a copy of the tensor data.
# Use `tensor()` in order to get a pointer to the tensor.
output_data = interpreter.get_tensor(output_details[0]['index'])
print(output_data)


interpreter = tf.lite.Interpreter(TFLITE_FILE_PATH)
my_signature = interpreter.get_signature_runner()
output = my_signature(x=tf.constant([1.0], shape=(1,10), dtype=tf.float32))

In [22]:
interpreter = tf.lite.Interpreter(model_path="feature_extractor_quantized.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_shape = input_details[0]['shape']
input_data = np.array(np.random.random_sample(input_shape), dtype=np.float32)
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])

(224, 224, 3)
