In [None]:
 # python standard libraries
import os
import random
import fnmatch
import datetime
import pickle

# data processing
import numpy as np
np.set_printoptions(formatter={'float_kind':lambda x: "%.4f" % x})

import pandas as pd
pd.set_option('display.width', 300)
pd.set_option('display.float_format', '{:,.4f}'.format)
pd.set_option('display.max_colwidth', 200)

# sklearn
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

# imaging
import cv2
from imgaug import augmenters as img_aug
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
from PIL import Image

# ml models
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

In [None]:
def create_data_sets(data_dir, visual=False):

  # Place images into pandas dataframe
  file_list = os.listdir(data_dir)
  image_paths = []
  steering_angles = []
  pattern = "*.png"
  for filename in file_list:
      if fnmatch.fnmatch(filename, pattern):
          image_paths.append(os.path.join(data_dir,filename))
          angle = filename.split('_')[2].split('.')[0].split(' ')[0]  # 092 part of video01_143_092.png is the angle. 90 is go straight
          steering_angles.append(int(angle))

  df = pd.DataFrame()
  df['ImagePath'] = image_paths
  df['Angle'] = steering_angles

  # Look at the distribution of steering angle
  num_of_bins = 25
  samples_per_bin = 400
  hist, bins = np.histogram(df['Angle'], num_of_bins)

  if visual:
    fig, axes = plt.subplots(1,1, figsize=(12,4))
    axes.hist(df['Angle'], bins=num_of_bins, width=1, color='blue')


  # Split training from testing data
  X_train, X_test, Y_train, Y_test = train_test_split(image_paths, steering_angles, test_size=0.15) #split data 85% training, 15% testing

  if visual:
    print("Training data: %d\nTesting data: %d" % (len(X_train), len(X_test)))
    # plot the distributions of train and valid, make sure they are consistent
    fig, axes = plt.subplots(1,2, figsize=(12,4))
    axes[0].hist(Y_train, bins=num_of_bins, width=1, color='blue')
    axes[0].set_title('Training Data')
    axes[1].hist(Y_test, bins=num_of_bins, width=1, color='red')
    axes[1].set_title('Testing Data')


  # Split validation from  testing data
  X_train, X_valid, Y_train, Y_valid = train_test_split(X_train, Y_train, test_size=0.10) #split data training-validation

  if visual:
    print("Training data: %d\nValidation data: %d" % (len(X_train), len(X_valid)))
    # plot the distributions of train and valid, make sure they are consistent
    fig, axes = plt.subplots(1,2, figsize=(12,4))
    axes[0].hist(Y_train, bins=num_of_bins, width=1, color='blue')
    axes[0].set_title('Training Data')
    axes[1].hist(Y_test, bins=num_of_bins, width=1, color='red')
    axes[1].set_title('Validation Data')


  return X_train, Y_train, X_valid, Y_valid, X_test, Y_test

In [None]:

def augment_images_for_preprocessing(X_train, Y_train, X_valid, Y_valid, X_test, Y_test):
  """
  Methods containing random number generation use seed function to replicate
  controlled data sets during trials
  """
  def my_imread(image_path):
      image = cv2.imread(image_path)
      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      return image

  def zoom(image):
      zoom = img_aug.Affine(scale=(1, 1.3))  # zoom from 100% (no zoom) to 130%
      image = zoom.augment_image(image)
      return image

  def pan(image):
      # pan left / right / up / down about 10%
      pan = img_aug.Affine(translate_percent= {"x" : (-0.1, 0.1), "y": (-0.1, 0.1)})
      image = pan.augment_image(image)
      return image

  def adjust_brightness(image):
      # increase or decrease brightness by 30%
      brightness = img_aug.Multiply((0.7, 1.3))
      image = brightness.augment_image(image)
      return image

  def blur(image):
      kernel_size = random.randint(1, 5)  # kernel larger than 5 would make the image way too blurry
      image = cv2.blur(image,(kernel_size, kernel_size))

      return image

  def random_flip(image, steering_angle):
      is_flip = random.randint(0, 1)
      if is_flip == 1:
          # randomly flip horizon
          image = cv2.flip(image,1)
          steering_angle = 180 - steering_angle

      return image, steering_angle

  def random_augment(image, steering_angle):
      """
      Randomly augment images by panning, zooming, blurring, and adjusting brightness
      """
      if np.random.rand() < 0.5:
          image = pan(image)
      if np.random.rand() < 0.5:
          image = zoom(image)
      if np.random.rand() < 0.5:
          image = blur(image)
      if np.random.rand() < 0.5:
          image = adjust_brightness(image)
      image, steering_angle = random_flip(image, steering_angle)

      return image, steering_angle

  def img_preprocess(image):
      height, _, _ = image.shape
      image = image[int(height/2):,:,:]   # remove top half of the image, as it is not relavant for lane following
      image = cv2.cvtColor(image, cv2.COLOR_RGB2YUV)  # Nvidia model said it is best to use YUV color space
      image = cv2.GaussianBlur(image, (3,3), 0)
      image = cv2.resize(image, (200,66)) # input image size (200,66) Nvidia model
      image = image / 255                 # normalizing
      return image

  def image_data_generator(image_paths, steering_angles, batch_size, is_training):
    while True:
        batch_images = []
        batch_steering_angles = []

        for i in range(batch_size):
            random_index = random.randint(0, len(image_paths) - 1)
            image_path = image_paths[random_index]
            image = my_imread(image_paths[random_index])
            steering_angle = steering_angles[random_index]
            if is_training:
                # training: augment image
                image, steering_angle = random_augment(image, steering_angle)

            image = img_preprocess(image)
            batch_images.append(image)
            batch_steering_angles.append(steering_angle)

        yield( np.asarray(batch_images), np.asarray(batch_steering_angles))

  # Create processed image test sets (ppi = pre-processed images)
  X_train_ppi, Y_train_ppi = next(image_data_generator(X_train, Y_train, len(X_train), True))
  X_valid_ppi, Y_valid_ppi = next(image_data_generator(X_valid, Y_valid, len(X_valid), True))
  X_test_ppi, Y_test_ppi = next(image_data_generator(X_test, Y_test, len(X_test), False))

  return X_train_ppi, Y_train_ppi, X_valid_ppi, Y_valid_ppi, X_test_ppi, Y_test_ppi


In [None]:
def load_model(path_to_model):
  return keras.models.load_model(path_to_model)

In [None]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)

            # Compute scaled distillation loss from https://arxiv.org/abs/1503.02531
            # The magnitudes of the gradients produced by the soft targets scale
            # as 1/T^2, multiply them by T^2 when using both hard and soft targets.
            distillation_loss = (
                self.distillation_loss_fn(
                    tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                    tf.nn.softmax(student_predictions / self.temperature, axis=1),
                )
                * self.temperature**2
            )

            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

In [None]:
from sklearn.metrics import mean_squared_error, r2_score

def summarize_prediction(Y_true, Y_pred):

    mse = mean_squared_error(Y_true, Y_pred)
    r_squared = r2_score(Y_true, Y_pred)

    print(f'mse       = {mse:.2f}')
    print(f'r_squared = {r_squared:.2%}')
    print()
    return mse, r_squared

def predict_and_summarize(X, Y, model):
    Y_pred = model.predict(X)
    mse, r_squared = summarize_prediction(Y, Y_pred)
    return mse, r_squared

def evaluate_and_summarize(X, Y, model):
    Y_pred = model.predict(X)
    mse, r_squared = summarize_prediction(Y, Y_pred)
    return mse, r_squared

In [None]:
def distilled0_model():
    model = keras.models.Sequential(name='distilled0_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(12, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(24, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled1_model():
    model = keras.models.Sequential(name='distilled1_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(24, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(36, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled2_model():
    model = keras.models.Sequential(name='distilled2_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(36, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(48, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled3_model():
    model = keras.models.Sequential(name='distilled3_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(48, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(64, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled4_model():
    model = keras.models.Sequential(name='distilled4_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(12, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(36, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
 def distilled5_model():
    model = keras.models.Sequential(name='distilled5_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(12, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(48, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
 def distilled6_model():
    model = keras.models.Sequential(name='distilled6_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(12, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(64, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
 def distilled7_model():
    model = keras.models.Sequential(name='distilled7_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(24, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(48, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled8_model():
    model = keras.models.Sequential(name='distilled8_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(24, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(64, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distilled9_model():
    model = keras.models.Sequential(name='distilled9_model')

    # elu=Expenential Linear Unit, similar to leaky Relu

    # Convolution Layers
    model.add(keras.layers.Conv2D(36, (5, 5), strides=(2, 2), input_shape=(66, 200, 3), activation='elu'))
    model.add(keras.layers.Conv2D(64, (3, 3), activation='elu'))

    # Fully Connected Layers
    model.add(keras.layers.Flatten())

    # output layer: turn angle (from 45-135, 90 is straight, <90 turn left, >90 turn right)
    model.add(keras.layers.Dense(1))

    # since this is a regression problem not classification problem,
    # we use MSE (Mean Squared Error) as loss function
    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    model.compile(loss='mse', optimizer=optimizer)

    return model

In [None]:
def distill_teacher_to_student(student, teacher, temp, X_train_ppi, Y_train_ppi, X_valid_ppi, Y_valid_ppi):
  """
  Distill teacher model into student model

  :student: points to defined student model architecture
  :teacher: points to trained teacher model
  :temp: temperature to perform the distillations
  :X_train_ppi: training preprocessed images
  :Y_train_ppi: training preprocessed angles
  :X_valid_ppi: validation preprocessed images
  :Y_valid_ppi: validation preprocessed angles
  """
  # Initialize and compile distiller
  distilled = Distiller(student=student, teacher=teacher)
  distilled.compile(
      optimizer=keras.optimizers.Adam(),
      metrics=[keras.metrics.SparseCategoricalAccuracy()],
      student_loss_fn=tf.keras.losses.MeanSquaredError(),
      distillation_loss_fn=tf.keras.losses.MeanSquaredError(),
      alpha=0.1,
      temperature=temp,
  )
  # Distill teacher to student
  distilled.fit(x=X_train_ppi, y=Y_train_ppi,
                steps_per_epoch=10,
                epochs=40,
                validation_data = (X_valid_ppi, Y_valid_ppi),
                verbose=1,
                shuffle=1)
  return distilled

In [None]:
import csv
def write_row_csv(data, csv_file):
  """
  Write a row into a csv file

  :param data: data to write into file
  :param csv_file: file youd like to write data into
  :return: none
  """
  with open(csv_file, 'a') as outfile:
    writer = csv.writer(outfile)
    writer.writerow(data)

In [None]:
def main():

  # Access data
  from google.colab import drive
  drive.mount('/content/drive')
  # change the data path on own google drive
  data_dir = '/content/drive/MyDrive/data/train_data/'

  # Create datasets
  X_train, Y_train, X_valid, Y_valid, X_test, Y_test = create_data_sets(data_dir)
  X_train_ppi, Y_train_ppi, X_valid_ppi, Y_valid_ppi, X_test_ppi, Y_test_ppi = augment_images_for_preprocessing(X_train, Y_train, X_valid, Y_valid, X_test, Y_test)

  """
  # For first run save images to ensure each run has the same datasets
  np.savetxt('X_train_ppi.csv', X_train_ppi, delimiter=',')
  np.savetxt('Y_train_ppi.csv', Y_train_ppi, delimiter=',')
  np.savetxt('X_valid_ppi.csv', X_valid_ppi, delimiter=',')
  np.savetxt('Y_valid_ppi.csv', Y_valid_ppi, delimiter=',')
  np.savetxt('X_test_ppi.csv', X_test_ppi, delimiter=',')
  np.savetxt('Y_test_ppi.csv', Y_test_ppi, delimiter=',')
  """
#have a look LeAnn 's teacher model and generate own or use it
  # Load trained teacher model
  path_to_teacher = '/content/drive/MyDrive/train_model/lane_navigation_final.h5'
  teacher = load_model(path_to_teacher)
  teacher_results = predict_and_summarize(X_test_ppi, Y_test_ppi, teacher)
  teacher.summary()

  # Deploy trials and save results
  result_file = '/content/drive/MyDrive/train_model/results.csv'

  # Independant Variables
  models = [distilled0_model, distilled1_model, distilled2_model, distilled3_model, distilled4_model, distilled5_model, distilled6_model, distilled7_model, distilled8_model, distilled9_model]
  # temps parameters.
  temps = [20, 10, 5, 2.5]

  # For each model, and each temp, run 10 distilled trials
  # refine the for loop, time complexity
  for i, model in enumerate(models):
    print('Now testing model...', str(i))
    student = model()
    student.summary()
    for temp in temps:
      print('Temp...', str(temp))
      for trial in range(1,11):
        print('Trial...', str(trial))

        # Construct student model
        student = model()

        # Distill teacher to student
        distill_teacher_to_student(student, teacher, temp, X_train_ppi, Y_train_ppi, X_valid_ppi, Y_valid_ppi)
        distilled_path = '/content/drive/MyDrive/train_model/distilled'+ str(i) +'_trial'+str(trial)+'_temp'+str(temp)+'_model.h5'

        # Save results row by row iteratively
        student.save(distilled_path)
        student_results = predict_and_summarize(X_test_ppi, Y_test_ppi, student)
        result_row = [str(model), str(trial), str(temp)] + list(teacher_results) + list(student_results)
        write_row_csv(result_row, result_file)


In [None]:
# Access data
from google.colab import drive
drive.mount('/content/drive')
# change the data path on own google drive
data_dir = '/content/drive/MyDrive/data/train_data/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
