In [37]:
import requests
from time import sleep
from ipyleaflet import Map, Marker, Rectangle
from ipywidgets import HTML, VBox, IntProgress
import threading

center = [44.8378, -0.5792]
m = Map(center=center, zoom=13)

output = HTML("<strong>Coordonnées sélectionnées :</strong><br>Aucune sélection pour l'instant.")
address_output = HTML("<strong>Adresses trouvées :</strong><br>Aucune adresse pour l'instant.")
progress_bar = IntProgress(min=0, max=1, value=0, description='Progression')  
display(VBox([m, output, address_output, progress_bar]))

start_point = None
end_point = None
rectangle = None
markers = []  # Liste pour stocker les marqueurs ajoutés

def handle_map_click(**kwargs):
    global start_point, end_point, rectangle, markers
    
    if kwargs.get('type') == 'click':
        latlng = kwargs.get('coordinates')
        
        # Si deux points ont déjà été sélectionnés, on réinitialise tout
        if start_point is not None and end_point is not None:
            if rectangle is not None:
                m.remove_layer(rectangle)
            for marker in markers:
                m.remove_layer(marker)
            markers.clear()
            
            start_point = None
            end_point = None
            rectangle = None
            address_output.value = "<strong>Adresses trouvées :</strong><br>Aucune adresse pour l'instant."
            progress_bar.value = 0
        
        if start_point is None:
            start_point = latlng
            marker = Marker(location=start_point)
            markers.append(marker)
            m.add_layer(marker)
            output.value = f"Point 1 sélectionné : {start_point}"
        
        elif end_point is None:
            end_point = latlng
            marker = Marker(location=end_point)
            markers.append(marker)
            m.add_layer(marker)
            
            bounds = [start_point, end_point]
            rectangle = Rectangle(bounds=bounds, color="blue", fill_opacity=0.3)
            m.add_layer(rectangle)
            
            output.value = (f"<strong>Coordonnées sélectionnées :</strong><br>"
                            f"Coin supérieur gauche : {start_point}<br>"
                            f"Coin inférieur droit : {end_point}")

            # Exécuter la récupération des adresses et l'estimation de la population dans un thread séparé
            threading.Thread(target=process_and_estimate_population, args=(start_point, end_point)).start()

def process_and_estimate_population(start, end):
    """
    Récupère les adresses avec Overpass, puis estime la population uniquement pour les adresses résidentielles.
    """
    addresses = get_addresses_with_overpass(start, end)
    if addresses:
        estimate_population_from_addresses(addresses)
    else:
        address_output.value += "<br><strong>Aucune adresse trouvée pour l'estimation.</strong>"

def get_electricity_consumption(address, is_residential):
    """
    Appelle l'API d'Enedis pour récupérer la consommation annuelle résidentielle d'une adresse.
    
    Si l'adresse exacte est trouvée avec le bon libelle_de_voie et numero_de_voie, utilise celle-ci.
    Sinon, cherche la consommation pour la rue et prend le numero_de_voie le plus proche.
    
    :param address: Dictionnaire représentant l'adresse complète avec 'housenumber', 'street' et 'city'.
    :param is_residential: Booléen indiquant si l'adresse est résidentielle.
    :return: Consommation annuelle totale de l'adresse en MWh ou None si non trouvée.
    """
    api_url = "https://data.enedis.fr/api/explore/v2.1/catalog/datasets/consommation-annuelle-residentielle-par-adresse/records"

    def sanitize(text):
        """Supprime les caractères spéciaux susceptibles de poser problème dans les requêtes."""
        return text.replace("'", " ").upper().strip()
    
    street = ' '.join(address['street'].split()[1:])  # Enlever le type de voie (RUE, AVENUE, etc.)
    street = sanitize(street)
    housenumber = ''.join(filter(str.isdigit, address['housenumber']))
    city = sanitize(address['city'])
    
    # Étape 1 : Chercher l'adresse exacte (libelle_de_voie + numero_de_voie)
    where_clauses_exact = (
        f"nom_commune like '{city}' AND "
        f"libelle_de_voie = '{street}' AND "
        f"numero_de_voie = '{housenumber}'"
    )
    
    params_exact = {
        "where": where_clauses_exact,
        "limit": 1
    }
    
    try:
        # Afficher l'URL complète de la requête exacte
        full_url_exact = requests.Request('GET', api_url, params=params_exact).prepare().url
        print(f"URL envoyée (exacte) : {full_url_exact}")
        
        response = requests.get(api_url, params=params_exact, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        # Si une correspondance exacte est trouvée
        if data["total_count"] > 0:
            record = data["results"][0]
            consommation_annuelle = record.get("consommation_annuelle_totale_de_l_adresse_mwh", 0)
            segment_client = record.get("segment_de_client", "").upper()
            
            if not is_residential or segment_client != "RESIDENTIEL":
                print(f"Adresse non résidentielle : {address}")
                return None
            
            return consommation_annuelle
        
        # Étape 2 : Si l'adresse exacte n'est pas trouvée, chercher uniquement par rue
        print(f"Aucune correspondance exacte trouvée pour l'adresse : {address}. Recherche par rue...")

        street = sanitize(address['street'].split()[-1])  # Dernier mot de la rue
        where_clauses_street = (
            f"nom_commune like '{city}' AND "
            f"libelle_de_voie like '{street}' AND "
            f"segment_de_client = 'RESIDENTIEL'"
        )
        
        params_street = {
            "where": where_clauses_street,
            "limit": 100  # Limite à 100 adresses dans la rue
        }
        
        # Afficher l'URL complète de la requête par rue
        full_url_street = requests.Request('GET', api_url, params=params_street).prepare().url
        print(f"URL envoyée (par rue) : {full_url_street}")
        
        response = requests.get(api_url, params=params_street, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        if data["total_count"] == 0:
            print(f"Aucune donnée trouvée pour la rue : {street}, {city}")
            return None
        
        # Trier localement les adresses par proximité du numéro de voie
        records = data["results"]
        records.sort(key=lambda r: abs(int(''.join(filter(str.isdigit, str(r["numero_de_voie"])))) - int(housenumber)))
        
        # Prendre la consommation de l'adresse la plus proche
        closest_record = records[0]
        closest_housenumber = closest_record["numero_de_voie"]
        consommation_annuelle = closest_record.get("consommation_annuelle_totale_de_l_adresse_mwh", 0)
        segment_client = closest_record.get("segment_de_client", "").upper()
        
        print(f"Correspondance proche trouvée : {closest_housenumber} {street}, {city}")
        
        if not is_residential or segment_client != "RESIDENTIEL":
            print(f"Adresse proche non résidentielle : {address}")
            return None
        
        return consommation_annuelle
    
    except requests.exceptions.RequestException as e:
        print(f"Erreur lors de l'appel à l'API : {e}")
        return None


def estimate_population_from_addresses(addresses):
    """
    Estime la population à partir des adresses résidentielles en utilisant la consommation électrique.
    
    :param addresses: Liste de dictionnaires contenant les adresses.
    """
    total_population = 0
    average_consumption_per_person = 1.7  # Consommation moyenne par habitant en MWh/an
    missing_data_addresses = []  # Liste pour les adresses sans données
    non_residential_addresses = []  # Liste pour les adresses non résidentielles
    
    for address_dict in addresses:
        # Construire l'adresse complète sous forme de chaîne
        address = f"{address_dict['housenumber']} {address_dict['street']}, {address_dict['city']}"
        
        # Appeler l'API pour récupérer la consommation
        consumption = get_electricity_consumption(address_dict, address_dict['residential_status'])
        
        if consumption is not None:
            population_estimee = consumption / average_consumption_per_person
            total_population += round(population_estimee)
        else:
            if not(address_dict['residential_status']):
                non_residential_addresses.append(address)
            else:
                missing_data_addresses.append(address)
        
        sleep(1)  # Respecter les limites de l'API en ajoutant une pause entre les requêtes
    
    result = f"<strong>Population totale estimée :</strong> {total_population} habitants<br>"
    
    if missing_data_addresses:
        result += ("<strong>Adresses sans données de consommation :</strong><br>" +
                   "<br>".join(missing_data_addresses))
    
    if non_residential_addresses:
        result += ("<br><strong>Adresses non résidentielles ignorées :</strong><br>" +
                   "<br>".join(non_residential_addresses))
    
    address_output.value = result


def get_addresses_with_overpass(start, end):
    """
    Récupère les objets avec des adresses dans la zone définie
    par deux points GPS en utilisant l'API Overpass.
    
    :param start: Coordonnées du coin supérieur gauche (lat, lon)
    :param end: Coordonnées du coin inférieur droit (lat, lon)
    :return: Liste de dictionnaires contenant les détails des adresses
    """
    addresses = []
    overpass_url = "http://overpass-api.de/api/interpreter"
    
    # Extraire les coordonnées des points
    lat1, lon1 = start
    lat2, lon2 = end
    
    # Construire la bounding box dans le bon ordre : south, west, north, east
    bbox = f"{min(lat1, lat2)},{min(lon1, lon2)},{max(lat1, lat2)},{max(lon1, lon2)}"
    
    # Construire la requête Overpass
    query = f"""
    [out:json];
    (
      node["addr:housenumber"]({bbox});
      way["addr:housenumber"]({bbox});
      relation["addr:housenumber"]({bbox});
    );
    out body;
    """
    
    print(f"URL de la requête : {overpass_url}?data={query}")
    
    try:
        response = requests.get(overpass_url, data=query, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        if data['elements']:
            total_elements = len(data['elements'])
            progress_bar.max = total_elements
            
            for i, element in enumerate(data['elements'], start=1):
                tags = element.get('tags', {})
                housenumber = tags.get('addr:housenumber', '').upper()
                street = tags.get('addr:street', '').upper()
                city = tags.get('addr:city', '')
                postcode = tags.get('addr:postcode', '')
                
                # Récupérer les coordonnées de l'élément
                if 'lat' in element and 'lon' in element:
                    lat = element['lat']
                    lon = element['lon']
                elif 'center' in element:
                    lat = element['center']['lat']
                    lon = element['center']['lon']
                else:
                    continue
                
                # Si le nom de la rue est manquant, utiliser le reverse geocoding
                if housenumber and not street:
                    reverse_address = reverse_geocode(lat, lon)
                    street = reverse_address.get("street", "").upper()
                    city = reverse_address.get("city", "").upper()

                # Vérifier si l'élément est résidentiel
                if not tags.get('name') and not tags.get('shop'):
                    residential_status = 1
                else:
                    residential_status = 0
                
                # Construire un dictionnaire contenant les détails de l'adresse
                address_dict = {
                    "housenumber": housenumber,
                    "street": street,
                    "city": city,
                    "postcode": postcode,
                    "lat": lat,
                    "lon": lon,
                    "residential_status": residential_status
                }
                addresses.append(address_dict)
                
                # Mettre à jour l'affichage sous forme de texte
                address_display = ", ".join(
                    part for part in [housenumber, street, city, postcode] if part
                )
                progress_bar.value = i
                progress_bar.description = f"{i}/{total_elements}"
                sleep(0.1)
            
            # Afficher les adresses sous forme de texte
            address_output.value = ("<strong>Adresses trouvées :</strong><br>" +
                                    "<br>".join(f"{addr['housenumber']} {addr['street']}, {addr['city']} {addr['postcode']}"
                                                for addr in addresses))
        else:
            address_output.value = "<strong>Adresses trouvées :</strong><br>Aucune adresse trouvée."
            progress_bar.value = 0
        
        return addresses
    
    except requests.exceptions.RequestException as e:
        address_output.value = f"<strong>Erreur lors de la récupération des adresses :</strong><br>{e}"
        progress_bar.value = 0
        print(f"Erreur : {e}")
        return []


def reverse_geocode(lat, lon):
    """
    Utilise l'API Nominatim pour effectuer un reverse geocoding
    et récupérer l'adresse complète à partir des coordonnées GPS.
    
    :param lat: Latitude
    :param lon: Longitude
    :return: Dictionnaire contenant 'street' et 'city' en majuscules
    """
    nominatim_url = "https://nominatim.openstreetmap.org/reverse"
    params = {
        "lat": lat,
        "lon": lon,
        "format": "json",
        "addressdetails": 1
    }
    
    headers = {
        "User-Agent": "MyPythonApp/1.0 (contact@example.com)"
    }
    
    try:
        response = requests.get(nominatim_url, params=params, headers=headers, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        address = data.get("address", {})
        return {
            "street": address.get("road", "").upper(),
            "city": address.get("city", "").upper()
        }
    
    except requests.exceptions.RequestException as e:
        print(f"Erreur lors du reverse geocoding : {e}")
        return {"street": "", "city": ""}



# Lier la fonction de gestion des clics à la carte
m.on_interaction(handle_map_click)

VBox(children=(Map(center=[44.8378, -0.5792], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom…

URL de la requête : http://overpass-api.de/api/interpreter?data=
    [out:json];
    (
      node["addr:housenumber"](44.82188239335125,-0.5819562077522279,44.82236182011501,-0.5816504359245301);
      way["addr:housenumber"](44.82188239335125,-0.5819562077522279,44.82236182011501,-0.5816504359245301);
      relation["addr:housenumber"](44.82188239335125,-0.5819562077522279,44.82236182011501,-0.5816504359245301);
    );
    out body;
    
URL envoyée (exacte) : https://data.enedis.fr/api/explore/v2.1/catalog/datasets/consommation-annuelle-residentielle-par-adresse/records?where=nom_commune+like+%27BORDEAUX%27+AND+libelle_de_voie+%3D+%27DE+L+ARGONNE%27+AND+numero_de_voie+%3D+%27269%27&limit=1
Aucune correspondance exacte trouvée pour l'adresse : {'housenumber': '269', 'street': "COURS DE L'ARGONNE", 'city': 'BORDEAUX', 'postcode': '', 'lat': 44.8222402, 'lon': -0.5817402, 'residential_status': 1}. Recherche par rue...
URL envoyée (par rue) : https://data.enedis.fr/api/explore/v2.1/catal