In [None]:
%set_env CUDA_VISIBLE_DEVICES=2

# Classification models training

Use this notebook to train classification models (KNN, SVM, etc) on leaf color classification.

## Data gathering

In [None]:
# Use pre-annotated dataset with each leaf segmentation and class
# From the data.yaml of this dataset, the label number to corresponding class is:
# 0=dark, 1=dead, 2=light, 3=medium

# This creates a list called 'obj_data', which contains every object as a tuple...
# ...containing (obj_classnum, obj_crop)

from typing import List
import scg_detection_tools.utils.image_tools as imtools
import scg_detection_tools.utils.cvt as cvt
from scg_detection_tools.utils.file_handling import get_all_files_from_paths, get_annotation_files
from scg_detection_tools.dataset import read_dataset_annotation
import cv2
import numpy as np
import matplotlib.pyplot as plt

IMG_DIR = "/home/juliocesar/leaf-detection/imgs/light_group/images"
LBL_DIR = "/home/juliocesar/leaf-detection/imgs/light_group/labels"

#IMG_DIR = "/home/juliocesar/leaf-detection/imgs/hemacias/annotated/images"
#LBL_DIR = "/home/juliocesar/leaf-detection/imgs/hemacias/annotated/labels"

imgs = get_all_files_from_paths(IMG_DIR, skip_ext=[".txt", ".json", ".yaml"])

STANDARD_SIZE = (32, 32)
MAX_MEDIUM_RATIO = 0.30

# !!!!!! taken from data.yaml
class_map = {0: "dark", 1: "dead", 2: "light", 3: "medium"}
#class_map = {0: "purple", 1: "white"}

sizes = []

def extract_objects_from_annotations(imgs: List[str], ann_dir: str, std_size=(32,32), max_cls_ratio: dict[int, float] = None, use_boxes=False):
    img_ann = get_annotation_files(imgs, ann_dir)
    obj_data = []
    sample_count = {cls: 0 for cls in class_map}
    for img in imgs:
        ann_file = img_ann[img]
        annotations = read_dataset_annotation(ann_file, separate_class=False)

        # check if contours are boxes or segments
        orig = cv2.imread(img)
        orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB)
        imgsz = orig.shape[:2]

        for ann in annotations:
            nclass = ann[0]
            if (max_cls_ratio is not None) and (nclass in max_cls_ratio):
                if (len(obj_data) >= 1) and ((sample_count[nclass]/len(obj_data)) >= max_cls_ratio[nclass]):
                    continue
            sample_count[nclass] += 1

            contour = ann[1:]
            if use_boxes and len(contour) != 4:
                contour = cvt.segment_to_box(contour, normalized=True, imgsz=imgsz)
            else:
                mask = cvt.contours_to_masks([contour], imgsz=imgsz, normalized=True)[0]
                mask = mask.reshape(mask.shape[0], mask.shape[1], 1)
            
            # get only segmented object from image
            if not use_boxes:
                masked = orig.copy()
                # masked[mask[:,:] < 1] = 0
                masked = masked * mask.astype(masked.dtype)

                # crop a box around it
                points = np.array(contour).reshape(len(contour) // 2, 2)
                box = cvt.segment_to_box(points, normalized=True, imgsz=imgsz)
                obj_crop = imtools.crop_box_image(masked, box)
            else:
                obj_crop = imtools.crop_box_image(orig, contour)

            # resize to 32x32 and add to our data
            obj_crop = cv2.resize(obj_crop, std_size, interpolation=cv2.INTER_CUBIC)
            obj_data.append((nclass, obj_crop))
    return obj_data

obj_data = extract_objects_from_annotations(imgs, LBL_DIR, STANDARD_SIZE, max_cls_ratio=None, use_boxes=False)
ncls = [obj[0] for obj in obj_data]
for cls in np.unique(ncls):
    print(f"Samples of type {cls}: {class_map[cls]!r} = {len([c for c in ncls if c == cls])}")


In [None]:
# Test data loaded
idx = 100
plt.title(class_map[obj_data[idx][0]])
plt.axis("off")
plt.imshow(obj_data[idx][1].astype(np.int32))
print(obj_data[idx][1].shape)

In [2]:
# Split between Train and Test to evaluate model as well
from sklearn.model_selection import train_test_split

X = []
y = []
for nclass, obj_crop in obj_data:
    X.append(obj_crop)
    y.append(nclass)

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, random_state=0)
class_labels = [class_map[c] for c in class_map]

In [None]:
# Preprocessing functions (to be able to call clf.predict(imgs) instead of having to extract features first and then calling clf.predict(features))
# -> rn_feature_preprocess: use resnet feature extraction to train classificators
# -> channels_feature_preprocess: extract RGB, HSV and Gray values from a 32x32 image as features

from object_detection_analysis.classifiers.feature_extract import (
    rn18_feature_preprocess, rn34_feature_preprocess, rn50_feature_preprocess,
    channels_feature_preprocess, norm_channels_feature_preprocess, norm_image_to_tensor
)

In [None]:
# Visualize ResNet feature extraction with PCA

from sklearn.decomposition import PCA

colors=["red","black","deepskyblue","green"]
labels=["dark","dead","light","green"]
def test_pca(objs, tgts):
    rn_features = rn50_feature_preprocess(objs)
    pca = PCA(n_components=2)
    transX = pca.fit_transform(rn_features)

    fig, ax = plt.subplots(layout="tight")
    for i in range(len(transX)):
        ax.scatter(transX[i][0],transX[i][1],c=colors[tgts[i]])
    ax.set(xlabel="PCA component 0", ylabel="PCA component 1")
    plt.show()

test_pca(X, y)

## Parameters test training (e.g. optimal k value for KNN, loss for SGD)

### KNN K

In [None]:
############### K VALUE TEST FOR KNN WITH MANUAL CHANNELS FEATURE EXTRACTION #################

################ CHANNELS FEATURE EXTRACTION
### LAST TESTED OPTIMAL SEGMENTS: K=5 (no nca)
### "" BOXES: K=8

from object_detection_analysis.classifiers import KNNClassifier

MAX_K = 25
for k in range(1, MAX_K+1):
    knn = KNNClassifier(n_neighbors=k, enable_nca=False, preprocess=channels_feature_preprocess)
    knn.fit(X_train, y_train)
    
    print("_"*82)
    print(f"EVALUATION: K = {k}")
    print("_"*82)
    knn.evaluate(X_test, y_test, disp_labels=class_labels)


In [None]:
################ K VALUE TEST FOR KNN WITH RESNET FEATURE EXTRACTION ################ 
### BEST FOR RESNET18: K=5 (SEGMENT) K=3 (BOX)
### BEST FOR RESNET34: K=6 (SEGMENT) 
### BEST FOR RESNET50: NONE

from object_detection_analysis.classifiers import KNNClassifier

MAX_K = 15
for k in range(1, MAX_K+1):
    knn = KNNClassifier(n_neighbors=k, preprocess=rn34_feature_preprocess, enable_nca=False)
    knn.fit(X_train, y_train)
    
    print("_"*82)
    print(f"EVALUATION: K = {k}")
    print("_"*82)
    knn.evaluate(X_test, y_test, disp_labels=class_labels)

### CNN FC test

In [None]:
from object_detection_analysis.classifiers import CNNFCClassifier

cnn = CNNFCClassifier(n_classes=4, preprocess=norm_image_to_tensor)
cnn.to("cuda")
cnn.fit(X_train, y_train, X_test, y_test, epochs=60, batch=4)

In [None]:
cnn.evaluate(X_test, y_test)

## Actual training

### KNN

In [None]:
#####################################################
##### TRAIN KNN WITH RESNET FEATURE EXTRACTION #####
#####################################################

from object_detection_analysis.classifiers import KNNClassifier

# LEAF CLASSIFICATION: (SEGMENTS): RESNET18-K=5, RESNET34-K=6, RESNET50-K=None
#                      (BOXES): RESNET18-K=3
# BLOOD CELL CLASSIFICATION: K = ?

resnet_knn = KNNClassifier(n_neighbors=6, preprocess=rn34_feature_preprocess)
# resnet_knn.fit(X, y)
resnet_knn.fit(X_train, y_train)
resnet_knn.evaluate(X_test, y_test, disp_labels=class_labels)

In [7]:
resnet_knn.save_state("knn_rn34_k6.skl")

In [None]:
#############################################################
##### TRAIN KNN WITH MANUAL CHANNELS FEATURE EXTRACTION #####
#############################################################

from object_detection_analysis.classifiers import KNNClassifier

# LEAF CLASSIFICATION: K = 5 (SEGMENT); K = 8 (BOXES)
# BCELL CLASSIFICATION: K = 5

knn = KNNClassifier(n_neighbors=8, preprocess=channels_feature_preprocess)
# knn.fit(X, y)
knn.fit(X_train, y_train)
knn.evaluate(X_test, y_test, disp_labels=class_labels)

In [14]:
knn.save_state("knn_k5.skl")

### SVM

In [None]:
####################################################
##### TRAIN SVM WITH RESNET FEATURE EXTRACTION #####
####################################################

# BEST WITH RESNET34

from object_detection_analysis.classifiers import SVMClassifier

sv = SVMClassifier(preprocess=rn50_feature_preprocess)
# sv.fit(X, y)
sv.fit(X_train, y_train)
sv.evaluate(X_test, y_test, disp_labels=class_labels)

In [11]:
sv.save_state("svm_rn34.skl")

In [None]:
############################################################
##### TRAIN SVM WITH MANUAL CHANNEL FEATURE EXTRACTION #####
############################################################

from object_detection_analysis.classifiers import SVMClassifier

sv = SVMClassifier(preprocess=channels_feature_preprocess)
# sv.fit(X, y)
sv.fit(X_train, y_train)
sv.evaluate(X_test, y_test, disp_labels=class_labels)

In [14]:
sv.save_state("svm.skl")

### MLP

In [None]:
####################################################
##### TRAIN MLP WITH RESNET FEATURE EXTRACTION #####
####################################################

#### BEST RESULTS: RESNET34

from object_detection_analysis.classifiers import MLPClassifier

RN = [rn18_feature_preprocess, rn34_feature_preprocess, rn50_feature_preprocess]
RNID = [18, 34, 50]

for id, func in zip(RNID, RN):
    rn_out_features = 512 if id != 50 else 2048
    mlp = MLPClassifier(n_features=rn_out_features, n_classes=len(class_map), preprocess=func)
    
    # mlp.fit(X, y, epochs=50)
    
    print("_"*82, "\nRESNET:", id, "\n", "_"*82)
    mlp.fit(X_train, y_train, epochs=50)
    mlp.evaluate(X_test, y_test, disp_labels=class_labels)
    print("_"*82)

    mlp.save_state(f"mlp_rn{id}.pt")

In [None]:
#################################################################
##### TRAIN MLP WITH NORMALIZED CHANNELS FEATURE EXTRACTION #####
#################################################################

from object_detection_analysis.classifiers import MLPClassifier

n_features = 32*32*(3 + 3 + 1) # 32x32 leaf RGB, HSV and Gray
mlp = MLPClassifier(n_features=n_features, n_classes=len(class_map), preprocess=norm_channels_feature_preprocess)

# mlp.fit(X, y, epochs=50)
mlp.fit(X_train, y_train, epochs=70)
mlp.evaluate(X_test, y_test, disp_labels=class_labels)

In [17]:
mlp.save_state("mlp.pt")

### CNN FC

In [None]:
######################################################
########## TRAIN CNN_FC WITH OBJECTS CROPS ###########
######################################################

from object_detection_analysis.classifiers import CNNFCClassifier

cnn = CNNFCClassifier(n_classes=len(class_labels), preprocess=norm_image_to_tensor)
cnn.to("cuda")
cnn.fit(X_train, y_train, X_test, y_test, epochs=10, batch=8, save_best=True)
cnn.evaluate(X_test, y_test, disp_labels=class_labels)


In [None]:
cnn = CNNFCClassifier.from_state("cnn_best.pt")
cnn.to("cuda")
cnn.evaluate(X_test, y_test, disp_labels=class_labels)

In [19]:
cnn.save_state("cnn_last.pt")

## Checking saved states

In [None]:
###############################################################################################################
## CELLS BELOW ARE FOR CHECKING SAVED MODELS

In [None]:
from object_detection_analysis.classifiers import KNNClassifier

knn = KNNClassifier.from_state("/home/juliocesar/leaf-detection/checkpoints/classifiers/knn_k8_box.skl")
knn.evaluate(X_test, y_test, disp_labels=class_labels)

In [None]:
from object_detection_analysis.classifiers import SVMClassifier

svm = SVMClassifier.from_state("/home/juliocesar/leaf-detection/checkpoints/classifiers/svm_box.skl")
svm.evaluate(X_test, y_test, disp_labels=class_labels)

In [None]:
from object_detection_analysis.classifiers import MLPClassifier

mlp = MLPClassifier.from_state("/home/juliocesar/leaf-detection/checkpoints/classifiers/mlp_box.pt")
mlp.evaluate(X_test, y_test, disp_labels=class_labels)

In [None]:
from object_detection_analysis.classifiers import MLPClassifier

mlp = MLPClassifier.from_state("/home/juliocesar/leaf-detection/checkpoints/classifiers/mlp_rn34_box.pt")
mlp.evaluate(X_test, y_test, disp_labels=class_labels)

In [None]:
from object_detection_analysis.classifiers import CNNFCClassifier

# FOR BOX: cnn_box.pt
# FOR SEGMENTED: cnn.pt
# cnn = CNNFCClassifier.from_state("/home/juliocesar/leaf-detection/checkpoints/classifiers/cnn_box.pt")
cnn = CNNFCClassifier.from_state("cnn_best.pt")
cnn.to("cuda")
cnn.evaluate(X_test, y_test, disp_labels=class_labels)
cnn = CNNFCClassifier.from_state("cnn_last.pt")
cnn.to("cuda")
cnn.evaluate(X_test, y_test, disp_labels=class_labels)

## Clustering

In [5]:
# Helper functions (Voronoi plot, Silhouette analysis)
from typing import Tuple
from scipy.spatial import Voronoi, voronoi_plot_2d
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from sklearn.metrics import silhouette_samples, silhouette_score
from sklearn.pipeline import Pipeline
import numpy as np

def plot_voronoi(centers):
    vor = Voronoi(centers)
    _, ax = plt.subplots(figsize=(12,8))
    ax.scatter(centers[:,0], centers[:,1], marker='o', s=10)
    voronoi_plot_2d(vor, ax)
    plt.show()

def silhouette_analyze(clusterer_pca: Pipeline, X: np.ndarray, n_clusters: int, sil_range: Tuple[float,float] = (-1.0,1.0)):
    assert(sil_range[0] >= -1.0 and sil_range[1] <= 1.0)
    # Ax1 is for silhouette plot, Ax2 is for clusters plot
    fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12,12))
    # (n_clusters+1)*10 is for inserting blank space between sikhouette plots of individual clusters
    ax1.set(
        xlim=sil_range,
        ylim=[0, len(X) + (n_clusters + 1) * 10],
    )
    
    Xpca = clusterer_pca[:-1].fit_transform(X)
    cluster_labels = clusterer_pca[-1].fit_predict(Xpca)
    sil_avg = silhouette_score(X, cluster_labels)
    print("_"*82)
    print(f"\t\tn_clusters = {n_clusters} | average silhouette_score = {sil_avg}")

    sample_sil_values = silhouette_samples(X, cluster_labels)
    y_lower = 10
    for cluster in range(n_clusters):
        # Aggregate silhouette scores for samples belonging to cluster i, and sort them
        cluster_sil_values = sample_sil_values[cluster_labels == cluster]
        cluster_sil_values.sort()

        size_cluster = cluster_sil_values.shape[0]
        y_upper = y_lower + size_cluster        
        
        color = cm.nipy_spectral(float(cluster) / n_clusters)
        ax1.fill_betweenx(
            np.arange(y_lower, y_upper),
            0,
            cluster_sil_values,
            facecolor=color,
            edgecolor=color,
            alpha=0.7
        )
        ax1.text(-0.05, y_lower + 0.5 * size_cluster, str(cluster))

        # y_lower for next plot
        y_lower = y_upper + 10

    ax1.set(
        xlabel="Silhouette coefficient values",
        ylabel="Cluster label",
        yticks=[],
        xticks=np.arange(start=sil_range[0], stop=sil_range[1]+0.2, step=0.2),
    )
    # color for average silhouette score
    ax1.axvline(x=sil_avg, color="red", linestyle="--")

    # 2nd plot with actual clusters
    colors = cm.nipy_spectral(cluster_labels.astype(float) / n_clusters)
    
    ax2.scatter(
        Xpca[:,0], Xpca[:,1], marker='.', s=30, lw=0, alpha=0.7, c=colors, edgecolor='k',
    )
    centers = clusterer_pca[-1].cluster_centers_
    ax2.scatter(
        centers[:,0],
        centers[:,1],
        marker='o',
        c="white",
        alpha=1,
        s=200,
        edgecolor='k',
    )
    # Plot cluster center label on top of the point
    for i, c in enumerate(centers):
        ax2.scatter(c[0], c[1], marker="$%d$" % i, alpha=1, s=50, edgecolor='k')

    ax2.set(
        xlabel="PCA feature 0",
        ylabel="PCA feature 1",
    )

    plt.show()

In [6]:
# Extract features from objects
X = [obj[1] for obj in obj_data]
rn_features = rn18_feature_preprocess(X)

In [None]:
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline

n_clusters = 15
n_inertia = np.arange(2, n_clusters+1)
inertias = []
for n in range(2, n_clusters+1):
    kmeans = Pipeline(steps=[
            ("pca", PCA(n_components=2)),
            ("kmeans", KMeans(init="k-means++", n_clusters=n, n_init=10, random_state=42)),
        ]
    )
    silhouette_analyze(kmeans, rn_features, n, sil_range=(-1.0,1.0))
    inertias.append(kmeans[-1].inertia_)

# Inertia plot
fig, ax = plt.subplots(layout="tight")
ax.plot(n_inertia, inertias)
ax.set(xlabel="K", ylabel="Inertia")
plt.show()

In [None]:
# Testing K-means segmentation
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import cv2

kmeans = KMeans(n_clusters=2, init="k-means++", n_init=10, random_state=42)
obj = obj_data[24][1]
pixel_labels = kmeans.fit_predict(obj.reshape(obj.shape[0]*obj.shape[1], 3))

segmented = cv2.cvtColor(obj, cv2.COLOR_RGB2GRAY)
for i in range(len(pixel_labels)):
    row = i // segmented.shape[0]
    col = i % segmented.shape[1]
    segmented[row,col] = pixel_labels[i]

_,axs = plt.subplots(nrows=2, layout="tight")
axs[0].imshow(obj)
axs[1].imshow(segmented, cmap="gray")
axs[0].axis("off")
axs[1].axis("off")
plt.show()