## Libraries importation

In [None]:
# Please use these lines in every notebook you create

import os
import sys

# Get the current working directory
current_dir = os.getcwd()

# Get the parent directory of `maps` (which is `src`)
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))

# Add `src` to the module search path
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

from global_variables import *

In [None]:
import pandas as pd
import numpy as np
from scipy.spatial import Delaunay, Voronoi
from copy import deepcopy
import math

from python_scripts.graphs.graphs_creation import delaunay_graph, kNN_graph
from python_scripts.neighbours_criteria.enhanced_criteria import distance_criterion_enhanced
from python_scripts.neighbours_criteria.simple_criteria import angle_criterion
from python_scripts.city.city_utils import mean_distance_to_NN, mean_distance_choice
from python_scripts.miscellaneaous.data_processing import extract_data

## Database import and data extraction

In [None]:
df = pd.read_csv("../../database/data.csv", sep=";", decimal=",")
df = extract_data(df, provider=PROVIDER, region=REGION, techno=TECHNO)
df.head()

In [None]:
df_azimuth_raw = pd.read_csv("../../database/data_azimuth_freq.csv", sep=";", decimal=",", dtype={'id_station_anfr':str})

dep_codes = ['027', '076', '014', '050', '061']
df_azimuth = pd.DataFrame(columns=df_azimuth_raw.columns)
for row in df_azimuth_raw.values:
    if(row[0][0:3] in dep_codes):
        df_azimuth = pd.concat([df_azimuth,pd.DataFrame([row],columns=df_azimuth_raw.columns)],ignore_index=True)
        
df_azimuth.head()

## City detection

In [None]:
mean_distances = mean_distance_to_NN(df[['x', 'y']], n_neighbours=N_NEIGH)

## Voronoi and Delaunay creation

In [None]:
del_G, pos = delaunay_graph(df)
print(len(del_G.edges))

In [None]:
from sklearn.neighbors import NearestNeighbors
K=10
kNN_G, _ = kNN_graph(df, k=K)
coordsXY = df[['x','y']]

# for edge in kNN_G.edges: # applying gabriel
#     pt1 = edge[0]
#     pt2 = edge[1]

#     middle_point = (coordsXY.loc[pt1] + coordsXY.loc[pt2])/2

#     neigh = NearestNeighbors(radius=np.sqrt(np.sum((coordsXY.loc[pt1] - coordsXY.loc[pt2])**2, axis=0))/2)
#     neigh.fit(coordsXY.values)

#     if(len(coordsXY.iloc[neigh.radius_neighbors([middle_point], sort_results=True)[1][0][:-2]].index)>0):
#         kNN_G.remove_edges_from([edge])
# print(len(kNN_G.edges))

## Neighbors finding

In [None]:
# Function to calculate the direction coefficient (alpha)

"""
    Calculate the direction coefficient (alpha) for a given azimuth.

    The direction coefficient represents how well an azimuth is aligned with the direction
    to a neighboring station. It is calculated as the absolute angular difference between
    the direction to the neighbor and the azimuth, normalized to a range of [0, 1].

    Parameters:
    station_pos (numpy array): Position (latitude, longitude) of the base station.
    neighbor_pos (numpy array): Position (latitude, longitude) of the neighboring station.
    azimuth (float): Azimuth angle in degrees.
    beamwidth (float): Beamwidth angle in degrees.

    Returns:
    tuple: (within_coverage (bool), alpha (float)) where within_coverage indicates if the
    neighbor is within the beamwidth of the azimuth, and alpha is the direction coefficient.
    """

def calculate_alpha(station_pos, neighbor_pos, azimuth, beamwidth):
    direction_vector = neighbor_pos - station_pos
    direction_angle = (np.degrees(np.arctan2(direction_vector[1], direction_vector[0])) + 360) % 360

    # Check if the direction angle is within the beamwidth coverage
    min_angle = (azimuth - beamwidth / 2 + 360) % 360
    max_angle = (azimuth + beamwidth / 2 + 360) % 360
    if min_angle <= max_angle:
        within_coverage = min_angle <= direction_angle <= max_angle
    else:
        within_coverage = direction_angle >= min_angle or direction_angle <= max_angle

    # Calculate the angle coefficient alpha
    vector1 = np.array([np.cos(np.radians(azimuth)), np.sin(np.radians(azimuth))])
    vector2 = np.array([np.cos(np.radians(direction_angle)), np.sin(np.radians(direction_angle))])
    cos_theta = np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))
    alpha = (1 + cos_theta) / 2

    return within_coverage, alpha

    

In [None]:
# Function to calculate beamwidth for each azimuth

"""
    Calculate the beamwidth for each azimuth.
    
    The beamwidth for an azimuth is calculated as the average of the angular distances
    to its previous and next azimuths. Special cases are handled for the first and last
    azimuths to account for the circular nature of angles (i.e., 0 degrees is adjacent
    to 360 degrees).

    Parameters:
    azimuths (list of float): List of azimuth angles in degrees.

    Returns:
    list of float: List of beamwidths for each azimuth.
    """

def calculate_beamwidths(azimuths):
    azimuths = sorted(set(azimuths)) 
    beamwidths = []
    num_azimuths = len(azimuths)
    
    if num_azimuths == 1:
        return [180]  

    if num_azimuths == 2:
        diff = (azimuths[1] - azimuths[0]) % 360
        beamwidth = diff / 2
        return [beamwidth, 360 - beamwidth]  
    
    for i in range(num_azimuths):
        prev_azimuth = azimuths[i - 1] if i > 0 else azimuths[-1] - 360
        next_azimuth = azimuths[(i + 1) % num_azimuths]
        
        if i == 0:
            beamwidth = (next_azimuth - azimuths[i] + 360 + azimuths[i] - prev_azimuth) / 2
        elif i == num_azimuths - 1:
            beamwidth = (next_azimuth + 360 - azimuths[i] + azimuths[i] - prev_azimuth) / 2
        else:
            beamwidth = (next_azimuth - azimuths[i] + azimuths[i] - prev_azimuth) / 2

        beamwidths.append(beamwidth % 360)
    
    return beamwidths

In [None]:
# Function to find real neighbors based on Delaunay triangulation and azimuths

"""
    Identify real neighboring base stations based on azimuth directions and coverage areas.

    This function uses Delaunay triangulation to find potential neighbors and then checks
    if the azimuths of the base stations are directed towards each other within their
    respective beamwidths. It also calculates the direction coefficient (alpha) to determine
    the alignment quality between the stations.

    Parameters:
    df (DataFrame): DataFrame containing base station data.
    df_azimuth (DataFrame): DataFrame containing azimuth data for the base stations.
    potential_neigh_G (Graph): Graph representing potential neighboring stations based on Delaunay triangulation.
    pos (dict): Dictionary with station positions.
    min_alpha (float): Minimum acceptable direction coefficient to consider stations as real neighbors.

    Returns:
    tuple: (neigh_G (Graph), edge_info (dict)) where neigh_G is the graph of real neighbors
    and edge_info contains information about the azimuths and direction coefficients for each edge.
    """

def find_real_neighbors(df, df_azimuth, potential_neigh_G, pos, min_alpha=0.0):
    # Create a deep copy of the potential neighbors graph
    neigh_G = deepcopy(potential_neigh_G)
    edge_info = {}  # Dictionary to store information about each edge (connection)

    # Iterate over each base station in the DataFrame
    for bs_id in df.index:
        # Get the azimuths for the current base station
        azimuths = df_azimuth.loc[df_azimuth['id_station_anfr'] == bs_id, 'angle_azimuth']
        # Calculate the beamwidths for each azimuth
        beamwidths = calculate_beamwidths(azimuths)

        # Iterate over each potential neighbor of the current base station
        for [_, neigh_id] in potential_neigh_G.edges(bs_id):
            is_neighbor = False  # Flag to indicate if the current station and neighbor are real neighbors
            azimuths_info = {"station": [], "neighbor": [], "alpha": None}  # Dictionary to store azimuth information
            max_alpha = 0  # Initialize the maximum alpha value

            # Iterate over each azimuth and corresponding beamwidth of the current base station
            for azimuth, beamwidth in zip(azimuths, beamwidths):
                # Calculate if the neighbor is within the coverage and the direction coefficient alpha
                within_coverage, alpha = calculate_alpha(pos[bs_id], pos[neigh_id], azimuth, beamwidth)
                if within_coverage:
                    # Get the azimuths and beamwidths for the neighboring station
                    neigh_azimuths = df_azimuth.loc[df_azimuth['id_station_anfr'] == neigh_id, 'angle_azimuth']
                    neigh_beamwidths = calculate_beamwidths(neigh_azimuths)

                    # Iterate over each azimuth and corresponding beamwidth of the neighboring station
                    for neighbor_azimuth, neighbor_beamwidth in zip(neigh_azimuths, neigh_beamwidths):
                        # Calculate if the base station is within the coverage of the neighbor and the direction coefficient alpha
                        within_coverage_neigh, alpha_neigh = calculate_alpha(pos[neigh_id], pos[bs_id], neighbor_azimuth, neighbor_beamwidth)
                        if within_coverage_neigh:
                            is_neighbor = True  # Set the flag indicating the stations are real neighbors
                            combined_alpha = alpha * alpha_neigh  # Calculate the combined direction coefficient
                            # Update the azimuths information if the combined alpha is greater than the current maximum alpha
                            if combined_alpha > max_alpha:
                                max_alpha = combined_alpha
                                azimuths_info["station"] = [bs_id, azimuth]
                                azimuths_info["neighbor"] = [neigh_id, neighbor_azimuth]
                                azimuths_info["alpha"] = max_alpha

            # If the stations are real neighbors and the combined alpha is greater than or equal to the minimum alpha
            if is_neighbor and max_alpha >= min_alpha:
                edge_info[(bs_id, neigh_id)] = azimuths_info  # Store the azimuths information in the edge info dictionary
            else:
                neigh_G.remove_edges_from([[bs_id, neigh_id]])  # Remove the edge from the graph if the conditions are not met

    return neigh_G, edge_info  # Return the graph of real neighbors and the edge information




In [None]:
min_alpha = 0.5

# Use the new function with min_alpha parameter
neigh_G, edge_info = find_real_neighbors(df, df_azimuth, del_G, pos, min_alpha)
neigh_G = distance_criterion_enhanced(neigh_G, pos, params=MEAN_DISTANCE_PARAMS, mean_distance_to_NN=mean_distances)

# neigh_G = angle_criterion(neigh_G, pos, min_angle=MIN_ANGLE, max_distance=None)
   

In [None]:
len(neigh_G.edges())

## Map creation

In [None]:
import folium
import numpy as np
from networkx import Graph
from pandas import DataFrame

In [None]:
def add_graph_edges(G_base: Graph, G: Graph, df: DataFrame, fg: folium.FeatureGroup, colour: str):
    for edge in G_base.edges:
        stations = []

        if(not(edge in G.edges)):
            stations.append(df.loc[edge[0], ['latitude', 'longitude']])
            stations.append(df.loc[edge[1], ['latitude', 'longitude']])

            folium.PolyLine(np.array(stations), color=colour, weight=2.5, opacity=1).add_to(fg)


In [None]:
# Function to generate edge info popup with visualization
def generate_edge_info_popup_with_visualization(edge, df, df_azimuth, edge_info):
    bs_id, neigh_id = edge
    bs_info = df.loc[bs_id]
    neigh_info = df.loc[neigh_id]

    azimuths_info = edge_info.get((bs_id, neigh_id), {"station": [None, None], "neighbor": [None, None], "alpha": None})
    unique_azimuths_bs = np.unique(df_azimuth.loc[df_azimuth['id_station_anfr'] == bs_id, 'angle_azimuth'])
    unique_azimuths_neigh = np.unique(df_azimuth.loc[df_azimuth['id_station_anfr'] == neigh_id, 'angle_azimuth'])

    popup_text = (
        f"<b>Base Station {bs_id} - {bs_info['nom_com']}</b><br>"
        f"Latitude: {bs_info['latitude']}, Longitude: {bs_info['longitude']}<br>"
        f"Azimuths: {'°, '.join(map(str, unique_azimuths_bs))}°<br>"
        f"Coverage Azimuths: <b>{azimuths_info['station'][1]}°</b><br>"
        f"<br><b>Neighbor Station {neigh_id} - {neigh_info['nom_com']}</b><br>"
        f"Latitude: {neigh_info['latitude']}, Longitude: {neigh_info['longitude']}<br>"
        f"Azimuths: {'°, '.join(map(str, unique_azimuths_neigh))}°<br>"
        f"Coverage Azimuths: <b>{azimuths_info['neighbor'][1]}°</b><br>"
        f"<br><b>Direction Coefficient (alpha): {azimuths_info['alpha']:.2f}</b><br>"
    )

    return popup_text



In [None]:
# Function to add edges with interactive information
def add_graph_edges_with_visualization(G: Graph, df: DataFrame, df_azimuth: DataFrame, fg: folium.FeatureGroup, edge_info: dict):
    for edge in G.edges:
        stations = df.loc[[edge[0], edge[1]], ['latitude', 'longitude']].values
        popup_text = generate_edge_info_popup_with_visualization(edge, df, df_azimuth, edge_info)
        popup = folium.Popup(popup_text, max_width=300)
        line = folium.PolyLine(stations, color="#AAA662", weight=2.5, opacity=1)
        line.add_child(popup)
        line.add_to(fg)

In [None]:
# Function to add azimuth lines
def add_azimuth_lines(df: DataFrame, df_azimuth: DataFrame, fg: folium.FeatureGroup):
    azimuth_length = 0.01
    for bs_id in df.index:
        lat = df.loc[bs_id, 'latitude']
        long = df.loc[bs_id, 'longitude']
        for azimuth in df_azimuth.loc[df_azimuth['id_station_anfr'] == bs_id, 'angle_azimuth']:
            azimuth_angle = np.radians(azimuth)
            end_lat = lat + azimuth_length * np.cos(azimuth_angle)
            end_lon = long + azimuth_length * np.sin(azimuth_angle)
            line = folium.PolyLine([(lat, long), (end_lat, end_lon)], color='black', weight=2, opacity=0.7)
            popup_text = f"Azimuth: {azimuth}°"
            popup = folium.Popup(popup_text, max_width=150)
            line.add_child(popup)
            line.add_to(fg)


# Map visualization
def create_method_illustration_map(df: DataFrame, df_azimuth, del_graph: Graph, nei_graph: Graph, edge_info: dict, save_as: str, **kwargs):
    map = folium.Map(location=list(np.mean(df[['latitude', 'longitude']], axis=0)), zoom_start=8.5, tiles="Cartodb Positron")

    edges_del = folium.FeatureGroup(f"Edges - Delaunay triangulation ({len(del_graph.edges)})", show=True).add_to(map)
    edges_nei = folium.FeatureGroup(f"Edges - neighboring graph ({len(nei_graph.edges)})", show=True).add_to(map)
    azimuth_lines = folium.FeatureGroup(f"Azimuth Lines", show=True).add_to(map)

    add_graph_edges(del_graph, Graph(), df, edges_del, colour="lightblue")
    add_graph_edges_with_visualization(nei_graph, df, df_azimuth, edges_nei, edge_info)
    add_azimuth_lines(df, df_azimuth, azimuth_lines)

    points = folium.FeatureGroup(f"Base stations ({len(df)})").add_to(map)
    for bs_id in df.index:
        row = df.loc[bs_id]
        popup_text = (
            f"Station ID: {bs_id}<br>"
            f"Department: {row['nom_dep']}<br>"
            f"Commune: {row['nom_com']}<br>"
            f"Coordinates: ({row['latitude']}, {row['longitude']})<br>"
            f"2G: {row['site_2g']}<br>"
            f"3G: {row['site_3g']}<br>"
            f"4G: {row.get('site_4g', '1')}<br>"
            f"5G: {row['site_5g']}<br>"
        )
        station_data = df_azimuth.loc[df_azimuth['id_station_anfr'] == bs_id]
        for _, antenna in station_data.iterrows():
            popup_text += (
                f"<br>Antenna Frequency: {antenna['frequency_GHz']} GHz<br>"
                f"Antenna Azimuth: {antenna['angle_azimuth']}°<br>"
            )
        popup = folium.Popup(popup_text, max_width=200)
        folium.CircleMarker(
            location=[row['latitude'], row['longitude']],
            color='blue',
            radius=3,
            popup=popup,
            fillOpacity=1,
            fill=True
        ).add_to(points)

    folium.LayerControl().add_to(map)
    map.save(f"../../out/maps/neighbours_finding/{save_as}.html")




In [None]:
# Create and save map with visualization
create_method_illustration_map(df, df_azimuth, del_G, neigh_G, edge_info, save_as="azimuth_enhanced_map")
