In [2]:
import csv
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
def save_data(lips, Y, data_type):
    with open(f'{data_type}_lips.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        header = ['label']
        for i in range(lips[0].shape[0]): #frames
            for j in range(lips[0].shape[1]): #height
                for k in range(lips[0].shape[2]): #width
                    header.append(f'pixel_{i}_{j}_{k}')
        writer.writerow(header)
        for i in range(len(lips)):
            row = [Y[i].argmax()]
            current_lips = lips[i].flatten()
            for j in range(len(current_lips)):
                row.append(current_lips[j])
            writer.writerow(row)
            
            
def load_data():
    X_train = pd.read_csv('train_lips.csv')
    Y_train = X_train['label']
    del X_train['label']
    X_valid = pd.read_csv('valid_lips.csv')
    Y_valid = X_valid['label']
    del X_valid['label']
    
    X_test = pd.read_csv('test_lips.csv')
    Y_test = X_test['label']
    del X_test['label']
    return X_train, Y_train, X_valid, Y_valid, X_test, Y_test

In [3]:
# loading the data from the csv files
X_train, Y_train, X_valid, Y_valid, X_test, Y_test = load_data()
import numpy as np
import tensorflow
x_train = np.array(X_train, dtype='float32').reshape(-1, 11, 60, 100, 1)
x_valid = np.array(X_valid, dtype='float32').reshape(-1, 11, 60, 100, 1)
x_test = np.array(X_test, dtype='float32').reshape(-1, 11, 60, 100, 1)

onehot_encoder = OneHotEncoder()
    
Y_train_reshaped = np.array(Y_train).reshape(-1, 1)
Y_valid_reshaped = np.array(Y_valid).reshape(-1, 1)
Y_test_reshaped = np.array(Y_test).reshape(-1, 1)

Y_train_onehot = onehot_encoder.fit_transform(Y_train_reshaped)
Y_valid_onehot = onehot_encoder.fit_transform(Y_valid_reshaped)
Y_test_onehot = onehot_encoder.fit_transform(Y_test_reshaped)
    
# Convert one-hot encoded arrays to dense arrays
Y_train = Y_train_onehot.toarray()
Y_valid = Y_valid_onehot.toarray()
Y_test = Y_test_onehot.toarray()


In [5]:
mean=x_train.mean(axis=0)
std=x_train.std(axis=0)

X_train=np.array((x_train-mean)/std)
x_train=None
X_valid=np.array((x_valid-mean)/std)
x_valid=None
X_test=np.array((x_test-mean)/std)
x_test=None

print(X_train.shape)


(4768, 11, 60, 100, 1)


In [7]:
import wandb

wandb.login()



True

In [None]:
# This script needs these libraries to be installed:
#   tensorflow, numpy

import wandb
from wandb.keras import WandbMetricsLogger, WandbModelCheckpoint

import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import *

# Start a run, tracking hyperparameters
'''
wandb.init(
    # set the wandb project where this run will be logged
    project="LipReadingModel",

    # track hyperparameters and run metadata with wandb.config
    config={
        "num_layers": 3,
        "layer_1": 64,
        "activation_1": "relu",
        "dropout": random.uniform(0.01, 0.50),
        "resblock_size": 32,
        "resblock_activation": "relu",
        "optimizer": "adam",
        "loss": "categorical_crossentropy",
        "metric": "accuracy",
        "epoch": 10,
        "batch_size": 32
    }
)
'''
# [optional] use wandb.config as your config
#config = wandb.config
from keras import backend as K
def channel_normalization(x):
    # Normalize by the highest activation
    max_values = tf.argmax(tf.abs(x), 2)+1e-5
    out = x / max_values
    return out

from tensorflow.keras.layers import Conv2D, Conv3D, MaxPooling3D, Flatten, Dense, Dropout, BatchNormalization, Input, ReLU, GlobalAveragePooling3D, add
from tensorflow.keras import Model

output_shape = 6
input = Input(shape=(11, 60, 100, 1))

'''block_0'''
b0_conv3d_1 = Conv3D(64, kernel_size=(2, 3, 3), padding='same', use_bias=False,
                     name='b0_conv3d_1', kernel_initializer = 'he_normal')(input)
b0_relu_1 = ReLU(name='b0_relu_1')(b0_conv3d_1)
b0_bn_1 = BatchNormalization(name='b0_bn_1')(b0_relu_1)
b0_out =  MaxPooling3D(pool_size=(2, 2, 2))(b0_bn_1)

'''block_1'''
b1_cnv3d_1 = Conv3D(filters=16, kernel_size=(3, 3, 3), strides=(2, 2, 2), padding='same',
                        use_bias=False, name='b1_cnv3d_1', kernel_initializer='he_normal')(b0_out)
b1_relu_1 = ReLU(name='b1_relu_1')(b1_cnv3d_1)
b1_bn_1 = BatchNormalization(name='b1_bn_1')(b1_relu_1)  # size: 14*14

b1_cnv3d_2 = Conv3D(filters=32, kernel_size=(1, 1, 1), strides=(2, 2, 2), padding='same',
                    use_bias=False, name='b1_cnv3d_2', kernel_initializer='he_normal')(b1_bn_1)
b1_relu_2 = ReLU(name='b1_relu_2')(b1_cnv3d_2)
b1_out = BatchNormalization(name='b1_out')(b1_relu_2)  # size: 14*14


'''block 2'''
b2_cnv3d_1 = Conv3D(filters=32, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same',
                    use_bias=False, name='b2_cnv3d_1', kernel_initializer='he_normal')(b1_out)
b2_relu_1 = ReLU(name='b2_relu_1')(b2_cnv3d_1)
b2_bn_1 = BatchNormalization(name='b2_bn_1')(b2_relu_1)  # size: 14*14

b2_add = add([b1_out, b2_bn_1])  #

b2_cnv3d_2 = Conv3D(filters=64, kernel_size=(3, 3, 3), strides=(2, 2, 2), padding='same',
                    use_bias=False, name='b2_cnv3d_2', kernel_initializer='he_normal')(b2_add)
b2_relu_2 = ReLU(name='b2_relu_2')(b2_cnv3d_2)
b2_out = BatchNormalization(name='b2_bn_2')(b2_relu_2)  # size: 7*7

'''block 3'''
b3_cnv3d_1 = Conv3D(filters=64, kernel_size=(1, 1, 1), strides=(1, 1, 1), padding='same',
                    use_bias=False, name='b3_cnv3d_1', kernel_initializer='he_normal')(b2_out)
b3_relu_1 = ReLU(name='b3_relu_1')(b3_cnv3d_1)
b3_bn_1 = BatchNormalization(name='b3_bn_1')(b3_relu_1)  # size: 7*7

b3_add = add([b2_out, b3_bn_1])  #

b3_cnv3d_2 = Conv3D(filters=128, kernel_size=(3, 3, 3), strides=(2, 2, 2), padding='same',
                    use_bias=False, name='b3_cnv3d_2', kernel_initializer='he_normal')(b3_add)
b3_relu_2 = ReLU(name='b3_relu_2')(b3_cnv3d_2)
b3_out = BatchNormalization(name='b3_out')(b3_relu_2)  # size: 3*3

'''TCN block 1'''
n_layers=2
activation='relu'
from tensorflow.keras.layers import Lambda

for i in range(n_layers):
     
    b3_out =tf.keras.layers.ZeroPadding3D((2,1,0))(b3_out)
    b3_out =tf.keras.layers.Conv3D(32, (1,2,3), padding='same')(b3_out)
    

    b3_out = tf.keras.layers.SpatialDropout3D(0.3)(b3_out)
        
    if activation=='relu': 
        b3_out = tf.keras.activations.relu(b3_out)       
       # b3_out = tf.keras.layers.Lambda(channel_normalization, name="encoder_norm_{}".format(i), output_shape=(5,2,2,3))(b3_out)
    elif activation=='wavenet': 
        b3_out = tf.keras.WaveNet_activation(b3_out) 
    else:
        b3_out = tf.keras.Activation(activation)(b3_out)            
        
        b3_out =tf.keras.layers. MaxPooling1D(2)(b3_out)

    # ---- Decoder ----
    for i in range(n_layers):
        b3_out =tf.keras.layers.UpSampling3D(2)(b3_out)
        b3_out = tf.keras.layers.ZeroPadding3D((2,1,0))(b3_out)
        b3_out = tf.keras.layers.Conv3D(32, (1,2,3), padding='same')(b3_out)
        b3_out = tf.keras.layers.Cropping3D((0,0,1))(b3_out)

        b3_out = tf.keras.layers.SpatialDropout3D(0.3)(b3_out)

        if activation=='relu': 
            b3_out = tf.keras.activations.relu(b3_out)
          # model = Lambda(channel_normalization, name="decoder_norm_{}".format(i), output_shape=(14,6,2,32))(b3_out)
        elif activation=='wavenet': 
            b3_out = tf.keras.WaveNet_activation(b3_out) 
       

'''end block'''


b3_out =tf.keras.layers.TimeDistributed(Dense(output_shape, name='model_output', activation='relu',
                       kernel_initializer='he_uniform'))(b3_out)
b3_out = GlobalAveragePooling3D()(b3_out)
b3_out=Flatten()(b3_out)
output=Dense(6, activation='softmax')(b3_out)
model = Model(input, output)



# compile the model
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy']
              )

es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', verbose=1, patience=5, restore_best_weights=True,  min_delta=0.001)

model.summary()
# WandbMetricsLogger will log train and validation metrics to wandb
# WandbModelCheckpoint will upload model checkpoints to wandb
history = model.fit(X_train,Y_train,
                    epochs=20,
                    batch_size=32,verbose=1,
                    validation_data=(X_valid, Y_valid),callbacks=[es])



# [optional] finish the wandb run, necessary in notebooks 
# wandb.finish()

Epoch 1/20


Evaluate the model

In [ ]:
from sklearn.metrics import classification_report

preds = model.predict(X_test)

print(classification_report(np.argmax(Y_test,1),np.argmax(preds,1)))

Save the model

In [ ]:
model.save('model.test3.keras')