<a href="https://colab.research.google.com/github/jenieto/computer-vision/blob/k-fold-cross-validation/computer-vision.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Trabajo M0

In [0]:
# Montamos Google Drive
from google.colab import drive
drive.mount('/content/drive/')
!unzip -o "/content/drive/My Drive/Datasets/computer-vision-M2.zip" -d /content/

In [0]:
# Importamos librerias
import tensorflow as tf
import pandas as pd
import numpy as np
import cv2
import os
import re
from google.colab.patches import cv2_imshow
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [0]:
# Definimos variables
items_path = '/content/computer-vision-M2/anotaciones_itemsEvaluables_v4.csv'
quality_path = '/content/computer-vision-M2/anotacionCalidadCopia.csv'
images_path = '/content/computer-vision-M2/dataset2'
min_quality = 4
pattern = 'a13'
IMAGE_CHANNELS = 1
IMAGE_SIZE = (128, 128, IMAGE_CHANNELS)
test_dataset_size = 0.20

In [0]:
# Funciones para leer los datos

def remove_spaces(data, keys=None):
    if keys is None:
        keys = data.keys()
    for key in keys:
        data[key] = data[key].apply(str.replace, args=(' ', ''))

def read_csv_data():
  quality_data = pd.read_csv(quality_path, sep=';', names=['image', 'quality'], skipinitialspace=True)
  items_data = pd.read_csv(items_path, sep=';', names=['dir', 'image', 'figure', 'coords', 'valid'], index_col=False, skipinitialspace=True)
  # remove_spaces(items_data)
  remove_spaces(quality_data, keys=['image'])
  merged_data = pd.merge(items_data, quality_data, how='left', left_on='image', right_on='image')
  merged_data = merged_data[merged_data['quality'] >= min_quality]
  merged_data = merged_data[merged_data['valid'] == True]
  return merged_data

In [0]:
# Leemos los datos
data = read_csv_data()
data

In [0]:
# Inicializamos una intancia scaler
scaler = preprocessing.StandardScaler()

# Generar coordenadas
def generate_coords(row, pattern):
  flat = np.array([int(s) for s in re.findall('\d+', row['coords'])]) # Parsea los números en el texto
  mat = flat.reshape((-1, 2)) # Reagrupa las coordenadas en grupos de dos
  valid = True
  if pattern == 'a1':
    # La mayoría tienen 1 punto
    n_points = 1
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a2':
    # Demasiada variación, de momento lo ignoramos
    valid = False
  elif pattern == 'a3':
    # Todas tienen 1 punto
    n_points = 1
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a4':
    # necesitamos 2 puntos, cogemos siempre el primero y el ultimo punto
    n_points = 2
    if mat.shape[0] >= n_points:
      mat = np.array([mat[0], mat[-1]])
    else:
      valid = False
  elif pattern == 'a5':
    # Hay suficientes imágnes con 2 puntos
    n_points = 2
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a6':
    # De momento lo ignoramos, demasiada variación
    valid = False
  elif pattern == 'a7':
    # Hay muchas con 2 puntos
    n_points = 2
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a8':
    # Hay muchas con 2 puntos
    n_points = 2
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a9':
    # De momento lo ignoramos, demasiada variación
    valid = False
  elif pattern == 'a10':
    # necesitamos 2 puntos, cogemos siempre el primero y el ultimo punto
    n_points = 2
    if mat.shape[0] == n_points:
      mat = np.array([mat[0], mat[-1]])
    else:
      valid = False
  elif pattern == 'a11':
    # todos tienen 1 punto
    n_points = 1
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a12':
    # este patron no tiene suficientes datos
    valid = False
  elif pattern == 'a13':
    # el CSV se ha modificado a mano para que siempre haya 3 puntos
    n_points = 3
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a14':
    # todos tienen 1 punto
    n_points = 1
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a15':
    # necesitamos 2 puntos, cogemos siempre el primero y el ultimo punto
    n_points = 2
    if mat.shape[0] >= n_points:
      mat = np.array([mat[0], mat[-1]])
    else:
      valid = False
  elif pattern == 'a16':
    # necesitamos 2 puntos, cogemos siempre el primero y el ultimo punto
    n_points = 2
    if mat.shape[0] >= n_points:
      mat = np.array([mat[0], mat[-1]])
    else:
      valid = False
  elif pattern == 'a17':
    # todos tienen 1 punto
    n_points = 1
    if mat.shape[0] == n_points:
      mat = mat[:n_points, :]
    else:
      valid = False
  elif pattern == 'a18':
    # necesitamos 2 puntos, cogemos siempre el primero y el ultimo punto
    n_points = 2
    if mat.shape[0] >= n_points:
      mat = np.array([mat[0], mat[-1]])
    else:
      valid = False
  return mat, valid

# Funcion para leer una imagen y sus coordenadas
def read_sample(row, pattern='a1'):
  # Leer imagenes procesadas
  path = os.path.join(images_path, 'grafos_' + row['dir'] + '_limpiezaManual', 'grafo_' + row['image'] + '.png')
  raw_X = cv2.imread(path, 0)
  raw_y = None
  proc_X = None
  proc_y = None
  if raw_X is not None: # TODO: algunas imagenes no existen
    shape = raw_X.shape
    raw_X_1 = raw_X[:shape[0]//2, :shape[1]//2] # Imagen original
    raw_X_2 = raw_X[:shape[0]//2, shape[1]//2:] # Imagen original invertida
    raw_X_3 = raw_X[shape[0]//2:, :shape[1]//2] # Imagen de grafos detalle alto
    raw_X_4 = raw_X[shape[0]//2:, shape[1]//2:] # Imagen de grafos detalle bajo
    subimage_shape = raw_X_2.shape
    raw_X_1 = cv2.resize(raw_X_1, dsize=(IMAGE_SIZE[0], IMAGE_SIZE[1]))
    raw_X_2 = cv2.resize(raw_X_2, dsize=(IMAGE_SIZE[0], IMAGE_SIZE[1]))
    raw_X_3 = cv2.resize(raw_X_3, dsize=(IMAGE_SIZE[0], IMAGE_SIZE[1]))
    raw_X_4 = cv2.resize(raw_X_4, dsize=(IMAGE_SIZE[0], IMAGE_SIZE[1]))
    proc_X_1 = scaler.fit_transform(raw_X_1) # Opción más simple: raw_X_1 / 255
    proc_X_2 = scaler.fit_transform(raw_X_2) # Opción más simple: raw_X_2 / 255
    proc_X_3 = scaler.fit_transform(raw_X_3) # Opción más simple: raw_X_3 / 255
    proc_X_4 = scaler.fit_transform(raw_X_4) # Opción más simple: raw_X_4 / 255
    if IMAGE_CHANNELS == 1:
      raw_X = np.expand_dims(raw_X_2, axis=-1) # Sólo se usa la imagen original invertida
      proc_X = np.expand_dims(proc_X_2, axis=-1)
    elif IMAGE_CHANNELS == 2:
      raw_X = np.stack((raw_X_2, raw_X_3), axis=-1) # Se usa imagen original invertida + grafos alto detalle
      proc_X = np.stack((proc_X_2, proc_X_3), axis=-1)
    elif IMAGE_CHANNELS == 3:
      raw_X = np.stack((raw_X_2, raw_X_3, raw_X_4), axis=-1) # Se usa imagen original invertida + grafos alto detalle + grafos bajo detalle
      proc_X = np.stack((proc_X_2, proc_X_3, proc_X_4), axis=-1)
    raw_X = np.expand_dims(raw_X, axis=0)
    proc_X = np.expand_dims(proc_X, axis=0)
    # Leer coordenadas
    mat, valid = generate_coords(row, pattern)
    if valid:
      raw_y = (mat * np.array([2 * IMAGE_SIZE[1] / subimage_shape[1], 2 * IMAGE_SIZE[0] / subimage_shape[0]]))
      proc_y = raw_y / np.array([IMAGE_SIZE[1], IMAGE_SIZE[0]])
      raw_y = np.expand_dims(raw_y.flatten(), axis=0)
      proc_y = np.expand_dims(proc_y.flatten(), axis=0)
  return raw_X, raw_y, proc_X, proc_y

# Funcion que devuelve las imagenes y las coordenadas
def read_data(data, pattern):
  data = data[data['figure'] == pattern]
  firstIteration = True
  for _, row in data.iterrows():
    raw_X, raw_y, proc_X, proc_y = read_sample(row, pattern)
    if raw_X is not None and raw_y is not None and proc_y is not None:
      if firstIteration == True:
        images = raw_X
        labels = raw_y
        X = proc_X
        # y = proc_y
        y = raw_y
        firstIteration = False
      else:
        images = np.concatenate((images, raw_X), axis=0)
        labels = np.concatenate((labels, raw_y), axis=0)
        X = np.concatenate((X, proc_X), axis=0)
        #y = np.concatenate((y, proc_y), axis=0)
        y = np.concatenate((y, raw_y), axis=0)
  return images, labels, X, y

In [0]:
# Generamos las imagenes y los outputs
images, labels, X, y = read_data(data, pattern)
print(images.shape)
print(labels.shape)
print(X.shape)
print(y.shape)

In [0]:
# Mostramos una imagen con el punto de la figura marcado
def show_image(image, point=None):
  if point is not None:
    im_color = cv2.cvtColor(image[:, :, 0], cv2.COLOR_GRAY2BGR)
    for i in range(point.shape[0]//2):
      im_color = cv2.circle(im_color, (int(point[2*i]), int(point[2*i+1])), 3, (0, 0, 255), -1)
    cv2_imshow(im_color)

show_image(images[20], labels[20])

In [0]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Activation, BatchNormalization
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.regularizers import l2

# Creamos el modelo
def create_model():  
  model = tf.keras.models.Sequential() # Create the model
  network_type = 1

  if network_type == 0:
    conv = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3), pooling='avg') #Load the MobileNet v2 model
    # for layer in conv.layers[:-15]: # Freeze the layers except the last 3 layers
    #   layer.trainable = False
    # for layer in conv.layers: # Check the trainable status of the individual layers
    #   print(layer, layer.trainable)
    model.add(conv) # Add the convolutional base model
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.1))
    model.add(Dense(256, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.1))
    model.add(Dense(128, activation='relu'))
    model.add(BatchNormalization())
    # model.add(Dropout(0.1))
    model.add(Dense(y.shape[1], activation='relu'))
    
  elif network_type == 1:
    model.add(Conv2D(32, (3, 3), activation='relu', input_shape=IMAGE_SIZE))
    model.add(MaxPooling2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.25))
    model.add(Dense(y.shape[1], activation='relu', kernel_regularizer=l2(0.001)))
  return model

In [0]:
# model = create_model()
# train_images, test_images, train_labels, test_labels = train_test_split(X, y, test_size=test_dataset_size) # Creamos los datasets de train y test

In [0]:
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
from datetime import datetime
from sklearn.model_selection import KFold

# fix random seed for reproducibility
seed = 7
np.random.seed(seed)

# custom metric para distancia euclidea
def eu(y_true, y_pred):
  n_points = int(y_true[0].shape[0] / 2)
  distance = 0
  for n in range(n_points):
    distance += K.sqrt(K.square(y_pred[0][n] - y_true[0][n]) + K.square(y_pred[0][n+1] - y_true[0][n+1]))
  return distance

def initializeCallbacks():
    filepath_mdl = 'model.h5'
    checkpoint = ModelCheckpoint(filepath_mdl, monitor='val_loss', verbose=1, save_best_only=True) # Va guardando los pesos tras cada época
    log_dir = "logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard = TensorBoard(log_dir=log_dir, write_graph=True, write_images=True) # Para graficado de las estadísticas durante el entrenamiento
    earlystopping = EarlyStopping(patience=20, verbose=1) # Detiene el entrenamiento prematuramente si validation accuracy lleva sin aumentar varias épocas
    return [checkpoint, tensorboard, earlystopping]

# define 5-fold cross validation test harness
kfold = KFold(n_splits=5, shuffle=True, random_state=seed)
cvscores = []
for train, test in kfold.split(X, y):
  # Creamos el modelo
  model = create_model()
  # Compilamos y entrenamos el modelo
  model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics=[eu  ])
  # model.summary()
  output = model.fit(train_images, train_labels, validation_data=(test_images, test_labels), epochs=100, verbose=1, callbacks=initializeCallbacks())
  scores = model.evaluate(X[test], y[test], verbose=0)
  print(f"{model.metrics_names[1]}: {scores[1]*100:.2f}")
  cvscores.append(scores[1] * 100)
print(f'{np.mean(cvscores):.2f} (+/- {np.std(cvscores):.2f})')

In [0]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [0]:
# Start TensorBoard within the notebook using magics
%tensorboard --logdir logs

In [0]:
import matplotlib.pyplot as plt
import math

# Evaluar resultado
def show_test_image(image, point=None):
  if point is not None:
    x = image[:, :, 0].astype('float32') * 255
    image = cv2.cvtColor(x, cv2.COLOR_GRAY2BGR)
    image_with_point = image
    for i in range(point.shape[0]//2):
      image_with_point = cv2.circle(image, (int(point[2*i]), int(point[2*i+1])), 3, (255, 0, 0), -1)
    # cv2_imshow(image2)
    return image_with_point


def plot_image_matrix(images=[], coords=[]):
    columns = 3
    rows = math.ceil(len(images) / columns)
    fig, ax = plt.subplots(nrows=rows, ncols=columns, figsize=(10, 10))
    
    row = 0
    col = 0
    i = 0
    for img in images:
      image_with_points = show_test_image(img, coords[i])
      ax[row][col].imshow(image_with_points, cmap='gray', vmin=0, vmax=255)
      i += 1
      col += 1
      if col >= columns:
        col = 0
        row += 1
    plt.show()


model.load_weights("model.h5") # Cargamos pesos guardados

images = []
coords = []
for i in range(0, 9):
  image = test_images[i]
  prediction = model.predict(np.expand_dims(image, axis=0))
  images.append(image)
  coords.append(prediction[0])
  print(f'Predicted coordinates: {prediction[0]} -- Real coordinates: {test_labels[i]}')
plot_image_matrix(images, coords)
