<a href="https://colab.research.google.com/github/cathay/cnn/blob/master/driver_action_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import zipfile
import random
import tensorflow as tf
from shutil import copyfile
import pandas as pd
import numpy as np
import cv2
from sklearn.model_selection import train_test_split

#Import Google drive functions
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [0]:
#Authorize Google drive
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

#download train and test images
#download = drive.CreateFile({'id': '1KPoJecKIQCufzJo-LANkySjF8rp4motV'})
#download.GetContentFile('driver-detection-train-data.zip')

#download = drive.CreateFile({'id': '1POWy7luJLkVWEo5Iq0Dkgj1EbGn81-RQ'})
#download.GetContentFile('driver-detection-test-data.zip')

#Extract files
#zip_ref = zipfile.ZipFile('driver-detection-train-data.zip', 'r')
#zip_ref.extractall('tmp/trains')
import fnmatch

##Not sure why it cannot detect the file
for file in os.listdir('.'):
    if fnmatch.fnmatch(file, '*test*.zip'):
        zip_ref = zipfile.ZipFile(file, 'r')
        zip_ref.extractall('tmp/test')

zip_ref.close()

In [25]:
from keras.utils import to_categorical

def get_im_cv2(path, w, h):
  img = cv2.imread(path)
  resized = cv2.resize(img, (w,h), cv2.INTER_LINEAR) 
  return resized

TRAINING_DIR = 'tmp/trains/driver-detection-train-data'
IMAGE_WIDTH=224
IMAGE_HEIGHT=224
IMAGE_SIZE=(IMAGE_WIDTH, IMAGE_HEIGHT)
IMAGE_CHANNELS=3

files = []
categories = []
paths = []

for i in range(10):
  for file in os.listdir("%s/c%s" %(TRAINING_DIR, i)):
    files.append(file)
    categories.append(i)
    paths.append("%s/c%s/%s" %(TRAINING_DIR, i, file))

df = pd.DataFrame({'file': files,
                   'category': categories,
                   'path': paths
                  })

df['Y'] = df['category'].map(lambda x: to_categorical(x, 10, dtype='uint8'))
df['X'] = df['path'].map(lambda path: get_im_cv2(path, IMAGE_WIDTH, IMAGE_HEIGHT))
df.sample(5, axis=None)

Using TensorFlow backend.


Unnamed: 0,file,category,path,Y,X
3943,img_21191.jpg,1,tmp/trains/driver-detection-train-data/c1/img_...,"[0, 1, 0, 0, 0, 0, 0, 0, 0, 0]","[[[10, 17, 10], [9, 16, 9], [8, 15, 8], [9, 16..."
7870,img_53391.jpg,3,tmp/trains/driver-detection-train-data/c3/img_...,"[0, 0, 0, 1, 0, 0, 0, 0, 0, 0]","[[[9, 14, 12], [9, 14, 12], [9, 14, 12], [11, ..."
19493,img_92021.jpg,8,tmp/trains/driver-detection-train-data/c8/img_...,"[0, 0, 0, 0, 0, 0, 0, 0, 1, 0]","[[[46, 53, 46], [45, 52, 45], [44, 51, 44], [4..."
16793,img_60726.jpg,7,tmp/trains/driver-detection-train-data/c7/img_...,"[0, 0, 0, 0, 0, 0, 0, 1, 0, 0]","[[[31, 41, 35], [29, 39, 32], [31, 42, 32], [3..."
2577,img_56169.jpg,1,tmp/trains/driver-detection-train-data/c1/img_...,"[0, 1, 0, 0, 0, 0, 0, 0, 0, 0]","[[[34, 42, 35], [29, 38, 31], [28, 36, 29], [2..."


In [0]:
train_df, validate_df = train_test_split(df, test_size=0.20, random_state=42)
train_df = train_df.reset_index(drop=True)
validate_df = validate_df.reset_index(drop=True)

x_train = []
x_test = []
y_train = []
y_test= []

for item in train_df['X']:
  x_train.append(item)

for item in validate_df['X']:
  x_test.append(item)

for item in train_df['Y']:
  y_train.append(item)

for item in validate_df['Y']:
  y_test.append(item)

x_train = np.array(x_train, dtype=np.uint8)
x_test = np.array(x_test, dtype=np.uint8)
y_train = np.array(y_train, dtype=np.uint8)
y_test = np.array(y_test, dtype=np.uint8)

In [27]:
#build model
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Activation, BatchNormalization
from keras.optimizers import RMSprop, SGD
from keras.applications.resnet50 import ResNet50
from keras.applications.vgg16 import VGG16

base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS))
#base_model = VGG16(weights='imagenet', include_top=False, input_shape=(IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS))

model = Sequential()
model.add(base_model)

model.add(Flatten())
model.add(Dense(1024))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.5))

model.add(Dense(512))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax')) # 

for layer in base_model.layers:
    layer.trainable = False

#model.compile(loss='categorical_crossentropy', optimizer=RMSprop(lr=0.0001), metrics=['accuracy'])
#model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model.compile(optimizer=SGD(lr=0.0001, momentum=0.9), loss='categorical_crossentropy', metrics=['accuracy'])

model.summary()

## Train
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.applications.vgg16 import preprocess_input

earlystop = EarlyStopping(patience=5)
learning_rate_reduction = ReduceLROnPlateau(monitor='val_acc', 
                                            patience=2, 
                                            verbose=1, 
                                            factor=0.5, 
                                            min_lr=0.0001)
EPOCHS=10
BATCH_SIZE=15
callbacks = [learning_rate_reduction]

history = model.fit(x_train, 
                    y_train,
                    shuffle=True,
                    batch_size=BATCH_SIZE,
                    epochs=EPOCHS,
                    verbose=1,
                    validation_data=(x_test, y_test)
                    ,callbacks=callbacks)
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

















Downloading data from https://github.com/fchollet/deep-learning-models/releases/download/v0.2/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5

Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
resnet50 (Model)             (None, 7, 7, 2048)        23587712  
_________________________________________________________________
flatten_1 (Flatten)          (None, 100352)            0         
_________________________________________________________________
dense_1 (Dense)              (None, 1024)              102761472 
_________________________________________________________________
batch_normalization_1 (Batch (None, 1024)              4096      
_________________________________________________________________
activation_50 (Activation)   (None, 1024)            

In [28]:
TEST_DIRECTORY = 'tmp/test/driver-detection-test-data/batch0'
test_data = pd.DataFrame({'filename': os.listdir(TEST_DIRECTORY)})

test_data_frame = test_data.sample(20, axis=None)
test_data_frame['X'] = test_data_frame['filename'].map(lambda file: get_im_cv2(TEST_DIRECTORY + "/" + file, IMAGE_WIDTH, IMAGE_HEIGHT))

x_perf=[]
for item in test_data_frame['X']:
  x_perf.append(item)

x_perf = np.array(x_perf, dtype=np.uint8)
predict = model.predict(x_perf)



FileNotFoundError: ignored

In [0]:
#Plot prediction
import matplotlib.pyplot as plt
label_map = {
    0: "normal driving",
    1: "texting - right",
    2: "talking on the phone - right",
    3: "texting - left",
    4: "talking on the phone - left",
    5: "operating the radio",
    6: "drinking",
    7: "reaching behind",
    8: "hair and makeup",
    9: "talking to passenger"
}

test_data_frame['category'] = np.argmax(predict, axis=1)
test_data_frame['action'] = test_data_frame['category'].map(lambda x: label_map.get(x))
print(test_data_frame.head())

#Draw predictions with images
from keras.preprocessing.image import load_img
sample_test = test_data_frame
plt.figure(figsize=(12, 24))
index=0
for _, row in sample_test.iterrows():
    filename = row['filename']
    category = row['action']
    index=index+1
    img = load_img(TEST_DIRECTORY+filename, target_size=IMAGE_SIZE)
    plt.subplot(6, 3, index)
    plt.imshow(img)
    plt.xlabel(filename + '(' + "{}".format(category) + ')' )
plt.tight_layout()
plt.show()