In [1]:
import cv2
import os
import pickle
import numpy as np
import PIL as pil
from tqdm.auto import tqdm

In [4]:
import os
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## Drive files

1. Aceptar invitación a la unidad compartida Patrones2022

2. Abrir la unidad, y sobre la carpeta `home`, hacer click derecho y luego "Añadir acceso directo a Drive".

Si eso funcionó, al ejecutar la siguiente celda, debería verse `parameters.json   pickled_features  sample_data
pickled_database  raw_database	    trained_models` en el output.

In [5]:
HOME = "/content/drive/My Drive/home/"
!ls "{HOME}"

parameters.json   pickled_features  sample_data
pickled_database  raw_database	    trained_models


In [6]:
!ls "{HOME}/raw_database"

Bicicletas  Cachipun  Espinas  Letras  Lunares


In [7]:
def LoadImage(path: str, cmap: str = 'gray', echo: bool = True) -> np.ndarray:
    """
    Load an image from a path
    """
    if cmap == 'gray':
      cflag = cv2.IMREAD_GRAYSCALE
    elif cmap == 'rgb':
      cflag = cv2.IMREAD_COLOR
    else:
      print(f"{cmap} is not a valid option")
      raise AttributeError
    if echo:
      print("Image: " + path)
    img = cv2.imread(path, cflag)
    if echo:
      print("Image size:", img.shape)
    return img


def NofClasses(path: str) -> int:
    """
    Get the classes in a directory
    """
    return len(os.listdir(path)) - 1


def NofSamples(path: str) -> list:
    """
    Get the number of samples for each class
    """
    samples = []
    for subdir in os.listdir(path):
        samples.append(len(os.listdir(path + subdir)))
    return samples


def GetMinDim(path: str) -> tuple:
  """
  Returns the smallest dimensions from every image in path.
  Path must be a nonempty folder, with at least one folder with images.
  """
  minh = None
  minw = None
  for dir in os.listdir(path):
    for fil in os.listdir(path+dir):
      h, w = LoadImage(path + dir + '/' + fil, cmap=cmap, echo=echo).shape
      if not minh or h < minh:
        minh = h
      if not minw or w < minw:
        minw = w
      break
  return (minw, minh)


def BuildDataset(path: str, cmap: str = 'gray', echo: bool = False) -> tuple:
    """
    Build a dataset from a directory, returns a tuple (X, y, #clas, [#sam])
    """
    imdim = GetMinDim(path)
    if echo:
      print(f"Smallest image size: {imdim}")
    classes = NofClasses(path)
    samples = NofSamples(path)
    Xsam = np.zeros((sum(samples), imdim[1], imdim[0]))
    Ysam = np.zeros((sum(samples), 1))
    i = 0
    ii = 0
    echo = True
    for dir in os.listdir(path):
        for fil in tqdm(os.listdir(path + dir)):
            if fil == '.DS_Store':
              continue
            img = LoadImage(path + dir + '/' + fil, cmap=cmap, echo=echo)
            img = cv2.resize(img, imdim, interpolation = cv2.INTER_AREA) 
            Xsam[ii] = img
            echo = False
            Ysam[ii] = i
            ii += 1
        i += 1
    return (Xsam, Ysam, classes, samples)
    

In [None]:
%%script echo skipping

# datasets = ["Bicicletas", "Cachipun", "Espinas", "Letras", "Lunares"]

# for DSNAME in datasets:
#   out = BuildDataset(f"{HOME}raw_database/{DSNAME}/", echo=True)
#   Xsam, Ysam, clas, sam = out

#   pick_insert = open(f'{HOME}pickled_database/{DSNAME}/Xsam.pkl', 'wb')
#   pickle.dump(Xsam, pick_insert)
#   pick_insert.close()

#   pick_insert = open(f'{HOME}pickled_database/{DSNAME}/Ysam.pkl', 'wb')
#   pickle.dump(Ysam, pick_insert)
#   pick_insert.close()

#   pick_insert = open(f'{HOME}pickled_database/{DSNAME}/clas.pkl', 'wb')
#   pickle.dump(clas, pick_insert)
#   pick_insert.close()

#   pick_insert = open(f'{HOME}pickled_database/{DSNAME}/sam.pkl', 'wb')
#   pickle.dump(sam, pick_insert)X_raw[0]
#   pick_insert.close()

skipping


## Bloque 1: Lectura de parámetros
Librerías necesarias para este bloque:

In [8]:
import json

In [9]:
class Block1:
  def __init__(self, path):
    '''
    Parse params file

    INPUT:
      path: path to params file
    '''
    self.path = path
    self.load_params()
  
  def load_params(self):
    with open(self.path, encoding = 'utf-8') as file:
      params = json.load(file)

      self.dataset = params['dataset']
      self.features = params['features']
      self.transformations = params['transformations']
      self.classifiers = []
      for c in params['classifiers']:
        formatted_input = []
        for c_name, c_params in c.items():
          formatted_input.append(c_name)
          for k, v in c_params.items():
            param = "{0}:{1}".format(k,v)
            formatted_input.append(param)
        
        self.classifiers.append("-".join(formatted_input))

Ejemplo de uso

In [11]:
block1 = Block1(f'{HOME}parameters.json')
print(block1.classifiers)
print(block1.features)
print(block1.transformations)
print(block1.dataset)

['FOREST-max_depth:3-n_estimators:100', 'KNN-n_neighbors:5']
['HOG-5x5x9', 'LBP-8x8']
[['CLEAN', 'MINMAX', 'KBEST-50'], ['PCA-5'], ['MINMAX', 'SFS-8'], ['PCA-5', 'ICA-2'], ['CLEAN', 'MINMAX', 'KBEST-50', 'SFS-10', 'PCA-5', 'ICA-2']]
Espinas


In [None]:
del block1 # Importante para no llenar la RAM mientras se prueba

## Bloque 3: Extracción de características.
Librerías necesarias para este bloque:

In [None]:
from IPython.display import clear_output
!pip3 install scipy==1.2
!pip3 install pybalu==0.2.5
clear_output()

In [None]:
from pybalu.feature_extraction import hog_features, lbp_features

Formato del nombre para cada característica soportada:
  - HoG: "HOG-NxMxB"
  - LBP: "LBP-NxM"

Formato de archivo de características guardado en Drive:

"DSNAME_feature1+feature2.pkl"

Ej:
"Bicicletas_LBP-5x5+HOG-7x7x9.pkl"

In [None]:
class Block3:
  def __init__(self, DSNAME, features, load_database=True, load_features=True, save=True):
      '''
      Extracts the features indicated from the DSNAME dataset.

      INPUT:
        DSNAME: name of the dataset stored in Drive.
        features: array of strings following feature name convention,
        load_database: True -> loads pickled images directly.
        load_features: True -> tries to load the features from Drive in case they already
          been extracted before. If file does not exist, the features are extracted.
        save:          True -> saves the extracted features to Drive.

      OUTPUT:
        Feature matrix is stored in the variable self.X.
        Ground truth is stored in the variable self.Ysam
      '''
      self.DSNAME = DSNAME
      self.features = features
      self.features_names, self.features_parameters = self.parse_features()
      self.save = save

      if load_database:
        self.Xsam, self.Ysam, self.n_class, self.n_samples = self.import_pickled_dataset()
      else:
        self.Xsam, self.Ysam, self.n_class, self.n_samples = BuildDataset(f"{HOME}raw_database/{self.DSNAME}/", echo=True)
      self.n_class += 1

      self.identifier = self.DSNAME + "_" + "+".join(self.features)
      if load_features:
        self.X = self.load_features()
      else:
        self.X = self.extract_features()
  
  def parse_features(self):
    features_names = []
    features_parameters = dict()

    for feature in self.features:
      name = feature.split("-")[0]
      parameters = feature.split("-")[1]
      if name == "HOG":
        parameters = [int(param) for param in parameters.split("x")]

      elif name == "LBP":
        parameters = [int(param) for param in parameters.split("x")]
      features_names.append(name)
      features_parameters[name] = parameters
    
    return features_names, features_parameters
    

  def import_pickled_dataset(self):
    file_read = open(f'{HOME}pickled_database/{self.DSNAME}/Xsam.pkl','rb')
    Xsam = pickle.load(file_read)
    file_read.close()

    file_read = open(f'{HOME}pickled_database/{self.DSNAME}/Ysam.pkl','rb')
    Ysam = pickle.load(file_read)
    file_read.close()

    file_read = open(f'{HOME}pickled_database/{self.DSNAME}/clas.pkl','rb')
    n_class = pickle.load(file_read)
    file_read.close()

    file_read = open(f'{HOME}pickled_database/{self.DSNAME}/sam.pkl','rb')
    n_samples = pickle.load(file_read)
    file_read.close()
    return Xsam, Ysam, n_class, n_samples

  def load_features(self):
    try:
      read_file = open(f'{HOME}pickled_features/{self.DSNAME}/{self.identifier}.pkl', 'rb')
      X = pickle.load(read_file)
      return X
    except:
      print(f'Could not load {self.identifier}.pkl')
      X = self.extract_features()
      return X

  def extract_features(self):
    total_images = sum(self.n_samples)
    feature_matrices = dict() # Dictionary indexed by feature name that saves feature matrix of feature.

    # Initialize feature matrices
    for feature in self.features_names:
      if feature == "LBP":
        parameters = self.features_parameters[feature]
        M = 59*parameters[0]*parameters[1]
        Xfeat = np.zeros((total_images,M))
        feature_matrices[feature] = Xfeat

      elif feature == "HOG":
        parameters = self.features_parameters[feature]
        M = parameters[0]*parameters[1]*parameters[2]
        Xfeat = np.zeros((total_images,M))
        feature_matrices[feature] = Xfeat
    
    # Extract each feature from each image.
    t = 0
    for image in self.Xsam:
      for feature in self.features_names:
        if feature == "LBP":
          parameters = self.features_parameters[feature]
          feature_matrices[feature][t,:] = lbp_features(image, hdiv=parameters[0], vdiv=parameters[1], mapping='nri_uniform')

        elif feature == "HOG":
          parameters = self.features_parameters[feature]
          feature_matrices[feature][t,:] = hog_features(image, v_windows=parameters[0], h_windows=parameters[1], n_bins=parameters[2])
      t+=1
    
    # Concatenate features
    X = np.concatenate(tuple([feature_matrices[feature] for feature in self.features_names]), axis=1)

    if self.save:
      try:
        pick_insert = open(f'{HOME}pickled_features/{self.DSNAME}/{self.identifier}.pkl', 'wb')
        pickle.dump(X, pick_insert)
        pick_insert.close()
      except:
        print(f'Could not save {self.identifier}.pkl')

    return X

  def __str__(self):
    return self.identifier

Ejemplo de uso

In [None]:
block3 = Block3("Espinas", ["LBP-6x6", "HOG-7x7x9"], load_database=True, load_features=False, save=True)

In [None]:
print(block3)
print(block3.features_names, block3.features_parameters)
print(block3.Xsam.shape, block3.Ysam.shape, block3.n_class, block3.n_samples)
print(block3.X.shape)

Espinas_LBP-5x5+HOG-7x7x9
['LBP', 'HOG'] {'LBP': [5, 5], 'HOG': [7, 7, 9]}
(640, 100, 100) (640, 1) 2 [320, 320]
(640, 1916)


In [None]:
del block3 # Importante para no llenar la RAM mientras se prueba

## Bloque 4: selección y transformación de características.
- Split Train-Validation
- Aplicación secuencial de alguna selección/transformación
  - Clean
  - MinMax Scaling
  - SelectKBest
  - SFS
  - PCA
  - ICA

In [None]:
from IPython.display import clear_output
!pip3 install scipy==1.2
!pip3 install pybalu==0.2.5
clear_output()

In [None]:
from sklearn.model_selection import train_test_split
from pybalu.feature_selection import clean
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import SelectKBest, chi2
from pybalu.feature_selection import sfs
from sklearn.decomposition import PCA, FastICA

In [None]:
class CleanInterface:

  def __init__(self, X):
    self.model = clean(X)

  def transform(self, X):
    return X[:, self.model]

class SFSInterface:

  def __init__(self, X, y, s):
    self.model = sfs(X, y, s, show=False)

  def transform(self, X):
    return X[:, self.model]

In [None]:
class Block4:

  def __init__(self, X, y, sequence, ratio=0.3):

    self.sequence = sequence

    self.models = []

    self.Xtrain, self.Xval, self.ytrain, self.yval = train_test_split(np.array(X), y, test_size=ratio, random_state=42, stratify=y)

    self.Xtrain = self.interative_fit()
    self.Xval = self.transform(self.Xval)

  def interative_fit(self):

    X = self.Xtrain

    for seq in self.sequence:

      name = seq.split('-')[0]

      if name == 'CLEAN':
        model = CleanInterface(X)
      elif name == 'MINMAX':
        model = MinMaxScaler().fit(X)
      else: 
        param = int(seq.split('-')[1])
        if name == 'KBEST':
          model = SelectKBest(chi2, k=param).fit(X, self.ytrain)
        elif name == 'SFS':
          model = SFSInterface(X, self.ytrain, param)
        elif name == 'PCA':
          model = PCA(n_components=param).fit(X)
        elif name == 'ICA':
          model = FastICA(n_components=param, random_state=0).fit(X, self.ytrain)
        else:
          model = None
          print(f'No existe el modelo {name}')

      self.models.append(model)
      X = model.transform(X)

    return X
  
  def transform(self, X):

    for model in self.models:
      X = model.transform(X)
    return X

In [None]:
# DATOS DE PRUEBA
!gdown --id 1CA-l9_JjdjG_4kTuavKf8Wm27dt0jyqT
clear_output()

f = open('data.p', "rb")
data = pickle.load(f)
X = data['train']

y = np.array([0 if i < 7000 else 1 for i in range(0, 14000)])

X.shape, len(y)

((14000, 1844), 14000)

In [None]:
block = Block4(np.array(X), y, ['CLEAN', 'MINMAX', 'KBEST-50', 'SFS-10', 'PCA-5', 'ICA-2'])
block.Xtrain.shape, block.Xval.shape

(9800, 1844) (4200, 1844) (9800,) (4200,)


((9800, 2), (4200, 2))

## Bloque 5

<div align="center">
<img src="https://i.imgur.com/jxpf9U9.png"></img>
</div>

### Modo de uso

**Block5 recibe dos elementos:**

- **blocks**: lista de instancias de la clase `Block4`
- **sequence**: lista de strings

Para sequence, un elemento de la lista puede verse así:

`FOREST-max_depth:3-n_estimators:100`

Es decir: `{CLASSIFIER_NAME}-{PARAM_1_NAME}:{PARAM_1_VALUE}-{PARAM_2_NAME}:{PARAM_2_VALUE}`

Puede que un Clasificador no tenga parámetros (como `QDA`). En ese caso basta sólo con el nombre.

El mapping de nombre --> clasificador se aprecia en `self.classifiers` de `Block5`. Ahí mismo, se aprecian los `kwargs`, es decir los parámetros que recibe el constructor de dicho clasificador.

Kwargs son los parámetros permitidos a personalizar. Lo demás será default.
Hay casos donde estos parámetros son obligatorios, y otros donde no.
A modo de maximizar el accuracy y evitar errores, ojalá todos estén presentes (mayor personalización).

Al momento de instanciar el bloque, inmediatamente se comienza el proceso de búsqueda y validación del mejor modelo.
Al terminar, en el atributo `best_classifier` queda guardado el mejor clasificador. Es decir, una tupla de:

`final_accuracy: float, <block4>: Block4, clf_class: object, params: dict`

In [None]:
from sklearn.neighbors import KNeighborsClassifier, NearestCentroid

from sklearn.svm import SVC

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB

from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis

from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score

In [None]:
from typing import List

class Params:
    N_FOLDS = 10

class Block5:

  def __init__(self, blocks: List['Block4'], sequence: List[str]):

    self.blocks = blocks
    self.sequence = sequence

    # Kwargs son los parámetros permitidos a personalizar. Lo demás será default.
    # Hay casos donde estos parámetros son obligatorios, y otros donde no.
    # A modo de maximizar el accuracy y evitar errores, ojalá todos estén presentes (mayor personalización).
    self.classifiers = {
          "KNN": {"clf": KNeighborsClassifier, "kwargs": ["n_neighbors"]},
          "DMIN": {"clf": NearestCentroid, "kwargs": []},
          "SVC": {"clf": SVC, "kwargs": ["kernel", "C", "gamma"]},
          "TREE": {"clf": DecisionTreeClassifier, "kwargs": ["max_depth"]},
          "FOREST": {"clf": RandomForestClassifier, "kwargs": ["max_depth", "n_estimators"]},
          "NBAYES": {"clf": GaussianNB, "kwargs": []},
          "QDA": {"clf": QuadraticDiscriminantAnalysis, "kwargs": []},
    }

    self.best_classifier = self.interative_classify()

  def interative_classify(self):

    N = {}

    for block in self.blocks:
      for seq in self.sequence:
        
        # "FOREST-max_depth:3-n_estimators:100" --> ["FOREST", "max_depth:3-n_estimators:100"]
        splitted_seq = seq.split('-')
        # name = "FOREST"
        name = splitted_seq[0]
        # params = [["max_depth", "3"], ["n_estimators", "100"]]
        params = [clf.split(":") for clf in splitted_seq[1:]] if len(splitted_seq) > 1 else []

        clf_info = self.classifiers[name]
        # RandomForestClassifier
        clf_class = clf_info["clf"]

        if name == "SVC": # necesita tipos de datos especiales
          param_types = {"kernel": str, "C": float, "gamma": int}
          params = {param_name: param_types[param_name](param_value) for param_name, param_value in params}
        else: # en caso contrario, todo parámetro se trata como <int>
          params = {param_name: int(param_value) for param_name, param_value in params}

        crossval_score = self.cross_validation(block.Xtrain, block.ytrain, clf_class, params)
        N[crossval_score] = (block, clf_class, params)

    # se obtiene el clasificador con mayor score en cross_validation
    max_n = max(N.keys())
    block, clf_class, params = N[max_n]

    final_accuracy = self.hold_out(block.Xtrain, block.ytrain,
                                   block.Xval, block.yval,
                                   clf_class, params)
    
    return (final_accuracy, block, clf_class, params)


  def hold_out(self, X_train, y_train, X_test, y_test, clf, params) -> float:
    """ Retorna accuracy score al entrenar el clasificador en Train y probarlo en Test"""
    clf = clf(**params)
    fitted_clf = clf.fit(X_train, y_train)
    y_pred = fitted_clf.predict(X_test)

    return accuracy_score(y_test, y_pred)

  def cross_validation(self, X_train, y_train, clf, params, n_folds=Params.N_FOLDS) -> float:
    """ Retorna score de validación cruzada"""
    cv_scores = cross_val_score(clf(**params), X_train, y_train, cv=n_folds)
    return cv_scores.mean()

Ejemplo de uso

In [None]:
# USADO PARA DATOS SAMPLE EN EJEMPLO

import pickle


def dump_sample_block4():
  with open('block4_A.pkl', 'wb') as handle:
      pickle.dump(block_A, handle)
  
  with open('block4_B.pkl', 'wb') as handle:
    pickle.dump(block_B, handle)

  with open('block4_C.pkl', 'wb') as handle:
    pickle.dump(block_C, handle)


def load_sample_block4():
  base_path: str = "/content/drive/MyDrive/home/sample_data/"

  with open(base_path +'block4_A.pkl', 'rb') as handle:
    block_A = pickle.load(handle)

  with open(base_path + 'block4_B.pkl', 'rb') as handle:
    block_B = pickle.load(handle)

  with open(base_path + 'block4_C.pkl', 'rb') as handle:
    block_C = pickle.load(handle)

  return (block_A, block_B, block_C)

In [None]:
# EJEMPLO CON DATOS EN DRIVE

#block_A = Block4(np.array(X), y, ['CLEAN', 'MINMAX', 'KBEST-50', 'SFS-10', 'PCA-5', 'ICA-2'])
#block_B = Block4(np.array(X), y, ['CLEAN', 'MINMAX', 'KBEST-20', 'SFS-10', 'PCA-5'])
#block_C = Block4(np.array(X), y, ['CLEAN', 'MINMAX', 'KBEST-20', 'ICA-2'])

block_A, block_B, block_C = load_sample_block4()
BLOCKS = [block_A, block_B, block_C]

results = Block5(BLOCKS, ["FOREST-max_depth:3-n_estimators:100", "KNN-n_neighbors:15",
                          "NBAYES", "SVC-kernel:linear-C:0.025-gamma:2"])

results.best_classifier