In [1]:
import tensorflow as tf
import keras
from keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D
from keras.layers import Flatten, Dense, Dropout, BatchNormalization, Reshape, Permute, Multiply, Input, Activation
from keras.models import Sequential, Model
import tensorflow.keras.backend as K
import numpy as np
import cv2
#for GPU training
import tensorflow as tf
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())


config = tf.compat.v1.ConfigProto(gpu_options = 
                         tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.33)
# device_count = {'GPU': 1}
)
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(config=config)
tf.compat.v1.keras.backend.set_session(session)

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 11180419703988896659
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 3173331764
locality {
  bus_id: 1
  links {
  }
}
incarnation: 4603016271983946287
physical_device_desc: "device: 0, name: NVIDIA GeForce GTX 960M, pci bus id: 0000:02:00.0, compute capability: 5.0"
]


In [2]:
def SqueezeExcite(x, ratio=16, name=''):
    nb_chan = K.int_shape(x)[-1]

    y = GlobalAveragePooling2D(name='SE_avg_{}'.format(name))(x)
    y = Dense(nb_chan // ratio, activation='relu', name='se_dense1_{}'.format(name))(y)
    y = Dense(nb_chan, activation='sigmoid', name='dense2_{}'.format(name))(y)

    y = Multiply(name='se_mul_{}'.format(name))([x, y])
    return y

def SE_Conv2D_block(x, filters, kernel_size, name):
    y = Conv2D(filters, kernel_size, padding='same', name='conv_{}'.format(name))(x)
    #squeeze excite in this
    y = SqueezeExcite(y, ratio=16, name=name)
    #other relu and maxpools
    y = BatchNormalization(name='BatchNorm_{}'.format(name))(y)
    y = Activation('relu', name='Activation_{}'.format(name))(y)
    y = MaxPool2D(pool_size=(2, 2), padding='same', name='{}_pool'.format(name))(y)
    y = Dropout(0.25)(y)
    return y


def SE_Dense_block(x, size, name, bn=True):
    y = Dense(size, name='dense_{}'.format(name))(x)
    if bn:
        y = BatchNormalization(name='BatchNorm_{}'.format(name))(y)
    y = Activation('relu', name='Activation_{}'.format(name))(y)
    return y

In [4]:
#huge thanks to -->https://github.com/arthurdouillard/keras-squeeze_and_excitation_network


def SE_Model(nb_class, input_shape, include_top=True, weights = False):
    """SE net without the splitted stream."""

    img_input = Input(shape=input_shape)

    x = SE_Conv2D_block(img_input, 32, (3, 3), name='block1')
    

    params = [
        (64, (3, 3)),
        (128, (3, 3)),
        (256, (3, 3))
    ]

    for i, (filters, kernel_size) in enumerate(params, start=2):
        x = SE_Conv2D_block(x, filters, kernel_size, name='block{}'.format(i))

    if include_top:
        x = Flatten(name='flatten')(x)
#         x = SE_Dense_block(x, 512, 'top1', bn=True)
#         x = SE_Dense_block(x, 128, 'top2', bn=True)
#         x = SE_Dense_block(x, 32, 'top3', bn=True)
        x = Dense(nb_class, activation='sigmoid')(x)

    model = Model(inputs=img_input, outputs=x)

    if weights:
        print('Loading')
        model.load_weights(weights)
        print("Weight Loaded")

    return model

model = SE_Model(nb_class=1, input_shape=(128, 128, 3), weights='SEModelWeights.h5')
model.summary()
# model.compile()

optimizer = keras.optimizers.Adam(lr=0.00001)
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True) #from_logits=True means output probabilities are not normalized
acc_metric = keras.metrics.BinaryAccuracy()
val_acc_metric = keras.metrics.BinaryAccuracy()
val_loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)

Loading
Weight Loaded
Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv_block1 (Conv2D)            (None, 128, 128, 32) 896         input_2[0][0]                    
__________________________________________________________________________________________________
SE_avg_block1 (GlobalAveragePoo (None, 32)           0           conv_block1[0][0]                
__________________________________________________________________________________________________
se_dense1_block1 (Dense)        (None, 2)            66          SE_avg_block1[0][0]              
______________________________________________________________________

In [5]:
@tf.function
def test_step(x):
    val_preds = model(x, training=False)
    return val_preds

In [7]:
cap = cv2.VideoCapture(0)

while(True):
    # Capture frame-by-frame
    ret, frame = cap.read()
    frame = cv2.flip(frame,1)

    # Our operations on the frame come here
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray=[gray,gray,gray]
    to_test = np.resize(frame,(1,128,128,3))
#     print(to_test)
#     print(to_test.shape)
    result = test_step(to_test)
    result = np.reshape(tf.get_static_value(result),(1,1))[0][0]
    
    # Display the resulting frame
    prediction = ''
    if result>0.5:
        prediction = 'non-drowsy'
    else:
        prediction = 'drowsy'
       
    cv2.putText(frame, str(result), (10, 160), cv2.FONT_HERSHEY_PLAIN, 2, (255,255,255), 3) 
    cv2.putText(frame, prediction, (10, 200), cv2.FONT_HERSHEY_PLAIN, 2, (255,255,255), 3) 
    cv2.imshow('frame',frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()