Load the Data

In [9]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from sklearn.preprocessing import MinMaxScaler


dataframes_list = []
directory_path = r'c:\Users\bounn\Documents\train10\train10'

for subdir in os.listdir(directory_path):
    subdir_path = os.path.join(directory_path, subdir)

    if os.path.isdir(subdir_path):  
        word_dataframes = []

        for file in os.listdir(subdir_path):
            file_path = os.path.join(subdir_path, file)
            df = pd.read_csv(file_path)
            word_dataframes.append(df)

        dataframes_list.append(word_dataframes) # List of list of dataframes, dataframes_list[1] is the list of all data from the csv files in the addition directory



Definition of some useful functions

In [16]:

def nb_changes(polarity_list):
    changes = np.diff(polarity_list)
    return np.sum(changes != 0)

def nb_positive_changes(polarity_list):
    if not isinstance(polarity_list, list):
        return 0  # Return 0 if the input is not a list
    
    positive_changes = 0
    
    for i in range(1, len(polarity_list)):
        if polarity_list[i] == 1 and polarity_list[i-1] == 0:
            positive_changes += 1
    
    return positive_changes

Slice into frames and extract features from the data we have before normalizing them.
We extract the number of changes each pixel had during a timeframe, the number of positive changes it had (going from 0 to 1) aswell as the sum of the polarities captured.
We then store all of these informations into a list.

In [19]:
x_range = range(0, 224)
y_range = range(0, 90)
DF2 = pd.DataFrame([(x, y) for x in x_range for y in y_range], columns=['x', 'y'])
all_results = []
num_frames=30
for list_of_df in dataframes_list:
    list_results = []  # List of results for each time interval

    # Loop over each dataframe in the inner list
    for df in list_of_df:
        results = []
        med=df.loc[:,'time'].median()
        if med <600000 :
            interval_to_slice=df[(df['time'] < 1200000)]
        elif med >2400000 :
            interval_to_slice=df[(df['time'])>1800000]
        else :
            interval_to_slice=df[(df['time']>med-600000) & (df['time']<med+600000)]
        
        
        time_intervals = np.linspace(interval_to_slice['time'].to_numpy()[0], interval_to_slice['time'].to_numpy()[len(interval_to_slice)-1], num=num_frames + 1)
        for i in range(len(time_intervals) - 1):    
            start_time, end_time = time_intervals[i], time_intervals[i + 1]
            interval_data = interval_to_slice[(interval_to_slice['time'] >= start_time) & (interval_to_slice['time'] < end_time)]
            last_polarity_per_location =interval_data.groupby(['x', 'y'])['polarity'].sum().reset_index()
            last_pol_merged=pd.merge(DF2, last_polarity_per_location, on=['x', 'y'], how='left').fillna(0)
            scaler = MinMaxScaler()
            last_pol_merged['polarity']=scaler.fit_transform(last_pol_merged['polarity'].to_numpy().reshape(-1, 1))


            grouped_data = interval_data.groupby(['x', 'y']).agg({'polarity': nb_changes}).reset_index()
            DF_merged = pd.merge(DF2, grouped_data, on=['x', 'y'], how='left')
            DF_merged['polarity'].fillna(0, inplace=True)
            DF_merged['polarity'] = scaler.fit_transform(DF_merged[['polarity']])
            
            grouped_data2=pd.DataFrame(interval_data.groupby(['x', 'y']).agg({'polarity': list}).reset_index())
            DF_merged2 = pd.merge(DF2, grouped_data2, on=['x', 'y'], how='left')
            DF_merged2['polarity'].fillna(0, inplace=True)
            DF_merged2['polarity'] = scaler.fit_transform(DF_merged2.apply(lambda row: nb_positive_changes(row['polarity']), axis=1).to_numpy().reshape(-1, 1))
            


            matrix1=last_pol_merged.pivot(index='y', columns='x', values='polarity').values 
            matrix2=DF_merged2.pivot(index='y', columns='x', values='polarity').values ##nb of changes
            matrix3=DF_merged.pivot(index='y', columns='x', values='polarity').values ##nb of changes
            
            

            matrix3D = np.dstack((matrix1,matrix2,matrix3))
            results.append(matrix3D)
        list_results.append(results)
    all_results.append(list_results)

Store all the information in a tensor of shape 320,30,90,224,3 and save it

In [20]:

flattened_matrices = [matrix for list_of_results in all_results for results in list_of_results for matrix in results]

X = np.array(flattened_matrices)
X1 = X.reshape(320,30,90,224,3)
np.save('newX5.npy',X) ##30 frames, sum and nb of changes

In [24]:
import numpy as np
X = np.load('newX5.npy')
X1 = X.reshape(320,30,90,224,3)

Crop the grid to keep only the lips 

In [25]:
X3 = X1[:,:, 20:70,55:170:]

CNN 3D GRU architecture

In [36]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout,MaxPooling2D, Conv3D, GRU, Reshape
from tensorflow.keras.optimizers import Adam,SGD
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from keras.regularizers import l2
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.regularizers import l2
from keras.losses import SparseCategoricalCrossentropy
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X3[:,:,:,:,:], np.array([i for i in range(10) for _ in range(32)]), test_size=0.20, random_state=41)

# Build the model
custom_optimizer = Adam(learning_rate=0.0001)
model = tf.keras.models.Sequential()

model.add(tf.keras.layers.Conv3D(16, (2,2,2),padding='same', kernel_regularizer=l2(0.015), input_shape=(30,50,115,3)))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Conv3D(32, (2,2,2),padding='same',kernel_regularizer=l2(0.015)))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPooling3D((2,2,2)))

model.add(Reshape((15,-1)))
model.add(GRU(units=16, return_sequences=True, kernel_regularizer=l2(0.015),recurrent_dropout=0.1,activation='relu'))

model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(32))

model.add(tf.keras.layers.Activation('relu'))

'''model.add(tf.keras.layers.Dense(128,kernel_regularizer=l2(0.01)))
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.Dropout(0.1))'''


model.add(tf.keras.layers.Dense(10))

# Compile the model
model.compile(optimizer=custom_optimizer, loss=SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=200, batch_size=16, validation_data=(X_test, y_test), callbacks=[early_stopping,reduce_lr])

GRU only

In [34]:
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import LSTM, Dense, TimeDistributed, Flatten,GRU, Conv2D, MaxPooling2D
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.optimizers import Adam,SGD
from keras.layers import Dropout
from keras.callbacks import EarlyStopping
from keras.regularizers import l2
from keras.layers import BatchNormalization
import tensorflow as tf
from keras.losses import SparseCategoricalCrossentropy


early_stopping = EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)


custom_optimizer = Adam(learning_rate=0.00009)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X3[:,:,:,:,:].reshape(320,30,50*115*3), np.array([i for i in range(10) for _ in range(32)]), test_size=0.20, random_state=41)


# Create the model
model = tf.keras.models.Sequential()


model.add(GRU(units=64, return_sequences=True, kernel_regularizer=l2(0.016),dropout=0.1,recurrent_dropout=0.4, input_shape=(30, 50*115*3)))
model.add(GRU(units=128, return_sequences=True, kernel_regularizer=l2(0.016),activation='relu',dropout=0.2,recurrent_dropout=0.2))


model.add(Flatten())

model.add(tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=l2(0.016)))
model.add(Dropout(0.06))
model.add(tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=l2(0.016)))



model.add(Dense(units=10))

# Compile the model
model.compile(optimizer=custom_optimizer, loss=SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=100, batch_size=8, validation_data=(X_test, y_test), shuffle=True, callbacks=[early_stopping])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100


<keras.src.callbacks.History at 0x260f53e8640>

In [None]:
##resized crop : same result
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import LSTM, Dense, TimeDistributed, Flatten,GRU, Conv2D, MaxPooling2D
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.optimizers import Adam,SGD
from keras.layers import Dropout
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.regularizers import l2
from keras.layers import BatchNormalization
import tensorflow as tf
from keras.losses import SparseCategoricalCrossentropy


early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)


custom_optimizer = Adam(learning_rate=0.00008)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X3[:,:,:,:,:].reshape(320,30,50*115*2), np.array([i for i in range(10) for _ in range(32)]), test_size=0.2, random_state=35)


# Create the model
model = tf.keras.models.Sequential()


model.add(GRU(units=32, return_sequences=True, kernel_regularizer=l2(0.015),dropout=0.015,recurrent_dropout=0.25, input_shape=(30, 50*115*2)))

model.add(GRU(units=64, return_sequences=True, kernel_regularizer=l2(0.015),dropout=0.015,recurrent_dropout=0.25,activation='relu'))

#model.add(GRU(units=64, return_sequences=True, kernel_regularizer=l2(0.015),recurrent_dropout=0.3,activation='relu'))

model.add(Flatten())
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(Dropout(0.05))



model.add(Dense(units=10))

# Compile the model
model.compile(optimizer=custom_optimizer, loss=SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=100, batch_size=16, validation_data=(X_test, y_test), shuffle=True, callbacks=[early_stopping])