In [1]:
from generate_signal import generate_X_matrix
from beamforming import beamforming_method
from generate_signal import generate_A_matrix
from generate_signal import generate_noise
from generate_signal import generate_R_hat_with_phase
from generate_signal import generate_R_hat
from music import music_method
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split 
import numpy as np


nbSources = 2 # Nombre de sources
nbSensors = 10 # Nombre de capteurs
nbTimePoints = 100 # Nombre de points temporels
signal_noise_ratio = 3 # Rapport signal sur bruit en décibels. Si 'False', cela revient à une absence totale de bruit.
L = 10
T = 100
correlation_List = [0.4] # Liste des corrélations. Il y a une corrélation nécéssaire pour chaque paire distincte de sources différentes: 0 pour 1 source, 1 pour 2 sources, 3 pour 3 sources, 6 pour 4 sources etc...
# Ordre de remplisage de la correlation_List: de gauche à droite et ligne par ligne, moitié haut-droite de la matrice uniquement, puis symétrie de ces valeurs pour la moitié bas-gauche.


In [4]:
num_samples = 1000 # Nombre d'échantillons à générer

# Initialiser les tableaux pour stocker les matrices de covariance et les angles estimés
all_R_hat = np.zeros((num_samples, L, L,3), dtype=complex)
all_estimated_angles = []
for i in range(num_samples):
    # Générer des angles aléatoires pour chaque échantillon
    theta1 = np.random.uniform(-90, 90)
    theta2 = np.random.uniform(-90, 90)
    var1 = np.random.uniform(0,1000000000000)
    var2 = np.random.uniform(0,1000000000000)
    thetaList = [theta1, theta2]
    varList = [var1, var2]

    # Générer la matrice X
    X = generate_X_matrix(nbSources, L, T, thetaList, varList, correlation_List, signal_noise_ratio)

    # Calcul de la matrice de covariance pour l'échantillon actuel
    R_hat = generate_R_hat_with_phase(X)
    all_R_hat[i] = R_hat

    # Calcul du spectre MUSIC pour l'échantillon actuel
    estimated_angles = music_method(X, L, T, print_angles = False, draw_plot = False)
    all_estimated_angles.append(estimated_angles)

# Division des données en ensembles d'entraînement et de test
all_estimated_angles = np.array(all_estimated_angles)  # Conversion en numpy array si nécessaire
R_hat_train, R_hat_test, angles_train, angles_test = train_test_split(
    all_R_hat, all_estimated_angles, test_size=0.2, random_state=42
)

[[7.66631036e+11 2.83705710e+11]
 [2.83705710e+11 6.56190251e+11]]
[[7.65929069e+11 2.38900667e+11]
 [2.38900667e+11 4.65721394e+11]]
[[1.95416697e+11 1.64514738e+11]
 [1.64514738e+11 8.65621369e+11]]
[[1.71012287e+11 1.63007361e+11]
 [1.63007361e+11 9.71107114e+11]]
[[4.88635718e+11 1.08131594e+11]
 [1.08131594e+11 1.49554683e+11]]
[[8.47901836e+11 2.19313998e+11]
 [2.19313998e+11 3.54541556e+11]]
[[4.97546379e+11 2.04113415e+11]
 [2.04113415e+11 5.23346766e+11]]
[[9.81190560e+11 9.39034829e+10]
 [9.39034829e+10 5.61681419e+10]]
[[5.70334660e+11 2.99408611e+11]
 [2.99408611e+11 9.82378449e+11]]
[[1.12974091e+11 1.24770145e+11]
 [1.24770145e+11 8.61236687e+11]]
[[7.22694827e+11 2.24299160e+11]
 [2.24299160e+11 4.35091267e+11]]
[[3.89208539e+09 2.14750686e+10]
 [2.14750686e+10 7.40571125e+11]]
[[1.67245381e+11 1.57636963e+11]
 [1.57636963e+11 9.28628492e+11]]
[[4.16635419e+11 2.08288271e+11]
 [2.08288271e+11 6.50808867e+11]]
[[2.69542005e+11 1.18747989e+11]
 [1.18747989e+11 3.26968629e+

In [73]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Définir l'input_size et le nombre de classes
input_size = (10, 10)  # Remplacez par la taille correcte de vos données d'entrée
num_classes = 1799  # Remplacez par le nombre correct de classes

class DOAEstimator(nn.Module):
    def __init__(self, input_size, num_classes):
        super(DOAEstimator, self).__init__()

        # Modifier les couches en fonction de l'input_size
        self.conv1 = nn.Conv1d(32, 128, kernel_size=3, stride=2)
        self.conv2 = nn.Conv1d(128, 128, kernel_size=2)
        self.conv3 = nn.Conv1d(128, 128, kernel_size=2)
        self.conv4 = nn.Conv1d(128, 128, kernel_size=2)

        # Couches de normalisation par lots
        self.bn1 = nn.BatchNorm1d(128)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(128)
        self.bn4 = nn.BatchNorm1d(128)

        # Couche de mise à plat
        self.flatten = nn.Flatten()

        # Couches entièrement connectées (FC)
        self.fc1 = nn.Linear(128 * (((input_size[0] - 6) // 2) - 6) * (((input_size[1] - 6) // 2) - 6), 2048)
        self.fc2 = nn.Linear(2048, 1024)
        self.fc3 = nn.Linear(1024, num_classes)

    def forward(self, x):
        # Couches de convolution
        x = F.relu(self.conv1(x))
        x = F.relu(self.bn1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.bn2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.bn3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.bn4(x))

        # Couche de mise à plat
        x = self.flatten(x)

        # Couches entièrement connectées (FC)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))

        return x

# Créer une instance du modèle avec les dimensions appropriées
model = DOAEstimator(input_size=input_size, num_classes=num_classes)

# Afficher le modèle
print(model)


DOAEstimator(
  (conv1): Conv1d(32, 128, kernel_size=(3,), stride=(2,))
  (conv2): Conv1d(128, 128, kernel_size=(2,), stride=(1,))
  (conv3): Conv1d(128, 128, kernel_size=(2,), stride=(1,))
  (conv4): Conv1d(128, 128, kernel_size=(2,), stride=(1,))
  (bn1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn3): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (bn4): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=2048, out_features=2048, bias=True)
  (fc2): Linear(in_features=2048, out_features=1024, bias=True)
  (fc3): Linear(in_features=1024, out_features=1799, bias=True)
)


In [74]:
import torch
from torch.utils.data import Dataset, DataLoader

class DOADataset(Dataset):
    def __init__(self, covariance_matrices, doa_angles):
        self.covariance_matrices = covariance_matrices
        self.doa_angles = doa_angles

    def __len__(self):
        return len(self.covariance_matrices)

    def __getitem__(self, idx):
        cov_matrix = self.covariance_matrices[idx]
        angle = self.doa_angles[idx]

        # Conversion en tenseurs PyTorch
        cov_matrix = torch.from_numpy(np.real(cov_matrix)).float()
        angle = torch.tensor(angle).float()

        return cov_matrix, angle

# Création des instances de DOADataset
train_dataset = DOADataset(R_hat_train, angles_train)
test_dataset = DOADataset(R_hat_test, angles_test)

# Création des DataLoaders pour l'entraînement et le test
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [75]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)

print(f"Using {device} device")

Using cpu device


In [76]:
import torch.optim as optim

# Configuration de l'optimiseur
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()

# Fonctions de boucle d'entraînement et d'évaluation
def train_loop(dataloader, model, loss_fn, optimizer):
    total_loss = 0
    for batch, (X, y) in enumerate(dataloader):
        # Aplatir les données
        X_flattened = X.view(X.size(0), -1)

        # Compute prediction and loss
        pred = model(X_flattened)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{len(dataloader.dataset):>5d}]")

    return total_loss / len(dataloader)

def eval_loop(dataloader, model, loss_fn):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X_flattened = X.view(X.size(0), -1)
            pred = model(X_flattened)
            total_loss += loss_fn(pred, y).item()

    avg_loss = total_loss / len(dataloader)
    print(f"Test Avg loss: {avg_loss:>8f} \n")
    return avg_loss

# Nombre d'époques
epochs = 10

# Boucle d'entraînement
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train_loss = train_loop(train_dataloader, model, loss_fn, optimizer)
    print(f"Training Loss: {train_loss}")
    test_loss = eval_loop(test_dataloader, model, loss_fn)
    print(f"Testing Loss: {test_loss}")
print("Done!")


Epoch 1
-------------------------------


RuntimeError: running_mean should contain 49 elements not 128

In [80]:
import torch
import numpy as np

# Assume que input_data est un tableau complexe (complex128)
input_data_real = np.concatenate([np.real(input_data), np.imag(input_data)], axis=-1)

# Afficher des informations détaillées sur les données
print("---- Informations sur les données d'entrée ----")
print("Type de données d'entrée:", input_data_real.dtype)
print("Shape des données d'entrée:", input_data_real.shape)
print("Statistiques descriptives des données d'entrée:")
print("   - Moyenne:", input_data_real.mean())
print("   - Écart-type:", input_data_real.std())
print("\n---- Informations sur les étiquettes ----")
print("Type des étiquettes:", labels.dtype)
print("Shape des étiquettes:", labels.shape)
print("Statistiques descriptives des étiquettes:")
print("   - Moyenne:", np.mean(labels))
print("   - Écart-type:", np.std(labels))

# Créer une instance du modèle
model = DOAEstimator(input_channels=20, num_classes=100)  # Mettez à jour input_channels en fonction de la nouvelle dimension des données

# Passer les données à travers le modèle
with torch.no_grad():
    model.eval()
    output = model(torch.from_numpy(input_data_real).float())

# Afficher les informations sur les dimensions des couches
print("\nDimensions de la sortie du modèle:", output.shape)

# Afficher la structure du modèle
print("\nStructure du modèle:")
print(model)


---- Informations sur les données d'entrée normalisées ----
Type de données d'entrée normalisée: complex128
Shape des données d'entrée normalisée: (1000, 10, 10)
Statistiques descriptives des données d'entrée normalisée:
   - Moyenne: (-3.430145056881884e-17+6.805667140952211e-19j)
   - Écart-type: 0.9999999999999999


In [82]:
# Normalisation des données d'entrée
mean_input = input_data.mean()
std_input = input_data.std()
input_data_normalized = (input_data - mean_input) / std_input

# Afficher des informations sur les données normalisées
print("---- Informations sur les données d'entrée normalisées ----")
print("Type de données d'entrée normalisée:", input_data_normalized.dtype)
print("Shape des données d'entrée normalisée:", input_data_normalized.shape)
print("Statistiques descriptives des données d'entrée normalisée:")
print("   - Moyenne:", input_data_normalized.mean())
print("   - Écart-type:", input_data_normalized.std())


---- Informations sur les données d'entrée normalisées ----
Type de données d'entrée normalisée: complex128
Shape des données d'entrée normalisée: (1000, 10, 10)
Statistiques descriptives des données d'entrée normalisée:
   - Moyenne: (-3.430145056881884e-17+6.805667140952211e-19j)
   - Écart-type: 0.9999999999999999


In [70]:
import numpy as np

# Remplacez ces valeurs factices par vos propres étiquettes
labels = all_estimated_angles  # Remplacez ceci par vos étiquettes réelles

# Obtenez le nombre de classes distinctes
num_classes = len(np.unique(labels))
print("Nombre de classes :", num_classes)


Nombre de classes : 1799


In [72]:
# Remplacez ceci par vos propres données d'entrée
input_data = all_R_hat  # Remplacez ceci par vos données d'entrée réelles

# Obtenez la forme des données d'entrée
input_size = input_data.shape
print("Input Size :", input_size)


Input Size : (1000, 10, 10)


In [83]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data

class DOAEstimationCNN(nn.Module):
    def __init__(self, input_channels=3, num_classes=1, num_filters=256, kernel_sizes=[5, 5, 3, 3]):
        super(DOAEstimationCNN, self).__init__()

        # Couches de convolution
        self.conv1 = nn.Conv2d(input_channels, num_filters, kernel_size=kernel_sizes[0], stride=1, padding=1)
        self.conv2 = nn.Conv2d(num_filters, num_filters, kernel_size=kernel_sizes[1], stride=1, padding=1)
        self.conv3 = nn.Conv2d(num_filters, num_filters, kernel_size=kernel_sizes[2], stride=1, padding=1)
        self.conv4 = nn.Conv2d(num_filters, num_filters, kernel_size=kernel_sizes[3], stride=1, padding=1)

        # Couche de mise à plat
        self.flatten = nn.Flatten()

        # Couches entièrement connectées (FC)
        self.fc1 = nn.Linear(num_filters * 10 * 10, 1024)
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, x):
        # Couches de convolution
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))

        # Couche de mise à plat
        x = self.flatten(x)

        # Couches entièrement connectées (FC)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

# Normalisation des données d'entrée
def normalize_input_data(data):
    # Normalisation des parties réelle et imaginaire séparément
    real_part = data.real
    imag_part = data.imag

    real_mean = real_part.mean()
    real_std = real_part.std()
    imag_mean = imag_part.mean()
    imag_std = imag_part.std()

    normalized_data = (data - (real_mean + 1j * imag_mean)) / (real_std + 1j * imag_std)
    
    return normalized_data

# Exemple d'utilisation de la normalisation
input_data_normalized = normalize_input_data(input_data)

# Création d'une instance du modèle avec les dimensions appropriées
model = DOAEstimationCNN(input_channels=3, num_classes=1, num_filters=256, kernel_sizes=[5, 5, 3, 3])

# Affichage du modèle
print(model)


import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Fonction de perte (à adapter en fonction de votre tâche)
criterion = nn.CrossEntropyLoss()

# Optimiseur
optimizer = optim.SGD(model_cnn.parameters(), lr=0.01, momentum=0.9)

# Exemple de DataLoader (à remplacer par vos propres DataLoader)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Fonction d'entraînement
def train(model, train_dataloader, criterion, optimizer):
    model.train()
    running_loss = 0.0

    for inputs, labels in train_dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(train_dataloader)

# Fonction d'évaluation
def evaluate(model, val_dataloader, criterion):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in val_dataloader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    accuracy = correct_predictions / total_samples
    return running_loss / len(val_dataloader), accuracy

# Boucle d'entraînement
num_epochs = 10

for epoch in range(num_epochs):
    train_loss = train(model, train_dataloader, criterion, optimizer)
    val_loss, val_accuracy = evaluate(model, val_dataloader, criterion)

    print(f'Epoch {epoch + 1}/{num_epochs} => '
          f'Training Loss: {train_loss:.4f}, '
          f'Validation Loss: {val_loss:.4f}, '
          f'Validation Accuracy: {val_accuracy:.4f}')

# À la fin de l'entraînement, vous pouvez sauvegarder le modèle si nécessaire
torch.save(model_cnn.state_dict(), 'doa_estimator_cnn.pth')



DOAEstimationCNN(
  (conv1): Conv2d(3, 256, kernel_size=(5, 5), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(256, 256, kernel_size=(5, 5), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=25600, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=1, bias=True)
)


RuntimeError: Given groups=1, weight of size [256, 3, 5, 5], expected input[1, 32, 10, 10] to have 3 channels, but got 32 channels instead