# Attack Detection

## 1. AutoEncoder Method

In [15]:
import tensorflow as tf
import App.dataset as dataset
from App.config import BATCH_SIZE

import time
from memory_profiler import profile
import psutil

import numpy as np

### Desactivation de la GPU pour l'inférence sur la CPU uniquement

In [5]:
# Desactivation de la GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Désactivez toutes les GPU
        tf.config.experimental.set_visible_devices([], 'GPU')
        print("GPU désactivée.")
    except RuntimeError as e:
        # La visibilité des GPU ne peut être modifiée qu'avant l'initialisation de TensorFlow
        print(e)
else:
    print("Aucune GPU disponible, utilisation du CPU.")

GPU désactivée.


In [7]:
# Load le modèle
model = tf.keras.models.load_model('bev_autoencoder.h5')



### Charge les données de test

In [12]:
test_dataset = dataset.create_bev_dataset_from_folder(
    folder_path="Data/dataset/sequences/00/velodyne_with_obstacles",
    batch_size=BATCH_SIZE
)

In [None]:
# Fonction qui detecte si oui ou non il y a un obstacle dans l'image en utilisant le modèle AutoEncoder

def detect_obstacle(model, image, threshold=0.05):
    """
    Détecte la présence d'un obstacle dans une image en comparant l'image originale avec l'image reconstruite.
    
    :param model: Modèle AutoEncoder chargé.
    :param image: Image à analyser (hauteur, largeur, canaux).
    :param threshold: Seuil pour la détection d'obstacle.
    :return: Booléen indiquant la présence d'un obstacle.
    """
    # Redimensionner l'image pour correspondre à l'entrée du modèle
    image = np.expand_dims(image, axis=0)  # Ajouter une dimension batch
    start_time = time.time()
    reconstructed_image = model.predict(image)
    end_time = time.time()
    print (f"Temps d'inférence : {end_time - start_time:.4f} secondes")

    # Calculer la différence entre l'image originale et l'image reconstruite
    difference = np.abs(image - reconstructed_image)
    print (f"Différence moyenne : {np.mean(difference)}")

    # Vérifier si la différence dépasse le seuil
    if np.mean(difference) > threshold:
        return True  # Obstacle détecté
    else:
        return False  # Pas d'obstacle détecté

# Fonction qui calcule le temps d'inférence d'un batch d'image
def infer_obstacle_detection(model, dataset):
    """
    Effectue une inférence de détection d'obstacle sur un dataset.
    
    :param model: Modèle AutoEncoder chargé.
    :param dataset: Dataset TensorFlow pour l'inférence.
    :return: Liste des résultats de détection d'obstacle.
    """
    results = []

    cpus = []

    for images, _ in dataset.take(1):
        for image in images:
            cpu_before = psutil.cpu_percent(interval=None)
            obstacle_detected = detect_obstacle(model, image.numpy())
            cpu_after = psutil.cpu_percent(interval=None)
            results.append(obstacle_detected)
            cpus.append(cpu_after - cpu_before)
    print ("CPU USAGE Mean : ", np.mean(cpus))

    return results  

# Exemple d'utilisation de la fonction de détection d'obstacle

def example_obstacle_detection(model, dataset, num_images=5):
    """
    Exemple d'utilisation de la fonction de détection d'obstacle sur un dataset.
    
    :param model: Modèle AutoEncoder chargé.
    :param dataset: Dataset TensorFlow pour l'inférence.
    :param num_images: Nombre d'images à analyser.
    """
    for images, _ in dataset.take(1):
        for i in range(num_images):
            image = images[i].numpy()
            obstacle_detected = detect_obstacle(model, image)
            print(f"Image {i + 1}: Obstacle détecté ? {'Oui' if obstacle_detected else 'Non'}")

In [None]:
infer_obstacle_detection(
    model=model,
    dataset=test_dataset
)

## 2. Ruled Based Method

In [None]:
# Fonction pour injecter des obstacles dans un nuage de points LiDAR
def inject_obstacles(lidar_points, num_obstacles=1, obstacle_size=1.0, density=1000, bounds=None):
    """
    Injecte un nombre donné d'obstacles dans un nuage de points LiDAR.
    
    :param lidar_points: Nuage de points original (numpy array de forme (N, 4) [x, y, z, intensité]).
    :param num_obstacles: Nombre d'obstacles à injecter.
    :param obstacle_size: Taille de chaque obstacle (rayon en mètres).
    :param density: Nombre de points par obstacle.
    :param bounds: Limites pour l'injection des obstacles (x_min, x_max, y_min, y_max, z_min, z_max).
                   Si None, les limites sont calculées à partir des points existants.
    :return: Nuage de points modifié avec les obstacles injectés.
    """
    # Calculer les limites si elles ne sont pas fournies
    if bounds is None:
        x_min, x_max = np.min(lidar_points[:, 0]), np.max(lidar_points[:, 0])
        y_min, y_max = np.min(lidar_points[:, 1]), np.max(lidar_points[:, 1])
        z_min, z_max = np.min(lidar_points[:, 2]), np.max(lidar_points[:, 2])
        
    else:
        x_min, x_max, y_min, y_max, z_min, z_max = bounds

    # Liste pour stocker les nouveaux points
    new_points = []

    for _ in range(num_obstacles):
        # Générer un centre aléatoire pour l'obstacle
        center_x = np.random.uniform(x_min, x_max)
        center_y = np.random.uniform(y_min, y_max)
        center_z = np.random.uniform(z_min, z_max)

        # Générer des points autour du centre
        obstacle_points = np.random.uniform(
            low=[center_x - obstacle_size, center_y - obstacle_size, center_z - obstacle_size, 0.1],
            high=[center_x + obstacle_size, center_y + obstacle_size, center_z + obstacle_size, 1.0],
            size=(density, 4)
        )
        new_points.append(obstacle_points)

    # Ajouter les nouveaux points au nuage de points existant
    new_points = np.vstack(new_points)
    lidar_points_with_obstacles = np.vstack((lidar_points, new_points))

    return lidar_points_with_obstacles

In [20]:
import os

# Function that read all the bin file, add obstacles and save them back into bin
def process_lidar_files_with_obstacles(source_dir, target_dir, num_obstacles=1, obstacle_size=1.0, density=50):
    """
    Traite tous les fichiers LiDAR dans un dossier, injecte des obstacles et enregistre les résultats.
    
    :param source_dir: Dossier source contenant les fichiers .bin LiDAR.
    :param target_dir: Dossier cible pour enregistrer les fichiers modifiés.
    :param num_obstacles: Nombre d'obstacles à injecter dans chaque nuage de points.
    :param obstacle_size: Taille de chaque obstacle (rayon en mètres).
    :param density: Nombre de points par obstacle.
    """
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)

    for filename in os.listdir(source_dir):
        if filename.endswith('.bin'):
            file_path = os.path.join(source_dir, filename)
            lidar_points = dataset.read_kitti_lidar(file_path)
            lidar_points_with_obstacles = inject_obstacles(
                lidar_points,
                num_obstacles=num_obstacles,
                obstacle_size=obstacle_size,
                density=density
            )
            target_file_path = os.path.join(target_dir, filename)
            lidar_points_with_obstacles.astype(np.float32).tofile(target_file_path)

# Exemple d'utilisation de la fonction pour traiter les fichiers LiDAR
source_dir = "Data/dataset/sequences/02/velodyne/"
target_dir = "Data/dataset/sequences/02/velodyne_with_obstacles/"
process_lidar_files_with_obstacles(
    source_dir=source_dir,
    target_dir=target_dir,
    num_obstacles=10,
    obstacle_size=2.0,
    density=100
)


In [21]:
# Read the test dataset with obstacles
test_dataset_with_obstacles = dataset.create_bev_dataset_from_folder(
    folder_path="Data/dataset/sequences/02/velodyne_with_obstacles",
    batch_size=BATCH_SIZE
)

In [22]:
from scipy.ndimage import label
import time
from memory_profiler import memory_usage
import psutil


# Fonction qui utilise des règles pour la detection d'anomalies dans un lot d'images BEV
def rule_based_anomaly_detection(bev_batch, intensity_threshold=0.1, density_threshold=0.5, min_cluster_size=10000):
    """
    Détecte les anomalies dans un lot d'images BEV en utilisant des règles basées sur l'intensité et la densité.
    
    :param bev_batch: Lot d'images BEV (numpy array de forme [batch_size, channels, height, width]).
    :param intensity_threshold: Seuil minimal pour considérer un point comme valide (canal intensité).
    :param density_threshold: Seuil minimal pour considérer un point comme valide (canal densité).
    :param min_cluster_size: Taille minimale d'un cluster pour qu'il soit considéré comme valide.
    :return: Liste des indices des images contenant des anomalies.
    """
    anomaly_indices = []


    start_time = time.time()
    for i, bev in enumerate(bev_batch):
        valid_points = (bev[0] > intensity_threshold) & (bev[2] > density_threshold)
        labeled_clusters, num_clusters = label(valid_points)
        # Vérifier la taille des clusters
        for cluster_id in range(1, num_clusters + 1):
            cluster_size = np.sum(labeled_clusters == cluster_id)
            if cluster_size < min_cluster_size:
                anomaly_indices.append(i)
                break  # Anomalie détectée dans cette image

    end_time = time.time()

    return anomaly_indices, end_time - start_time

In [None]:
counter = 0
times = []
cpu_usasges = []

for images, _ in test_dataset_with_obstacles.take(1):
    # Convertir les images en numpy array
    bev_batch = images.numpy()
    # Détecter les anomalies dans le lot d'images en utilisant des règles

    cpu_before = psutil.cpu_percent(interval=None)
    anomalies_detected = rule_based_anomaly_detection(
        bev_batch=bev_batch,
        intensity_threshold=0.5,
        density_threshold=0.5,
        min_cluster_size=10000
    )
    cpu_after = psutil.cpu_percent(interval=None)

    memory_usage_in_mb = memory_usage((rule_based_anomaly_detection, (bev_batch, 0.5, 0.5, 10000)), max_usage=True)
    print (f"Utilisation maximale de la mémoire : {memory_usage_in_mb} MB")

    times.append(anomalies_detected[1])
    cpu_usasges.append(cpu_after - cpu_before)

    for i in anomalies_detected[0]:
        counter += 1
        print(f"Anomalie détectée dans l'image {i + 1} du lot.")

print(f"Nombre total d'anomalies détectées dans le lot : {counter}/{len(bev_batch)}")
print(f"Temps moyen de détection d'anomalies par image (CPU): {np.mean(cpu_usasges):.4f} %")
print (f"Temps moyen de détection d'anomalies par image : {np.mean(times):.4f} secondes")