## Assignment: Image recognition
- Alumno 1:
- Alumno 2:
- Alumno 3:

The goals of the assignment are:
* Develop proficiency in using Tensorflow/Keras for training Neural Nets (NNs).
* Put into practice the acquired knowledge to optimize the parameters and architecture of a feedforward Neural Net (ffNN), in the context of an image recognition problem.
* Put into practice NNs specially conceived for analysing images. Design and optimize the parameters of a Convolutional Neural Net (CNN) to deal with previous task.
* Train popular architectures from scratch (e.g., GoogLeNet, VGG, ResNet, ...), and compare the results with the ones provided by their pre-trained versions using transfer learning.

Follow the link below to download the classification data set  “xview_recognition”: [https://drive.upm.es/s/2DDPE2zHw5dbM3G](https://drive.upm.es/s/2DDPE2zHw5dbM3G)

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import requests
import zipfile

url = 'https://drive.upm.es/s/2DDPE2zHw5dbM3G/download'
zip_name = 'dataset.zip'

r = requests.get(url, stream=True)
with open(zip_name, 'wb') as f:
    for chunk in r.iter_content(chunk_size=1024):
        f.write(chunk)

if os.path.getsize(zip_name) < 10000:
    print(f"ERREUR : Le fichier {zip_name} est trop petit. Le lien est invalide ou nécessite une connexion.")
else:
    with zipfile.ZipFile(zip_name, 'r') as z:
        z.extractall(".")

    target_file = 'xview_ann_train.json'
    found_path = None

    for root, dirs, files in os.walk("."):
        if target_file in files:
            found_path = os.path.join(root, target_file)
            break

    if found_path:
        print(f"SUCCÈS : Fichier trouvé à : {found_path}")

        import json
        json_file = found_path

        with open(json_file) as ifs:
            json_data = json.load(ifs)
        print("Base de données chargée avec succès !")

    else:
        print(f"ERREUR : {target_file} reste introuvable après extraction.")

SUCCÈS : Fichier trouvé à : ./xview_recognition/xview_ann_train.json
Base de données chargée avec succès !


In [2]:
import tensorflow as tf

# Vérifie la présence de processeurs graphiques
gpus = tf.config.list_physical_devices('GPU')

if gpus:
    try:
        # Configuration pour ne pas allouer toute la mémoire d'un coup
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"GPU activé : {len(gpus)} processeur(s) détecté(s)")
    except RuntimeError as e:
        print(e)
else:
    print("GPU non détecté. Activez l'accélérateur dans les réglages du notebook.")

GPU non détecté. Activez l'accélérateur dans les réglages du notebook.


In [3]:
import uuid
import numpy as np

class GenericObject:
    """
    Generic object data.
    """
    def __init__(self):
        self.id = uuid.uuid4()
        self.bb = (-1, -1, -1, -1)
        self.category= -1
        self.score = -1

class GenericImage:
    """
    Generic image data.
    """
    def __init__(self, filename):
        self.filename = filename
        self.tile = np.array([-1, -1, -1, -1])  # (pt_x, pt_y, pt_x+width, pt_y+height)
        self.objects = list([])

    def add_object(self, obj: GenericObject):
        self.objects.append(obj)

In [4]:
categories = {0: 'Cargo plane', 1: 'Small car', 2: 'Bus', 3: 'Truck', 4: 'Motorboat', 5: 'Fishing vessel', 6: 'Dump truck', 7: 'Excavator', 8: 'Building', 9: 'Helipad', 10: 'Storage tank', 11: 'Shipping container', 12: 'Pylon'}

In [5]:
!pip install rasterio



In [6]:
import warnings
import rasterio
import numpy as np

def load_geoimage(filename):
    warnings.filterwarnings('ignore', category=rasterio.errors.NotGeoreferencedWarning)
    src_raster = rasterio.open('./xview_recognition/'+filename, 'r')
    # RasterIO to OpenCV (see inconsistencies between libjpeg and libjpeg-turbo)
    input_type = src_raster.profile['dtype']
    input_channels = src_raster.count
    img = np.zeros((src_raster.height, src_raster.width, src_raster.count), dtype=input_type)
    for band in range(input_channels):
        img[:, :, band] = src_raster.read(band+1)
    return img

#### Training
Design and train a ffNN to deal with the “xview_recognition” classification task.

In [7]:
import json

# Load database
json_file = './xview_recognition/xview_ann_train.json'
with open(json_file) as ifs:
    json_data = json.load(ifs)
ifs.close()

In [8]:
import numpy as np

counts = dict.fromkeys(categories.values(), 0)
anns = []
for json_img, json_ann in zip(json_data['images'].values(), json_data['annotations'].values()):
    image = GenericImage(json_img['filename'])
    image.tile = np.array([0, 0, json_img['width'], json_img['height']])
    obj = GenericObject()
    obj.bb = (int(json_ann['bbox'][0]), int(json_ann['bbox'][1]), int(json_ann['bbox'][2]), int(json_ann['bbox'][3]))
    obj.category = json_ann['category_id']
    # Resampling strategy to reduce training time
    counts[obj.category] += 1
    image.add_object(obj)
    anns.append(image)
print(counts)
labels = [img.objects[0].category for img in anns]

{'Cargo plane': 635, 'Small car': 3324, 'Bus': 1768, 'Truck': 2210, 'Motorboat': 1069, 'Fishing vessel': 706, 'Dump truck': 1236, 'Excavator': 789, 'Building': 3594, 'Helipad': 111, 'Storage tank': 1469, 'Shipping container': 1523, 'Pylon': 312}


In [9]:
from sklearn.model_selection import train_test_split

anns_train, anns_valid = train_test_split(anns, test_size=0.1, random_state=1, shuffle=True, stratify=labels)
print('Number of training images: ' + str(len(anns_train)))
print('Number of validation images: ' + str(len(anns_valid)))

Number of training images: 16871
Number of validation images: 1875


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Flatten, Input, BatchNormalization, Dropout
from tensorflow.keras.optimizers import Nadam
from tensorflow.keras.regularizers import l2
import tensorflow as tf
from tensorflow.keras.losses import CategoricalFocalCrossentropy


model = Sequential()
model.add(Input(shape=(32, 32, 3)))
model.add(tf.keras.layers.RandomFlip("horizontal_and_vertical"))
model.add(tf.keras.layers.RandomRotation(0.25))  # 0.25 = 90°
model.add(Flatten())

# Layer 1 : Réduit à 512 pour éviter l'explosion de paramètres (Overfitting)
model.add(Dense(1024, kernel_initializer='he_normal'))
model.add(BatchNormalization())
model.add(Activation('swish'))
model.add(Dropout(0.2))

# Layer 2
model.add(Dense(1024, kernel_initializer='he_normal'))
model.add(BatchNormalization())
model.add(Activation('swish'))
model.add(Dropout(0.2))

# Layer 3
#model.add(Dense(256, kernel_initializer='he_normal', kernel_regularizer=l2(0.002)))
#model.add(BatchNormalization())
#model.add(Activation('swish'))
#model.add(Dropout(0.2))

# Output
model.add(Dense(len(categories)))
model.add(Activation('softmax'))

model.summary()

In [11]:
#from tensorflow.keras.optimizers import Adam
#
## Learning rate is changed to 0.001
#opt = Adam(learning_rate=lr_schedule, beta_1=0.9, beta_2=0.999, epsilon=1e-8, amsgrad=True, clipnorm=1.0)
#model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
from tensorflow.keras.callbacks import TerminateOnNaN, EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

model_checkpoint = ModelCheckpoint('model.keras', monitor='val_loss', save_best_only=True, mode='min')
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# Supprimer ReduceLROnPlateau si CosineDecay est utilisé

terminate = TerminateOnNaN()

callbacks = [model_checkpoint, early_stop, terminate]

In [None]:
def generator_images(objs, batch_size, do_shuffle=False):
    while True:
        working = list(objs)  # copie, jamais de mutation
        if do_shuffle:
            np.random.shuffle(working)
        groups = [working[i:i+batch_size] for i in range(0, len(working), batch_size)]
        for group in groups:
            images, labels = [], []
            for (filename, obj) in group:
                img = load_geoimage(filename)
                img_tensor = tf.convert_to_tensor(img)
                img_tensor = tf.image.convert_image_dtype(img_tensor, tf.float32)
                img_resized = tf.image.resize(img_tensor, [32, 32], method='bicubic')
                images.append(img_resized.numpy())
                label_idx = list(categories.keys())[list(categories.values()).index(obj.category)]
                one_hot = tf.keras.utils.to_categorical(label_idx, num_classes=len(categories))
                labels.append(one_hot)
            yield np.array(images), np.array(labels)

In [None]:
# Generate the list of objects from annotations
objs_train = [(ann.filename, obj) for ann in anns_train for obj in ann.objects]
objs_valid = [(ann.filename, obj) for ann in anns_valid for obj in ann.objects]

In [None]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight

# labels_train = liste des catégories pour chaque objet dans objs_train
labels_train = [obj.category for (_, obj) in objs_train]
classes = np.array(sorted(set(labels_train)))

weights = compute_class_weight(class_weight='balanced', classes=classes, y=labels_train)
class_weight_dict = dict(zip(classes, weights))

In [None]:
# Generators
batch_size = 128 # change to 32 to stabilize the gradients
train_generator = generator_images(objs_train, batch_size, do_shuffle=True)
valid_generator = generator_images(objs_valid, batch_size, do_shuffle=False)

In [None]:
import numpy as np

counts_array = np.array([counts[categories[i]] for i in range(len(categories))], dtype=np.float32)
alpha_per_class = 1.0 / counts_array
alpha_per_class = alpha_per_class / alpha_per_class.sum()

lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=0.001,
    decay_steps=100 * (len(objs_train) // 128)
)
optimizer = Nadam(learning_rate=lr_schedule, weight_decay=1e-4)

model.compile(
    optimizer=optimizer,
    loss=CategoricalFocalCrossentropy(alpha=alpha_per_class.tolist(), gamma=2.0),
    metrics=['accuracy']
)

In [None]:
import math
import numpy as np

print('Training model')
epochs = 100 # model still improving after 20 epochs -> stabilization
train_steps = math.ceil(len(objs_train)/batch_size)
valid_steps = math.ceil(len(objs_valid)/batch_size)
h = model.fit(
    train_generator,
    steps_per_epoch=train_steps,
    validation_data=valid_generator,
    validation_steps=valid_steps,
    epochs=epochs,
    callbacks=callbacks,
    #class_weight=class_weight_dict, 
    verbose=1
)# Best validation model
best_idx = int(np.argmin(h.history['val_loss']))
best_value = np.min(h.history['val_loss'])
print('Best validation model: epoch ' + str(best_idx+1), ' - val_loss ' + str(best_value))

Training model
Epoch 1/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m157s[0m 1s/step - accuracy: 0.2943 - loss: 0.4366 - val_accuracy: 0.3168 - val_loss: 0.4272
Epoch 2/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m135s[0m 1s/step - accuracy: 0.4161 - loss: 0.3069 - val_accuracy: 0.3787 - val_loss: 0.3510
Epoch 3/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m132s[0m 1s/step - accuracy: 0.4654 - loss: 0.2741 - val_accuracy: 0.4277 - val_loss: 0.3043
Epoch 4/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m145s[0m 1s/step - accuracy: 0.4803 - loss: 0.2592 - val_accuracy: 0.3941 - val_loss: 0.3576
Epoch 5/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m135s[0m 1s/step - accuracy: 0.4995 - loss: 0.2454 - val_accuracy: 0.4389 - val_loss: 0.3038
Epoch 6/100
[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m136s[0m 1s/step - accuracy: 0.5210 - loss: 0.2347 - val_accuracy: 0.4523 - val_loss: 0.2956
E

#### Validation
Compute validation metrics.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

def draw_confusion_matrix(cm, categories):
    # Draw confusion matrix
    fig = plt.figure(figsize=[6.4*pow(len(categories), 0.5), 4.8*pow(len(categories), 0.5)])
    ax = fig.add_subplot(111)
    cm = cm.astype('float') / np.maximum(cm.sum(axis=1)[:, np.newaxis], np.finfo(np.float64).eps)
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.colormaps['Blues'])
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=list(categories.values()), yticklabels=list(categories.values()), ylabel='Annotation', xlabel='Prediction')
    # Rotate the tick labels and set their alignment
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    # Loop over data dimensions and create text annotations
    thresh = cm.max() / 2.0
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], '.2f'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black", fontsize=int(20-pow(len(categories), 0.5)))
    fig.tight_layout()
    plt.show()

In [None]:
import numpy as np
import tensorflow as tf

model.load_weights('model.keras')
y_true, y_pred = [], []
all_images = []
temp_true_labels = []

print("Chargement des données de validation...")
for ann in anns_valid:
    # 1. Charger l'image brute
    image_raw = load_geoimage(ann.filename)

    # 2. Prétraitement IDENTIQUE à l'entraînement
    image_tensor = tf.convert_to_tensor(image_raw)
    image_tensor = tf.image.convert_image_dtype(image_tensor, tf.float32)
    # C'est cette ligne qui manquait :
    image_resized = tf.image.resize(image_tensor, [32, 32], method='bicubic')

    # Stocker pour chaque objet de l'image
    for obj_pred in ann.objects:
        all_images.append(image_resized.numpy())
        temp_true_labels.append(obj_pred.category)

if all_images:
    X_valid = np.array(all_images)
    print(f"Lancement de la prédiction sur {len(X_valid)} objets...")

    all_predictions = model.predict(X_valid, batch_size=64, verbose=1)

    category_names = list(categories.values())
    for i in range(len(all_predictions)):
        pred_category = category_names[np.argmax(all_predictions[i])]
        y_true.append(temp_true_labels[i])
        y_pred.append(pred_category)

In [None]:
from sklearn.metrics import confusion_matrix

# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(categories.values()))
draw_confusion_matrix(cm, categories)

In [None]:
import numpy as np

# Compute the accuracy
correct_samples_class = np.diag(cm).astype(float)
total_samples_class = np.sum(cm, axis=1).astype(float)
total_predicts_class = np.sum(cm, axis=0).astype(float)
print('Mean Accuracy: %.3f%%' % (np.sum(correct_samples_class) / np.sum(total_samples_class) * 100))
acc = correct_samples_class / np.maximum(total_samples_class, np.finfo(np.float64).eps)
print('Mean Recall: %.3f%%' % (acc.mean() * 100))
acc = correct_samples_class / np.maximum(total_predicts_class, np.finfo(np.float64).eps)
print('Mean Precision: %.3f%%' % (acc.mean() * 100))
for idx in range(len(categories)):
    # True/False Positives (TP/FP) refer to the number of predicted positives that were correct/incorrect.
    # True/False Negatives (TN/FN) refer to the number of predicted negatives that were correct/incorrect.
    tp = cm[idx, idx]
    fp = sum(cm[:, idx]) - tp
    fn = sum(cm[idx, :]) - tp
    tn = sum(np.delete(sum(cm) - cm[idx, :], idx))
    # True Positive Rate: proportion of real positive cases that were correctly predicted as positive.
    recall = tp / np.maximum(tp+fn, np.finfo(np.float64).eps)
    # Precision: proportion of predicted positive cases that were truly real positives.
    precision = tp / np.maximum(tp+fp, np.finfo(np.float64).eps)
    # True Negative Rate: proportion of real negative cases that were correctly predicted as negative.
    specificity = tn / np.maximum(tn+fp, np.finfo(np.float64).eps)
    # Dice coefficient refers to two times the intersection of two sets divided by the sum of their areas.
    # Dice = 2 |A∩B| / (|A|+|B|) = 2 TP / (2 TP + FP + FN)
    f1_score = 2 * ((precision * recall) / np.maximum(precision+recall, np.finfo(np.float64).eps))
    print('> %s: Recall: %.3f%% Precision: %.3f%% Specificity: %.3f%% Dice: %.3f%%' % (list(categories.values())[idx], recall*100, precision*100, specificity*100, f1_score*100))

#### Testing
Try to improve the results provided in the competition.

In [None]:
import os
import numpy as np

anns = []
root_dir = './xview_recognition/'
test_dir = os.path.join(root_dir, 'xview_test')
for (dirpath, dirnames, filenames) in os.walk(test_dir):
    for filename in filenames:
        rel_dir = os.path.relpath(dirpath, root_dir)
        clean_filename = os.path.join(rel_dir, filename)
        image = GenericImage(clean_filename)
        image.tile = np.array([0, 0, 224, 224])
        obj = GenericObject()
        obj.bb = (0, 0, 224, 224)
        obj.category = os.path.basename(dirpath)
        image.add_object(obj)
        anns.append(image)
print('Number of testing images: ' + str(len(anns)))

In [None]:
import numpy as np
import tensorflow as tf

model.load_weights('model.keras')
predictions_data = {"images": {}, "annotations": {}}
all_test_images = []
metadata = [] # Pour stocker (image_id, bbox) dans le même ordre que les images
ann_id = 0

print("Préparation des images de test...")
for idx, ann in enumerate(anns):
    image_data = {
        "image_id": ann.filename.split('/')[-1],
        "filename": ann.filename,
        "width": int(ann.tile[2]),
        "height": int(ann.tile[3])
    }
    predictions_data["images"][idx] = image_data

    # Prétraitement (une seule fois par image source)
    image_raw = load_geoimage(ann.filename)
    image_tensor = tf.convert_to_tensor(image_raw)
    image_tensor = tf.image.convert_image_dtype(image_tensor, tf.float32)
    image_resized = tf.image.resize(image_tensor, [64,64], method='bicubic')
    img_final = image_resized.numpy()

    for obj_pred in ann.objects:
        all_test_images.append(img_final)
        metadata.append({
            "image_id": ann.filename.split('/')[-1],
            "bbox": [int(x) for x in obj_pred.bb]
        })

# Prédiction massive sur GPU
if all_test_images:
    X_test = np.array(all_test_images)
    print(f"Prédiction en cours sur {len(X_test)} détections...")

    # Utilisation du batch_size pour saturer les T4
    all_preds = model.predict(X_test, batch_size=128, verbose=1)

    category_names = list(categories.values())

    # Reconstruction du dictionnaire final
    for i, pred in enumerate(all_preds):
        pred_category = category_names[np.argmax(pred)]

        predictions_data["annotations"][ann_id] = {
            "image_id": metadata[i]["image_id"],
            "category_id": pred_category,
            "bbox": metadata[i]["bbox"]
        }
        ann_id += 1
print("Test terminé.")

In [None]:
import json
with open('prediction.json', 'w') as f:
    json.dump(predictions_data, f)