In [1]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers.convolutional import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.datasets import mnist
from keras.utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
from keras import optimizers, regularizers, models, layers
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.applications import VGG16
from random import shuffle, randint
import numpy as np
from PIL import Image

import matplotlib.pyplot as plt
import os, shutil
from os import listdir
from os.path import isfile, join
from keras.models import load_model, model_from_json
from pathlib import Path
from file_utils import make_dir_helper, delete_dir_helper, is_jpeg

Using TensorFlow backend.


In [2]:
project_dir_name = 'Esri Project'
base_scripts_dir =  join(os.path.expanduser('~'), 'Desktop', project_dir_name, 'scripts');
model_dir =  join(os.path.expanduser('~'), 'Desktop', project_dir_name, 'models');

POSITIVE_CLASS = 'roads'
NEGATIVE_CLASS = 'others'
MODEL_FILE_NAME = POSITIVE_CLASS + '_model_road_detector.h5'
ext = '.jpg'
IMAGE_SIZE = 256
EPOCHS = 30
BATCH_SIZE = 10
STEPS_PER_EPOCH = 50

dataset_dir_name =   'redlands roads dataset'
original_dataset_dir = join(os.path.expanduser('~'),'Desktop', project_dir_name, 'datasets', dataset_dir_name);
positive_dataset_dir = join(original_dataset_dir, POSITIVE_CLASS)
negative_dataset_dir = join(original_dataset_dir, NEGATIVE_CLASS)

base_data_dir =  join(os.path.expanduser('~'), 'Desktop', project_dir_name, 'temp data');


train_dir = join(base_data_dir, 'train')
validation_dir = join(base_data_dir, 'validation')
test_dir = join(base_data_dir, 'test')

model_file_path = join(model_dir, MODEL_FILE_NAME)

train_pos_dir = join(train_dir, POSITIVE_CLASS)
validation_pos_dir = join(validation_dir, POSITIVE_CLASS)
test_pos_dir = join(test_dir, POSITIVE_CLASS)

train_neg_dir = join(train_dir, NEGATIVE_CLASS)
validation_neg_dir = join(validation_dir, NEGATIVE_CLASS)
test_neg_dir = join(test_dir, NEGATIVE_CLASS)

In [3]:
delete_dir_helper(base_data_dir)

# Make base directories
make_dir_helper(base_data_dir)
make_dir_helper(train_dir)
make_dir_helper(validation_dir)
make_dir_helper(test_dir)
make_dir_helper(model_dir)

# Make positive class sub directories
make_dir_helper(train_pos_dir)
make_dir_helper(validation_pos_dir)
make_dir_helper(test_pos_dir)

# Make negative class sub directories
make_dir_helper(train_neg_dir)
make_dir_helper(validation_neg_dir)
make_dir_helper(test_neg_dir)

True

In [4]:
fnames = [f for f in listdir(positive_dataset_dir) if isfile(join(positive_dataset_dir, f))]
shuffle(fnames)

no_samples = len(fnames)
no_training_samples = round(no_samples * .70)

for idx, fname in enumerate(fnames):
    
    src_dir = os.path.join(positive_dataset_dir, fname)
    if idx < no_training_samples:
        
        dst_dir = os.path.join(train_pos_dir, fname)
        shutil.copyfile(src_dir, dst_dir)
    
    elif no_training_samples <= idx < no_samples:
        
        dst_dir = os.path.join(validation_pos_dir, fname)
        shutil.copyfile(src_dir, dst_dir)
        
        dst_dir = os.path.join(test_pos_dir, fname)
        shutil.copyfile(src_dir, dst_dir)

In [5]:
fnames = [f for f in os.listdir(negative_dataset_dir) if isfile(join(negative_dataset_dir, f))]
shuffle(fnames)

no_samples = len(fnames)
no_training_samples = int(no_samples * .70)
  
for idx, fname in enumerate(fnames):
    
    src_dir = os.path.join(negative_dataset_dir, fname)
    if idx < no_training_samples:
        
        dst_dir = os.path.join(train_neg_dir, fname)
        shutil.copyfile(src_dir, dst_dir)
        
    elif no_training_samples <= idx < no_samples:
        
        dst_dir = os.path.join(validation_neg_dir, fname)
        shutil.copyfile(src_dir, dst_dir)
        dst_dir = os.path.join(test_neg_dir, fname)
        shutil.copyfile(src_dir, dst_dir)

In [6]:
train_datagen = ImageDataGenerator(rescale=1./255,
                                   vertical_flip=True,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   rotation_range=40,
                                   width_shift_range=0.2,
                                   height_shift_range=0.2,
                                   horizontal_flip=True)   

test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
        train_dir,  
        target_size=(IMAGE_SIZE, IMAGE_SIZE),  
        batch_size=BATCH_SIZE,
        class_mode='binary')  

validation_generator = test_datagen.flow_from_directory(
        validation_dir,
        target_size=(IMAGE_SIZE, IMAGE_SIZE),
        batch_size=BATCH_SIZE,
        class_mode='binary')

Found 2570 images belonging to 2 classes.
Found 1102 images belonging to 2 classes.


In [7]:
model = models.Sequential()
conv_base = VGG16(weights='imagenet' ,include_top=False, input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3))

for layer in conv_base.layers:
    if 'block_5' in layer.name:
        layer.trainable = True
    else:
        layer.trainable = False


model.add(conv_base)
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.001)))
model.add(layers.Dropout(0.3))
model.add(layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy',
            optimizer=optimizers.RMSprop(lr=1e-4),
             metrics=['acc'])
conv_base.summary()

callbacks = [
    EarlyStopping(monitor='acc', patience=3, mode='auto'),
    ModelCheckpoint(monitor='val_loss', save_best_only=True, filepath=model_file_path)
]

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 256, 256, 3)       0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 256, 256, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 256, 256, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 128, 128, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 128, 128, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 128, 128, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 64, 64, 128)       0         
__________

In [8]:
history = model.fit_generator(
      train_generator,
      steps_per_epoch=STEPS_PER_EPOCH,
      epochs=EPOCHS,
      callbacks = callbacks,
      validation_data=validation_generator,
      validation_steps=STEPS_PER_EPOCH)

model.save(model_file_path) 

acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(acc))

plt.plot(epochs, acc, 'b+')
plt.plot(epochs, val_acc, 'bo')
plt.title('Training and validation accuracy')

plt.figure()

plt.plot(epochs, loss, 'b+')
plt.plot(epochs, val_loss, 'bo')
plt.title('Training and validation loss')

plt.show()

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
10/50 [=====>........................] - ETA: 92s - loss: 0.6101 - acc: 0.7000

KeyboardInterrupt: 

In [None]:
fnames = []
target_dir = test_dir
dnames = [join(target_dir, dname) for dname in listdir(target_dir)]

for dname in dnames:
    
    fnames =([join(dname, fname) for fname in listdir(dname)])
    
    cur_class = dname.split('\\')[-1]
    print(cur_class, end=':- ')

    correct_predict_count = 0
    high_accuracy_count = 0
    
    for index, img_path in enumerate(fnames):
          
        img = image.load_img(img_path, target_size=(IMAGE_SIZE, IMAGE_SIZE)) 
        
        img_tensor = image.img_to_array(img)
        img_tensor = img_tensor.reshape((1,) + img_tensor.shape)
        img_tensor /= 255.
        prediction = model.predict(img_tensor)[0]
        
        if prediction[0][0] == 0:
            correct_predict_count +=1
            
        if max_val > 0.8:
            high_accuracy_count += 1
          
    print('Accuracy: ' + str(correct_predict_count / len(fnames) * 100) + ' , High count: ' + str(high_accuracy_count))
    

In [None]:
images_array = np.array([np.array(Image.open(fname)) for fname in listdir(test_pos_dir) if is_jpeg(fname)])