# Classification de cris d'insectes

Les populations d’insectes terrestres subissent actuellement un déclin massif d’environ 10% par décennie. Il est urgent de mieux comprendre les causes de ce déclin et surtout d’identifier les méthodes de gestion de l'environnement permettant de le limiter. Néanmoins la recherche traditionnelle en entomologie est peu outillée pour l’étude non invasive (sans capture et sans mise à mort) et à haute fréquence spatio-temporelle des milliers d’espèces qui caractérisent cette classe très diversifiée du règne animal.

Pour ces raisons, des approches de suivi des populations basées IA et capteurs émergent ; l'idée est d'utiliser des enregistreurs audio de type *soundscape* pour écouter et reconnaître la présence et la fréquence de cris des différentes espèces.

Dans ce projet, vous devrez mettre en place un réseau de neurones pour la reconnaissance d'espèces d'insectes chanteurs (cigales, grillons, sauterelles, criquets) à partir de courts échantillons audio.

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
import pandas as pd
import os
import sys
from tqdm import tqdm
import torch.nn.functional as F
import audiomentations as AA
import numpy as np
import os
import pandas as pd
from torch.utils.data import Dataset
import librosa
import torchaudio.transforms as T
import torchaudio
from python_speech_features import mfcc
from itertools import islice

### Extraction des audios et transformation en dataloder

In [None]:
class InsectImage(Dataset):
    def __init__(self, annotations_file, img_dir, data_type, transform, list_augment):
        #On récupère le fichier csv
        self.img_labels = pd.read_csv(annotations_file, sep=',')
        self.img_dir = img_dir
        
        # On ne fait qu'un nombre fixé d'augmentations selon le nb d'occurences des classes
        self.data_type = self.img_labels[
            (self.img_labels['class_ID'].isin(list_augment)) &
            (self.img_labels['data_set'] == data_type)
        ]
        # transformation pour l'augmentation de donnée 
        self.transform = transform

    def __len__(self):
        return len(self.data_type)

    def decoupage_segments(self, audio_path, duree_segment=5):
        """
        Méthode pour extraire des segments de 5 secondes pour chaque audio et remplissage avec des 0 pour le dernier segment
        si il ne dure pas 5 secondes.
        En sortie on a une liste comportant tous les segments de 5 secondes de l'audio actuel
        """
        audio, sr = librosa.load(audio_path, sr=None)
        frames_par_segment = int(sr * duree_segment)
        nombre_segments = int(np.ceil(len(audio) / frames_par_segment))
        segments = []
        for i in range(nombre_segments):
            segment_audio = audio[i * frames_par_segment: (i + 1) * frames_par_segment]
            dernier_segment_taille = len(segment_audio)
            if dernier_segment_taille < frames_par_segment:
                segment_audio = np.pad(segment_audio, (0, frames_par_segment - dernier_segment_taille), mode='constant')
            segments.append(segment_audio)
        return segments, sr

    def augmentation(self, segment):
        """
        Méthode permettant d'appliquer la transformation choisit par l'utilisateur
        """
        if self.transform=='left_shift':
            S_db = AA.Shift(min_shift=-0.5, max_shift=-0.1, p=1)(segment, sample_rate=44100)
        elif self.transform=='right_shift':
            S_db = AA.Shift(min_shift=0.1, max_shift=0.5, p=1)(segment, sample_rate=44100)
        elif self.transform == 'slow_stretch':
            S_db = AA.TimeStretch(min_rate=0.7, max_rate=0.9, leave_length_unchanged=True)(segment, sample_rate=44100)
        elif self.transform == 'accelerate_stretch':
            S_db = AA.TimeStretch(min_rate=1.7, max_rate=1.9, leave_length_unchanged=True)(segment, sample_rate=44100)
        elif self.transform == 'few_noise':
            S_db = AA.AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=1)(segment, sample_rate=44100)
        elif self.transform == 'lot_noise':
            S_db = AA.AddGaussianNoise(min_amplitude=0.01, max_amplitude=0.15, p=1)(segment, sample_rate=44100)
        elif self.transform == 'lower_pitch':
            S_db =AA.PitchShift(min_semitones=-2, max_semitones=-1)(segment, sample_rate=44100)
        elif self.transform == 'higher_pitch':
            S_db =AA.PitchShift(min_semitones=1, max_semitones=2)(segment, sample_rate=44100)
        elif self.transform == 'time_mask':
            S_db = AA.TimeMask(min_band_part=0.1,max_band_part=0.15,fade=True,p=1.0)(segment, sample_rate=44100)
        elif self.transform == 'reverse':
            S_db = AA.Reverse(p=1.0)(segment, sample_rate=44100)
        else :
            S_db = segment
        return S_db
        
        
    def __getitem__(self, idx):
        """
        Pour chaque audio et chaque segment de l'audio on applique la transfromation et on transforme le signal en 
        un spectrogramme
        """
        audio_path = os.path.join(self.img_dir, self.data_type.iloc[idx, 1], self.data_type.iloc[idx, 0])
        ## Récupération du label associé à l'audio
        label = self.data_type.iloc[idx, 2]
        #Découpage de l'audio en segments
        segments, sr = self.decoupage_segments(audio_path)
        ##On compte le nb de segments pour l'audio (pour pouvoir ensuite calculer le nb d'occurences par classe)
        longueur_segment = len(segments)

        for segment in segments:
            ## On applique l'augmentation
            S_db = self.augmentation(segment)
            #On applique la stft pour avoir un spectrogramme
            D = librosa.stft(S_db.astype('float32'))
            S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)
            ##Une autre transformation possible sur le spectrogramme
            if self.transform == 'freq_mask':
                S_db = AA.SpecFrequencyMask(p=1.0)(S_db)
            S_db = torch.from_numpy(S_db)
            S_db = torch.unsqueeze(S_db, dim=0)
            
            return S_db, label, longueur_segment


Le chemin 'projetmodia' correspond au dossier qui contient les données, nous vous conseillons de le mettre dans le même dossier que ce notebook 

In [None]:
import os
# récupérer le chemin du répertoire courant
path = os.getcwd()
path = path + "/projetmodia/"

## Calcul du nombre d'occurences sans ré-équilibrage des classes

In [None]:
# Liste avec tous les labels
list_all_Labels = list(range(32))

In [None]:
dataset_train_cicadae = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', data_type='train', transform = None, list_augment= list_all_Labels)
dataset_train_orthoptera = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', data_type='train', transform = None, list_augment = list_all_Labels)
train = torch.utils.data.ConcatDataset([dataset_train_cicadae, dataset_train_orthoptera])
train_dataloader2 = DataLoader(train, batch_size=1, shuffle=True)

In [None]:
class_occ = {}
for _, data in enumerate(train_dataloader2):
    nb_donnees = data[2].item()
    class_label=data[1].item()
    if class_label in class_occ.keys():
        class_occ[class_label]+= nb_donnees
    else:
        class_occ[class_label]=nb_donnees

In [None]:
sorted_dict = dict(sorted(class_occ.items()))
print(sorted_dict)
dataframe = pd.DataFrame.from_dict(sorted_dict,  orient='index', columns = ['Occurences'])

In [None]:
class_mapping = {
    0: 'Azanicadazuluensis',
    1: 'Brevisianabrevis',
    2: 'Chorthippusbiguttulus',
    3: 'Chorthippusbrunneus',
    4: 'Grylluscampestris',
    5: 'Kikihiamuta',
    6: 'Myopsaltaleona',
    7: 'Myopsaltalongicauda',
    8: 'Myopsaltamackinlayi',
    9: 'Myopsaltamelanobasis',
    10: 'Myopsaltaxerograsidia',
    11: 'Nemobiussylvestris',
    12: 'Oecanthuspellucens',
    13: 'Pholidopteragriseoaptera',
    14: 'Platypleuracapensis',
    15: 'Platypleuracfcatenata',
    16: 'Platypleurachalybaea',
    17: 'Platypleuradeusta',
    18: 'Platypleuradivisa',
    19: 'Platypleurahaglundi',
    20: 'Platypleurahirtipennis',
    21: 'Platypleuraintercapedinis',
    22: 'Platypleuraplumosa',
    23: 'Platypleurasp04',
    24: 'Platypleurasp10',
    25: 'Platypleurasp11cfhirtipennis',
    26: 'Platypleurasp12cfhirtipennis',
    27: 'Platypleurasp13',
    28: 'Pseudochorthippusparallelus',
    29: 'Pycnasemiclara',
    30: 'Roeselianaroeselii',
    31: 'Tettigoniaviridissima'
}

# Appliquez la correspondance à la colonne des numéros de classes
dataframe['class_names'] = class_mapping

In [None]:
import plotly.express as px
fig = px.bar(dataframe, x='class_names', y='Occurences', title='Répartition des classes')
fig.show()

Nous remarquons que certaines classes ont beaucoup plus d'occurences que d'autres et cela peut impacter la classification

## Augmentation de données

Nous appliquons ensuite de l'augmentation de données pour à la fois avoir plus d'exemples car le dataset est petit mais aussi pour ré-équilibrer les classes

In [None]:
# Toutes les transformations possibles
transformations = ['left_shift', 'right_shift', 'slow_stretch', 'accelerate_stretch', 'few_noise', 'lot_noise','lower_pitch', 'higher_pitch', 'higher_pitch', 'time_mask', 'freq_mask', 'reverse']

In [None]:
# Calcul quartiles pour augmentation de données
q1 = dataframe['Occurences'].quantile(0.25)
median = dataframe['Occurences'].median()
q3 = dataframe['Occurences'].quantile(0.75)

# Listes des classes pour l'augmentations de données
classes_q1 = dataframe[dataframe['Occurences'] <= q1].index.tolist()
classes_q1_median = dataframe[(dataframe['Occurences'] > q1) & (dataframe['Occurences'] <= median)].index.tolist()
classes_median_q3 = dataframe[(dataframe['Occurences'] > median) & (dataframe['Occurences'] <= q3)].index.tolist()
classes_q3 = dataframe[dataframe['Occurences'] > q3].index.tolist()

In [None]:
import random
# On effectue les 10 transformations pour les classes qui ont un nb d'occurences inférieurs au 1er quartile
train_cicadae1 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q1, data_type='train', transform = None)
train_orthoptera1 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q1, data_type='train', transform = None)
train_dataset1 = torch.utils.data.ConcatDataset([train_cicadae1, train_orthoptera1])
transformQ1_list = random.sample(transformations, 11)
for transform in transformQ1_list :
    dataset_train_cicadae1 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q1, data_type='train', transform = transform)
    dataset_train_orthoptera1 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q1, data_type='train', transform = transform)
    train_dataset1 = torch.utils.data.ConcatDataset([train_dataset1, dataset_train_cicadae1, dataset_train_orthoptera1])
  

# On effectue 6 transformations choisis aléatoirement sans remise pour les classes qui ont un nb d'occurences compris entre le 1er quartile et la médiane
train_cicadae2 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q1_median, data_type='train', transform = None)
train_orthoptera2 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q1_median, data_type='train', transform = None)
train_dataset2 = torch.utils.data.ConcatDataset([train_cicadae2, train_orthoptera2])
transformQ2_list = random.sample(transformations, 6)
for transform in transformQ2_list :
    dataset_train_cicadae2 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q1_median, data_type='train', transform = transform)
    dataset_train_orthoptera2 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q1_median, data_type='train', transform = transform)
    train_dataset2 = torch.utils.data.ConcatDataset([train_dataset2, dataset_train_cicadae2, dataset_train_orthoptera2])
 


# On effectue 3 transformations choisis aléatoirement sans remise pour les classes qui ont un nb d'occurences compris entre la médiane et le 3ieme quartile
train_cicadae3 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_median_q3, data_type='train', transform = None)
train_orthoptera3 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_median_q3, data_type='train', transform = None)
train_dataset3 = torch.utils.data.ConcatDataset([train_cicadae3, train_orthoptera3])
transformQ3_list = random.sample(transformations, 3)
for transform in transformQ3_list :
    dataset_train_cicadae3 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_median_q3, data_type='train', transform = transform)
    dataset_train_orthoptera3 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_median_q3, data_type='train', transform = transform)
    train_dataset3 = torch.utils.data.ConcatDataset([train_dataset3, dataset_train_cicadae3, dataset_train_orthoptera3])


# On effectue 1 transformation choisi aléatoirement sans remise pour les classes qui ont un nb d'occurences > au 3ieme quartile
train_cicadae4 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q3, data_type='train', transform = None)
train_orthoptera4 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q3, data_type='train', transform = None)
train_dataset4 = torch.utils.data.ConcatDataset([train_cicadae4, train_orthoptera4])
transformQ4_list = random.sample(transformations, 1)
for transform in transformQ4_list :
    dataset_train_cicadae4 = InsectImage(path+'Cicadidae.csv', path+'Cicadidae/Cicadidae/', list_augment=classes_q3, data_type='train', transform = transform)
    dataset_train_orthoptera4 = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', list_augment=classes_q3, data_type='train', transform = transform)
    train_dataset4 = torch.utils.data.ConcatDataset([train_dataset4, dataset_train_cicadae4, dataset_train_orthoptera4])

    

## Calcul du nombre d'occurences après augmentation

In [None]:
dataset_all_augments = torch.utils.data.ConcatDataset([train_dataset1, train_dataset2, train_dataset3, train_dataset4])
train_dataloader_complete = DataLoader(dataset_all_augments, batch_size=1, shuffle=True)
class_occ = {}

for _, data in enumerate(train_dataloader_complete):
    nb_donnees = data[2].item()
    class_label=data[1].item()
    if class_label in class_occ.keys():
        class_occ[class_label]+= nb_donnees
    else:
        class_occ[class_label]=nb_donnees

sorted_dict = dict(sorted(class_occ.items()))
dataframe_final = pd.DataFrame.from_dict(sorted_dict,  orient='index', columns = ['Occurences'])

In [None]:
class_mapping = {
    0: 'Azanicadazuluensis',
    1: 'Brevisianabrevis',
    2: 'Chorthippusbiguttulus',
    3: 'Chorthippusbrunneus',
    4: 'Grylluscampestris',
    5: 'Kikihiamuta',
    6: 'Myopsaltaleona',
    7: 'Myopsaltalongicauda',
    8: 'Myopsaltamackinlayi',
    9: 'Myopsaltamelanobasis',
    10: 'Myopsaltaxerograsidia',
    11: 'Nemobiussylvestris',
    12: 'Oecanthuspellucens',
    13: 'Pholidopteragriseoaptera',
    14: 'Platypleuracapensis',
    15: 'Platypleuracfcatenata',
    16: 'Platypleurachalybaea',
    17: 'Platypleuradeusta',
    18: 'Platypleuradivisa',
    19: 'Platypleurahaglundi',
    20: 'Platypleurahirtipennis',
    21: 'Platypleuraintercapedinis',
    22: 'Platypleuraplumosa',
    23: 'Platypleurasp04',
    24: 'Platypleurasp10',
    25: 'Platypleurasp11cfhirtipennis',
    26: 'Platypleurasp12cfhirtipennis',
    27: 'Platypleurasp13',
    28: 'Pseudochorthippusparallelus',
    29: 'Pycnasemiclara',
    30: 'Roeselianaroeselii',
    31: 'Tettigoniaviridissima'
}

dataframe_final['class_names'] = class_mapping
fig = px.bar(dataframe_final, x='class_names', y='Occurences', title='Répartition des classes')
fig.show()


Maintenant, nous voyons que les classes sont à peu près équilibrées 

## Construction du dataset d'entrainement, de validation et de test

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Ajout de poids pour la loss 

In [None]:
class_weights = torch.tensor(1.0 / dataframe_final['Occurences'].values, dtype=torch.float32)
class_weights = class_weights.to(device)

Dataset pour l'entraînement avec batch de 32 et non 1

In [None]:
train_dataloader_final = DataLoader(dataset_all_augments, batch_size=32, shuffle=True)

Dataset de validation et de test

In [None]:
dataset_test_orthoptera = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', data_type='test',transform = None, list_augment = list_all_Labels)
dataset_validation_orthoptera = InsectImage(path+'Orthoptera.csv', path+'Orthoptera/Orthoptera/', data_type='validation',transform =None, list_augment = list_all_Labels)

In [None]:
dataset_test_cicadae = InsectImage(path+'Cicadidae.csv', path+ 'Cicadidae/Cicadidae/', data_type='test', transform = None, list_augment=list_all_Labels)
dataset_validation_cicadae = InsectImage(path+'Cicadidae.csv',path+ 'Cicadidae/Cicadidae/', data_type='validation', transform =None, list_augment=list_all_Labels)

In [None]:
test_dataset = torch.utils.data.ConcatDataset([dataset_test_cicadae, dataset_test_orthoptera])
val_dataset = torch.utils.data.ConcatDataset([dataset_validation_cicadae, dataset_validation_orthoptera])

In [None]:
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
validation_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True)

# Modèle 

# Xception

Il vous faudra avoir la version 0.9.12 du package timm pour pouvoir exécuter la partie suivante  

In [None]:
import torch.nn as nn
import torch
import timm
model = timm.create_model('xception', pretrained=False, num_classes=1000)
path = os.getcwd()
model.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
model.fc = nn.Linear(in_features=2048, out_features=32, bias=True)
model.load_state_dict(torch.load(path+'/xception.pth'))

In [None]:
from torchvision import models, transforms
model = model.to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.001)
loss_values = []
accuracy_values = []
loss_values_validation = []
accuracy_values_validation = []

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    print('Epoch {}/{}'.format(epoch+1, num_epochs))
    print('-------------')
    running_loss = 0.0
    n_samples =0.0
    running_acc = 0.0
    for i, data in  enumerate(tqdm(train_dataloader_final)):
        img, label = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = model(img)
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()
        preds= torch.argmax(outputs, dim=1)
        running_loss += loss.item() * img.size(0)
        n_samples += label.size(0)
        running_acc += torch.sum(preds == label).item()
 
    epoch_loss = running_loss / n_samples
    epoch_accuracy = running_acc / n_samples * 100.0

    loss_values.append(epoch_loss)
    accuracy_values.append(epoch_accuracy)
    print('Epoch [{}/{}], Loss Train: {:.4f}, Accuracy Train: {:.4f}%'.format(epoch + 1, num_epochs, epoch_loss, epoch_accuracy))
    
    ##### Validation loop
    
    valid_loss = 0.0
    n_samples =0.0
    running_acc_validation = 0.0
    running_loss_validation = 0.0
    model.eval()     
    for i, data in enumerate(validation_dataloader):
        img, label = data[0].to(device), data[1].to(device)
        outputs = model(img)
        loss = criterion(outputs,label)
        preds = torch.argmax(outputs, dim=1)
        running_loss_validation += loss.item() * img.size(0)
        n_samples += label.size(0)
        running_acc_validation += torch.sum(preds == label).item()
 
    epoch_loss_validation = running_loss_validation / n_samples
    epoch_accuracy_validation = running_acc_validation / n_samples * 100.0

    loss_values_validation.append(epoch_loss_validation)
    accuracy_values_validation.append(epoch_accuracy_validation)
    print('Epoch [{}/{}], Loss Validation: {:.4f}, Accuracy Validation: {:.4f}%'.format(epoch + 1, num_epochs, epoch_loss_validation, epoch_accuracy_validation))
    


In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import f1_score
correct = 0
total = 0
model.eval() 
true_labels = []
predicted_labels = []

with torch.no_grad():
    for _, data in enumerate(test_dataloader):
        img, true_labels_batch = data[0].to(device), data[1].to(device)
        outputs = model(img)
        preds = torch.argmax(outputs, dim=1)
        true_labels.extend(true_labels_batch.cpu().numpy())
        predicted_labels.extend(preds.cpu().numpy())
        total += true_labels_batch.size(0)
        correct += (preds == true_labels_batch).sum().item()
        
acc = correct / total * 100.0
print('accuracy est de seulement: {:.2f}%'.format(acc))

## Calcul du f1-score : 
y_true = np.array(true_labels)
y_pred = np.array(predicted_labels)

sommaire = classification_report(y_true, y_pred, output_dict=True)

for class_label, metrics in sommaire.items():
    if class_label.isdigit():
        F1_score_class = metrics['f1-score']
        print(f"F1-score pour la classe {class_label} : {F1_score_class}")
        
f1_score_moyen = sommaire['weighted avg']['f1-score']
print(f"F1-score moyen : {f1_score_moyen}")


conf_matrix = confusion_matrix(y_true, y_pred)
# Afficher la matrice de confusion
plt.figure(figsize=(12, 10))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(32), yticklabels=range(32))
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig('matrice_confusion.png')
plt.show()

