# TP : Pipeline de Données Météorologiques en Temps Réel avec Kafka

**Objectif** : Dans ce TP, vous allez utiliser Kafka pour transmettre des données météorologiques en temps réel d’un producteur (qui récupère les données) vers un consumer (qui les affiche). Vous configurerez un producteur pour récupérer les données météo de plusieurs villes via une API, puis un consumer pour consommer et afficher ces données en continu.

---

## Partie 1 : Producteur Kafka – Récupération et Envoi des Données Météorologiques

Le producteur Kafka va récupérer les données météo pour trois villes cibles et les envoyer dans un topic Kafka nommé `tp-meteo`. 

### Étapes :


1. **Configuration de l'API et des Villes** :
   - Utilisez une clé API pour accéder à OpenWeatherMap (une API de données météo).
   - Définissez trois villes cibles : `Paris`, `London`, et `Tokyo`.

In [5]:
# Configuration
API_KEY = '60c351069138b06a8e6cc9b06d8c4752'  # Clé API
CITIES = ['Paris', 'London', 'Tokyo']  # Villes cibles

2. **Initialisation du Producteur Kafka** :
   - Configurez Kafka pour qu’il envoie des messages au serveur `localhost:9092` dans le topic `tp-meteo`.
   - Kafka sera utilisé pour envoyer les données météorologiques formatées en JSON.

In [None]:
from kafka import KafkaProducer
import json

# Configuration
KAFKA_SERVER = 'localhost:9092'

# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for city in CITIES : 
  # Envoi des données en continu
  producer.send("tp-meteo",  value=city)


<kafka.producer.future.FutureRecordMetadata at 0x7f3e7b9d3220>

3. **Fonction de Récupération des Données** :
   - Écrivez une fonction `get_weather_data` qui récupère les informations météo pour une ville donnée.
   - La fonction envoie une requête à l’API et retourne les données si la requête est réussie.

In [8]:
import requests
# Fonction pour récupérer les données météo
def get_weather_data(city):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    return response.json() if response.status_code == 200 else None

get_weather_data("London")

{'coord': {'lon': -0.1257, 'lat': 51.5085},
 'weather': [{'id': 804,
   'main': 'Clouds',
   'description': 'overcast clouds',
   'icon': '04d'}],
 'base': 'stations',
 'main': {'temp': 11.78,
  'feels_like': 11,
  'temp_min': 10.86,
  'temp_max': 12.81,
  'pressure': 1032,
  'humidity': 76,
  'sea_level': 1032,
  'grnd_level': 1027},
 'visibility': 10000,
 'wind': {'speed': 2.57, 'deg': 10},
 'clouds': {'all': 100},
 'dt': 1731587989,
 'sys': {'type': 2,
  'id': 2075535,
  'country': 'GB',
  'sunrise': 1731568648,
  'sunset': 1731600763},
 'timezone': 0,
 'id': 2643743,
 'name': 'London',
 'cod': 200}


4. **Boucle d’Envoi en Continu** :
   - Utilisez une boucle pour envoyer les données de chaque ville au topic Kafka toutes les minutes.
   - Affichez un message de confirmation dans la console pour chaque envoi réussi.

In [13]:
import time 

# Boucle d'envoi en continu
while True:
    for city in CITIES:
        weather_data = get_weather_data(city)
        
        if weather_data:
            # Extraire les données pertinentes
            data = {
                'city': city,
                'temp': weather_data['main']['temp'],
                'feels_like': weather_data['main']['feels_like'],
                'temp_min': weather_data['main']['temp_min'],
                'temp_max': weather_data['main']['temp_max'],
                'pressure': weather_data['main']['pressure'],
                'humidity': weather_data['main']['humidity'],
                'sea_level': weather_data['main']['sea_level'],
                'grnd_level': weather_data['main']['grnd_level']
            }
            
            # Envoi des données dans le topic Kafka 'tp-meteo'
            producer.send('tp-meteo', value=data)
            
            # Affichage du message de confirmation
            print(f"Les données météo pour {city} ont été envoyées avec succès.")
        else:
            print(f"Impossible de récupérer les données pour {city}.")
    
    # Attendre 60 secondes avant le prochain envoi
    time.sleep(60)

# Assurer que toutes les données sont envoyées avant de fermer le producteur
producer.flush()

# Fermeture du producteur après l'envoi
producer.close()

Les données météo pour Paris ont été envoyées avec succès.
Les données météo pour London ont été envoyées avec succès.
Les données météo pour Tokyo ont été envoyées avec succès.
Les données météo pour Paris ont été envoyées avec succès.
Les données météo pour London ont été envoyées avec succès.
Les données météo pour Tokyo ont été envoyées avec succès.


KeyboardInterrupt: 