In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from getpass import getpass

# Prompt for token securely
token = getpass('Enter your GitHub personal access token: ')

username = "pabasara-samarakoon-4176"

# Clone using token authentication
!git clone https://{username}:{token}@github.com/pabasara-samarakoon-4176/MDT_prediction.git

Enter your GitHub personal access token: ··········
Cloning into 'MDT_prediction'...
remote: Enumerating objects: 163, done.[K
remote: Counting objects: 100% (163/163), done.[K
remote: Compressing objects: 100% (147/147), done.[K
remote: Total 163 (delta 77), reused 50 (delta 14), pack-reused 0 (from 0)[K
Receiving objects: 100% (163/163), 5.13 MiB | 9.04 MiB/s, done.
Resolving deltas: 100% (77/77), done.


In [None]:
!cp /content/drive/MyDrive/Colab\ Notebooks/Inference_and_validation.ipyn.ipynb /content/MDT_prediction/

In [None]:
# This code is for infer and validate the fine-tuned model.
# Load a dataset consists of antenna features: Cell_ID, Site_ID, Site_latitude, Site_longitude, tilt, azimuth, antenna_height, EARFCN_DL
# Use a GPU

In [None]:
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings('ignore')

In [None]:
test_df_antenna = pd.read_csv('test_df.csv')

In [None]:
import pandas as pd
import numpy as np
import geohash2
from geopy.distance import distance

# === Your given max_range function ===
def max_range(earfcn_dl, antenna_height):
    if earfcn_dl == 525:
        base_range = 5000
    elif earfcn_dl == 1650:
        base_range = 3000
    else:
        base_range = 1500
    return base_range + (antenna_height * 10)

In [None]:
import math
import geohash2
import pandas as pd
import numpy as np
import folium
from shapely.geometry import Point, Polygon
from shapely.ops import unary_union

In [None]:
def create_sector_polygon(lat, lon, azimuth, radius_m, beamwidth=65, num_points=100):
    """Return a shapely polygon representing the coverage sector."""
    # Convert degrees to radians
    az_rad = math.radians(azimuth)
    half_bw = math.radians(beamwidth / 2)

    # Earth radius (meters)
    R = 6371000

    # Compute arc boundary points
    lats, lons = [], []
    for theta in np.linspace(az_rad - half_bw, az_rad + half_bw, num_points):
        d = radius_m
        lat2 = math.degrees(math.asin(math.sin(math.radians(lat)) * math.cos(d/R) +
                                      math.cos(math.radians(lat)) * math.sin(d/R) * math.cos(theta)))
        lon2 = lon + math.degrees(math.atan2(math.sin(theta) * math.sin(d/R) * math.cos(math.radians(lat)),
                                             math.cos(d/R) - math.sin(math.radians(lat)) * math.sin(math.radians(lat2))))
        lats.append(lat2)
        lons.append(lon2)

    # Build polygon (center + arc points)
    points = [(lat, lon)] + list(zip(lats, lons)) + [(lat, lon)]
    return Polygon([(p[1], p[0]) for p in points])  # lon, lat order for shapely

In [None]:
def polygon_to_geohash8(polygon, precision=8, step_m=50):
    """Fill the polygon with geohash8 tiles spaced roughly step_m apart."""
    minx, miny, maxx, maxy = polygon.bounds

    # Create grid points inside bounding box
    lats = np.arange(miny, maxy, step_m / 111320)  # 1 deg ≈ 111.32 km
    lons = np.arange(minx, maxx, step_m / (111320 * math.cos(math.radians((miny+maxy)/2))))

    geohashes = set()
    for lat in lats:
        for lon in lons:
            p = Point(lon, lat)
            if polygon.contains(p):
                geohashes.add(geohash2.encode(lat, lon, precision))

    return list(geohashes)

In [None]:
geo_rows = []

In [None]:
for _, row in test_df_antenna.iterrows():
    site_lat = row['Site_latitude']
    site_lon = row['Site_longitude']
    azimuth = row['azimuth']
    tilt = row['tilt']
    h = row['antenna_height']
    cell_id = row['Cell_ID']
    site_id = row['Site_ID']
    earfcn = row['EARFCN_DL']
    r_max = max_range(earfcn, h)
    poly = create_sector_polygon(site_lat, site_lon, azimuth, r_max)
    geohashes = polygon_to_geohash8(poly)

    for gh in geohashes:
        geo_rows.append({
            'Geohash': gh,
            'Cell_ID': cell_id,
            'Site_ID': site_id,
            'Site_Latitude': site_lat,
            'Site_Longitude': site_lon,
            'azimuth': azimuth,
            'tilt': tilt,
            'antenna_height': h,
            'EARFCN_DL': earfcn
        })

In [None]:
test_df_regen = pd.DataFrame(geo_rows)

In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
import geohash2
import rasterio
from rasterio.transform import rowcol
from scipy.spatial import cKDTree
from tqdm import tqdm

def enrich_with_environmental_features(prediction_df,
                                       building_data_file,
                                       road_data_file,
                                       elevation_raster_file,
                                       ndvi_files,
                                       population_file):
    """
    Enrich prediction_df with building_count, total_road_length,
    elevation, NDVI, and population_density features.

    Args:
        prediction_df (pd.DataFrame): must contain 'Geohash'
        building_data_file (str): path to OSM buildings shapefile
        road_data_file (str): path to OSM roads shapefile
        elevation_raster_file (str): path to elevation .tif
        ndvi_files (list): list of NDVI raster .tif files
        population_file (str): path to population density CSV

    Returns:
        pd.DataFrame: enriched prediction_df
    """
    tqdm.pandas()

    # --- Buildings ---
    building_data = gpd.read_file(building_data_file).to_crs("EPSG:4326")
    building_data["Centroid"] = building_data.geometry.centroid
    building_data['Geohash'] = building_data["Centroid"].progress_apply(
        lambda pt: geohash2.encode(pt.y, pt.x, precision=8))
    building_counts = building_data.groupby('Geohash').size().reset_index(name='building_count')
    prediction_df = prediction_df.merge(building_counts, on='Geohash', how='left')
    prediction_df['building_count'] = prediction_df['building_count'].fillna(0).astype(int)

    # --- Roads ---
    road_data = gpd.read_file(road_data_file)
    road_data = road_data.set_crs("EPSG:4326", allow_override=True).to_crs("EPSG:5234")
    road_data['length_m'] = road_data.geometry.length
    road_data['centroid'] = road_data.geometry.centroid
    centroids_geo = gpd.GeoSeries(road_data["centroid"], crs="EPSG:5234").to_crs("EPSG:4326")
    road_data["Geohash"] = centroids_geo.apply(lambda pt: geohash2.encode(pt.y, pt.x, precision=8))
    road_grouped = road_data.groupby("Geohash").agg(total_road_length=("length_m", "sum")).reset_index()
    prediction_df = prediction_df.merge(road_grouped, on="Geohash", how="left")
    prediction_df['total_road_length'] = prediction_df['total_road_length'].fillna(0).astype(int)

    # --- Elevation ---
    def geohash_to_latlon_center(gh):
        lat, lon, _, _ = geohash2.decode_exactly(gh)
        return lat, lon
    prediction_df['lat'], prediction_df['lon'] = zip(*prediction_df['Geohash'].map(geohash_to_latlon_center))

    def get_raster_value(raster, lon, lat):
        try:
            coords = [(lon, lat)]
            for val in raster.sample(coords):
                return val[0] if val[0] != raster.nodata else np.nan
        except:
            return np.nan

    with rasterio.open(elevation_raster_file) as elev_src:
        prediction_df['elevation'] = prediction_df.progress_apply(
            lambda row: get_raster_value(elev_src, row['lon'], row['lat']), axis=1
        )

    # --- NDVI ---
    rasters = []
    for file in ndvi_files:
        src = rasterio.open(file)
        data = src.read(1).astype(np.float32) / 65535.0
        rasters.append((src, data))

    def get_ndvi(lat, lon):
        for src, ndvi_data in rasters:
            try:
                row, col = rowcol(src.transform, lon, lat)
                if (0 <= row < ndvi_data.shape[0]) and (0 <= col < ndvi_data.shape[1]):
                    return float(ndvi_data[row, col])
            except:
                continue
        return np.nan

    prediction_df["NDVI"] = prediction_df.progress_apply(
        lambda row: get_ndvi(row["lat"], row["lon"]), axis=1
    )

    # --- Population density ---
    df_pop = pd.read_csv(population_file)
    gdf = gpd.GeoDataFrame(prediction_df,
                           geometry=gpd.points_from_xy(prediction_df.lon, prediction_df.lat),
                           crs="EPSG:4326").to_crs("EPSG:5234")
    gdf_pop = gpd.GeoDataFrame(df_pop,
                               geometry=gpd.points_from_xy(df_pop.X, df_pop.Y),
                               crs="EPSG:4326").to_crs("EPSG:5234")
    points = np.array(list(zip(gdf.geometry.x, gdf.geometry.y)))
    pop_points = np.array(list(zip(gdf_pop.geometry.x, gdf_pop.geometry.y)))
    population_tree = cKDTree(pop_points)
    distances, indices = population_tree.query(points, distance_upper_bound=1000)

    prediction_df['population_density'] = [gdf_pop.iloc[i]['Z'] if i < len(gdf_pop) else 0
                                           for i in tqdm(indices)]

    # --- Cleanup ---
    return prediction_df.drop(columns=['lat', 'lon'], errors='ignore')

In [None]:
test_df_regen = enrich_with_environmental_features(
    test_df_regen,
    building_data_file="/content/drive/MyDrive/Final_year_project/building/gis_osm_buildings_a_free_1.shp",
    road_data_file="/content/drive/MyDrive/Final_year_project/roads/gis_osm_roads_free_1.shp",
    elevation_raster_file="/content/drive/MyDrive/Final_year_project/terrain/elevation.tif",
    ndvi_files=[
        "/content/drive/MyDrive/Final_year_project/vegetation/ndvi_west.tiff",
        "/content/drive/MyDrive/Final_year_project/vegetation/ndvi_east.tiff"
    ],
    population_file="/content/drive/MyDrive/Final_year_project/population/population_density.csv"
)

In [None]:
import geohash2
import pandas as pd

def add_geohash_centroids(prediction_df, geohash_col="Geohash"):
    """
    Adds centroid latitude and longitude of geohashes in prediction_df.

    Args:
        prediction_df (pd.DataFrame): must contain a 'Geohash' column
        geohash_col (str): name of the column with geohash strings (default: "Geohash")

    Returns:
        pd.DataFrame: with two new columns 'lat' and 'lon'
    """
    def geohash_to_latlon_center(gh):
        lat, lon, _, _ = geohash2.decode_exactly(gh)
        return lat, lon

    prediction_df = prediction_df.copy()
    prediction_df['lat'], prediction_df['lon'] = zip(*prediction_df[geohash_col].map(geohash_to_latlon_center))
    return prediction_df

In [None]:
test_df_regen = add_geohash_centroids(test_df_regen)

In [None]:
test_df_regen = test_df_regen.rename(columns={
    'Site_Latitude': 'Site_latitude',
    'Site_Longitude': 'Site_longitude'
})

In [None]:
from tqdm import tqdm

test_df_cartesian = pd.DataFrame()

for cell_id, group in tqdm(test_df_regen.groupby("Cell_ID")):
    site_lat = group["Site_latitude"].iloc[0]
    site_lon = group["Site_longitude"].iloc[0]

    group_cartesian = latlon_to_cartesian(group, site_lat, site_lon,
                                          lat_col="lat", lon_col="lon")
    test_df_cartesian = pd.concat([test_df_cartesian, group_cartesian])

In [None]:
test_df_rot_ind = pd.DataFrame()

In [None]:
for cell_id, group in test_df_cartesian.groupby("Cell_ID"):
    print(f"Cell ID: {cell_id}\n")
    group = group.copy()
    group["angle_deg"] = np.degrees(np.arctan2(group["y"], group["x"])) % 360
    group["folded_angle"] = group["angle_deg"].progress_apply(mirror_azimuth)
    group["rotation_angle_deg"] = -(group["angle_deg"] - group["folded_angle"])
    group["rotation_angle_rad"] = np.deg2rad(group["rotation_angle_deg"])

    cos_a = np.cos(group["rotation_angle_rad"])
    sin_a = np.sin(group["rotation_angle_rad"])
    x_new = group["x"] * cos_a - group["y"] * sin_a
    y_new = group["x"] * sin_a + group["y"] * cos_a

    group["x"] = x_new
    group["y"] = y_new

    test_df_rot_ind = pd.concat([test_df_rot_ind, group])

In [None]:
features = [
    'EARFCN_DL',
    'antenna_height',
    'azimuth',
    'tilt',
    'building_count',
    'total_road_length',
    'elevation',
    'NDVI',
    'population_density'
]

positional_encoding = ['x', 'y']

target = ['RSRP', 'RSRQ']

In [None]:
feature_scalers = {}
for col in features:
    scaler = StandardScaler()
    train_df[col] = scaler.fit_transform(train_df[[col]])
    feature_scalers[col] = scaler

target_scalers = {}
for col in target:
    scaler = StandardScaler()
    train_df[col] = scaler.fit_transform(train_df[[col]])
    target_scalers[col] = scaler

In [None]:
for col in features:
    scaler = feature_scalers[col]
    test_df_processed[col] = scaler.transform(test_df_processed[[col]])

In [None]:
def prepare_prediction_tensor(df, seq_len, feature_cols, pos_cols):
    N = (len(df) // seq_len) * seq_len
    if N == 0:
        raise ValueError("Not enough rows in prediction_df_processed for one full sequence.")

    df = df.iloc[:N]
    num_seq = N // seq_len

    X_tensor = torch.tensor(df[feature_cols].values, dtype=torch.float32).view(num_seq, seq_len, -1)
    pos_tensor = torch.tensor(df[pos_cols].values, dtype=torch.float32).view(num_seq, seq_len, -1)
    return X_tensor, pos_tensor

In [None]:
sequence_length = 256
X_pred, pos_pred = prepare_prediction_tensor(
    test_df_processed,
    sequence_length,
    features,
    ['x', 'y']
)

In [None]:
# Transformer only (+FFNNs)

import torch.nn.functional as F
import torch.nn as nn
import torch
import numpy as np

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.qkv_proj = nn.Linear(d_model, d_model * 3)
        self.out_proj = nn.Linear(d_model, d_model)

    def forward(self, x):
        B, S, D = x.shape
        qkv = self.qkv_proj(x).reshape(B, S, self.num_heads, 3 * self.d_k).transpose(1, 2)
        Q, K, V = qkv.chunk(3, dim=-1)
        scores = Q @ K.transpose(-2, -1) / np.sqrt(self.d_k)
        attn = F.softmax(scores, dim=-1)
        context = attn @ V
        context = context.transpose(1, 2).reshape(B, S, D)
        return self.out_proj(context)

class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.5):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.norm1(x + self.dropout(self.attn(x)))
        return self.norm2(x + self.dropout(self.ff(x)))

class TransformerModel(nn.Module):
    def __init__(self, input_dim, output_dim, d_model=128, num_heads=4, num_layers=6, d_ff=256):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_proj = nn.Linear(2, d_model)
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        self.output_layer = nn.Linear(d_model, output_dim)

    def forward(self, x, pos):
        x = self.input_proj(x) + self.pos_proj(pos)
        for layer in self.layers:
            x = layer(x)
        return self.output_layer(x)

In [None]:
# Setup the fine-tuned model location

In [None]:
input_dim = len(features)
output_dim = len(target)
d_model = 128
num_heads = 4
num_layers = 6
d_ff = 256

inf_model = TransformerModel(
    input_dim=input_dim,
    output_dim=output_dim,
    d_model=d_model,
    num_heads=num_heads,
    num_layers=num_layers,
    d_ff=d_ff
).to(device)

best_model_path = "/content/best_model.pt"
best_model = torch.load(best_model_path, map_location=device)

inf_model.load_state_dict(best_model["model_state_dict"])

In [None]:
inf_model.eval()
with torch.no_grad():
    X_pred, pos_pred = X_pred.to(device), pos_pred.to(device)
    preds = inf_model(X_pred, pos_pred).cpu().numpy()

In [None]:
preds_flat = preds.reshape(-1, preds.shape[-1])

# Inverse scale each target
preds_df = pd.DataFrame(preds_flat, columns=target)
for col in target:
    scaler = target_scalers[col]
    preds_df[col] = scaler.inverse_transform(preds_df[[col]])

preds_df.head()

In [None]:
test_results = test_df_processed.iloc[:len(preds_df)].copy()
test_results[['RSRP', 'RSRQ']] = preds_df[['RSRP', 'RSRQ']]

In [None]:
for col in features:
    scaler = feature_scalers[col]
    test_results[col] = scaler.inverse_transform(test_results[[col]])
test_results.head()

In [None]:
# Set the locations for Prediction and Actual visualisations

In [None]:
import pandas as pd
import folium
import geohash
import geohash2

# --- Select the site of interest ---
site_id = str(input("Enter the site ID: "))   # <-- change this to the Site_ID you want
site_df = test_results_processed[test_results_processed['Site_ID'] == site_id].copy()

# Extract all cells belonging to that site
site_cells = site_df['Cell_ID'].unique()
print(f"Plotting {len(site_cells)} cells for site {site_id}")

# Use precision 8 for geohash aggregation
site_df['geohash8'] = site_df['Geohash'].str[:8]

# --- Aggregate average RSRP for each geohash ---
agg_data = []
for gh, group in site_df.groupby('geohash8'):
    lat, lon = geohash.decode(gh)
    avg_rsrp = group['RSRP'].mean()
    count = len(group)
    cell_ids = group['Cell_ID'].unique().tolist()
    agg_data.append((gh, lat, lon, avg_rsrp, count, cell_ids))

agg_df = pd.DataFrame(agg_data, columns=['geohash8', 'avg_lat', 'avg_lon', 'avg_RSRP', 'point_count', 'Cell_IDs'])

# --- Color mapping for RSRP ---
def rsrp_to_color(rsrp):
    if rsrp == 0 or pd.isna(rsrp):
        return "#606060"  # gray
    elif rsrp <= -85:
        return "red"
    elif rsrp <= -75:
        return "orange"
    elif rsrp <= -65:
        return "yellow"
    else:
        return "green"

# --- Initialize map centered on site ---
center_lat = site_df['Site_latitude'].mean()
center_lon = site_df['Site_longitude'].mean()
m_site = folium.Map(location=[center_lat, center_lon], zoom_start=13)

# --- Draw rectangles for all geohash cells ---
for _, row in agg_df.iterrows():
    bbox = geohash2.decode_exactly(row['geohash8'])
    lat, lon, lat_err, lon_err = bbox
    lat_min, lat_max = lat - lat_err, lat + lat_err
    lon_min, lon_max = lon - lon_err, lon + lon_err

    color = rsrp_to_color(row['avg_RSRP'])
    cell_list = ", ".join(map(str, row['Cell_IDs']))
    popup_text = (
        f"<b>Site:</b> {site_id}<br>"
        f"<b>Cells:</b> {cell_list}<br>"
        f"Points: {row['point_count']}<br>"
        f"Avg RSRP: {row['avg_RSRP']:.2f} dBm"
    )

    folium.Rectangle(
        bounds=[[lat_min, lon_min], [lat_max, lon_max]],
        color=color,
        fill=True,
        fill_opacity=0.6,
        popup=popup_text
    ).add_to(m_site)

# --- Plot site marker ---
site_lat = site_df['Site_latitude'].iloc[0]
site_lon = site_df['Site_longitude'].iloc[0]
folium.Marker(
    location=[site_lat, site_lon],
    icon=folium.Icon(color='black', icon='signal', prefix='fa'),
    popup=f"Site ID: {site_id}"
).add_to(m_site)

# --- Title ---
title_html = f'''
<div style="
    position: fixed;
    top: 10px;
    left: 50%;
    transform: translateX(-50%);
    z-index: 9999;
    font-size: 22px;
    font-weight: bold;
    background-color: white;
    padding: 5px 10px;
    border: 2px solid grey;
    border-radius: 5px;
    box-shadow: 2px 2px 5px rgba(0,0,0,0.3);
">
RSRP Distribution Map — Site {site_id} (Prediction)
</div>
'''
m_site.get_root().html.add_child(folium.Element(title_html))

# --- Legend ---
legend_html = '''
<div style="position: fixed;
            bottom: 50px; left: 50px; width: 200px; height: 160px;
            border:2px solid grey; z-index:9999; font-size:14px;
            background-color:white;
            padding:5px;">
<b>RSRP (dBm)</b><br>
<i style="background:red;color:red">....</i>&nbsp; ≤ -85 (Very Weak)<br>
<i style="background:orange;color:orange">....</i>&nbsp; -85 to -75 (Weak–Medium)<br>
<i style="background:yellow;color:yellow">....</i>&nbsp; -75 to -65 (Medium–Strong)<br>
<i style="background:green;color:green">....</i>&nbsp; > -65 (Very Strong)<br>
<i style="background:#606060;color:#606060">....</i>&nbsp; Zero/No Data
</div>
'''
m_site.get_root().html.add_child(folium.Element(legend_html))

m_site.save(f'site_{site_id}_prediction_map.html')

In [None]:
import pandas as pd
import folium
import geohash
import geohash2

# --- Select the site of interest ---
site_id = str(input("Enter the site ID: "))   # <-- change this to the Site_ID you want
site_df = test_df[test_df['Site_ID'] == site_id].copy()

# Extract all cells belonging to that site
site_cells = site_df['Cell_ID'].unique()
print(f"Plotting {len(site_cells)} cells for site {site_id}")

# Use precision 8 for geohash aggregation
site_df['geohash8'] = site_df['Geohash'].str[:8]

# --- Aggregate average RSRP for each geohash ---
agg_data = []
for gh, group in site_df.groupby('geohash8'):
    lat, lon = geohash.decode(gh)
    avg_rsrp = group['RSRP'].mean()
    count = len(group)
    cell_ids = group['Cell_ID'].unique().tolist()
    agg_data.append((gh, lat, lon, avg_rsrp, count, cell_ids))

agg_df = pd.DataFrame(agg_data, columns=['geohash8', 'avg_lat', 'avg_lon', 'avg_RSRP', 'point_count', 'Cell_IDs'])

# --- Color mapping for RSRP ---
def rsrp_to_color(rsrp):
    if rsrp == 0 or pd.isna(rsrp):
        return "#606060"  # gray
    elif rsrp <= -85:
        return "red"
    elif rsrp <= -75:
        return "orange"
    elif rsrp <= -65:
        return "yellow"
    else:
        return "green"

# --- Initialize map centered on site ---
center_lat = site_df['Site_latitude'].mean()
center_lon = site_df['Site_longitude'].mean()
m_site = folium.Map(location=[center_lat, center_lon], zoom_start=13)

# --- Draw rectangles for all geohash cells ---
for _, row in agg_df.iterrows():
    bbox = geohash2.decode_exactly(row['geohash8'])
    lat, lon, lat_err, lon_err = bbox
    lat_min, lat_max = lat - lat_err, lat + lat_err
    lon_min, lon_max = lon - lon_err, lon + lon_err

    color = rsrp_to_color(row['avg_RSRP'])
    cell_list = ", ".join(map(str, row['Cell_IDs']))
    popup_text = (
        f"<b>Site:</b> {site_id}<br>"
        f"<b>Cells:</b> {cell_list}<br>"
        f"Points: {row['point_count']}<br>"
        f"Avg RSRP: {row['avg_RSRP']:.2f} dBm"
    )

    folium.Rectangle(
        bounds=[[lat_min, lon_min], [lat_max, lon_max]],
        color=color,
        fill=True,
        fill_opacity=0.6,
        popup=popup_text
    ).add_to(m_site)

# --- Plot site marker ---
site_lat = site_df['Site_latitude'].iloc[0]
site_lon = site_df['Site_longitude'].iloc[0]
folium.Marker(
    location=[site_lat, site_lon],
    icon=folium.Icon(color='black', icon='signal', prefix='fa'),
    popup=f"Site ID: {site_id}"
).add_to(m_site)

# --- Title ---
title_html = f'''
<div style="
    position: fixed;
    top: 10px;
    left: 50%;
    transform: translateX(-50%);
    z-index: 9999;
    font-size: 22px;
    font-weight: bold;
    background-color: white;
    padding: 5px 10px;
    border: 2px solid grey;
    border-radius: 5px;
    box-shadow: 2px 2px 5px rgba(0,0,0,0.3);
">
RSRP Distribution Map — Site {site_id} (Actual)
</div>
'''
m_site.get_root().html.add_child(folium.Element(title_html))

# --- Legend ---
legend_html = '''
<div style="position: fixed;
            bottom: 50px; left: 50px; width: 200px; height: 160px;
            border:2px solid grey; z-index:9999; font-size:14px;
            background-color:white;
            padding:5px;">
<b>RSRP (dBm)</b><br>
<i style="background:red;color:red">....</i>&nbsp; ≤ -85 (Very Weak)<br>
<i style="background:orange;color:orange">....</i>&nbsp; -85 to -75 (Weak–Medium)<br>
<i style="background:yellow;color:yellow">....</i>&nbsp; -75 to -65 (Medium–Strong)<br>
<i style="background:green;color:green">....</i>&nbsp; > -65 (Very Strong)<br>
<i style="background:#606060;color:#606060">....</i>&nbsp; Zero/No Data
</div>
'''
m_site.get_root().html.add_child(folium.Element(legend_html))

m_site.save(f'site_{site_id}_actual_map.html')

In [None]:
import pandas as pd
import numpy as np
import folium
import geohash
import matplotlib.colors as mcolors
from scipy.stats import pearsonr

def compare_rsrp_maps_black_missing(df_pred, df_actual, site_id, metric='RSRP', generate_map=True):
    """
    Compare predicted vs actual RSRP maps for a given Cell_ID, including all Geohash cells.
    Missing cells are colored black on the difference map.

    Args:
        df_pred (pd.DataFrame): Prediction Data Frame
        df_actual (pd.DataFrame): Actual Data Frame
        generate_map (bool): If True, generates folium difference map.

    Returns:
        dict: similarity metrics and optionally folium map object.
    """

    df_pred = df_pred[df_pred['Site_ID'] == site_id].copy()
    df_actual = df_actual[df_actual['Site_ID'] == site_id].copy()

    if df_pred.empty or df_actual.empty:
        print(f"No data found for Cell ID {cell_id}")
        return {}

    def aggregate(df):
        agg_data = []
        for gh, group in df.groupby('Geohash'):
            lat, lon = geohash.decode(gh)
            avg_val = group[metric].mean()
            count = len(group)
            agg_data.append((gh, lat, lon, avg_val, count))
        return pd.DataFrame(agg_data, columns=['Geohash', 'avg_lat', 'avg_lon', f'avg_{metric}', 'point_count'])

    agg_pred = aggregate(df_pred)
    agg_actual = aggregate(df_actual)

    merged = pd.merge(
        agg_pred[['Geohash', 'avg_RSRP', 'avg_lat', 'avg_lon']],
        agg_actual[['Geohash', 'avg_RSRP']],
        on='Geohash',
        how='outer',
        suffixes=('_pred', '_actual')
    )

    merged['avg_lat'] = merged['avg_lat'].combine_first(
        merged['Geohash'].apply(lambda gh: geohash.decode(gh)[0])
    )
    merged['avg_lon'] = merged['avg_lon'].combine_first(
        merged['Geohash'].apply(lambda gh: geohash.decode(gh)[1])
    )

    overlap = merged.dropna(subset=['avg_RSRP_pred', 'avg_RSRP_actual'])
    if len(overlap) > 1:
        corr, _ = pearsonr(overlap['avg_RSRP_pred'], overlap['avg_RSRP_actual'])
        mae = np.mean(np.abs(overlap['avg_RSRP_pred'] - overlap['avg_RSRP_actual']))
        rmse = np.sqrt(np.mean((overlap['avg_RSRP_pred'] - overlap['avg_RSRP_actual'])**2))
        similarity_score = (corr + 1) / 2
        status = "Exact overlap"
    else:
        from sklearn.neighbors import NearestNeighbors

        pred_valid = merged.dropna(subset=['avg_lat', 'avg_lon', 'avg_RSRP_pred']).copy()
        actual_valid = merged.dropna(subset=['avg_lat', 'avg_lon', 'avg_RSRP_actual']).copy()

        if len(pred_valid) > 1 and len(actual_valid) > 1:
            nn = NearestNeighbors(n_neighbors=1).fit(actual_valid[['avg_lat', 'avg_lon']])
            distances, indices = nn.kneighbors(pred_valid[['avg_lat', 'avg_lon']])

            paired_pred_rsrp = pred_valid['avg_RSRP_pred'].values
            paired_actual_rsrp = actual_valid.iloc[indices.flatten()]['avg_RSRP_actual'].values

            # Now both have equal length
            mae = np.mean(np.abs(paired_pred_rsrp - paired_actual_rsrp))
            rmse = np.sqrt(np.mean((paired_pred_rsrp - paired_actual_rsrp)**2))
            corr = np.corrcoef(paired_pred_rsrp, paired_actual_rsrp)[0, 1]
            similarity_score = (corr + 1) / 2
            status = "Spatially approximated (no exact overlap)"
        else:
            mae = rmse = np.nan
            corr = np.nan
            similarity_score = 0.0
            status = "No overlap or insufficient data"

    results = {
        'Site ID': site_id,
        'MAE': mae,
        'RMSE': rmse,
        'Pearson Correlation': corr,
        'Similarity Score (0-1)': similarity_score,
        'Status': status
    }

    if generate_map:
        merged['RSRP_diff'] = merged['avg_RSRP_pred'] - merged['avg_RSRP_actual']
        center_lat = merged['avg_lat'].mean()
        center_lon = merged['avg_lon'].mean()

        vmax = merged['RSRP_diff'].abs().quantile(0.95)
        if vmax < 2: vmax = 2
        norm_diff = mcolors.TwoSlopeNorm(vmin=-vmax, vcenter=0, vmax=vmax)
        cmap_diff = mcolors.LinearSegmentedColormap.from_list("", ["blue","white","red"])

        m_diff = folium.Map(location=[center_lat, center_lon], zoom_start=15)

        for _, row in merged.iterrows():
            bbox = geohash.bbox(row['Geohash'])

            if pd.isna(row['avg_RSRP_pred']) or pd.isna(row['avg_RSRP_actual']):
                color = "#000000"  # black for missing cells
                popup_text = "Cell missing in one dataset"
            else:
                color = mcolors.to_hex(cmap_diff(norm_diff(row['RSRP_diff'])))
                popup_text = f"RSRP Diff (Pred - Actual): {row['RSRP_diff']:.2f} dBm"

            folium.Rectangle(
                bounds=[[bbox['s'], bbox['w']], [bbox['n'], bbox['e']]],
                color=color, fill=True, fill_opacity=0.6, popup=popup_text
            ).add_to(m_diff)

        # --- Add legend ---
        legend_html = '''
        <div style="position: fixed; bottom: 50px; left: 50px; width: 240px;
                    background-color: white; border:2px solid grey; z-index:9999;
                    padding:10px; font-size:14px;">
        <b>RSRP Difference (Pred - Actual)</b><br>
        <i style="background:blue;color:blue">....</i>&nbsp; Under-predicted<br>
        <i style="background:white;color:white;border:1px solid grey">....</i>&nbsp; Accurate<br>
        <i style="background:red;color:red">....</i>&nbsp; Over-predicted<br>
        <i style="background:black;color:black">....</i>&nbsp; Missing cell
        </div>
        '''
        m_diff.get_root().html.add_child(folium.Element(legend_html))
        results['Difference Map'] = m_diff

    return results

In [None]:
# setup the validation map outputs for each site

In [None]:
site_id = str(input("Enter the Site ID: "))
result = compare_rsrp_maps_black_missing(
    df_pred=test_results_processed,
    df_actual=test_df,
    site_id=site_id,
    generate_map=True
)

In [None]:
results = []
for site_id in test_df['Site_ID'].unique().tolist():
    result = compare_rsrp_maps_black_missing(
        df_pred=test_results_processed,
        df_actual=test_df,
        site_id=site_id,
        generate_map=False
    )
    results.append(result)

In [None]:
results_df = pd.DataFrame(results)
results_df.to_csv('/content/drive/MyDrive/Final_year_project/models/1026v2/results1026v2.csv', index=False)
results_df.head()

In [None]:
overall_metrics = {
    "Mean MAE": results_df["MAE"].mean(),
    "Mean RMSE": results_df["RMSE"].mean(),
    "Mean Pearson Correlation": results_df["Pearson Correlation"].mean(),
    "Mean Similarity Score (0-1)": results_df["Similarity Score (0-1)"].mean(),
    "Total Sites": len(results_df)
}

In [None]:
print("=== Overall Evaluation Metrics ===")
for k, v in overall_metrics.items():
    print(f"{k}: {v:.4f}")