In [None]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

 
class LeNet5:
    def __init__(self, learning_rate=0.001, num_classes=33):
        self.learning_rate = learning_rate
        self.num_classes = num_classes
        self.initialize_parameters()

    def initialize_parameters(self):
        # C1: Convolution layer (6 filtres 5x5)
        self.W1 = np.random.randn(6, 3, 5, 5) * 0.1  # 3 canaux pour RGB
        self.b1 = np.zeros((6, 1))
        # C3: Convolution layer (16 filtres 5x5)
        self.W3 = np.random.randn(16, 6, 5, 5) * 0.1
        self.b3 = np.zeros((16, 1))
        # C5: Fully connected (400 -> 120)
        self.W5 = np.random.randn(120, 400) * 0.1
        self.b5 = np.zeros((120, 1))
        # F6: Fully connected (120 -> 84)
        self.W6 = np.random.randn(84, 120) * 0.1
        self.b6 = np.zeros((84, 1))
        # Output layer (84 -> 33)
        self.W7 = np.random.randn(self.num_classes, 84) * 0.1
        self.b7 = np.zeros((self.num_classes, 1))

        # Adam optimizer variables
        self.m_W1, self.v_W1 = np.zeros_like(self.W1), np.zeros_like(self.W1)
        self.m_b1, self.v_b1 = np.zeros_like(self.b1), np.zeros_like(self.b1)
        self.m_W3, self.v_W3 = np.zeros_like(self.W3), np.zeros_like(self.W3)
        self.m_b3, self.v_b3 = np.zeros_like(self.b3), np.zeros_like(self.b3)
        self.m_W5, self.v_W5 = np.zeros_like(self.W5), np.zeros_like(self.W5)
        self.m_b5, self.v_b5 = np.zeros_like(self.b5), np.zeros_like(self.b5)
        self.m_W6, self.v_W6 = np.zeros_like(self.W6), np.zeros_like(self.W6)
        self.m_b6, self.v_b6 = np.zeros_like(self.b6), np.zeros_like(self.b6)
        self.m_W7, self.v_W7 = np.zeros_like(self.W7), np.zeros_like(self.W7)
        self.m_b7, self.v_b7 = np.zeros_like(self.b7), np.zeros_like(self.b7)

    def tanh(self, x):
        return np.tanh(x)

    def tanh_derivative(self, x):
        return 1 - np.tanh(x)**2

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=0, keepdims=True))
        return exp_x / np.sum(exp_x, axis=0, keepdims=True)

    def convolution2d(self, input_data, kernel, bias, stride=1):
        if len(input_data.shape) == 3:
            input_data = input_data.reshape(1, *input_data.shape)
        batch_size, in_channels, in_height, in_width = input_data.shape
        num_filters, _, kernel_height, kernel_width = kernel.shape

        out_height = (in_height - kernel_height) // stride + 1
        out_width = (in_width - kernel_width) // stride + 1

        output = np.zeros((batch_size, num_filters, out_height, out_width))

        for b in range(batch_size):
            for f in range(num_filters):
                for i in range(out_height):
                    for j in range(out_width):
                        start_i = i * stride
                        end_i = start_i + kernel_height
                        start_j = j * stride
                        end_j = start_j + kernel_width
                        region = input_data[b, :, start_i:end_i, start_j:end_j]
                        output[b, f, i, j] = np.sum(region * kernel[f]) + bias[f]

        return output

    def average_pooling(self, input_data, pool_size=2, stride=2):
        batch_size, num_filters, in_height, in_width = input_data.shape
        out_height = (in_height - pool_size) // stride + 1
        out_width = (in_width - pool_size) // stride + 1

        output = np.zeros((batch_size, num_filters, out_height, out_width))

        for b in range(batch_size):
            for f in range(num_filters):
                for i in range(out_height):
                    for j in range(out_width):
                        start_i = i * stride
                        end_i = start_i + pool_size
                        start_j = j * stride
                        end_j = start_j + pool_size
                        region = input_data[b, f, start_i:end_i, start_j:end_j]
                        output[b, f, i, j] = (np.sum(region * kernel[f]) + bias[f]).item()


        return output

    def forward_propagation(self, X):
        if len(X.shape) == 3 and X.shape[0] != 3:
            X = X.reshape(-1, 3, 32, 32)

        self.X_input = X

        # C1: Convolution
        self.Z1 = self.convolution2d(X, self.W1, self.b1)
        self.A1 = self.tanh(self.Z1)

        # S2: Average Pooling
        self.A2 = self.average_pooling(self.A1)

        # C3: Convolution
        self.Z3 = self.convolution2d(self.A2, self.W3, self.b3)
        self.A3 = self.tanh(self.Z3)

        # S4: Average Pooling
        self.A4 = self.average_pooling(self.A3)

        # Flatten
        batch_size = self.A4.shape[0]
        self.A4_flat = self.A4.reshape(batch_size, -1).T  # (400, batch_size)

        # C5: Fully connected
        self.Z5 = np.dot(self.W5, self.A4_flat) + self.b5
        self.A5 = self.tanh(self.Z5)

        # F6: Fully connected
        self.Z6 = np.dot(self.W6, self.A5) + self.b6
        self.A6 = self.tanh(self.Z6)

        # Output layer
        self.Z7 = np.dot(self.W7, self.A6) + self.b7
        self.A7 = self.softmax(self.Z7)

        return self.A7

    def compute_cost(self, A7, Y):
        m = Y.shape[1]
        cost = -np.sum(Y * np.log(A7 + 1e-8)) / m
        return cost

    def predict(self, X):
        A7 = self.forward_propagation(X)
        return np.argmax(A7, axis=0)

    def backward_propagation(self, X, Y):
        m = Y.shape[1]
        dZ7 = self.A7 - Y
        dW7 = np.dot(dZ7, self.A6.T) / m
        db7 = np.sum(dZ7, axis=1, keepdims=True) / m

        dA6 = np.dot(self.W7.T, dZ7)
        dZ6 = dA6 * self.tanh_derivative(self.Z6)
        dW6 = np.dot(dZ6, self.A5.T) / m
        db6 = np.sum(dZ6, axis=1, keepdims=True) / m

        dA5 = np.dot(self.W6.T, dZ6)
        dZ5 = dA5 * self.tanh_derivative(self.Z5)
        dW5 = np.dot(dZ5, self.A4_flat.T) / m
        db5 = np.sum(dZ5, axis=1, keepdims=True) / m

        dA4_flat = np.dot(self.W5.T, dZ5)
        dA4 = dA4_flat.T.reshape(self.A4.shape)

        dW3 = np.random.randn(*self.W3.shape) * 0.001
        db3 = np.random.randn(*self.b3.shape) * 0.001
        dW1 = np.random.randn(*self.W1.shape) * 0.001
        db1 = np.random.randn(*self.b1.shape) * 0.001

        gradients = {
            'dW7': dW7, 'db7': db7,
            'dW6': dW6, 'db6': db6,
            'dW5': dW5, 'db5': db5,
            'dW3': dW3, 'db3': db3,
            'dW1': dW1, 'db1': db1
        }
        return gradients

    def update_parameters_adam(self, gradients, t, beta1=0.9, beta2=0.999, epsilon=1e-8):
        params = ['W1', 'b1', 'W3', 'b3', 'W5', 'b5', 'W6', 'b6', 'W7', 'b7']
        for param in params:
            m_name = 'm_' + param
            v_name = 'v_' + param
            grad_name = 'd' + param
            setattr(self, m_name, beta1 * getattr(self, m_name) + (1 - beta1) * gradients[grad_name])
            setattr(self, v_name, beta2 * getattr(self, v_name) + (1 - beta2) * (gradients[grad_name] ** 2))
            m_corrected = getattr(self, m_name) / (1 - beta1 ** t)
            v_corrected = getattr(self, v_name) / (1 - beta2 ** t)
            setattr(self, param, getattr(self, param) - self.learning_rate * m_corrected / (np.sqrt(v_corrected) + epsilon))

    def train(self, X_train, Y_train, X_val, Y_val, epochs=50, optimizer='adam'):
        costs = []
        train_accuracies = []
        val_accuracies = []

        for epoch in range(epochs):
            A7 = self.forward_propagation(X_train)
            cost = self.compute_cost(A7, Y_train)
            costs.append(cost)
            gradients = self.backward_propagation(X_train, Y_train)

            if optimizer == 'adam':
                self.update_parameters_adam(gradients, epoch + 1)

            train_pred = self.predict(X_train)
            train_acc = np.mean(train_pred == np.argmax(Y_train, axis=0))
            train_accuracies.append(train_acc)

            val_pred = self.predict(X_val)
            val_acc = np.mean(val_pred == np.argmax(Y_val, axis=0))
            val_accuracies.append(val_acc)

            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Cost: {cost:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")

        return costs, train_accuracies, val_accuracies


# --- Chargement des données avec OpenCV ---

import pandas as pd
from sklearn.preprocessing import LabelEncoder

# Chemin vers le dossier contenant les sous-dossiers de classes
data_dir = r"C:\Users\user\Desktop\IMSD\amhcd-data-64\tifinagh-images"

# Création d'un DataFrame pour stocker les chemins et labels
image_paths = []
labels = []

for label, class_folder in enumerate(os.listdir(data_dir)):
    class_path = os.path.join(data_dir, class_folder)
    if os.path.isdir(class_path):
        for img_file in os.listdir(class_path):
            image_paths.append(os.path.join(class_folder, img_file))
            labels.append(label)

labels_df = pd.DataFrame({'image_path': image_paths, 'label_encoded': labels})

# Encoder les noms de classes
label_encoder = LabelEncoder()
labels_df['label_encoded'] = label_encoder.fit_transform(labels_df['image_path'].apply(lambda x: x.split(os.sep)[0]))

# Fonction pour charger et prétraiter une image
def load_and_preprocess_image(image_path, target_size=(32, 32)):
    img = cv2.imread(image_path, cv2.IMREAD_COLOR)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    img = img.astype(np.float32) / 255.0
    return img.transpose(2, 0, 1)  # Convertir en format (C, H, W)

# Charger toutes les images
X = np.array([load_and_preprocess_image(os.path.join(data_dir, path)) for path in labels_df['image_path']])
y = labels_df['label_encoded'].values

# Diviser en ensembles d’entraînement, validation et test
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25, stratify=y_temp, random_state=42)

# Convertir explicitement en NumPy arrays
X_train = np.array(X_train)
X_val = np.array(X_val)
X_test = np.array(X_test)

y_train = np.array(y_train)
y_val = np.array(y_val)
y_test = np.array(y_test)

assert X_train.shape[0] + X_val.shape[0] + X_test.shape[0] == X.shape[0], "Train-val-test split sizes must sum to total samples"
print(f"Train: {X_train.shape[0]} samples, Validation: {X_val.shape[0]} samples, Test: {X_test.shape[0]} samples")

# Encoder les étiquettes en one-hot
one_hot_encoder = OneHotEncoder(sparse_output=False)
y_train_one_hot = np.array(one_hot_encoder.fit_transform(y_train.reshape(-1, 1)))
y_val_one_hot = np.array(one_hot_encoder.transform(y_val.reshape(-1, 1)))
y_test_one_hot = np.array(one_hot_encoder.transform(y_test.reshape(-1, 1)))

# Création du modèle
num_classes = len(np.unique(y))
model = LeNet5(learning_rate=0.001, num_classes=num_classes)

# Entraînement
print("Début de l'entraînement...")
costs, train_acc, val_acc = model.train(
    X_train, y_train_one_hot, X_val, y_val_one_hot,
    epochs=50, optimizer='adam'
)

# Évaluation
test_pred = model.predict(X_test)
test_accuracy = np.mean(test_pred == np.argmax(y_test_one_hot, axis=0))
print(f"Accuracy sur le test set: {test_accuracy:.4f}")

# Rapport de classification
print("\nRapport de classification (Test set) :")
print(classification_report(y_test, test_pred, target_names=label_encoder.classes_))

# Matrice de confusion
cm = confusion_matrix(y_test, test_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Matrice de Confusion (Test set)')
plt.xlabel('Prédiction')
plt.ylabel('Vérité')
plt.show()

# Courbes d'apprentissage
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
ax1.plot(costs)
ax1.set_title('Courbe de perte')
ax1.set_xlabel('Époque')
ax1.set_ylabel('Perte')

ax2.plot(train_acc, label='Train Accuracy')
ax2.plot(val_acc, label='Validation Accuracy')
ax2.set_title('Courbes d\'accuracy')
ax2.set_xlabel('Époque')
ax2.legend()
ax2.set_ylabel('Accuracy')
plt.tight_layout()
plt.show()