**INTERPOLATION DU FILTRE**

Les codes ci-dessous permettent d'abord d'interpoler le filtre de dimension 4x4 vers une dimension 50x50. 
Ensuite, le filtre est projeté dans l'espace de Schwartz. Explication dans le rapport.

In [None]:
from scipy.interpolate import RegularGridInterpolator
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import torch 
import torch.nn as nn
import copy
from collections import defaultdict
# -----------------------------
# 1. Interpolation du filtre
# -----------------------------
def interpolate_filter(filter_weights, smooth_factor=50):
    """Interpolation cubique bilinéaire pour passer du filtre discret à une fonction continue
    retour : 
    - Zsmooth : filtre projeté en dimension 50x50
    """
    h, w = filter_weights.shape
    y = np.arange(h)
    x = np.arange(w)
    interp_func = RegularGridInterpolator((y, x), filter_weights, method='cubic')
    ynew = np.linspace(0, h-1, smooth_factor)
    xnew = np.linspace(0, w-1, smooth_factor)
    X, Y = np.meshgrid(xnew, ynew)
    points = np.array([Y.ravel(), X.ravel()]).T
    Zsmooth = interp_func(points).reshape(smooth_factor, smooth_factor)
    return Zsmooth

In [None]:
# -----------------------------
# 2. Projection dans l'espace de Schwartz
# -----------------------------
def project_to_schwartz(Zsmooth, sigma=None):
    """Projette un filtre interpolé dans un espace proche de S(R^2) via une fenêtre gaussienne
    retour : 
    - Z_schwartz : filtre projeté dans l'espace de Schwartz 
    """
    H, W = Zsmooth.shape
    if sigma is None:
        sigma = min(H, W) / 2
    y = np.arange(H) - H/2
    x = np.arange(W) - W/2
    X, Y = np.meshgrid(x, y)
    gauss_window = np.exp(-(X**2 + Y**2) / (2*sigma**2))
    Z_schwartz = Zsmooth * gauss_window
    return Z_schwartz

**NORME DE SOBOLEV : MESURE DE REGULARITE** 

Le code ci-dessous permet de définir la norme de Sobolev utilisé pour mesure la régularité des filtres

In [None]:
# -----------------------------
# 1. Définition de la fenêtre Gausienne utilisée pour la définition de la norme locale
# -----------------------------
def gaussian_window(shape, sigma=1.0, center=None):
    """Fenêtre gaussienne normalisée 2D
    retour : 
    - g : la fenêtre gausienne
    """
    n, m = shape
    if center is None:
        center = (n//2, m//2)
    y = np.arange(n) - center[0]
    x = np.arange(m) - center[1]
    X, Y = np.meshgrid(x, y)
    g = np.exp(-(X**2 + Y**2)/(2*sigma**2))
    g /= np.sqrt(np.sum(g**2))
    return g
# -----------------------------
# 2. Définition de la norme de Sobolev locale
# -----------------------------
def local_sobolev_gauss_3D(h, block_size=(5,5), alpha=2, sigma=1.0):
    """
    Calcule la régularité locale de sous-blocs d'un filtre 2D
    avec fenêtre gaussienne et pondération Sobolev.

    retour :
    - reg_map : matrice de régularité locale
    """
    n, m = h.shape
    bh, bw = block_size
    reg_map = np.zeros((n - bh + 1, m - bw + 1))

    for i in range(n - bh + 1):
        for j in range(m - bw + 1):
            block = h[i:i+bh, j:j+bw]
            g = gaussian_window(block.shape, sigma=sigma)
            block_win = block * g

            H = np.fft.fft2(block_win)
            H = np.fft.fftshift(H)

            u = np.fft.fftshift(np.fft.fftfreq(bh))
            v = np.fft.fftshift(np.fft.fftfreq(bw))
            U, V = np.meshgrid(u, v, indexing='ij')

            freq_weight = (1 + U**2 + V**2)**alpha
            reg_map[i,j] = np.sum(freq_weight * np.abs(H)**2)

    return reg_map


**TRAITEMENT DES POIDS**

Une fois la norme définit, il nous définissons une methode de comparaison des filtres sur la base de cette dernière

In [None]:
# -----------------------------
# 1. Fonction de sélection (utiliser pour définir le P)
# -----------------------------
def graphcut_filter_surface(Zsmooth, percentile):
    """
    Conserve uniquement les valeurs les plus petites (en valeur absolue)
    selon un seuil basé sur le percentile d'un filtre donné
    
    retour : 
    - Zcut : filtre conservant seuleument les poids de régularité appartenant au percentile de régularité donné
    - mask : masque crée suite au filtrage
    """
    threshold = np.percentile(np.abs(Zsmooth), percentile)
    mask = np.abs(Zsmooth) <= threshold  # True là où on garde
    Zcut = np.where(mask, Zsmooth, np.nan)  # NaN pour visualiser les trous
    return Zcut, mask
def magnitude_mesure(Zcut):
    """
    Calcule la somme des valeurs absolues de Zcut.
    Les NaN sont traités comme des zéros.
    """
    return np.nansum(np.abs(Zcut))

In [None]:
# -----------------------------
# 2. Calcule de la régularité sur l'ensemble du filtre
# -----------------------------
def ultimate_graphcut_filter_analysis_reg(filter_weights):
  """
  Calcule la régularité totale d'un filtre en se basant sur la mesure locale
  
  retour : 
  - valeur de la régularité du filtre
  """
  Zsmooth = interpolate_filter(filter_weights)
  Z_schwartz = project_to_schwartz(Zsmooth)
  reg_map = local_sobolev_gauss_3D(Zsmooth, block_size=(5,5), alpha=2, sigma=58)
  Zcut, mask = graphcut_filter_surface(reg_map, percentile=80) #Pourcentage à modifier 
  return magnitude_mesure(Zcut)

**CLASSEMENT DES FILTRES**

Une fois la norme établit, nous parcourons l'ensemble des filtres de l'ensemble des couches de convolution afin de classe les filtres selon la valeur de la norme

In [None]:
# -----------------------------
# 1. Classement des filtres selon leur valeur de régularité
# -----------------------------
conv_layers = [module for module in model.modules() if isinstance(module, nn.Conv2d)]

filter_ranking = {}
conv_layers = [m for m in model.modules() if isinstance(m, torch.nn.Conv2d)]

for m_idx, conv_layer in enumerate(conv_layers):
    print(f"   Couche {m_idx+1}/{len(conv_layers)}...")

    # Vérification si la couche est un bottleneck
    # Si la couche est un bottleneck, on la saute
    # Le bottleneck est une convolution 1x1 après une couche de plus grande taille, on suppose que c'est un critère ici
    if conv_layer.kernel_size == (1, 1):
        print(f"   Couche {m_idx+1} est un bottleneck, on passe.")
        continue

    filter_ranking[m_idx] = {}

    out_channels, in_channels, h, w = conv_layer.weight.shape

    for oc in range(out_channels):
        print(oc)
        filter_ranking[m_idx][oc] = {}
        for ic in range(in_channels):
            print(ic)

            # 1. Récupère les poids du filtre
            filter_weights = conv_layer.weight[oc, ic].detach().cpu().numpy()

            #2. Calcule la régularité du filtre
            base_importance = ultimate_graphcut_filter_analysis_reg(filter_weights)

            # 3. Peuple le dictionnaire à l'indice de la couche et du filtre correspondant, la valeur étant la mesure de régularité
            filter_ranking[m_idx][oc][ic] = float(base_importance)




In [None]:
# -----------------------------
# 2. Enregistrement du dictionnaire
# -----------------------------
import json 
with open("filter_ranking.json", "w") as f: 
    json.dump(filter_ranking, f, indent=4)
from google.colab import files
files.download("filter_ranking.json")

**PRUNING DU MODELE**

Une fois le dictionnaire de classement des filtres récupéré, nous prunons notre modèle en conservant un pourcentage fixe des filtres les plus réguliers. Nous reconstruisons ainsi un modèle plus léger que nous ré-entrainons ensuite

In [None]:
def keep_top_percentile_filters(filter_ranking, percentile):
    """
    Keeps the top 'percentile' of filters based on their base_importance score.

    Args:
        filter_ranking (dict): A dictionary containing the ranking of filters.
                               Expected format: {layer_idx: {out_channel_idx: {in_channel_idx: base_importance}}}
        percentile (int): The percentile to keep (e.g., 10 for top 10%).

    Returns:
        dict: A new dictionary containing only the top percentile filters.
    """
    all_importances = []
    # Collect all base_importance values
    for layer_idx, out_channels in filter_ranking.items():
        for out_channel_idx, in_channels in out_channels.items():
            for in_channel_idx, importance in in_channels.items():
                # Only consider non-NaN importance values
                if not np.isnan(importance):
                    all_importances.append(importance)

    if not all_importances:
        print("No valid importance values found.")
        return {}

    # Calculate the threshold based on the percentile
    threshold = np.percentile(all_importances, 100 - percentile) # Keep values >= threshold

    # Create a new dictionary with only the top percentile filters
    top_filters_ranking = {}
    for layer_idx, out_channels in filter_ranking.items():
        top_filters_ranking[layer_idx] = {}
        for out_channel_idx, in_channels in out_channels.items():
            top_filters_ranking[layer_idx][out_channel_idx] = {}
            for in_channel_idx, importance in in_channels.items():
                if not np.isnan(importance) and importance <= threshold:
                    top_filters_ranking[layer_idx][out_channel_idx][in_channel_idx] = importance

    return top_filters_ranking

In [None]:
def structured_channel_pruning(model, importance_dict, keep_ratio=0.5):
    """
    Pruning structuré basé sur la régularité (petites valeurs = plus régulier = garder).

    Returns:
        dict[layer_idx] = [list of output channels to keep]
    """
    print(f"\n  DEBUG: importance_dict a {len(importance_dict)} couches")

    channel_importance = {}

    for layer_idx in importance_dict:
        channel_importance[layer_idx] = {}
        for out_ch in importance_dict[layer_idx]:
            avg_regularity = np.mean(list(importance_dict[layer_idx][out_ch].values()))
            channel_importance[layer_idx][out_ch] = avg_regularity

    channels_to_keep = {}

    for layer_idx in channel_importance:
        # Tri croissant : plus petites valeurs (plus régulier) en premier
        channels_by_regularity = sorted(
            channel_importance[layer_idx].items(),
            key=lambda x: x[1],
            reverse=False
        )

        total_channels = len(channels_by_regularity)
        n_keep = max(1, int(total_channels * keep_ratio))

        
        kept_channel_indices = sorted([int(ch) for ch, _ in channels_by_regularity[:n_keep]])

       
        channels_to_keep[int(layer_idx)] = kept_channel_indices

        
        print(f"  Layer {int(layer_idx)}: garde {n_keep}/{total_channels} canaux -> {kept_channel_indices[:5]}{'...' if len(kept_channel_indices) > 5 else ''}")

        
        if n_keep == total_channels:
            print(f"    ⚠️  ATTENTION: On garde 100% des canaux de la couche {layer_idx}!")

    return channels_to_keep


def calculate_output_size_after_layer(layer, h, w, channels):
    """Calcule la taille de sortie après une couche"""
    if isinstance(layer, nn.Conv2d):
        kernel_size = layer.kernel_size[0] if isinstance(layer.kernel_size, tuple) else layer.kernel_size
        stride = layer.stride[0] if isinstance(layer.stride, tuple) else layer.stride
        padding = layer.padding[0] if isinstance(layer.padding, tuple) else layer.padding

        h = (h + 2*padding - kernel_size) // stride + 1
        w = (w + 2*padding - kernel_size) // stride + 1
        channels = layer.out_channels

    elif isinstance(layer, nn.MaxPool2d):
        kernel_size = layer.kernel_size
        stride = layer.stride if layer.stride else kernel_size
        h = h // stride
        w = w // stride

    elif isinstance(layer, nn.AdaptiveAvgPool2d):
        # AdaptiveAvgPool force la sortie à une taille fixe
        if isinstance(layer.output_size, tuple):
            h, w = layer.output_size
        else:
            h = w = layer.output_size

    return h, w, channels


def prune_model_by_channel_removal(model, channels_to_keep, input_size=(3, 32, 32)):
    """
    Supprime des canaux entiers du modèle.
    """
    device = next(model.parameters()).device
    model_copy = copy.deepcopy(model)

    print(f"\n  DEBUG prune_model: channels_to_keep = {channels_to_keep}")

    new_layers = []
    conv_layers = [i for i, layer in enumerate(model_copy) if isinstance(layer, nn.Conv2d)]

    print(f"  Couches Conv trouvées aux indices: {conv_layers}")

    prev_kept_channels = None

    for i, layer in enumerate(model_copy):
        if isinstance(layer, nn.Conv2d):
            conv_idx = conv_layers.index(i)

            # Canaux de sortie à garder
            keep_out_channels = channels_to_keep.get(conv_idx, list(range(layer.out_channels)))

            print(f"  Conv {conv_idx} (indice {i}): {layer.out_channels} canaux -> garde {len(keep_out_channels)} canaux")

            # Canaux d'entrée
            if conv_idx == 0:
                keep_in_channels = list(range(layer.in_channels))
            else:
                keep_in_channels = prev_kept_channels if prev_kept_channels else list(range(layer.in_channels))

            # Créer nouvelle couche
            new_conv = nn.Conv2d(
                in_channels=len(keep_in_channels),
                out_channels=len(keep_out_channels),
                kernel_size=layer.kernel_size,
                stride=layer.stride,
                padding=layer.padding,
                bias=(layer.bias is not None)
            ).to(device)

            print(f"    Nouvelle Conv: in={len(keep_in_channels)}, out={len(keep_out_channels)}")

            # Copier les poids
            with torch.no_grad():
                for new_out_idx, old_out_idx in enumerate(keep_out_channels):
                    for new_in_idx, old_in_idx in enumerate(keep_in_channels):
                        new_conv.weight[new_out_idx, new_in_idx] = layer.weight[old_out_idx, old_in_idx]
                    if layer.bias is not None:
                        new_conv.bias[new_out_idx] = layer.bias[old_out_idx]

            new_layers.append(new_conv)
            prev_kept_channels = keep_out_channels

            # ✅ Vérification: s'assurer que la réduction a bien eu lieu
            if len(keep_out_channels) == layer.out_channels:
                print(f"    ⚠️  AUCUNE réduction pour Conv {conv_idx}!")
            else:
                reduction = (1 - len(keep_out_channels) / layer.out_channels) * 100
                print(f"    ✅ Réduction de {reduction:.1f}% pour Conv {conv_idx}")

        elif isinstance(layer, nn.BatchNorm2d):
            if prev_kept_channels is not None:
                new_bn = nn.BatchNorm2d(len(prev_kept_channels)).to(device)
                with torch.no_grad():
                    for new_idx, old_idx in enumerate(prev_kept_channels):
                        new_bn.weight[new_idx] = layer.weight[old_idx]
                        new_bn.bias[new_idx] = layer.bias[old_idx]
                        new_bn.running_mean[new_idx] = layer.running_mean[old_idx]
                        new_bn.running_var[new_idx] = layer.running_var[old_idx]
                new_layers.append(new_bn)
            else:
                new_layers.append(copy.deepcopy(layer))

        elif isinstance(layer, nn.Linear):
            # Vérifier si c'est la première Linear après les conv
            if len([l for l in new_layers if isinstance(l, nn.Linear)]) == 0:
                # Calculer la taille réelle après toutes les couches conv
                channels, h, w = input_size

                for prev_layer in new_layers:
                    h, w, channels = calculate_output_size_after_layer(prev_layer, h, w, channels)

                new_input_features = channels * h * w

                print(f"  Linear input: {new_input_features} (channels={channels}, h={h}, w={w})")

                new_linear = nn.Linear(
                    in_features=new_input_features,
                    out_features=layer.out_features,
                    bias=(layer.bias is not None)
                ).to(device)

                # Initialiser aléatoirement (pas de copie des poids car taille incompatible)
                with torch.no_grad():
                    nn.init.kaiming_normal_(new_linear.weight)
                    if layer.bias is not None:
                        new_linear.bias.zero_()

                new_layers.append(new_linear)
            else:
                # Autres Linear: copier tel quel
                new_layers.append(copy.deepcopy(layer))

        else:
            # Autres couches (ReLU, MaxPool, Dropout, Flatten, AdaptiveAvgPool)
            new_layers.append(copy.deepcopy(layer))

    return nn.Sequential(*new_layers).to(device)


def test_effective_pruning(model, importance_dict, trn_dl, val_dl, loss_fn,
                          keep_ratios=[0.9, 0.8, 0.7, 0.6, 0.5],
                          return_best=True):
    """
    Test de pruning avec réduction effective des paramètres.

    Args:
        return_best: Si True, retourne le meilleur modèle (meilleur compromis accuracy/compression)

    Returns:
        Si return_best=True: (best_model, results)
        Sinon: results
    """
    print("Test de Pruning avec Réduction Effective des Paramètres")
    print("="*60)

    # Baseline
    orig_params = sum(p.numel() for p in model.parameters())
    orig_loss, orig_acc = evaluate_model(model, val_dl, loss_fn)

    print(f"Modèle original: {orig_acc:.4f} accuracy, {orig_params:,} paramètres\n")

    results = []
    best_model = None
    best_score = -float('inf')

    for keep_ratio in keep_ratios:
        print(f"Test avec keep_ratio = {keep_ratio} ({keep_ratio*100:.0f}% des canaux)")
        print("-" * 40)

        try:
            # Pruning structuré
            channels_to_keep = structured_channel_pruning(model, importance_dict, keep_ratio)

            # Afficher le nombre de canaux gardés par couche
            total_kept = sum(len(chs) for chs in channels_to_keep.values())
            print(f"  Total canaux gardés: {total_kept}")

            # Appliquer le pruning
            pruned_model = prune_model_by_channel_removal(model, channels_to_keep, input_size=(3, 32, 32))

            # Vérifier la réduction
            pruned_params = sum(p.numel() for p in pruned_model.parameters())
            actual_reduction = (orig_params - pruned_params) / orig_params * 100

            print(f"\nParamètres avant: {orig_params:,}")
            print(f"Paramètres après: {pruned_params:,}")
            print(f"Réduction RÉELLE: {actual_reduction:.1f}%")

            # Test avant fine-tuning
            val_loss_before, val_acc_before = evaluate_model(pruned_model, val_dl, loss_fn)
            print(f"Accuracy avant fine-tuning: {val_acc_before:.4f}")

            # Fine-tuning
            print("Fine-tuning...")
            pruned_model = fine_tune_pruned_model(pruned_model, trn_dl, val_dl, loss_fn, epochs=5, lr=1e-3)

            val_loss_final, val_acc_final = evaluate_model(pruned_model, val_dl, loss_fn)
            accuracy_retention = (val_acc_final / orig_acc) * 100
            compression_ratio = orig_params / pruned_params if pruned_params > 0 else 0

            print(f"Accuracy finale: {val_acc_final:.4f}")
            print(f"Rétention accuracy: {accuracy_retention:.1f}%")
            print(f"Ratio de compression: {compression_ratio:.2f}x\n")

            # Calculer un score pour trouver le meilleur modèle
            # Score = accuracy_retention * log(compression_ratio)
            # Favorise un bon équilibre entre accuracy et compression
            import math
            score = accuracy_retention * math.log(compression_ratio + 1)

            results.append({
                'keep_ratio': keep_ratio,
                'actual_reduction': actual_reduction,
                'final_accuracy': val_acc_final,
                'accuracy_retention': accuracy_retention,
                'compression_ratio': compression_ratio,
                'pruned_params': pruned_params,
                'model': pruned_model,  # ✅ Sauvegarder le modèle
                'score': score
            })

            # Garder le meilleur modèle
            if score > best_score:
                best_score = score
                best_model = pruned_model
                print(f"    🏆 Nouveau meilleur modèle! (score: {score:.2f})")

        except Exception as e:
            import traceback
            print(f"Erreur: {e}")
            traceback.print_exc()
            print()

    # Résumé
    if results:
        print("\n" + "="*60)
        print("RÉSUMÉ DES RÉSULTATS")
        print("="*60)
        for r in results:
            marker = "🏆" if r['model'] == best_model else "  "
            print(f"{marker} Keep {r['keep_ratio']*100:.0f}%: {r['actual_reduction']:.1f}% réduction, "
                  f"{r['accuracy_retention']:.1f}% accuracy, {r['compression_ratio']:.2f}x compression")

    if return_best:
        return best_model, results
    else:
        return results


def evaluate_model(model, val_dl, loss_fn):
    """Évalue le modèle"""
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for x, y in val_dl:
            device = next(model.parameters()).device
            x, y = x.to(device), y.to(device)
            pred = model(x)
            total_loss += loss_fn(pred, y).item() * x.size(0)
            total_correct += (pred.argmax(1) == y).sum().item()
            total_samples += x.size(0)

    return total_loss / total_samples, total_correct / total_samples


def fine_tune_pruned_model(pruned_model, trn_dl, val_dl, loss_fn, epochs=5, lr=1e-3):
    """Fine-tune le modèle pruné"""
    from torch.optim import Adam

    device = next(pruned_model.parameters()).device
    optimizer = Adam(pruned_model.parameters(), lr=lr)

    for epoch in range(epochs):
        pruned_model.train()
        for x, y in trn_dl:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            output = pruned_model(x)
            loss = loss_fn(output, y)
            loss.backward()
            optimizer.step()

        val_loss, val_acc = evaluate_model(pruned_model, val_dl, loss_fn)
        if epoch == 0 or (epoch + 1) % 2 == 0:
            print(f"  Epoch {epoch+1}: Val Acc = {val_acc:.4f}")

    return pruned_model



In [None]:
model_test = copy.deepcopy(model) 
pruned_model = test_effective_pruning(model_test,filter_ranking,train_loader, val_loader, loss_fn) 