<a target="_blank" href="http://colab.research.google.com/github/woflydev/odyssey_nnn/blob/main/utils/notebooks/training.ipynb">
        <img src="https://i.ibb.co/2P3SLwK/colab.png"  style="padding-bottom:5px;" />Run in Google Colab</a>

In [None]:
import tensorflow as tf
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from pandas import read_csv
from sklearn.model_selection import train_test_split
import math
# from skimage import data
from skimage import io
from skimage.exposure import match_histograms

In [None]:
# input image resolution to the DNN model (smaller than the original dataset image resolution: 320x240x3)
# new camera has width and height of (640, 360)
img_width = 640
img_height = 360
img_channels = 3

model_name = "model_opt"
model_file = "models/{}-{}x{}x{}".format(model_name[6:], img_width, img_height, img_channels)
pre="resize" # [resize|crop]

In [None]:
import requests 
import os
import zipfile

dataset_dir = "dataset"
dataset_zipfile = 'Dataset-kucsl-Apr2022.zip' # 'labtrack-leftonly-RTCSA22.zip' # 'Dataset.zip'
RunningInCOLAB = 'google.colab' in str(get_ipython())
force_unzip = True
use_match_histogram = False

if RunningInCOLAB: 
    print("in colab")
    from google.colab import files
    uploaded = files.upload()
    for fn in uploaded.keys():
        print('User uploaded file "{name}" with length {length} bytes'.format(
            name=fn, length=len(uploaded[fn])))
        dataset_zipfile = fn  # expecting a zip file to be uploaded. 
else:
    print ("not in colab")
    # RPI_IP=None # replace this with the IP address of your raspberry pi 
    # if RPI_IP is not None:
    #     URL="http://"+RPI_IP+":8000/"+dataset_zipfile
    #     filename=URL.split('/')[-1]
    #     r = requests.get(URL)
    #     open(filename, 'wb').write(r.content)

if not os.path.isfile(dataset_zipfile):
    #default_dataset_URL = "https://raw.githubusercontent.com/heechul/DeepPicar-v3/devel/Dataset-kucsl-Apr2022.zip"    
    ##print ("No existing dataset. download from ", default_dataset_URL)
    #r = requests.get(default_dataset_URL)
    #open(dataset_zipfile, 'wb').write(r.content)
    pass

if not os.path.isdir(dataset_dir) or force_unzip==True:
    #print ('unzip %s into %s folder' % (dataset_zipfile, dataset_dir))
    #zip_ref = zipfile.ZipFile(dataset_zipfile)
    #zip_ref.extractall(dataset_dir)
    pass

In [None]:
# run this first
#!pip uninstall --yes gdown # After running this line, restart Colab runtime.

# then this
!pip install --upgrade --no-cache-dir gdown

# replace this with id from gdrive
!gdown --id 1Q4-JTe_lxXonSl2VjDHz7FEjJjZjHvV3

# unzip
!unzip dataset.zip -d dataset/

In [None]:
if use_match_histogram:
    refs = []
    num_spectrums = 5
    size = img_width*img_height
    num_colors = int(256 / num_spectrums)
    color_pixels = int(size / num_colors)
    color = 0
    for s in range(num_spectrums): 
        num_color = 0
        img = []
        for i in range(img_height):
            img.append([])
            for j in range(img_width):
                img[i].append([])
                #for k in range(3):
                img[i][j].append(color)
                img[i][j].append(color)
                img[i][j].append(color)
                num_color += 1
                if num_color == color_pixels - 1:
                    if not color == 255:
                        color += 1
                    num_color = 0            
        img = np.asarray(img, dtype=np.uint8) / 255.
        refs.append(img)

In [None]:
# Train/test data lists
imgs = []
vals = []

def rad2deg(rad):
	return 180.0 * rad / math.pi

def crop_image(img):
	scaled_img = cv2.resize(img, (max(int(img_height * 4 / 3), img_width), img_height))
	fb_h, fb_w, fb_c = scaled_img.shape
	# print(scaled_img.shape)
	startx = int((fb_w - img_width) / 2)
	starty = int((fb_h - img_height) / 2)
	return scaled_img[starty:starty+img_height, startx:startx+img_width,:]

def data_generator():
	for vid_file_path in glob.iglob(f'{dataset_dir}/*.mp4'):
		csv_file_path = vid_file_path.replace('video', 'key').replace('mp4', 'csv') #used to be avi but different encoding now
		print(vid_file_path, csv_file_path)
		
		vid = cv2.VideoCapture(vid_file_path)
		df = read_csv(csv_file_path)
		
		for val in df["wheel"].values:
			ret,img = vid.read()
			if ret is not False:
				if pre == "crop":
					img = crop_image(img)
					print(img.shape)
				else:
					img = cv2.resize(img, (img_width, img_height))

				# Convert to grayscale and readd channel dimension
				if img_channels == 1:
						img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
						img = np.reshape(img, (img_height, img_width, img_channels))
				img = (img / 255).astype(np.uint8) # <--- prevents ram overload

				if use_match_histogram == True:
					for i in range(num_spectrums):
						img = match_histograms(img, refs[i], multichannel=True) # channel_axis=-1

				yield img, tf.constant(val) # returns the frame and data
			else:
				print("video seems to be empty. nothing to do!")
				continue

		

import glob
for vid_file_path in glob.iglob(f'{dataset_dir}/*.mp4'):
		csv_file_path = vid_file_path.replace('video', 'key').replace('mp4', 'csv') #used to be avi but different encoding now
		print(vid_file_path, csv_file_path)
		vid = cv2.VideoCapture(vid_file_path)
		df = read_csv(csv_file_path)
		for val in df["wheel"].values:
				ret,img = vid.read()
				if ret is not False:
					if pre=="crop":
						img = crop_image(img)
						print(img.shape)
					else:
						img = cv2.resize(img, (img_width, img_height))
					# Convert to grayscale and readd channel dimension
					if img_channels == 1:
							img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
							img = np.reshape(img, (img_height, img_width, img_channels))
					img = (img / 255).astype(np.uint8) # <--- prevents ram overload
					imgs.append(img)
					vals.append(val)
					if use_match_histogram == True:
							for i in range(num_spectrums):
									matched = match_histograms(img, refs[i], multichannel=True) # channel_axis=-1
									imgs.append(matched)
									vals.append(val)
				else:
					 print("video seems to be empty. nothing to do!")
		print(len(imgs), len(vals))

list_ds = tf.data.Dataset.from_generator(data_generator, output_signature=(
		tf.TensorSpec(shape=(img_height, img_width, img_channels), dtype=tf.float32),
		tf.TensorSpec(shape=(1), dtype = tf.float32)
))

# Convert lists to numpy arrays and ensure they are of equal length    
imgs = np.asarray(imgs)  # input images
vals = np.asarray(vals)  # steering angles
assert len(imgs) == len(vals)
print("Loaded {} smaples".format(len(imgs)))

In [None]:
if use_match_histogram:
    fig, axises = plt.subplots(nrows=1, ncols=num_spectrums+1, figsize=(8, 4),
                                        sharex=True, sharey=True)
    for i in range(num_spectrums+1):
        aa = axises[i]
        aa.imshow(imgs[240 + i])
    plt.tight_layout()
    plt.show()

In [None]:
# Split the dataset for train and test 
print("Splitting the dataset for training and testing...")
x_train, x_test, y_train, y_test = train_test_split(imgs, vals, test_size=0.35)

In [None]:
len(x_train)

In [None]:
from keras import layers

use_depthmodel = False

if use_depthmodel == True: 
  h_len = img_height
  w_len = img_width
  d_len = img_channels

  depth_multiplier = 1.0
  dw_conv_str="1111"
  fc_str="111"
  # depthwise convolution model (doesn't work for some reason!!!)
  model = tf.keras.Sequential()
  
  dw_conv_arr = []
  dw_conv_arr.append(layers.DepthwiseConv2D((5,5), strides=(2, 2)))
  dw_conv_arr.append(layers.Conv2D(round(36 * depth_multiplier), (1, 1), strides=(1, 1)))

  dw_conv_arr.append(layers.DepthwiseConv2D((5,5), strides=(2, 2)))
  dw_conv_arr.append(layers.Conv2D(round(48 * depth_multiplier), (1, 1), strides=(1, 1)))

  dw_conv_arr.append(layers.DepthwiseConv2D((3,3), strides=(1, 1)))
  dw_conv_arr.append(layers.Conv2D(round(64 * depth_multiplier), (1, 1), strides=(1, 1)))

  dw_conv_arr.append(layers.DepthwiseConv2D((3,3), strides=(1, 1)))
  dw_conv_arr.append(layers.Conv2D(round(64 * depth_multiplier), (1, 1), strides=(1, 1)))

  fc_arr = []
  fc_arr.append(layers.Dense(round(100 * depth_multiplier), activation='relu'))
  fc_arr.append(layers.Dense(round(50 * depth_multiplier), activation='relu'))
  fc_arr.append(layers.Dense(round(10 * depth_multiplier), activation='relu'))

  # input layer
  model.add(layers.Input(shape=(h_len,w_len,d_len), name='input'))
  model.add(layers.DepthwiseConv2D((5,5), strides=(2, 2), name='dwi'))
  model.add(layers.Conv2D(round(24 * depth_multiplier), (1, 1), strides=(1, 1), name='pwi'))
  model.add(layers.BatchNormalization())
  model.add(layers.ReLU())

  # Add Conv2D layers
  # for i in range(len(conv_str)):
  # 	if int(conv_str[i]) == 1:
  # 		model.add(conv2d_arr[i])

  for i in range(0, len(dw_conv_str)):
    if int(dw_conv_str[i]) == 1:
      model.add(dw_conv_arr[i*2])
      model.add(dw_conv_arr[i*2+1])
      model.add(layers.BatchNormalization())
      model.add(layers.ReLU())

  # Add Flatten layer
  model.add(layers.Flatten())

  # Add FC layers
  for i in range(len(fc_str)):
    if int(fc_str[i]) == 1:
      model.add(fc_arr[i])

  # Add final output layer
  model.add(layers.Dense(1, name="output"))

else:
  # original model
  model = tf.keras.Sequential()
  model.add(layers.Conv2D(24, (5,5), strides=(2,2), activation='relu', input_shape=(img_height,img_width,img_channels)))
  model.add(layers.Conv2D(36, (5,5), strides=(2,2), activation='relu'))
  model.add(layers.Conv2D(48, (5,5), strides=(2,2), activation='relu'))
  model.add(layers.Conv2D(64, (3,3), activation='relu'))
  model.add(layers.Conv2D(64, (3,3), activation='relu'))
  model.add(layers.Flatten())
  model.add(layers.Dense(100, activation='relu'))
  model.add(layers.Dense(50, activation='relu'))
  model.add(layers.Dense(10, activation='relu'))
  model.add(layers.Dense(1, name="output"))

model.summary()

In [None]:
print("Quantize to make it extreme zoom...")
if RunningInCOLAB:
  !pip install tensorflow_model_optimization

import tensorflow_model_optimization as tfmot
quantize_model = tfmot.quantization.keras.quantize_model
q_aware_model = quantize_model(model)

print ("Compiling model...")
q_aware_model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
		                loss=tf.keras.losses.MeanSquaredError())
q_aware_model.summary()

In [None]:
print ("Model fit")
# q_aware_model.reset_states()
history = q_aware_model.fit(x_train, y_train, batch_size=128,
                    epochs=50, validation_data=(x_test, y_test))

# Plot training and validation losses 
print(history.history.keys())
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
angle_thresh=10
# Helper functions for evaluating model accuracy

def get_action(angle_rad):
  degree = rad2deg(angle_rad)
  if degree < angle_thresh and degree > -angle_thresh:
    return "center"
  elif degree >= angle_thresh:
    return "right" 
  elif degree <-angle_thresh:
    return "left"

In [None]:
pred_angle = q_aware_model.predict(x_test)
pred = np.array(list(map(get_action, pred_angle)))
ground = np.array(list(map(get_action, y_test)))
print('Accuracy is %.3f' % np.mean(pred == ground))

visualization

In [None]:
def plot_image(i, prediction_array, groundtrue_array, img_array):
  angle, predicted_label, groundtrue_label, img = int(rad2deg(pred_angle[i])), prediction_array[i], groundtrue_array[i], img_array[i]
  plt.grid(True)
  plt.xticks([])
  plt.yticks([])
  img = np.squeeze(img)
  plt.imshow(img, cmap=plt.cm.binary)
  if predicted_label == groundtrue_label:
    color = 'green'
  else:
    color = 'red'  
  plt.xlabel("Predict:{} ({}) Actual:{}".format(predicted_label,angle,groundtrue_label), color=color)

In [None]:
#@title Visualize the outputs { run: "auto" }
index = 20 #@param {type:"slider", min:0, max:244, step:6}
for i in range(3):
  plt.figure(figsize=(10,3))
  plt.subplot(1,2,1)
  plot_image(index + 2*i, pred, ground, x_test)
  plt.subplot(1,2,2)
  plot_image(index + 2*i+1, pred, ground, x_test)
  plt.show()

In [None]:
# Save both the Keras and TFLite models      
print("Model Save")                  
q_aware_model.save(model_file+".h5")
files.download(model_file+".h5")

print("TFLite Model")
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]

# def representative_data_gen():
#   for input_value in tf.data.Dataset.from_tensor_slices(imgs).batch(1).take(100):
#     yield [input_value]

# converter.representative_dataset = representative_data_gen
# converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# converter.inference_input_type = tf.int8  # or tf.uint8
# converter.inference_output_type = tf.int8  # or tf.uint8

quantized_tflite_model = converter.convert()

interpreter = tf.lite.Interpreter(model_content=quantized_tflite_model)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

with open(model_file+".tflite", 'wb') as f:
    f.write(quantized_tflite_model)

if RunningInCOLAB: 
    from google.colab import files
    files.download(model_file+".tflite")