# Combinaison des Textes et images dans un réseau de neuronnes bimodal

In [None]:
import h5py
import numpy as np
import pandas as pd

from PIL import Image

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from tensorflow.keras.preprocessing.text import Tokenizer
from keras.preprocessing import sequence
from keras.utils import to_categorical
from keras_preprocessing.sequence import pad_sequences
import datetime
from sklearn.preprocessing import LabelEncoder

import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard

import matplotlib.pyplot as plt
import seaborn as sns
import cv2

import warnings
warnings.filterwarnings("ignore")


In [None]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))

## Intégration des données et traitements

In [None]:
# Loading the 2 CSVs
X = pd.read_csv( 'X_train.csv', delimiter=',', index_col=0)
y = pd.read_csv( 'Y_train.csv', delimiter=',', index_col=0)

In [None]:
# On met de côté un jeu de test auquel on ne touchera pas jusqu'à la fin, au moment de mesurer la performance du modèle retenu
X, X_test, y, y_test = train_test_split(X, y, test_size=0.2, random_state=0, stratify=y)

In [None]:
X['img_path'] = 'image_' + X['imageid'].astype(str) + '_product_' + X['productid'].astype(str) + '.jpg'

In [None]:
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# Nombre total de catégories
num_categories = len(label_encoder.classes_)

# Convertir en représentation one-hot
y_one_hot = to_categorical(y_encoded, num_categories)


In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, y_one_hot, 
                                                          test_size=0.2, 
                                                          random_state=0, 
                                                          stratify=y)


In [None]:
n_class = 27
max_words = 10000
max_len = 34          # correspond au nombre de mots maximum du jeu de données

## Génération des images cropping et size 100x100x3

In [None]:
def img_preprocessing(img_filename, input_dir, width=100, height=100, cropping=True):

    """
    - Par défaut, recadre l'image en lui retirant ses marges blanches si elle en a ;
    - Redimensionne l'image en 224 x 224 par défaut, ou selon les dimensions passées en arguments 'width' et 'height'
    """

    # Lecture de l'image avec OpenCV
    img = cv2.imread(input_dir + img_filename)

    if cropping:
        # Conversion de l'image en niveaux de gris
        img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
        # Inversion des niveaux de gris
        img_reversed = cv2.bitwise_not(img_gray)
    
        # Recherche des contours dans l'image
        contours, hierarchy = cv2.findContours(img_reversed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
        if contours:
            # Recherche du contour le plus externe
            contour_external = max(contours, key=cv2.contourArea)
    
            # Recherche des coordonnées du rectangle englobant le contour externe
            x_min, y_min, w, h = cv2.boundingRect(contour_external)
    
            # Rognement de l'image en utilisant les coordonnées du rectangle englobant
            img = img[y_min:y_min+h, x_min:x_min+w]

    # Redimensionnement de l'image
    img_resized = cv2.resize(img, (width, height))

    return img_resized;

In [None]:
import sys

folder_img = 'C:/Users/Nans/Documents/Rakuten Project/images/images/image_train/'
x_img = []

for idx, item in enumerate(X_train['img_path'].values):
    x_img.append(img_preprocessing(item, folder_img))
    sys.stdout.write("\rProgression : {}/{}".format(idx+1, len(X_train['img_path'].values)))
    sys.stdout.flush()
    

In [None]:
x_img_val = []

for idx, item in enumerate(X_val['img_path'].values):
    x_img_val.append(img_preprocessing(item, folder_img))
    sys.stdout.write("\rProgression : {}/{}".format(idx+1, len(X_val['img_path'].values)))
    sys.stdout.flush()

In [None]:
label_encoder.classes_

## Création du tokenizer et du modèle bimodal

In [None]:
# on importe le tokenizer créé dans la partie deep learning texte
import pickle

# Charger le tokenizer avec pickle
with open("../models/tokenizer.pkl", "rb") as f:
    tok = pickle.load(f)

In [None]:
# Définition du modèle pour le texte (embedding)
max_seq_length = 34  # Longueur maximale de la séquence de texte
vocab_size = 10000   # Taille du vocabulaire
embedding_dim = 100  # Dimension de l'embedding

# pour la partie train
#tok = Tokenizer(num_words=vocab_size)
#tok.fit_on_texts(X_train['designation'])
sequences = tok.texts_to_sequences(X_train['designation'])
sequences_matrix = pad_sequences(sequences,maxlen=max_seq_length)

# pour la partie test
test_sequences = tok.texts_to_sequences(X_val['designation'])
test_sequences_matrix = pad_sequences(test_sequences,maxlen=max_seq_length)


In [None]:
# Enregistrer le tableau en HDF5
with h5py.File('xy_val_100.h5', 'w') as f:
    f.create_dataset('x_img_val', data=x_img_val)
    f.create_dataset('test_sequences_matrix', data=test_sequences_matrix)
    f.create_dataset('y_val', data=y_val)

In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint

# Early stopping
early_stopping = EarlyStopping( patience=2, # Attendre n epochs avant application
                                min_delta = 0.01, # si au bout de n epochs la fonction de perte ne varie pas de 1%, 
                                verbose=1, # Afficher à quel epoch on s'arrête
                                mode = 'min',
                                monitor='val_loss')

# Learning rate
reduce_learning_rate = ReduceLROnPlateau(
                                    monitor="val_loss",
                                    patience=2, #si val_loss stagne sur n epochs consécutives selon la valeur min_delta
                                    min_delta= 0.01,
                                    factor=0.5,  # On réduit le learning rate d'un facteur x
                                    cooldown = 4, # On attend n epochs avant de réitérer 
                                    verbose=1)

# on compile nos callbacks
callbacks_list = [early_stopping, reduce_learning_rate]


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Flatten, Input, Embedding, Concatenate
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

# Définition du modèle pour les images (VGG16 pré-entraîné)
image_input = Input(shape=(100, 100, 3))
img_model = VGG16(weights='imagenet', include_top=False, input_tensor=image_input)
image_features = Flatten()(img_model.output)

# Définition du modèle pour le texte
text_input = Input(shape=(max_seq_length,))
text_embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim)(text_input)
text_features = Flatten()(text_embedding)

# Concaténation des caractéristiques des deux modalités
merged_features = Concatenate()([image_features, text_features])

# Couche dense pour la classification finale
output = Dense(27, activation='softmax')(merged_features)

# Création du modèle multimodal
model = Model(inputs=[image_input, text_input], outputs=output)

# Compiler le modèle
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# Afficher un résumé du modèle
model.summary()

## Entrainement du modèle

In [None]:
# Entraîner le modèle en utilisant vos données
model.fit(x=[X_img, sequences_matrix], 
          y=y_train, 
          epochs=10, 
          batch_size=16, 
          callbacks=callbacks_list,
          validation_data=([X_img_val, test_sequences_matrix], y_val))


In [None]:
accr = model.evaluate([X_img_val, test_sequences_matrix], y_val)

In [None]:
# Le chemin vers la sauvegarde du réseau
model_savepath  = '../models/combi.h5'

# Sauvegarde du réseau après entrainement
model.save(model_savepath)


## Validation du modèle et résultats

In [None]:
from tensorflow.keras.models import load_model

model_savepath  = '../models/combi.h5'

# Charger le modèle
model = load_model(model_savepath)


In [None]:
pred = model.predict([X_img_val, test_sequences_matrix])

y_pred = np.argmax(pred, axis = -1)
y_true = np.argmax(y_val, axis = -1)

cm = tf.math.confusion_matrix(labels=y_true, predictions=y_pred).numpy()
normalized_cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

In [None]:
from sklearn.metrics import classification_report

report = classification_report(np.array(y_true), np.array(y_pred), target_names=list(map(str,label_encoder.classes_)))
print(report)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(15, 8))
sns.heatmap(normalized_cm, annot=True, cmap='Blues', fmt='.2f', 
            xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)

plt.title('Matrice de confusion normalisée')
plt.xlabel('Prédictions')
plt.ylabel('Valeurs réelles')

# Rotation des étiquettes des axes pour éviter les chevauchements
plt.xticks(rotation=45, ha='right', fontsize=10)
plt.yticks(rotation=0, fontsize=10)

# Ajustement de l'espacement pour que tout soit bien visible
plt.tight_layout()

plt.show()