# D√©monstration : Attaques DoS/DDoS et D√©fenses

Ce notebook simule des attaques par d√©ni de service (DoS/DDoS) et pr√©sente les m√©canismes de d√©fense.

‚ö†Ô∏è **AVERTISSEMENT** : Ce contenu est √† but √©ducatif uniquement. Les attaques DoS/DDoS sont **ill√©gales** sans autorisation explicite.

## üìö Concepts couverts

1. **SYN Flood Attack** (simulation)
2. **Amplification Attacks** (DNS, NTP)
3. **D√©fenses** : SYN Cookies, Rate Limiting, Filtering
4. **D√©tection** d'anomalies de trafic

---

In [None]:
import time
import random
from collections import deque, defaultdict
from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np

## 1. Simulation SYN Flood Attack

### Principe

Une attaque **SYN Flood** exploite le handshake TCP en trois √©tapes :

```
1. Client ‚Üí Server : SYN
2. Server ‚Üí Client : SYN-ACK (serveur alloue ressources)
3. Client ‚Üí Server : ACK (connexion √©tablie)
```

**Attaque** : L'attaquant envoie des milliers de paquets SYN avec des adresses IP sources forg√©es, mais ne r√©pond jamais avec l'ACK final. Le serveur garde des connexions semi-ouvertes jusqu'√† timeout, √©puisant ses ressources.

### Simulation

In [None]:
class TCPServer:
    """Serveur TCP simul√© vuln√©rable au SYN Flood"""
    
    def __init__(self, max_connections=100, timeout=5.0):
        self.max_connections = max_connections
        self.timeout = timeout
        # Connexions semi-ouvertes : {(src_ip, src_port): timestamp}
        self.half_open_connections = {}
        # Connexions √©tablies
        self.established_connections = set()
        # Statistiques
        self.syn_received = 0
        self.connections_established = 0
        self.connections_refused = 0
    
    def receive_syn(self, src_ip: str, src_port: int) -> bool:
        """Re√ßoit un paquet SYN"""
        self.syn_received += 1
        
        # Nettoie les connexions expir√©es
        self._cleanup_expired()
        
        # V√©rifie si on peut accepter la connexion
        if len(self.half_open_connections) >= self.max_connections:
            self.connections_refused += 1
            return False  # Table de connexions pleine
        
        # Ajoute la connexion semi-ouverte
        conn_id = (src_ip, src_port)
        self.half_open_connections[conn_id] = time.time()
        
        # Envoie SYN-ACK (simul√©)
        return True
    
    def receive_ack(self, src_ip: str, src_port: int) -> bool:
        """Re√ßoit le ACK final pour √©tablir la connexion"""
        conn_id = (src_ip, src_port)
        
        if conn_id in self.half_open_connections:
            # Connexion √©tablie
            del self.half_open_connections[conn_id]
            self.established_connections.add(conn_id)
            self.connections_established += 1
            return True
        
        return False
    
    def _cleanup_expired(self):
        """Supprime les connexions expir√©es"""
        current_time = time.time()
        expired = [
            conn_id for conn_id, timestamp in self.half_open_connections.items()
            if current_time - timestamp > self.timeout
        ]
        for conn_id in expired:
            del self.half_open_connections[conn_id]
    
    def get_stats(self) -> dict:
        """Retourne les statistiques du serveur"""
        return {
            'syn_received': self.syn_received,
            'half_open': len(self.half_open_connections),
            'established': len(self.established_connections),
            'connections_established': self.connections_established,
            'connections_refused': self.connections_refused
        }

In [None]:
def simulate_syn_flood(server: TCPServer, num_packets: int, legitimate_ratio: float = 0.1):
    """Simule une attaque SYN Flood"""
    print(f"üéØ Simulation SYN Flood : {num_packets} paquets")
    print(f"   Ratio l√©gitime : {legitimate_ratio*100:.0f}%\n")
    
    for i in range(num_packets):
        # G√©n√®re une adresse IP al√©atoire (attaque avec spoofing)
        src_ip = f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
        src_port = random.randint(10000, 65535)
        
        # Envoie le SYN
        accepted = server.receive_syn(src_ip, src_port)
        
        # Les connexions l√©gitimes envoient l'ACK final
        if accepted and random.random() < legitimate_ratio:
            server.receive_ack(src_ip, src_port)
        
        # Simule un d√©lai entre les paquets
        if i % 100 == 0:
            time.sleep(0.01)  # 10ms
    
    stats = server.get_stats()
    print("\nüìä R√©sultats :")
    print(f"   SYN re√ßus : {stats['syn_received']}")
    print(f"   Connexions semi-ouvertes : {stats['half_open']}")
    print(f"   Connexions √©tablies : {stats['connections_established']}")
    print(f"   Connexions refus√©es : {stats['connections_refused']}")
    
    if stats['connections_refused'] > 0:
        print("\n‚ùå ATTAQUE R√âUSSIE : Le serveur a refus√© des connexions (DoS)")
    else:
        print("\n‚úÖ Le serveur a r√©sist√© √† l'attaque")

In [None]:
# Test 1 : Serveur vuln√©rable avec faible capacit√©
print("=" * 60)
print("TEST 1: Serveur vuln√©rable (max 100 connexions)")
print("=" * 60)

vulnerable_server = TCPServer(max_connections=100, timeout=5.0)
simulate_syn_flood(vulnerable_server, num_packets=500, legitimate_ratio=0.05)

# Test 2 : Serveur avec capacit√© augment√©e
print("\n" + "=" * 60)
print("TEST 2: Serveur avec plus de capacit√© (max 1000 connexions)")
print("=" * 60)

robust_server = TCPServer(max_connections=1000, timeout=5.0)
simulate_syn_flood(robust_server, num_packets=500, legitimate_ratio=0.05)

## 2. D√©fense : SYN Cookies

### Principe

Les **SYN Cookies** √©vitent de stocker l'√©tat des connexions semi-ouvertes. Le serveur encode l'information dans le num√©ro de s√©quence du SYN-ACK :

```
seq_number = Hash(src_ip, src_port, dst_ip, dst_port, timestamp, secret)
```

Lorsque le client renvoie l'ACK, le serveur peut v√©rifier et reconstruire l'√©tat sans avoir stock√© la connexion.

In [None]:
import hashlib

class TCPServerWithSynCookies:
    """Serveur TCP avec protection SYN Cookies"""
    
    def __init__(self):
        self.secret = random.randbytes(32)
        self.established_connections = set()
        # Statistiques
        self.syn_received = 0
        self.connections_established = 0
        self.invalid_acks = 0
    
    def _generate_cookie(self, src_ip: str, src_port: int, timestamp: int) -> int:
        """G√©n√®re un SYN cookie"""
        data = f"{src_ip}:{src_port}:{timestamp}".encode()
        h = hashlib.sha256(self.secret + data).digest()
        # Prend les 4 premiers bytes comme cookie
        cookie = int.from_bytes(h[:4], 'big')
        return cookie
    
    def receive_syn(self, src_ip: str, src_port: int) -> Tuple[bool, int]:
        """Re√ßoit un SYN et g√©n√®re un cookie"""
        self.syn_received += 1
        
        # G√©n√®re un cookie bas√© sur le timestamp actuel
        timestamp = int(time.time() / 60)  # Window de 60 secondes
        cookie = self._generate_cookie(src_ip, src_port, timestamp)
        
        # Retourne le cookie (qui sera le seq_number du SYN-ACK)
        return True, cookie
    
    def receive_ack(self, src_ip: str, src_port: int, ack_number: int) -> bool:
        """V√©rifie l'ACK avec le cookie"""
        # V√©rifie le cookie pour la fen√™tre actuelle et pr√©c√©dente
        current_time = int(time.time() / 60)
        
        for timestamp in [current_time, current_time - 1]:
            expected_cookie = self._generate_cookie(src_ip, src_port, timestamp)
            
            if ack_number == expected_cookie + 1:  # ACK = SEQ + 1
                conn_id = (src_ip, src_port)
                self.established_connections.add(conn_id)
                self.connections_established += 1
                return True
        
        self.invalid_acks += 1
        return False
    
    def get_stats(self) -> dict:
        return {
            'syn_received': self.syn_received,
            'established': len(self.established_connections),
            'connections_established': self.connections_established,
            'invalid_acks': self.invalid_acks
        }

In [None]:
def simulate_syn_flood_with_cookies(server: TCPServerWithSynCookies, num_packets: int, legitimate_ratio: float = 0.1):
    """Simule une attaque SYN Flood contre un serveur avec SYN Cookies"""
    print(f"üéØ Simulation SYN Flood avec SYN Cookies : {num_packets} paquets")
    print(f"   Ratio l√©gitime : {legitimate_ratio*100:.0f}%\n")
    
    for i in range(num_packets):
        src_ip = f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
        src_port = random.randint(10000, 65535)
        
        # Envoie le SYN et re√ßoit le cookie
        accepted, cookie = server.receive_syn(src_ip, src_port)
        
        # Les connexions l√©gitimes envoient l'ACK avec le bon cookie
        if accepted and random.random() < legitimate_ratio:
            ack_number = cookie + 1
            server.receive_ack(src_ip, src_port, ack_number)
    
    stats = server.get_stats()
    print("\nüìä R√©sultats :")
    print(f"   SYN re√ßus : {stats['syn_received']}")
    print(f"   Connexions √©tablies : {stats['connections_established']}")
    print(f"   ACK invalides : {stats['invalid_acks']}")
    print("\n‚úÖ Serveur prot√©g√© : Aucune ressource √©puis√©e (pas de stockage d'√©tat)")

# Test avec SYN Cookies
print("=" * 60)
print("TEST 3: Serveur avec SYN Cookies")
print("=" * 60)

protected_server = TCPServerWithSynCookies()
simulate_syn_flood_with_cookies(protected_server, num_packets=1000, legitimate_ratio=0.05)

## 3. Amplification Attack Simulation

### Principe

Une **attaque par amplification** exploite des serveurs tiers (DNS, NTP, etc.) pour amplifier le trafic vers la victime :

```
1. Attaquant envoie une petite requ√™te (64 bytes) avec IP source forg√©e (victime)
2. Serveur DNS r√©pond avec une grosse r√©ponse (3000+ bytes) √† la victime
3. Facteur d'amplification : 3000/64 ‚âà 47x
```

### Calcul du facteur d'amplification

In [None]:
class AmplificationAttack:
    """Calcule l'amplification de diff√©rents protocoles"""
    
    # Tailles typiques (en bytes)
    PROTOCOLS = {
        'DNS': {'request': 60, 'response': 3000, 'description': 'ANY query'},
        'NTP': {'request': 48, 'response': 468, 'description': 'monlist command'},
        'SSDP': {'request': 120, 'response': 1500, 'description': 'SSDP discovery'},
        'Memcached': {'request': 15, 'response': 750000, 'description': 'stats command'},
        'SNMP': {'request': 150, 'response': 1500, 'description': 'GetBulk request'},
    }
    
    @staticmethod
    def calculate_amplification(protocol: str) -> dict:
        """Calcule le facteur d'amplification"""
        data = AmplificationAttack.PROTOCOLS[protocol]
        factor = data['response'] / data['request']
        
        return {
            'protocol': protocol,
            'request_size': data['request'],
            'response_size': data['response'],
            'factor': factor,
            'description': data['description']
        }
    
    @staticmethod
    def simulate_attack(protocol: str, num_reflectors: int, attacker_bandwidth_mbps: float):
        """Simule une attaque par amplification"""
        amp = AmplificationAttack.calculate_amplification(protocol)
        
        # Bande passante de l'attaquant en bytes/s
        attacker_bw_bytes = (attacker_bandwidth_mbps * 1_000_000) / 8
        
        # Nombre de requ√™tes par seconde
        requests_per_sec = attacker_bw_bytes / amp['request_size']
        
        # Trafic total vers la victime (amplifi√©)
        victim_traffic_bytes = requests_per_sec * amp['response_size']
        victim_traffic_mbps = (victim_traffic_bytes * 8) / 1_000_000
        
        # Avec plusieurs reflectors
        victim_traffic_total_mbps = victim_traffic_mbps * num_reflectors
        
        return {
            'attacker_bw_mbps': attacker_bandwidth_mbps,
            'num_reflectors': num_reflectors,
            'requests_per_sec': requests_per_sec,
            'victim_traffic_mbps': victim_traffic_total_mbps,
            'amplification_factor': amp['factor']
        }

# Analyse des diff√©rents protocoles
print("=" * 70)
print("FACTEURS D'AMPLIFICATION PAR PROTOCOLE")
print("=" * 70)

for protocol in AmplificationAttack.PROTOCOLS.keys():
    amp = AmplificationAttack.calculate_amplification(protocol)
    print(f"\n{protocol:12s} | {amp['description']}")
    print(f"  Requ√™te  : {amp['request_size']:8d} bytes")
    print(f"  R√©ponse  : {amp['response_size']:8d} bytes")
    print(f"  Facteur  : {amp['factor']:8.1f}x")

# Simulation d'attaque
print("\n" + "=" * 70)
print("SIMULATION: Attaque DNS Amplification")
print("=" * 70)

result = AmplificationAttack.simulate_attack(
    protocol='DNS',
    num_reflectors=1000,
    attacker_bandwidth_mbps=10
)

print(f"\nSc√©nario :")
print(f"  Bande passante attaquant : {result['attacker_bw_mbps']:.0f} Mbps")
print(f"  Nombre de r√©flecteurs    : {result['num_reflectors']}")
print(f"  Facteur d'amplification  : {result['amplification_factor']:.1f}x")
print(f"\nüéØ R√©sultat :")
print(f"  Requ√™tes par seconde     : {result['requests_per_sec']:,.0f}")
print(f"  Trafic vers la victime   : {result['victim_traffic_mbps']:,.0f} Mbps ({result['victim_traffic_mbps']/1000:.1f} Gbps)")
print(f"\n‚ùå Impact : L'attaquant avec seulement 10 Mbps g√©n√®re {result['victim_traffic_mbps']/result['attacker_bw_mbps']:.0f}x plus de trafic!")

## 4. Rate Limiting (D√©fense)

Le **rate limiting** limite le nombre de requ√™tes par IP/utilisateur dans une fen√™tre de temps.

In [None]:
class RateLimiter:
    """Implemente un rate limiter avec sliding window"""
    
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        # {client_id: deque[timestamps]}
        self.requests = defaultdict(deque)
    
    def allow_request(self, client_id: str) -> bool:
        """V√©rifie si la requ√™te est autoris√©e"""
        current_time = time.time()
        client_requests = self.requests[client_id]
        
        # Supprime les requ√™tes hors de la fen√™tre
        while client_requests and current_time - client_requests[0] > self.window_seconds:
            client_requests.popleft()
        
        # V√©rifie si on peut accepter
        if len(client_requests) < self.max_requests:
            client_requests.append(current_time)
            return True
        
        return False

# D√©monstration
print("=" * 60)
print("D√âMONSTRATION: Rate Limiting")
print("=" * 60)
print("Configuration: Max 10 requ√™tes par 5 secondes\n")

limiter = RateLimiter(max_requests=10, window_seconds=5.0)

# Client l√©gitime
print("--- Client l√©gitime (taux normal) ---")
for i in range(15):
    allowed = limiter.allow_request("client_1")
    status = "‚úÖ Autoris√©e" if allowed else "‚ùå Bloqu√©e"
    print(f"Requ√™te {i+1:2d}: {status}")
    time.sleep(0.3)  # 300ms entre requ√™tes

# Attaquant (burst)
print("\n--- Attaquant (burst rapide) ---")
allowed_count = 0
blocked_count = 0

for i in range(100):
    allowed = limiter.allow_request("attacker")
    if allowed:
        allowed_count += 1
    else:
        blocked_count += 1

print(f"Autoris√©es : {allowed_count}")
print(f"Bloqu√©es   : {blocked_count}")
print(f"\n‚úÖ Rate limiting effectif : {blocked_count/100*100:.0f}% des requ√™tes bloqu√©es")

## 5. D√©tection d'anomalies de trafic

D√©tection d'une attaque DoS par analyse statistique du trafic.

In [None]:
def generate_traffic(duration_seconds: int, attack_start: int, attack_duration: int):
    """G√©n√®re un trafic simul√© avec une p√©riode d'attaque"""
    traffic = []
    
    for second in range(duration_seconds):
        if attack_start <= second < attack_start + attack_duration:
            # Pendant l'attaque : trafic tr√®s √©lev√©
            packets = random.randint(8000, 12000)
        else:
            # Trafic normal : avec variation
            baseline = 1000
            variation = random.randint(-200, 200)
            packets = baseline + variation
        
        traffic.append(packets)
    
    return traffic

def detect_anomalies(traffic: List[int], threshold_std: float = 3.0) -> List[int]:
    """D√©tecte les anomalies avec la m√©thode du Z-score"""
    mean = np.mean(traffic)
    std = np.std(traffic)
    
    anomalies = []
    for i, value in enumerate(traffic):
        z_score = (value - mean) / std
        if abs(z_score) > threshold_std:
            anomalies.append(i)
    
    return anomalies, mean, std

# G√©n√®re et analyse le trafic
print("=" * 60)
print("D√âTECTION D'ANOMALIES DE TRAFIC")
print("=" * 60)

duration = 120  # 2 minutes
attack_start = 60
attack_duration = 20

traffic = generate_traffic(duration, attack_start, attack_duration)
anomalies, mean, std = detect_anomalies(traffic, threshold_std=3.0)

print(f"\nStatistiques du trafic :")
print(f"  Moyenne : {mean:.0f} paquets/s")
print(f"  √âcart-type : {std:.0f} paquets/s")
print(f"  Seuil d'alerte : {mean + 3*std:.0f} paquets/s")
print(f"\nAnomalies d√©tect√©es : {len(anomalies)} secondes")

if anomalies:
    print(f"P√©riode d'attaque d√©tect√©e : secondes {min(anomalies)} √† {max(anomalies)}")
    print(f"P√©riode d'attaque r√©elle   : secondes {attack_start} √† {attack_start + attack_duration}")
    
    # Calcule la pr√©cision de la d√©tection
    true_attack_seconds = set(range(attack_start, attack_start + attack_duration))
    detected_seconds = set(anomalies)
    true_positives = len(true_attack_seconds & detected_seconds)
    precision = true_positives / len(detected_seconds) if detected_seconds else 0
    recall = true_positives / len(true_attack_seconds) if true_attack_seconds else 0
    
    print(f"\nüìä Pr√©cision de d√©tection :")
    print(f"   Precision : {precision*100:.1f}%")
    print(f"   Recall    : {recall*100:.1f}%")

# Visualisation
plt.figure(figsize=(12, 6))
plt.plot(traffic, label='Trafic', linewidth=1)
plt.axhline(y=mean, color='g', linestyle='--', label='Moyenne', alpha=0.7)
plt.axhline(y=mean + 3*std, color='r', linestyle='--', label='Seuil (3œÉ)', alpha=0.7)
plt.axvline(x=attack_start, color='orange', linestyle=':', label='D√©but attaque', alpha=0.7)
plt.axvline(x=attack_start + attack_duration, color='orange', linestyle=':', label='Fin attaque', alpha=0.7)

# Marque les anomalies
if anomalies:
    plt.scatter(anomalies, [traffic[i] for i in anomalies], 
                color='red', s=50, label='Anomalies d√©tect√©es', zorder=5)

plt.xlabel('Temps (secondes)')
plt.ylabel('Paquets par seconde')
plt.title('D√©tection d\'attaque DoS par analyse de trafic')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print("\n‚úÖ Anomalie d√©tect√©e avec succ√®s!")

## üìä R√©sum√© des d√©fenses contre DoS/DDoS

### ‚úÖ D√©fenses au niveau r√©seau

1. **SYN Cookies** : √âvite l'√©puisement de la table de connexions TCP
2. **Rate Limiting** : Limite les requ√™tes par IP/client
3. **IP Filtering** : Bloque les adresses IP suspectes
4. **Geo-blocking** : Bloque les pays sources d'attaques
5. **BPF (Berkeley Packet Filter)** : Filtrage au niveau kernel

### ‚úÖ D√©fenses au niveau infrastructure

1. **CDN** (Cloudflare, Akamai) : Distribue le trafic g√©ographiquement
2. **Scrubbing Centers** : Nettoie le trafic malveillant avant qu'il atteigne la cible
3. **Anycast** : Distribue les requ√™tes sur plusieurs serveurs
4. **Load Balancing** : R√©partit la charge sur plusieurs machines

### ‚úÖ D√©tection et mitigation

1. **IDS/IPS** : D√©tecte les patterns d'attaque
2. **NetFlow Analysis** : Analyse statistique du trafic
3. **Machine Learning** : D√©tection d'anomalies
4. **CAPTCHA** : Distingue humains et bots
5. **Blackholing** : Redirige le trafic malveillant vers un null route

---

## ‚ö†Ô∏è Limitations

Aucune d√©fense n'est parfaite contre les DDoS massifs :
- **Attaques volum√©triques** (Tbps) peuvent saturer m√™me les CDN
- **Co√ªt** : Mitigation professionnelle tr√®s co√ªteuse
- **Faux positifs** : Le rate limiting peut bloquer des utilisateurs l√©gitimes
- **Attaques cibl√©es** : Difficile de distinguer l√©gitime vs malveillant

---

## üîó R√©f√©rences

- **RFC 4987** : TCP SYN Flooding Attacks and Common Mitigations
- **Cloudflare DDoS Reports** : https://blog.cloudflare.com/tag/ddos/
- **OWASP** : Denial of Service Cheat Sheet