# Tests de charge

In [2]:
# installations
!pip install -q aiohttp
!pip install -q nest_asyncio
!pip install -q pyngrok


## Mesurer le temps de réponse moyen (latence) lorsque l'API reçoit un grand nombre de requêtes simultanées.

In [10]:
import asyncio
import aiohttp
import time

# CORRECTION : Utiliser le point de terminaison de l'API, pas la page de documentation
API_URL = "http://34.38.214.124/analyze" 

async def send_request(session, text ):
    payload = {"text": text, "model": "simple"}
    async with session.post(API_URL, json=payload) as r:
        # Vérifier que la requête a réussi (Code 200)
        if r.status != 200:
            print(f"Erreur HTTP: {r.status}")
        return await r.json()

async def load_test(n_users=5000):
    """Simule 5000 utilisateurs envoyant une requête en même temps."""
    start = time.time()
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request(session, "Hello world! This is a test comment.") for _ in range(n_users)]
        await asyncio.gather(*tasks)
    
    duration = time.time() - start
    rps = n_users / duration
    print(f"\n--- Test de Charge Basique ---")
    print(f"{n_users} requêtes exécutées en {duration:.2f}s")
    print(f"Débit (RPS) : {rps:.2f}")

await load_test()
# asyncio.run(load_test())


  async def __aenter__(self) -> _RetType:



--- Test de Charge Basique ---
5000 requêtes exécutées en 25.97s
Débit (RPS) : 192.52


## Déterminer la capacité maximale de l'API avant qu'elle ne commence à générer des erreurs (5xx) ou que la latence ne devienne inacceptable.

In [5]:
async def stress_test(start_users=100, step=100, max_steps=5):
    """Augmente progressivement la charge pour trouver le point de rupture."""
    print(f"\n--- Test de Résistance (Stress Test) ---")
    for i in range(max_steps):
        n_users = start_users + i * step
        start = time.time()
        
        async with aiohttp.ClientSession( ) as session:
            tasks = [send_request(session, "Stress test payload.") for _ in range(n_users)]
            results = await asyncio.gather(*tasks, return_exceptions=True)
        
        duration = time.time() - start
        rps = n_users / duration
        
        # Compter les erreurs (exceptions ou erreurs HTTP)
        errors = sum(1 for r in results if isinstance(r, Exception) or (isinstance(r, dict) and 'error' in r))
        
        print(f"Charge: {n_users} requêtes | RPS: {rps:.2f} | Erreurs: {errors}")
        
        # Si le taux d'erreur dépasse 5%, on considère que le point de rupture est atteint
        if errors / n_users > 0.05 and n_users > start_users:
            print(f"Point de rupture probable atteint à {n_users} requêtes.")
            break

await stress_test()



--- Test de Résistance (Stress Test) ---
Charge: 100 requêtes | RPS: 91.72 | Erreurs: 0
Charge: 200 requêtes | RPS: 155.25 | Erreurs: 0
Charge: 300 requêtes | RPS: 164.49 | Erreurs: 0
Charge: 400 requêtes | RPS: 166.39 | Erreurs: 0
Charge: 500 requêtes | RPS: 128.73 | Erreurs: 0


## Détecter les problèmes de performance qui n'apparaissent qu'après une utilisation prolongée, comme les fuites de mémoire ou la dégradation des performances.

In [11]:
async def soak_test(duration_minutes=10, interval_seconds=5, users_per_interval=50):
    """Maintient une charge constante pendant une longue période."""
    print(f"\n--- Test de Durée (Soak Test) ---")
    end_time = time.time() + duration_minutes * 60
    total_requests = 0
    
    while time.time() < end_time:
        start_interval = time.time()
        async with aiohttp.ClientSession( ) as session:
            tasks = [send_request(session, "Soak test payload.") for _ in range(users_per_interval)]
            await asyncio.gather(*tasks)
        
        total_requests += users_per_interval
        elapsed = time.time() - start_interval
        
        # Attendre pour maintenir l'intervalle
        await asyncio.sleep(max(0, interval_seconds - elapsed))
        
        print(f"Temps écoulé: {(time.time() - (end_time - duration_minutes * 60)) / 60:.1f} min | Total requêtes: {total_requests}")

await soak_test()



--- Test de Durée (Soak Test) ---
Temps écoulé: 0.1 min | Total requêtes: 50
Temps écoulé: 0.2 min | Total requêtes: 100
Temps écoulé: 0.3 min | Total requêtes: 150
Temps écoulé: 0.4 min | Total requêtes: 200
Temps écoulé: 0.5 min | Total requêtes: 250
Temps écoulé: 0.5 min | Total requêtes: 300
Temps écoulé: 0.6 min | Total requêtes: 350
Temps écoulé: 0.7 min | Total requêtes: 400
Temps écoulé: 0.8 min | Total requêtes: 450
Temps écoulé: 0.9 min | Total requêtes: 500
Temps écoulé: 1.0 min | Total requêtes: 550
Temps écoulé: 1.0 min | Total requêtes: 600
Temps écoulé: 1.1 min | Total requêtes: 650
Temps écoulé: 1.2 min | Total requêtes: 700
Temps écoulé: 1.3 min | Total requêtes: 750
Temps écoulé: 1.4 min | Total requêtes: 800
Temps écoulé: 1.5 min | Total requêtes: 850
Temps écoulé: 1.5 min | Total requêtes: 900
Temps écoulé: 1.6 min | Total requêtes: 950
Temps écoulé: 1.7 min | Total requêtes: 1000
Temps écoulé: 1.8 min | Total requêtes: 1050
Temps écoulé: 1.9 min | Total requêtes: 

## Mesurer la différence de latence entre le traitement d'un texte court et simple et celui d'un texte long et complexe (avec beaucoup de ponctuation, d'entités nommées, etc.).

In [15]:
async def scenario_test(n_requests=1000):
    """Compare la latence entre des requêtes simples et complexes."""
    
    simple_text = "Derrick Feliho, a very bad student at the University of Paris"
    complex_text = "Dr. John Smith, a resident of 123 Main St, New York, NY, wrote a very long and complicated fake comment about the organization's policy on the 15th of March 2025 and he insult all the students in this school by motherfucker."

    print(f"\n--- Test de Cas d'Usage (Scénario) ---")
    
    # Scénario 1 : Texte Simple
    start_simple = time.time()
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request(session, simple_text) for _ in range(n_requests)]
        await asyncio.gather(*tasks)
    duration_simple = time.time() - start_simple
    print(f"Simple ({n_requests} req): {duration_simple:.2f}s | Latence Moyenne: {duration_simple/n_requests*1000:.2f}ms")

    # Scénario 2 : Texte Complexe (met à l'épreuve l'anonymisation NLTK)
    start_complex = time.time()
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request(session, complex_text) for _ in range(n_requests)]
        await asyncio.gather(*tasks)
    duration_complex = time.time() - start_complex
    print(f"Complexe ({n_requests} req): {duration_complex:.2f}s | Latence Moyenne: {duration_complex/n_requests*1000:.2f}ms")

await scenario_test()



--- Test de Cas d'Usage (Scénario) ---
Simple (1000 req): 6.85s | Latence Moyenne: 6.85ms
Complexe (1000 req): 8.07s | Latence Moyenne: 8.07ms


## Mesurer le temps de réponse de la toute première requête après une période d'inactivité (mise à l'échelle de 0 à 1).

In [16]:
async def cold_start_test():
    """Mesure le temps de réponse du premier appel (cold start) et des suivants."""
    print(f"\n--- Test de Montée en Charge à Froid (Cold Start) ---")
    
    # Étape 1 : Le premier appel (Cold Start)
    start_cold = time.time()
    async with aiohttp.ClientSession( ) as session:
        await send_request(session, "First call to wake up the service.")
    duration_cold = time.time() - start_cold
    print(f"Latence du Premier Appel (Cold Start) : {duration_cold:.2f}s")

    # Étape 2 : Les appels suivants (Warm Start)
    n_warm = 100
    start_warm = time.time()
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request(session, "Warm call.") for _ in range(n_warm)]
        await asyncio.gather(*tasks)
    duration_warm = time.time() - start_warm
    avg_warm_latency = (duration_warm / n_warm) * 1000
    print(f"Latence Moyenne des {n_warm} Appels Suivants : {avg_warm_latency:.2f}ms")
    
    # Évaluation : Le temps de Cold Start doit être significativement plus long que la latence moyenne.

await cold_start_test()



--- Test de Montée en Charge à Froid (Cold Start) ---
Latence du Premier Appel (Cold Start) : 0.07s
Latence Moyenne des 100 Appels Suivants : 17.13ms


## Vérifier que l'API ne plante pas (pas d'erreur 500) face à des textes très longs, des entrées vides, ou des entrées contenant uniquement des données personnelles (PII).

In [17]:
async def edge_case_test():
    """Teste la résilience de l'API face à des entrées extrêmes."""
    print(f"\n--- Test de Cas Limites (Edge Case) ---")
    
    # 1. Texte Extrêmement Long (simule un article entier)
    long_text = "A" * 10000 + " This is the end."
    
    # 2. Texte PII Pur (simule une tentative d'injection de données personnelles)
    pii_text = "My name is John Smith and my credit card is 1234567890123456. I live at 123 Main Street."
    
    # 3. Texte Vide
    empty_text = ""
    
    test_cases = {
        "Long Text (10k chars)": long_text,
        "PII Text (RGPD test)": pii_text,
        "Empty Text": empty_text
    }
    
    async with aiohttp.ClientSession( ) as session:
        for name, text in test_cases.items():
            start = time.time()
            result = await send_request(session, text)
            duration = time.time() - start
            
            status = "SUCCÈS" if result and 'social_score' in result else "ÉCHEC (500)"
            
            print(f"Cas: {name} | Statut: {status} | Latence: {duration:.2f}s")
            
            # Évaluation RGPD : Vérifier que le score est calculé et que le service n'a pas planté.
            if name == "PII Text (RGPD test)" and status == "SUCCÈS":
                print(f"  -> Score obtenu: {result.get('social_score')}")

await edge_case_test()



--- Test de Cas Limites (Edge Case) ---
Erreur HTTP: 422
Cas: Long Text (10k chars) | Statut: ÉCHEC (500) | Latence: 0.09s
Cas: PII Text (RGPD test) | Statut: ÉCHEC (500) | Latence: 0.10s
Erreur HTTP: 422
Cas: Empty Text | Statut: ÉCHEC (500) | Latence: 0.10s


## Simuler un pic de 1000 requêtes, mais en limitant la concurrence à 500 pour éviter de saturer le client de test et pour mesurer la performance sous une charge contrôlée.

In [18]:
import asyncio
import aiohttp
import time

# HYPOTHÈSE : L'URL de l'API est http://34.38.214.124/analyze
API_URL = "http://34.38.214.124/analyze" 

# --- Configuration du Test ---
TOTAL_REQUESTS = 1000
MAX_CONCURRENCY = 500
TEST_TEXT = "This is a test comment for the concurrent load test."

# --- Fonctions de Test ---

async def send_request_with_semaphore(session, text, semaphore ):
    """Envoie une requête, mais attend un jeton du sémaphore avant de commencer."""
    # Attendre qu'un des 500 slots de concurrence soit libre
    async with semaphore:
        start_time = time.time()
        payload = {"text": text, "model": "simple"}
        
        async with session.post(API_URL, json=payload) as r:
            # Lire la réponse pour s'assurer que le temps de réponse est complet
            await r.read()
            end_time = time.time()
            
            # Retourner le temps de latence et le statut
            return end_time - start_time, r.status

async def concurrent_load_test():
    print(f"\n--- Test de Charge (Total: {TOTAL_REQUESTS} | Simultanéité: {MAX_CONCURRENCY}) ---")
    
    # Créer le sémaphore pour limiter la concurrence
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    
    start_total = time.time()
    
    async with aiohttp.ClientSession( ) as session:
        # Créer toutes les tâches
        tasks = [
            send_request_with_semaphore(session, TEST_TEXT, semaphore) 
            for _ in range(TOTAL_REQUESTS)
        ]
        
        # Exécuter les tâches, limitées par le sémaphore
        results = await asyncio.gather(*tasks)
    
    duration_total = time.time() - start_total
    
    # --- Analyse des Résultats ---
    latencies = [r[0] for r in results]
    statuses = [r[1] for r in results]
    
    avg_latency = sum(latencies) / TOTAL_REQUESTS
    rps = TOTAL_REQUESTS / duration_total
    
    errors = statuses.count(500) + statuses.count(400)
    
    print(f"Total des requêtes exécutées : {TOTAL_REQUESTS}")
    print(f"Durée totale : {duration_total:.2f}s")
    print(f"Débit (RPS) : {rps:.2f}")
    print(f"Latence Moyenne : {avg_latency * 1000:.2f}ms")
    print(f"Nombre d'erreurs (4xx/5xx) : {errors}")

# Exécution du test
await concurrent_load_test()



--- Test de Charge (Total: 1000 | Simultanéité: 500) ---
Total des requêtes exécutées : 1000
Durée totale : 6.81s
Débit (RPS) : 146.76
Latence Moyenne : 2586.52ms
Nombre d'erreurs (4xx/5xx) : 0


## Mesurer la latence et le débit lorsque les 500 workers simultanés traitent des données qui sollicitent fortement le CPU (anonymisation et tokenisation NLTK).

In [19]:
import asyncio
import aiohttp
import time

# HYPOTHÈSE : L'URL de l'API est http://34.38.214.124/analyze
API_URL = "http://34.38.214.124/analyze" 

# --- Configuration du Test ---
TOTAL_REQUESTS = 1000
MAX_CONCURRENCY = 500
# Texte complexe pour solliciter l'anonymisation (regex et NE chunking ) et la tokenisation
COMPLEX_TEXT = "Dr. John Smith, a resident of 123 Main St, New York, NY, wrote a very long and complicated comment about the organization's policy on the 15th of March 2025. This comment is highly toxic and offensive, and I hate it."

# --- Fonctions de Test (Réutilisées) ---
async def send_request_with_semaphore(session, text, semaphore):
    # ... (même fonction que précédemment)
    async with semaphore:
        start_time = time.time()
        payload = {"text": text, "model": "simple"}
        
        async with session.post(API_URL, json=payload) as r:
            await r.read()
            end_time = time.time()
            return end_time - start_time, r.status

async def complex_data_concurrency_test():
    print(f"\n--- Test de Concurrence avec Données Complexes (Total: {TOTAL_REQUESTS} | Simultanéité: {MAX_CONCURRENCY}) ---")
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    start_total = time.time()
    
    async with aiohttp.ClientSession( ) as session:
        tasks = [
            send_request_with_semaphore(session, COMPLEX_TEXT, semaphore) 
            for _ in range(TOTAL_REQUESTS)
        ]
        results = await asyncio.gather(*tasks)
    
    duration_total = time.time() - start_total
    
    # --- Analyse des Résultats ---
    latencies = [r[0] for r in results]
    avg_latency = sum(latencies) / TOTAL_REQUESTS
    rps = TOTAL_REQUESTS / duration_total
    
    print(f"Durée totale : {duration_total:.2f}s")
    print(f"Débit (RPS) : {rps:.2f}")
    print(f"Latence Moyenne : {avg_latency * 1000:.2f}ms")

await complex_data_concurrency_test()



--- Test de Concurrence avec Données Complexes (Total: 1000 | Simultanéité: 500) ---
Durée totale : 7.84s
Débit (RPS) : 127.56
Latence Moyenne : 3254.56ms


## Mesurer l'impact de l'ajout d'un petit délai entre les requêtes sur la latence globale. Simule un client qui envoie des requêtes en rafale.

In [20]:
async def send_request_with_delay(session, text, semaphore):
    # Ajout d'un micro-délai pour simuler un client qui n'est pas instantané
    await asyncio.sleep(0.001) 
    return await send_request_with_semaphore(session, text, semaphore)

async def throttled_concurrency_test():
    print(f"\n--- Test de Concurrence avec Délai (Total: {TOTAL_REQUESTS} | Simultanéité: {MAX_CONCURRENCY}) ---")
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    start_total = time.time()
    
    async with aiohttp.ClientSession( ) as session:
        tasks = [
            send_request_with_delay(session, TEST_TEXT, semaphore) 
            for _ in range(TOTAL_REQUESTS)
        ]
        results = await asyncio.gather(*tasks)
    
    duration_total = time.time() - start_total
    
    # Analyse des Résultats
    latencies = [r[0] for r in results]
    avg_latency = sum(latencies) / TOTAL_REQUESTS
    rps = TOTAL_REQUESTS / duration_total
    
    print(f"Durée totale : {duration_total:.2f}s")
    print(f"Débit (RPS) : {rps:.2f}")
    print(f"Latence Moyenne : {avg_latency * 1000:.2f}ms")

await throttled_concurrency_test()


--- Test de Concurrence avec Délai (Total: 1000 | Simultanéité: 500) ---
Durée totale : 6.61s
Débit (RPS) : 151.32
Latence Moyenne : 2638.96ms


## Mesurer le taux d'erreur (codes 5xx) et la capacité du système à se stabiliser après un pic de charge.

In [21]:
async def resilience_concurrency_test():
    print(f"\n--- Test de Concurrence avec Évaluation des Erreurs (Total: {TOTAL_REQUESTS} | Simultanéité: {MAX_CONCURRENCY}) ---")
    
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    start_total = time.time()
    
    async with aiohttp.ClientSession( ) as session:
        tasks = [
            send_request_with_semaphore(session, TEST_TEXT, semaphore) 
            for _ in range(TOTAL_REQUESTS)
        ]
        results = await asyncio.gather(*tasks)
    
    duration_total = time.time() - start_total
    
    # --- Analyse des Résultats ---
    statuses = [r[1] for r in results]
    
    # Compter les erreurs
    errors_5xx = statuses.count(500) + statuses.count(502) + statuses.count(503)
    errors_4xx = statuses.count(400) + statuses.count(404)
    
    print(f"Durée totale : {duration_total:.2f}s")
    print(f"Taux de Succès : {100 - (errors_5xx + errors_4xx) / TOTAL_REQUESTS * 100:.2f}%")
    print(f"Erreurs Serveur (5xx) : {errors_5xx}")
    print(f"Erreurs Client (4xx) : {errors_4xx}")

await resilience_concurrency_test()



--- Test de Concurrence avec Évaluation des Erreurs (Total: 1000 | Simultanéité: 500) ---
Durée totale : 7.53s
Taux de Succès : 100.00%
Erreurs Serveur (5xx) : 0
Erreurs Client (4xx) : 0


## 1. Montée en Charge Progressive (Ramp-up Test)
Objectif
Évaluation
Mesurer comment le système réagit à une augmentation constante et progressive de la charge.

In [26]:
async def progressive_ramp_up_test(max_concurrency=500, step=50, step_duration=10):
    """Augmente la charge de 50 utilisateurs toutes les 10 secondes jusqu'à 500."""
    print("\n--- 1. Montée en Charge Progressive ---")
    
    current_concurrency = 0
    while current_concurrency <= max_concurrency:
        current_concurrency += step
        if current_concurrency > max_concurrency:
            current_concurrency = max_concurrency
            
        print(f"\nCharge actuelle: {current_concurrency} utilisateurs pendant {step_duration}s")
        
        start_time = time.time()
        semaphore = asyncio.Semaphore(current_concurrency)
        
        async with aiohttp.ClientSession() as session:
            # Créer un grand nombre de tâches pour s'assurer que le sémaphore est toujours plein
            tasks = [send_request_with_semaphore(session, "Ramp-up test.", semaphore) for _ in range(current_concurrency * 5)]
            
            # Utiliser asyncio.gather au lieu de asyncio.wait pour obtenir les résultats
            try:
                results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=step_duration)
            except asyncio.TimeoutError:
                print(f"  Timeout après {step_duration}s")
                results = []
            
        # Analyser les métriques (RPS, Latence, Erreurs)
        duration = time.time() - start_time
        
        # Extraire les latences et les statuts des résultats
        latencies = []
        statuses = []
        errors = 0
        for r in results:
            if isinstance(r, tuple) and len(r) == 2:
                latencies.append(r[0])
                statuses.append(r[1])
                if r[1] >= 400:  # erreur 4xx ou 5xx
                    errors += 1
            elif isinstance(r, Exception):
                errors += 1
        
        if latencies:
            avg_latency = sum(latencies) / len(latencies)
            rps = len(latencies) / duration if duration > 0 else 0
            print(f"  -> Requêtes complétées: {len(latencies)}/{current_concurrency * 5}")
            print(f"  -> RPS: {rps:.2f}")
            print(f"  -> Latence Moyenne: {avg_latency * 1000:.2f}ms")
            print(f"  -> Erreurs: {errors}")
        else:
            print(f"  -> Aucune requête complétée")
        
        if current_concurrency == max_concurrency:
            break

await progressive_ramp_up_test()


--- 1. Montée en Charge Progressive ---

Charge actuelle: 50 utilisateurs pendant 10s
  -> Requêtes complétées: 250/250
  -> RPS: 155.06
  -> Latence Moyenne: 280.93ms
  -> Erreurs: 0

Charge actuelle: 100 utilisateurs pendant 10s
  -> Requêtes complétées: 250/250
  -> RPS: 155.06
  -> Latence Moyenne: 280.93ms
  -> Erreurs: 0

Charge actuelle: 100 utilisateurs pendant 10s
  -> Requêtes complétées: 500/500
  -> RPS: 109.90
  -> Latence Moyenne: 818.67ms
  -> Erreurs: 0

Charge actuelle: 150 utilisateurs pendant 10s
  -> Requêtes complétées: 500/500
  -> RPS: 109.90
  -> Latence Moyenne: 818.67ms
  -> Erreurs: 0

Charge actuelle: 150 utilisateurs pendant 10s
  -> Requêtes complétées: 750/750
  -> RPS: 149.12
  -> Latence Moyenne: 948.86ms
  -> Erreurs: 0

Charge actuelle: 200 utilisateurs pendant 10s
  -> Requêtes complétées: 750/750
  -> RPS: 149.12
  -> Latence Moyenne: 948.86ms
  -> Erreurs: 0

Charge actuelle: 200 utilisateurs pendant 10s
  -> Requêtes complétées: 1000/1000
  -> RP

## Détecter les fuites de mémoire, la dégradation des performances ou les problèmes de garbage collection sur une longue période.

In [None]:
# (Utilisez le code soak_test de la réponse précédente en fixant duration_minutes=30)

## Vérifier la capacité du système à absorber un pic de charge soudain et à se rétablir rapidement.

In [27]:
async def spike_test(spike_concurrency=1000, normal_concurrency=50):
    """Simule un pic soudain de 1000 requêtes."""
    print("\n--- 3. Burst Trafic Soudain (Spike Test) ---")
    
    # Phase 1 : Le Spike (1000 requêtes simultanées)
    print(f"Phase 1: Spike de {spike_concurrency} requêtes...")
    start_spike = time.time()
    semaphore_spike = asyncio.Semaphore(spike_concurrency)
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request_with_semaphore(session, "Spike payload.", semaphore_spike) for _ in range(spike_concurrency)]
        await asyncio.gather(*tasks)
    duration_spike = time.time() - start_spike
    print(f"Spike terminé en {duration_spike:.2f}s")
    
    # Phase 2 : Récupération (Charge nominale)
    print(f"Phase 2: Vérification de la récupération ({normal_concurrency} requêtes)...")
    start_recovery = time.time()
    semaphore_recovery = asyncio.Semaphore(normal_concurrency)
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request_with_semaphore(session, "Recovery payload.", semaphore_recovery) for _ in range(normal_concurrency)]
        await asyncio.gather(*tasks)
    duration_recovery = time.time() - start_recovery
    print(f"Récupération terminée en {duration_recovery:.2f}s")
    
    # Évaluation : La latence de la Phase 2 doit être proche de la latence normale.
    print(f"Temps de récupération par requête: {duration_recovery / normal_concurrency * 1000:.2f}ms")

await spike_test()


--- 3. Burst Trafic Soudain (Spike Test) ---
Phase 1: Spike de 1000 requêtes...
Spike terminé en 7.62s
Phase 2: Vérification de la récupération (50 requêtes)...
Spike terminé en 7.62s
Phase 2: Vérification de la récupération (50 requêtes)...
Récupération terminée en 0.31s
Temps de récupération par requête: 6.30ms
Récupération terminée en 0.31s
Temps de récupération par requête: 6.30ms


## 8. Traitement Payloads Lourds (Large JSON)
Objectif

Code (Logique)
Mesurer l'impact du transfert de données volumineuses (bien que le texte soit généralement petit, nous simulons un grand corps de requête).

In [28]:
async def large_payload_test(n_requests=100):
    """Teste l'impact d'un corps de requête volumineux."""
    print("\n--- 8. Traitement Payloads Lourds ---")
    
    # Créer un texte de 1 Mo (environ 1 million de caractères)
    large_text = "A" * 1000000 
    
    start_time = time.time()
    semaphore = asyncio.Semaphore(10) # Limiter la concurrence pour ne pas saturer
    
    async with aiohttp.ClientSession( ) as session:
        tasks = [send_request_with_semaphore(session, large_text, semaphore) for _ in range(n_requests)]
        await asyncio.gather(*tasks)
        
    duration = time.time() - start_time
    print(f"{n_requests} requêtes de 1 Mo exécutées en {duration:.2f}s")
    
    # Évaluation : La latence doit rester acceptable. Surveiller l'utilisation de la mémoire dans Cloud Monitoring.

await large_payload_test()



--- 8. Traitement Payloads Lourds ---
100 requêtes de 1 Mo exécutées en 14.83s
100 requêtes de 1 Mo exécutées en 14.83s
