In [1]:
import utilities
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Convolution2D, MaxPooling2D, Flatten, Dense, Dropout , SpatialDropout2D
from keras.layers import AveragePooling2D, Activation
from keras.callbacks import History
import numpy as np
import pandas as pd
import os
import keras
from  keras import optimizers
from keras import losses
from keras.regularizers import l1_l2, l1,l2
from keras.models import model_from_json
from sklearn.utils import class_weight
import pyprog
#print (tf.__version__)

import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"

SETTINGS_DIR = os.path.dirname(os.path.realpath('__file__'))
train_set_path = SETTINGS_DIR+'/images/Control/Train/'

#test_set_path = SETTINGS_DIR+'/images/Dysarthric/Test/Combined'
test_set_path = SETTINGS_DIR+"/images/Control/Test/"
#test_set_path = SETTINGS_DIR+"/images/Dysarthric/Test/F05"
#test_set_path = SETTINGS_DIR+"/images/Dysarthric/Test/M06"
#test_set_path = SETTINGS_DIR+"/images/Dysarthric/Test/M10"

dnn_file_name_structure = SETTINGS_DIR +"/cnn_control.json"
training_dynamics_path = SETTINGS_DIR+'/TrainingDynamics.csv'
dnn_file_name_weights = SETTINGS_DIR +  "/cnn_weight_control.h5"


# IMPORTANT: Deleting .ipynb_checkpoints from image paths
!find '.' -name '*.ipynb_checkpoints' -exec rm -r {} +

batch_size=128
image_input_size=(150,150)
vocab_size = utilities.get_no_folders_in_path(test_set_path)
print ("Vocabulary Size:",vocab_size)


  return f(*args, **kwds)
  return f(*args, **kwds)


<Figure size 640x480 with 1 Axes>

<Figure size 640x480 with 1 Axes>

Using TensorFlow backend.


Vocabulary Size: 155


In [2]:
def model_compile(model):
    model.compile(loss=losses.categorical_crossentropy,
                          optimizer=optimizers.Adadelta(),
                          metrics=['accuracy'])
    
def get_model():
    
    #droprate=0.4
    droprate=0.5

    classifier = Sequential()

    classifier.add( Convolution2D(  filters=32, kernel_size=(3,3), 
                                  input_shape= (*image_input_size,3), 
                                  activation='relu')  )
    classifier.add( SpatialDropout2D(droprate) )
    classifier.add( Convolution2D(  filters=32, kernel_size=(3,3),activation='relu'  )  )
    classifier.add(MaxPooling2D (pool_size=(2,2) ) )
    #classifier.add(Dropout(droprate))


    classifier.add( Convolution2D(  filters=64, kernel_size=(3,3),activation='relu'  )  )
    classifier.add( SpatialDropout2D(droprate) )
    classifier.add( Convolution2D(  filters=64, kernel_size=(3,3),activation='relu'  )  )
    #classifier.add(BatchNormalization())
    classifier.add(MaxPooling2D (pool_size=(2,2) ) )
    #classifier.add(Dropout(droprate))

    classifier.add( Convolution2D(  filters=128, kernel_size=(3,3),activation='relu'  )  )
    classifier.add( SpatialDropout2D(droprate) )
    classifier.add( Convolution2D(  filters=128, kernel_size=(3,3),activation='relu'  )  )
    #classifier.add(BatchNormalization())
    classifier.add(MaxPooling2D (pool_size=(2,2) ) )
    #classifier.add(Dropout(droprate))

    classifier.add( Convolution2D(  filters=256, kernel_size=(3,3),activation='relu'  )  )
    classifier.add( SpatialDropout2D(droprate) )
    classifier.add( Convolution2D(  filters=256, kernel_size=(3,3),activation='relu'  )  )
    #classifier.add(BatchNormalization())
    classifier.add(MaxPooling2D (pool_size=(2,2) ) )
    #classifier.add(Dropout(0.5))
    classifier.add(Dropout(droprate))

    classifier.add (Flatten( ) )

    classifier.add(Dense (units=vocab_size, activation='softmax' ))
    classifier.summary()

    return classifier

In [3]:
def predict_an_image(image_path, model):
    
    from tensorflow.keras.preprocessing import image

    test_image = image.load_img(image_path, target_size = image_input_size)
    test_image = image.img_to_array(test_image)
    test_image = np.expand_dims(test_image, axis = 0) 

    y_pred = model.predict_classes(test_image,batch_size)[0]
    classes =training_set.class_indices
    for key, value in classes.items():
        if value==y_pred:
            break       

    pred_key=utilities.dictionary .index [ utilities.dictionary  ['FILE NAME'] == key ] 
    predicted_word=utilities.dictionary .iloc[pred_key[0],0]
    # Get true label
    true_key=true_key=utilities.file_to_index(image_path)
    true_word = utilities.dictionary .iloc[true_key,0]
    #print("Predicted:",predicted_word,", True:",true_word)
    return predicted_word, true_word

In [4]:
def read_epoch():
    if os.path.exists(training_dynamics_path):
        training_dynamics=pd.read_csv(training_dynamics_path)
        return training_dynamics["Epoch"][len(training_dynamics)-1]
    else:
        return 0

def load_model(learning_rate=0.001):
    # Loading the CNN
    json_file = open(dnn_file_name_structure, 'r')
    loaded_model_json = json_file.read()
    json_file.close()
    model = model_from_json(loaded_model_json)
    # load weights into new model
    model.load_weights(dnn_file_name_weights)
    model_compile(model)
    return model

def save_model(model):
    # Save/overwrite the model
    model_json = model.to_json()
    with open(dnn_file_name_structure, "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model.save_weights(dnn_file_name_weights)
    
def save_training_dynamics(epoch,history,with_header=False):
    training_dynamics=pd.DataFrame(
        data = [ [epoch, history.history['loss'][0] ,  history.history['acc'][0],  
                history.history['val_loss'][0],  history.history['val_acc'][0] ]],
        columns=["Epoch","TrainingLoss", "TrainingAccuracy","ValidationLoss","ValidationAccuracy"]
    )
    if (with_header):
        with open(training_dynamics_path, 'a') as csv_file:
            training_dynamics.to_csv(csv_file, header=True)
    else:
        with open(training_dynamics_path, 'a') as csv_file:
            training_dynamics.to_csv(csv_file, header=False)
            
def visualize_training():
    import matplotlib.pyplot as plt
    training_dynamics=pd.read_csv(training_dynamics_path)
    loss_values = training_dynamics["TrainingLoss"]
    val_loss_values = training_dynamics["ValidationLoss"]
    epochs = range(1, len (training_dynamics['Epoch'])+1)
    plt.plot(epochs, loss_values, 'b', label='Training loss')
    plt.plot(epochs, val_loss_values, 'g.-', label='Validation loss')
    plt.title('Training and validation loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    # Ploting Accuracy
    loss_values = training_dynamics["TrainingAccuracy"]
    val_loss_values = training_dynamics["ValidationAccuracy"]
    epochs = range(1, len (training_dynamics['Epoch'])+1)
    plt.plot(epochs, loss_values, 'b', label='Training loss')
    plt.plot(epochs, val_loss_values, 'g.-', label='Validation loss')
    plt.title('Training and validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [5]:
def get_train_test_sets():
        from keras.preprocessing.image import ImageDataGenerator
        
        # https://fairyonice.github.io/Learn-about-ImageDataGenerator.html
        train_datagen = ImageDataGenerator(
                    rescale=1./255,
            width_shift_range=0.30,
            shear_range=0.2,
            zoom_range=0.2,
            fill_mode='nearest',
            horizontal_flip=False)
        
        test_datagen = ImageDataGenerator(rescale=1./255)
        
        # If shuffle=False then the validation results will be different from classifier.predict_generator()
        training_set = train_datagen.flow_from_directory(
            train_set_path,
            target_size=image_input_size,
            batch_size=batch_size,
            class_mode='categorical',
            shuffle=True)
        
        test_set = test_datagen.flow_from_directory(
           test_set_path,
            target_size=image_input_size,
            batch_size=batch_size,
            class_mode='categorical',
            shuffle=False)
        return training_set, test_set

In [6]:
def test_generator(test_set_generator):
    steps=test_set_generator.samples/batch_size
    model = load_model()

    y_pred = model.evaluate_generator(test_set_generator, steps = steps, verbose = 1)
    y_test = test_set_generator.classes
    correct_classifications=0
    for pred,label in zip(y_pred, y_test):
        if pred.argmax()==label:
            correct_classifications+=1
    #print ("Testing acuracy:", correct_classifications/len(y_pred) *100,"%")
    print ("Testing acuracy:", y_pred[1] *100,"%")
    return 

def manual_testing():
    model = load_model() 
    #test_path = SETTINGS_DIR+"/images/Control/Test"
    #test_path = SETTINGS_DIR+"/images/Dysarthric/Test/F05"
    #test_path = SETTINGS_DIR+"/images/Dysarthric/Test/M06"
    #test_path = SETTINGS_DIR+"/images/Dysarthric/Test/M10"
    #test_path = SETTINGS_DIR+"/images/Control/Test"
    test_path = test_set_path
    
    correct_classifications=0
    i=0
    prog = pyprog.ProgressBar("Predicting ", " Done", utilities.get_no_files_in_path(test_path))
    # Show the initial status
    prog.update()
    no_processed=i
    for directory, s, files in os.walk(test_path):
            for f in files:
                file_path=directory+"/"+f
                if ("jpg" in f):                
                    predicted_word, true_word = predict_an_image(file_path,model)
                    #print (predicted_word,true_word)
                    if (predicted_word==true_word):
                        correct_classifications+=1
                    i+=1
                    prog.set_stat(i)
                    prog.update()

    prog.end()
    print ("Testing acuracy:", correct_classifications/i *100,"%")
    return (correct_classifications/i)

In [7]:
def train(ideal_loss=0.01, is_dnn_structure_changned=False,
              learning_rate=0.001,  max_epoch=50, enabled_trasfer_learning=False):
        
        is_new_dnn=False
        history = History()
        
        print("=================================================")
        
        if (os.path.isfile(dnn_file_name_structure) and
                (os.path.isfile(dnn_file_name_weights)) and 
                (is_dnn_structure_changned == False)):
            # load the previosly trained DNN
            if (enabled_trasfer_learning):
                # Enable Transfer Learning
                print ("Transfer learning is enabled.")
                model = FreezeLayers(load_model(),top_unfrozen_layer_name="conv2d_5" ) 
            else:
                print ("Transer learning is disabled.")
                model = load_model(learning_rate=learning_rate)
            print("CNN is loaded.")
        else:
            # Create a new model
            model =  get_model()                    
            print("CNN is created")
            # Erase the training_dynamic_csv file
            if os.path.exists(training_dynamics_path):
                os.remove(training_dynamics_path)
            is_new_dnn=True
            model_compile(model)
        
        ep= read_epoch()+1
        PringFrozenLayers(model)
        model.fit_generator(
            training_set,
            steps_per_epoch=training_set.samples/batch_size,
            #steps_per_epoch=500,
                             epochs=1,
                           
                             validation_data=test_set,
                             validation_steps=test_set.samples/batch_size,
                             workers=10, 
                             max_queue_size=10,  callbacks=[history])
        save_training_dynamics(ep,history,with_header=is_new_dnn)
       
        while (history.history['loss'][0] >= ideal_loss):
            print("Epoch", ep)
            model.fit_generator(
            training_set,
            steps_per_epoch=training_set.samples/batch_size,
            #steps_per_epoch=500,
                             epochs=1,
                             
                             validation_data=test_set,
                             validation_steps=test_set.samples/batch_size,
                             workers=10,
                             max_queue_size=10,  callbacks=[history])

            # Save/overwrite the model
            save_model(model)
            
            if (ep%5==0):
                if(manual_testing()>0.85):
                    break
                
            if   (history.history['val_acc'][0]>0.88):
                break
                
            ep += 1
            save_training_dynamics(ep,history,with_header=False)        

            # stop the traning if certain training accuracy is reached
            if (history.history['loss'][0]<ideal_loss):
                   break
            if (ep > max_epoch):
                break

        return history

In [8]:
# Transfer learning: freeze top layers but unfreeze all layers below the given layer
def FreezeLayers(model, top_unfrozen_layer_name):
    model.trainable=True
    set_trainable = False
    for layer in model.layers:
        if (layer.name==top_unfrozen_layer_name):
            set_trainable=True
        if (set_trainable):
            layer.trainable=True
        else:
            layer.trainable=False
    model_compile(model)
    return model

def PringFrozenLayers(model):
     for layer in model.layers:
            print ("Layer:",layer.name, "Frozen:",not layer.trainable)

In [9]:
# Load X and y
training_set, test_set =get_train_test_sets()

Found 15736 images belonging to 155 classes.
Found 1395 images belonging to 155 classes.


In [None]:
# Train       
history = train(is_dnn_structure_changned= False,  
                enabled_trasfer_learning=False,max_epoch=5000)

In [None]:
# Evaluate
y_pred=test_generator(training_set)

In [10]:
y_pred=test_generator(test_set)

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Testing acuracy: 84.08602134301245 %


In [None]:
# Making a single prediction
model = load_model() 
image_path = SETTINGS_DIR+"/images/Control/Test/C10/CM06_B1_C10_M3.jpg"
predicted_word, true_word = predict_an_image(image_path,model)
print ("Prediction:", predicted_word, " --- True Label:", true_word)

In [11]:
# Manual Prediction
manual_testing()

Predicting Progress: 100% ##################################################  Done
Testing acuracy: 72.68817204301075 %


0.7268817204301076

In [None]:
visualize_training()

In [None]:
seq=Sequential()
model = load_model()

for layer in model.layers[:-3]:
    seq.add(layer)


In [None]:
model.summary()

In [None]:
seq.summary()