In [22]:
import numpy as np # math calcuations and other matrix, vector processing
import pandas as pd # dataframe organization (similar to Excel)
import seaborn as sns # for plotting
import matplotlib.pyplot as plt # for plotting
from sklearn.preprocessing import StandardScaler #replace by MinMaxScaler
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras import layers
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tensorflow import keras
from tensorflow.keras.layers import Dense, Conv2D
from tensorflow.keras.layers import BatchNormalization, Flatten, Dropout, MaxPooling2D

In [33]:
train_csv = 'train.csv'
test_csv = 'test.csv'
label_name = 'label'
id_name = 'id'

In [32]:
# Read data
def read_data(csv_file):
    df = pd.read_csv(csv_file)
    return df
# collect distinct labels
def distinct(df,label_name):
    train_labels = df[label_name]
    data_labels = train_labels.unique().tolist()
    return data_labels, train_labels
# convert train images to matrix
def convert_train(df_train_labels, id_name, img_size, channels, X_train, y_train):
    train_ids = df_train_labels[id_name]
    for index, _train_id in enumerate(train_ids):
        _label = train_labels[index]
        y_train.append(data_labels.index(_label))
        # image to matrix and resize to img_size * img_size    
        _img = load_img(f'imgs/{_train_id}.jpeg', target_size=(img_size, img_size))
        img2arr = img_to_array(_img)
        X_train.append(img2arr)
    # convert list to array
    X_train = np.array(X_train, dtype=np.float32)/255
    y_train = np.array(y_train)
    return X_train, y_train
# convert test images to matrix
def convert_test(df_test, id_name, img_size, X_test):
    test_ids = df_test[id_name]
    for index, _test_id in enumerate(test_ids):
        _label = train_labels[index]
        _img = load_img(f'imgs/{_test_id}.jpeg', target_size=(img_size, img_size))
        img2arr = img_to_array(_img)
        X_test.append(img2arr)
        # convert list to array and normalize
        X_test = np.array(X_test, dtype=np.float32)/255
        return X_test
# Build CNN model
def build_CNN_model(input_shape):
    model = Sequential()
    model.add(Conv2D(64, kernel_size=(3,3),
                 strides=(1,1),
                 padding='same',
                 activation='relu',
                 input_shape=(48,48,3))
             ) # output size: 48x48x64

    model.add(BatchNormalization()) #optional

    model.add(Conv2D(64, kernel_size=(3,3),
                 strides=(1,1),
                 padding='valid',
                 activation='relu'
                 #nput_shape=(48,48,64)
                    )
             )
    model.add(BatchNormalization()) #optional
    model.add(MaxPooling2D(2,2)) #optional, but often used to reduce 
                             # the overfitting, get simpler representation
    model.add(Dropout(0.25)) # optional, reduce overfitting 

    model.add(Conv2D(128, kernel_size=(3,3),
                 strides=(1,1),
                 padding='valid',
                 activation='relu'                
                    )
             )       

    model.add(BatchNormalization()) #optional
    model.add(MaxPooling2D(2,2)) 

    model.add(Flatten())
    #Fully Connected Layer(=Dense), 512, ReLU, BatchNorm, dropout(0.5)
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization()) #optional
    model.add(Dropout(0.5))
    #-> Dense, 128, ReLU, BatchNorm, dropout(0.5)
    model.add(Dense(128, activation='relu'))
    model.add(BatchNormalization()) #optional
    model.add(Dropout(0.5))
    #Dense, 2, softmax (Output layer)
    model.add(Dense(2, activation='softmax'))
    return model

In [15]:
# Read Training labels and collect distinct labels
df_train_labels = read_data(train_csv)
data_labels, train_labels = distinct(df_train_labels, label_name)


In [16]:
X_train = [] # matrix of train images
y_train = [] # labels
img_size = 48 # user-defined, original: 32x32
channels = 3
X_train, y_train = convert_train(df_train_labels, id_name, img_size, channels, X_train, y_train)

In [18]:
# Construct validation data 
X_train_, X_val, y_train_, y_val = train_test_split(
    X_train, y_train, stratify = y_train, test_size=0.3, 
    random_state=2022)

In [25]:
# Build the model
model = build_CNN_model(X_train_[0].shape)
model.summary()
#compile model
model.compile(loss='sparse_categorical_crossentropy', 
              optimizer='Adam', metrics=['accuracy'])
# optimizer: very important to get a better performance
# Adagrad, SGD, RMSprop

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_6 (Conv2D)           (None, 48, 48, 64)        1792      
                                                                 
 batch_normalization_10 (Bat  (None, 48, 48, 64)       256       
 chNormalization)                                                
                                                                 
 conv2d_7 (Conv2D)           (None, 46, 46, 64)        36928     
                                                                 
 batch_normalization_11 (Bat  (None, 46, 46, 64)       256       
 chNormalization)                                                
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 23, 23, 64)       0         
 2D)                                                             
                                                      

In [26]:
callback = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", 
    patience=3, 
    verbose=1,
    restore_best_weights=True                               
    )
data = {'X_train': X_train, 'y_train': y_train,
        'X_val' : X_val, 'y_val': y_val}
history = model.fit(x=X_train, y=y_train, 
                    batch_size=32,
                    validation_data=[data['X_val'], data['y_val']],
                    epochs=50,
                    callbacks=[callback],
                    verbose=1)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 11: early stopping


In [38]:
# check accuracy on the Test prediction
# Check performance on test-train-split set:
#predicted score
y_pred_test = model.predict(X_val) #output of last layer
# predicted labels
y_pred_label_test = np.argmax(y_pred_test, axis=1)


print(f'Accuracy score = {accuracy_score(y_val,y_pred_label_test)}')


Accuracy score = 0.9788583509513742
sum of val prediction = 390
sum of val actual = 378


In [39]:
#predict the labels of the test data 
df_test = read_data(test_csv)
X_test = []
X_test = convert_test(df_test, id_name, img_size, X_test)

In [40]:
y_test_prob_pred = model.predict(X_test)
y_test_label_pred = np.argmax(y_test_prob_pred, axis=1)



In [41]:
# Save Predictions for Test Data
#predicted score
df_test_pred = pd.DataFrame(data=y_test_label_pred, columns=['label'])
df_test_pred.to_csv('prediction.csv')