# Preprocessing

In [None]:
# Libraries
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import os
from PIL import Image
import numpy as np
from sklearn.model_selection import train_test_split
import shutil
import cv2
import py360convert
import re
import folium
import plotly.express as px
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import img_to_array, array_to_img
import math
import os
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from math import sqrt
from torchvision.utils import save_image
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
import time
from tqdm import tqdm

We firstly inspect the dataset

In [None]:
# We do a first inspection, i.e. do the images have the same size, are we able to detect the black frames, etc.?
image_folder = 'C:/Users/Nutzer/Downloads/images(1)/images'

def inspect_images(image_root, num_images_per_folder=3):
    for subfolder in sorted(os.listdir(image_root)):
        subfolder_path = os.path.join(image_root, subfolder)
        if not os.path.isdir(subfolder_path):
            continue

        print(f"Inspecting images from folder: '{subfolder}'")
        images = [f for f in os.listdir(subfolder_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        print(f"  Total images: {len(images)}")

        # Sample image inspection
        for img_name in images[:num_images_per_folder]:
            img_path = os.path.join(subfolder_path, img_name)
            with Image.open(img_path) as img:
                width, height = img.size
                np_img = np.array(img)
                img_mean_pixel_value = np.mean(np_img)

                # Check for black frame (at right edge)
                right_edge = np_img[:, -1, :]
                right_edge_mean = np.mean(right_edge)

                print(f"Image: {img_name}")
                print(f"Dimensions: {width}px (width) × {height}px (height)")
                print(f"Mean Pixel Value: {img_mean_pixel_value:.2f}")
                if right_edge_mean < 10:
                    print(f"Possible black frame detected {right_edge_mean:.2f})")
                else:
                    print(f"No black frame detected {right_edge_mean:.2f})")

if __name__ == "__main__":
    inspect_images(image_folder)


The images are quite big, we downsize them a little. We detected black frames that have to be removed or images that are entirely black.

We resize once and take out black borders

In [None]:
original_dir = 'C:/Users/Nutzer/Downloads/images(1)/images'
processed_dir = 'processed_images_final'
resize_dim_final = (4096, 2048) 

# resize function
def resize_final(img):
    return img.resize(resize_dim_final, Image.LANCZOS)

# crop function
def remove_black_borders(img, threshold=10):
    img_np = np.array(img)
    gray = np.mean(img_np, axis=2)
    mask = gray > threshold
    coords = np.argwhere(mask)

    if coords.size == 0:
        # fully black = just return image 
        print("Image fully black")
        return img 

    y0, x0 = coords.min(axis=0)
    y1, x1 = coords.max(axis=0) + 1
    return img.crop((x0, y0, x1, y1))

# Processing loop
panorama_images, labels = [], []

for country in os.listdir(original_dir):
    country_path = os.path.join(original_dir, country)
    if not os.path.isdir(country_path):
        continue

    for img_name in os.listdir(country_path):
        if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
            img_path = os.path.join(country_path, img_name)

            img = Image.open(img_path)
            img_resized = resize_final(img)
            img_cropped = remove_black_borders(img_resized)

            # Save directly
            final_country_dir = os.path.join(processed_dir, country)
            os.makedirs(final_country_dir, exist_ok=True)
            final_img_path = os.path.join(final_country_dir, img_name)
            img_cropped.save(final_img_path, 'JPEG', quality=85)

            panorama_images.append(final_img_path)
            labels.append(country)

            print(f"Saved: {country}/{img_name}")


Image Transformation: The images are equirectangular. Thus, we can extract 4 perspectives from one picture. We do that using the cv2 and py360convert libraries using yaw angles.

We extract 4 views per image

In [None]:
input_dir = "processed_images_final"
output_dir = "processed_images_final_perspectives"
os.makedirs(output_dir, exist_ok=True)

# dimensions, field-of-view, number of perspectives
img_width, img_height = 1024, 1024)
fov_deg = (90, 90)
num_views = 4

def extract_views(equi_img, num_views, fov_deg, out_hw):
    yaw_angles = np.linspace(-180, 180, num_views, endpoint=False)
    views = []
    for yaw in yaw_angles:
        perspective = py360convert.e2p(
            equi_img,
            fov_deg=fov_deg,
            u_deg=yaw,
            v_deg=0,
            out_hw=out_hw,
            mode='bilinear'
        )
        views.append(perspective)
    return views

# apply to each country folder
for country in os.listdir(input_dir):
    country_path = os.path.join(input_dir, country)
    if not os.path.isdir(country_path):
        continue

    # output directory
    output_country_path = os.path.join(output_dir, country)
    os.makedirs(output_country_path, exist_ok=True)

    # process each image file in country folder
    for img_name in os.listdir(country_path):
        if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
            continue

        input_img_path = os.path.join(country_path, img_name)
        equi_img = cv2.imread(input_img_path)
        if equi_img is None:
            print(f"Warning: Could not read image {input_img_path}")
            continue

        # BGR to RGB for processing
        equi_img = cv2.cvtColor(equi_img, cv2.COLOR_BGR2RGB)

        # extract perspective views from equirectangular image
        views = extract_views(equi_img, num_views, fov_deg, (img_height, img_width))

        # save each perspective view -> filename that starts with the original name
        for i, view in enumerate(views):
            base_name, ext = os.path.splitext(img_name)
            new_filename = f"{base_name}_view_{i+1}.jpg"
            save_path = os.path.join(output_country_path, new_filename)
            
            # Convert RGB back to BGR for cv2.imwrite
            view_bgr = cv2.cvtColor(view, cv2.COLOR_RGB2BGR)
            cv2.imwrite(save_path, view_bgr)

        # example to check
        print(f"Processed {img_name} in {country} -> saved {num_views} perspective views.")

print("All images processed and saved.")

Inspecting Corrupted Images: When going through the images manually, we detected some entirely black images. As they provide no useful information for learning and prediction, they should be removed from the entire dataset. Additionally, we saw some images with an odd color spectrum that likely came from tunnels. We want to find out how many those are to make a decision on how to deal with them.

In [None]:
dir = "processed_images_final_perspectives"

# load an image and check whether it's abnormal
def check_abnormal(image_path):
    img = cv2.imread(image_path)
    if img is None:
        return None, "error"
    
    # convert again from BGR to RGB for processing
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # mean color value over all pixels (per channel)
    mean_colors = np.mean(img_rgb, axis=(0, 1))
    red, green, blue = mean_colors

    # check for completely black images
    if np.all(mean_colors < 10):
        return img_rgb, "Completely black"

    # check for tunnel pictures (heuristic: red is very high, others are low)
    if red > 200 and green < 150 and blue < 150:
        return img_rgb, "Tunnel"

    return None, None 

# loop over all images in the data
for country in os.listdir(dir):
    country_path = os.path.join(dir, country)
    if not os.path.isdir(country_path):
        continue

    for img_name in os.listdir(country_path):
        image_path = os.path.join(country_path, img_name)
        abnormal_img, reason = check_abnormal(image_path)
        
        # if an abnormal image is found
        if abnormal_img is not None:
            print(f"abnormal image found: {image_path} - reason: {reason}")

        # because we found only 8 pictures with tunnels (2 locations), 
        # we will delete them globally as they won't contribute to model training and could influence it in a bad way
        if abnormal_img is not None:
            os.remove(image_path)
            print(f"Deleted {image_path} - {reason}")


To get an overview, we explore the positions of the places portrayed via a map. Additionally, we check whether all the locations are actually in the 13 countries.

In [None]:
# we have filenames like 1741691006_-30.9927065_-68.8548332_view_1.jpg that contain the coordinates
pattern = re.compile(r'(\d+)_(-?\d+\.\d+)_(-?\d+\.\d+)_view_(\d+)(?:\.jpg)?$')

coords = []

# check all subdirectories and files
for subdir, dirs, files in os.walk(dir):
    for filename in files:
        match = pattern.match(filename)
        if match:
            timestamp, lat, lon, view = match.groups()
            coords.append((float(lat), float(lon)))
        else:
            pass

# remove duplicate coordinates since each location has 4 views
unique_coords = list(set(coords))

if not unique_coords:
    print("No coordinates extracted.")
else:

    # dataframe with unique coordinates
    df = pd.DataFrame(unique_coords, columns=['lat', 'lon'])
    print("coordinates:")
    print(df.head())

    # interactive map with plotly express, save as html file 
    fig = px.scatter_geo(
        df,
        lat='lat',
        lon='lon',
        title="Image Locations Map",
        projection="natural earth",
        hover_name="lat"
    )

    fig.update_traces(marker=dict(color='red', size=5))
    fig.write_html("map.html")

In [None]:
# we detected some suspicious markers in the Indian Ocean. Investigate points where lat < -10 and lon between 50 and 100.
df_sus = df[(df['lat'] < -10) & (df['lon'] > 50) & (df['lon'] < 100)]
print("sus coordinates:")
print(df_sus)

In [None]:
# investigate suspicious coordinates (-> probably belong to France, e.g. La Reunion)
# store tuples of latitude, longitude, full_file_path
coords_with_file = []

# check all images
for subdir, dirs, files in os.walk(dir):
    for filename in files:
        match = pattern.match(filename)
        if match:
            timestamp, lat, lon, view = match.groups()
            full_path = os.path.join(subdir, filename)
            coords_with_file.append((float(lat), float(lon), full_path))

# dataframe 
df = pd.DataFrame(coords_with_file, columns=['lat', 'lon', 'filepath'])

# suspicious coordinates:
suspicious_coords = [
    (-21.058772, 55.368674),
    (-21.221322, 55.680430),
    (-21.232792, 55.667724)
]

# tolerance for floating point precision issues
def is_close(row, coord, tol=1e-6):
    return (abs(row['lat'] - coord[0]) < tol) and (abs(row['lon'] - coord[1]) < tol)

# print the corresponding files for each suspicious coordinate
for coord in suspicious_coords:
    filtered = df[df.apply(lambda row: is_close(row, coord), axis=1)]
    print("\nsuspicious coordinate {}:".format(coord))
    if filtered.empty:
        print("no files found")
    else:
        print(filtered[['filepath']])


Note: Those pictures have to be treated as outliers. The vegetation might be very different to those of the rest of France and, more important, if we aim to predict coordinates, 3 distinct locations for one region are not enough to build the model on. We assume that the purpose of the project is to make predictions about locations in the mainland of France which is why we delete the images from La Réunion, treating them as flaws in the data.

In [None]:
# delete suspicious files
for subdir, dirs, files in os.walk(dir):
    for filename in files:
        match = pattern.match(filename)
        if match:
            timestamp, lat_str, lon_str, view = match.groups()
            lat, lon = float(lat_str), float(lon_str)
            file_coord = (lat, lon)

            # check again
            if any(is_close(file_coord, sc) for sc in suspicious_coords):
                full_path = os.path.join(subdir, filename)
                print(f"deleted {full_path}")
                os.remove(full_path)


# Splitting the data and exploring training data

We Split the dataset

In [None]:
train_dir = "final_dataset/train"
val_dir = "final_dataset/validation"
test_dir = "final_dataset/test"

os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# split ratios: 70% training, 15% validation, 15% test
first_split_ratio = 0.3  # 30% for temporary set (val + test)
second_split_ratio = 0.5  # split temp equally into validation and test

# every folder with countries
for country in os.listdir(dir):
    country_input_path = os.path.join(dir, country)
    if not os.path.isdir(country_input_path):
        continue

    # list images 
    images = [f for f in os.listdir(country_input_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    if not images:
        continue

    # 1st split: training vs. temporary (val+test)
    train_imgs, temp_imgs = train_test_split(images, test_size=first_split_ratio, random_state=42)
    
    # 2nd split: split temporary set equally into validation and test
    val_imgs, test_imgs = train_test_split(temp_imgs, test_size=second_split_ratio, random_state=42)
    
    # output folders for this country in train, validation, and test directories
    country_train_dir = os.path.join(train_dir, country)
    country_val_dir = os.path.join(val_dir, country)
    country_test_dir = os.path.join(test_dir, country)
    os.makedirs(country_train_dir, exist_ok=True)
    os.makedirs(country_val_dir, exist_ok=True)
    os.makedirs(country_test_dir, exist_ok=True)
    
    # copy training images
    for img in train_imgs:
        src = os.path.join(country_input_path, img)
        dst = os.path.join(country_train_dir, img)
        shutil.copy(src, dst)
    
    # copy validation images
    for img in val_imgs:
        src = os.path.join(country_input_path, img)
        dst = os.path.join(country_val_dir, img)
        shutil.copy(src, dst)
    
    # copy test images
    for img in test_imgs:
        src = os.path.join(country_input_path, img)
        dst = os.path.join(country_test_dir, img)
        shutil.copy(src, dst)
    
    print(f"folder '{country}': {len(train_imgs)} train, {len(val_imgs)} validation, {len(test_imgs)} test images")

print("successfully split into train, validation, and test")


We inspect the training set: we look at the number of images per country in the set

In [None]:
# number of images per country and portray samples
# seaborn style
sns.set(style="whitegrid", context="talk", palette="viridis")

train_dir = "final_dataset/train"

# dataframe with the number of images per country
data = []
for country in os.listdir(train_dir):
    country_path = os.path.join(train_dir, country)
    if os.path.isdir(country_path):
        images = [f for f in os.listdir(country_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
        data.append({"country": country, "count": len(images)})

df = pd.DataFrame(data)

# bar plot with n images per country
plt.figure(figsize=(10, 6))
ax = sns.barplot(x="country", y="count", data=df)
plt.title("Number of Training Images per Country")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# add sample image for each country
countries = df['country'].tolist()
n = len(countries)
fig, axes = plt.subplots(nrows=1, ncols=n, figsize=(3 * n, 4))

for ax, country in zip(axes, countries):
    country_path = os.path.join(train_dir, country)
    images = [os.path.join(country_path, f) for f in os.listdir(country_path)
              if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    if images:
        sample_img_path = random.choice(images)
        img = Image.open(sample_img_path)
        ax.imshow(img)
        ax.set_title(country)
        ax.axis("off")
    else:
        ax.set_visible(False)

plt.suptitle("sample training images per country", y=1.05)
plt.tight_layout()
plt.show()


The amount of images per country vary greatly, we want to have 1000 images per country in the training data which is why we do image augmentation: converting a PIL image to a tensor, apply augmentation layers, and then convert it back to a PIL image.

We augment the training dataset to reach 1000 images per country

In [None]:
# n images per country in training data
target_count = 1000

# augmentation layers
data_augmentation_layers = [
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
]

def random_augmentation(pil_img):

    # PIL image to array
    img_array = img_to_array(pil_img)
    img_tensor = tf.expand_dims(img_array, 0)
    
    # augmentation layer
    for aug in data_augmentation_layers:
        img_tensor = aug(img_tensor)
    
    # remove batch dimension
    img_tensor = tf.squeeze(img_tensor, axis=0)

    # convert back
    augmented_pil = array_to_img(img_tensor)
    return augmented_pil

# process everything
for country in os.listdir(train_dir):
    country_path = os.path.join(train_dir, country)
    if not os.path.isdir(country_path):
        continue

    # list of all files
    image_files = [f for f in os.listdir(country_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    current_count = len(image_files)
    print(f"Country '{country}' initially has {current_count} images.")

    augment_index = 1

    # augment until the folder reaches the target count
    while current_count < target_count:
        for img_file in image_files:
            if current_count >= target_count:
                break
            img_path = os.path.join(country_path, img_file)
            pil_img = Image.open(img_path)

            augmented_img = random_augmentation(pil_img)
            base, ext = os.path.splitext(img_file)
            new_filename = f"{base}_aug{augment_index}{ext}"
            new_img_path = os.path.join(country_path, new_filename)
            augmented_img.save(new_img_path, quality=85)
            
            current_count += 1
            augment_index += 1
        
        # include newly generated images in list
        image_files = [f for f in os.listdir(country_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

    print(f"'{country}' now has {current_count} images.\n")


# Geocell Design

The goal is to have a model that is able to predict not only the country of an image but the coordinates of the place portrayed. When searching for a pretrained model, we found the PIGEON project of Lukas Haas (see report). 

The PIGEON model defines a PyTorch neural network model designed for predicting geographic locations from image data. In particular, the author constructed a model for geolocation prediction via geocell classification:
The model predicts a location by classifying an image (or a set of images) into one of many predefined geographical cells (“geocells”). It loads the centroids of these cells from a CSV file and then uses a linear layer to convert image embeddings into a probability distribution over these geocells. The predicted location is determined by selecting the geocell with the highest probability.

The geocell design will be the basis of our model. The plot already showed that the images aren’t uniformly spread over each country’s bounding box. A fixed grid is thus not the best choice. We do k-means clustering per-country.

We perform a K-means clustering method and find the best number of cluster per country

In [None]:
# helper to parse lat/lon and country from a filepath
def parse_fp(fp):
    fn = os.path.basename(fp)
    base = os.path.splitext(fn)[0]
    parts = base.split("_")
    lat, lon = float(parts[1]), float(parts[2])
    segs = os.path.normpath(fp).split(os.sep)
    cidx = segs.index("train") + 1
    country = segs[cidx]
    return country, lat, lon

# coords per country
root = "final_dataset/train"
coords_by_country = {}
for dp, _, fns in os.walk(root):
    for fn in fns:
        if not fn.lower().endswith(".jpg"): continue
        fp = os.path.join(dp, fn)
        country, lat, lon = parse_fp(fp)
        coords_by_country.setdefault(country, []).append((lat, lon))

# elbow
def detect_elbow(Ks, inertias):
    x1, y1 = Ks[0], inertias[0]
    x2, y2 = Ks[-1], inertias[-1]
    denom = sqrt((y2 - y1)**2 + (x2 - x1)**2)
    distances = []
    for x0, y0 in zip(Ks, inertias):
        num = abs((y2 - y1)*x0 - (x2 - x1)*y0 + x2*y1 - y2*x1)
        distances.append(num / denom)
    elbow_idx = int(np.argmax(distances))
    return Ks[elbow_idx]

# find best k via elbow and silhouette
def find_best_k(coords, k_min=2, k_max=20):
    inertias, silhouettes = [], []
    Ks = list(range(k_min, min(k_max, len(coords)-1) + 1))
    for K in Ks:
        km = KMeans(n_clusters=K, random_state=0).fit(coords)
        inertias.append(km.inertia_)
        silhouettes.append(silhouette_score(coords, km.labels_) if K>1 else np.nan)

    # plot
    fig, ax1 = plt.subplots()
    ax1.plot(Ks, inertias, '-o', color='tab:blue', label='Inertia')
    ax1.set_xlabel('K'); ax1.set_ylabel('Inertia', color='tab:blue')
    ax2 = ax1.twinx()
    ax2.plot(Ks, silhouettes, '-o', color='tab:red', label='Silhouette')
    ax2.set_ylabel('Silhouette', color='tab:red')
    plt.title('Elbow & Silhouette Analysis'); plt.show()

    K_elbow      = detect_elbow(Ks, inertias)
    K_silhouette = Ks[int(np.nanargmax(silhouettes))]

    return K_elbow, K_silhouette

# for all countries
for country, coords in coords_by_country.items():
    arr = np.array(coords)
    max_k = min(40, len(arr)//10)
    K_elbow, K_sil = find_best_k(arr, k_min=2, k_max=max_k)
    print(f"{country}: elbow K = {K_elbow}, silhouette K = {K_sil}")


The elbow‐Ks are all in a reasonably range (5–8) and the silhouette‐Ks sometimes collapse to very coarse values (e.g. 2 for France, 40 for Switzerland). We stick to the elbow picks for the final clustering. The elbow point reflects where adding clusters stops giving big gains in compactness, and that yields a useful spatial resolution.

Construct the geocells for each country and save it in the csv file. 
We do that for each dataset

In [None]:
# elbow k's
country_to_K = {
  "argentina":   6,
  "austria":     7,
  "canada":      7,
  "chile":       7,
  "france":      6,    
  "iceland":     8,
  "italy":       5,    
  "japan":       7,
  "new_zealand": 7,
  "norway":      5,
  "peru":        6,
  "switzerland": 7,
}

# 1) rebuild train list
records = []
for dp, _, fns in os.walk("final_dataset/train"):
    for fn in fns:
        if not fn.lower().endswith(".jpg"):
            continue
        fp = os.path.join(dp, fn)
        base = os.path.splitext(fn)[0]
        # filename format: timestamp_lat_lon_view_x
        _, lat, lon, *_ = base.split("_")
        lat, lon = float(lat), float(lon)
        # country folder right after 'train'
        parts = os.path.normpath(fp).split(os.sep)
        country = parts[parts.index("train") + 1]
        records.append({
            "filepath": fp,
            "lat": lat,
            "lon": lon,
            "country": country
        })

df = pd.DataFrame(records)
print("Rebuilt raw train DataFrame:", df.shape)

# cluster per country
train_cells = []
centroids   = []

for country, group in df.groupby("country"):
    coords = group[["lat","lon"]].to_numpy()
    K = country_to_K[country]
    km = KMeans(n_clusters=K, random_state=0).fit(coords)

    # assign 
    g = group.copy()
    g["local_id"]   = km.labels_.astype(str)
    g["geocell_id"] = country + "_" + g["local_id"]
    train_cells.append(g)

    # centroid dataframe
    centers = pd.DataFrame(km.cluster_centers_, 
                           columns=["centroid_lat","centroid_lon"])
    centers["local_id"]   = centers.index.astype(str)
    centers["geocell_id"] = country + "_" + centers["local_id"]
    centers["country"]    = country
    centroids.append(centers)

# results
df_train_cells = pd.concat(train_cells, ignore_index=True)
df_centroids   = pd.concat(centroids,   ignore_index=True)

# label mapping
df_train_cells["label"] = df_train_cells["geocell_id"]\
                               .astype("category")\
                               .cat.codes
df_centroids["label"]    = df_centroids["geocell_id"]\
                               .astype("category")\
                               .cat.codes

# save csv
df_train_cells.to_csv("geocell_train.csv", index=False)
df_centroids  .to_csv("geocell_centroids.csv", index=False)

print("geocell_train.csv saved", df_train_cells.label.nunique(), "classes")
print("geocell_centroids.csv saved", df_centroids.label.nunique(), "centroids")

In [None]:
# for validation data: build lookup: country -> centroid array, geocell_id list, label list
lookup = {}
for country, g in df_centroids.groupby("country"):
    coords = g[["centroid_lat","centroid_lon"]].to_numpy()
    ids    = g["geocell_id"].tolist()
    labels = g["label"].tolist()
    lookup[country] = (coords, ids, labels)

# now process validation folder and assign each image to the nearest centroid
val_recs = []
for dp, _, fns in os.walk("final_dataset/validation"):
    for fn in fns:
        if not fn.lower().endswith(".jpg"):
            continue
        fp = os.path.join(dp, fn)

        # parse lat/lon from filename
        base = os.path.splitext(fn)[0]
        _, lat, lon, *_ = base.split("_")
        lat, lon = float(lat), float(lon)

        # country from folder name
        country = os.path.normpath(fp).split(os.sep)[-2]
        
        cents, ids, labels = lookup[country]
        
        # euclidean distance in lat/lon space
        dists = np.linalg.norm(cents - np.array([lat, lon]), axis=1)
        idx   = int(dists.argmin())
        
        val_recs.append({
            "filepath":     fp,
            "country":      country,
            "geocell_id":   ids[idx],
            "label":        labels[idx],
            "lat":          lat,
            "lon":          lon
        })

df_val = pd.DataFrame(val_recs)
df_val.to_csv("geocell_val.csv", index=False)
print("val complete:", df_val.shape)

In [None]:
# same for test data

# test image to nearest centroid
test_recs = []
for dp, _, fns in os.walk("final_dataset/test"):
    for fn in fns:
        if not fn.lower().endswith(".jpg"):
            continue
        fp = os.path.join(dp, fn)
        base = os.path.splitext(fn)[0]
        _, lat, lon, *_ = base.split("_")
        lat, lon = float(lat), float(lon)
        country = os.path.normpath(fp).split(os.sep)[-2]

        if country not in lookup:
            print(f"Warning: '{country}' not in centroids. Skipping {fp}.")
            continue

        cents, ids, labels = lookup[country]
        dists = np.linalg.norm(cents - np.array([lat, lon]), axis=1)
        idx = int(dists.argmin())

        test_recs.append({
            "filepath":   fp,
            "country":    country,
            "geocell_id": ids[idx],
            "label":      labels[idx],
            "lat":        lat,
            "lon":        lon
        })

df_test = pd.DataFrame(test_recs)
df_test.to_csv("geocell_test.csv", index=False)
print("test complete:", df_test.shape)

# Stage 1: model training

In [None]:
import os
import shutil
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import mixed_precision
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from sklearn.metrics import balanced_accuracy_score, classification_report
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models    import Model, load_model
from tensorflow.keras.layers    import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses    import SparseCategoricalCrossentropy
import math
from collections                import Counter
from sklearn.preprocessing      import LabelEncoder
import pickle


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# paths
orig_train = "/content/drive/MyDrive/Colab Notebooks/data_assign2/final_dataset/train"
orig_val   = "/content/drive/MyDrive/Colab Notebooks/data_assign2/final_dataset/validation"
orig_test = "/content/drive/MyDrive/Colab Notebooks/data_assign2/final_dataset/test"

out_train  = "/content/drive/MyDrive/Colab Notebooks/data_assign2/final_dataset/train_small"
out_val    = "/content/drive/MyDrive/Colab Notebooks/data_assign2/validation_small"
out_test    = "/content/drive/MyDrive/Colab Notebooks/data_assign2/validation_small"

# resize images to 224 x 224
def resize_and_save(input_dir, output_dir, size=(224, 224)):
    os.makedirs(output_dir, exist_ok=True)
    for class_name in os.listdir(input_dir):
        src_class = os.path.join(input_dir, class_name)
        dst_class = os.path.join(output_dir, class_name)
        if not os.path.isdir(src_class):
            continue
        os.makedirs(dst_class, exist_ok=True)
        for fname in os.listdir(src_class):
            src_path = os.path.join(src_class, fname)
            dst_path = os.path.join(dst_class, fname)
            try:
                with Image.open(src_path) as img:
                    img = img.convert('RGB')
                    img = img.resize(size, resample=Image.LANCZOS)
                    img.save(dst_path)
            except Exception as e:
                print(f"Skipped {src_path}: {e}")

resize_and_save(orig_train, out_train)
resize_and_save(orig_val,   out_val)
resize_and_save(orig_test,  out_test)

print("all images resized to and saved")


The Stage 1 model (country classifier) for resnet50, the changes in order to have the code for effcientnet are written as comments in the code cells.

In [None]:
# mixed precision
mixed_precision.set_global_policy('mixed_float16')

# paths
TRAIN_DIR       = "/content/drive/MyDrive/final_dataset/train"
VAL_DIR_GDRIVE  = "/content/drive/MyDrive/final_dataset/validation"
CHECKPOINT_HEAD = "/content/drive/MyDrive/full_finetune_head_resnet(0003_03).h5"
CHECKPOINT_FULL = "/content/drive/MyDrive/full_finetune_final_resnet(0003_03).keras"

# copy validation locally for speed
LOCAL_VAL_DIR = "/tmp/validation_small"
if not os.path.exists(LOCAL_VAL_DIR):
    shutil.copytree(VAL_DIR_GDRIVE, LOCAL_VAL_DIR)
VAL_DIR = LOCAL_VAL_DIR

# hyperparamters
learning_rate = 0.0003
batch_size = 32
num_classes = 12
dropout = 0.3
input_shape = (224, 224, 3)
weight_decay = 1e-4  # L2 regularization

# build model with all layers trainable
base = ResNet50(weights="imagenet", include_top=False, input_shape=input_shape)
#base = EfficientNetB0(weights="imagenet", include_top=False, input_shape=input_shape)
x = GlobalAveragePooling2D()(base.output)
x = Dense(512, activation='relu', kernel_regularizer=l2(weight_decay))(x)
x = Dropout(dropout)(x)
x = Dense(128, activation='relu', kernel_regularizer=l2(weight_decay))(x)
x = Dropout(dropout)(x)
outputs = Dense(num_classes, activation='softmax', dtype='float32')(x)

model = Model(inputs=base.input, outputs=outputs)

# data generators
train_datagen = ImageDataGenerator(
    preprocessing_function=preprocess_input,
    horizontal_flip=True,
    rotation_range=15,
    zoom_range=0.1
)
val_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

train_gen = train_datagen.flow_from_directory(
    TRAIN_DIR,
    target_size=(224,224),
    batch_size=32,
    class_mode='categorical'
)

# load all val in one batch
count_gen = val_datagen.flow_from_directory(
    VAL_DIR, target_size=(224,224),
    batch_size=1, shuffle=False, class_mode='categorical'
)
num_val = count_gen.samples

val_single = val_datagen.flow_from_directory(
    VAL_DIR,
    target_size=(224,224),
    batch_size=num_val,
    shuffle=False,
    class_mode='categorical'
)
val_x, val_y    = val_single[0]
val_y_idx       = val_single.classes
class_indices   = val_single.class_indices

# compute sample‐weights for imbalanced val
val_counts = {
    'argentina': 89, 'austria': 160, 'canada': 148, 'chile': 149,
    'france': 128,   'iceland': 71,  'italy': 150,   'japan': 150,
    'new_zealand':120,'norway':138,  'peru': 95,     'switzerland':183
}
total_val = sum(val_counts.values())
n_classes = len(val_counts)
class_weight_val = {
    class_indices[name]: total_val/(n_classes*cnt)
    for name,cnt in val_counts.items()
}
val_sample_weights = np.array([class_weight_val[i] for i in val_y_idx])

# callbacks for head‐only stage
checkpoint_cb = ModelCheckpoint(
    CHECKPOINT_HEAD, monitor='val_accuracy',
    save_best_only=True, verbose=1
)
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', factor=0.2, patience=2, verbose=1
)
early_stop = EarlyStopping(
    monitor='val_loss', patience=5,
    restore_best_weights=True, verbose=1
)

# Stage 1: Train entire network end-to-end on clean data (no layer freezing)

model.compile(
    optimizer=Adam(learning_rate),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history_head = model.fit(
    train_gen,
    validation_data=(val_x, val_y, val_sample_weights),
    epochs=15,
    callbacks=[checkpoint_cb, reduce_lr, early_stop],
    verbose=1
)


# Stage 2: Continue fine-tuning end-to-end at lower LR (still no freezing)

model.load_weights(CHECKPOINT_HEAD)

# recompile with smaller LR
model.compile(
    optimizer=Adam(0.00003),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

checkpoint_full = ModelCheckpoint(
    CHECKPOINT_FULL, monitor='val_accuracy',
    save_best_only=True, verbose=1
)

history_full = model.fit(
    train_gen,
    validation_data=(val_x, val_y, val_sample_weights),
    epochs=5,
    callbacks=[checkpoint_full, reduce_lr, early_stop],
    verbose=1
)

# final evaluation
val_pred_idx = np.argmax(model.predict(val_x, batch_size=32), axis=1)
print("Balanced accuracy:", balanced_accuracy_score(val_y_idx, val_pred_idx))
print(classification_report(val_y_idx, val_pred_idx,
                            target_names=list(class_indices.keys())))

# save histories (including training accuracy)
pd.DataFrame(history_head.history).to_csv(
    "/content/drive/MyDrive/history_head_full_resnet(0003_03).csv",
    index=False
)
pd.DataFrame(history_full.history).to_csv(
    "/content/drive/MyDrive/history_full_finetune_resnet(0003_03).csv",
    index=False
)

# Stage 2: 12 country heads

Stage 2 model (Geocell Classification)

In [None]:
BASE   = "/content/drive/MyDrive/Colab Notebooks/Advanced Analytics/data_assign2"
TRAIN_CSV   = f"{BASE}/geocell_train.csv"
VAL_CSV     = f"{BASE}/geocell_val.csv"
CENT_CSV    = f"{BASE}/geocell_centroids.csv"
COUNTRY_MODEL = f"{BASE}/checkpoints/best_result.keras"
HEADS_DIR     = f"{BASE}/checkpointscountry_heads"
IMG_SIZE   = 224
BATCH_SIZE = 32

df_train = pd.read_csv(TRAIN_CSV)
df_val   = pd.read_csv(  VAL_CSV)
df_cent  = pd.read_csv(CENT_CSV).set_index("geocell_id")

for df in (df_train, df_val):
    df["filepath"] = (df["filepath"]
        .str.replace("\\\\","/",regex=False)
        .str.replace(r"^final_dataset/train",
                    f"{BASE}/train_small",regex=True)
        .str.replace(r"^final_dataset/validation",
                    f"{BASE}/validation_small",regex=True)
        .str.replace("\\\\","/",regex=True)
    )

df_val  ["filepath"] = df_val ["filepath"].str.replace("\\\\", "/", regex=True)
df_train ["filepath"] = df_train ["filepath"].str.replace("\\\\", "/", regex=True)

# 0) Common: load & freeze country backbone
stage1   = load_model(COUNTRY_MODEL)
for layer in stage1.layers:
    layer.trainable = False
base_feat = stage1.layers[-2].output
# for effcicientnet: base_feat = stage1.layers[-6].output

# where to save each head + encoder
OUT_DIR = "/content/drive/MyDrive/Colab Notebooks/Advanced Analytics/data_assign2/checkpointscountry_heads"
os.makedirs(OUT_DIR, exist_ok=True)

# 1) loop per country
for country in le_c.classes_:
    print(f"\n--- Training head for {country} ---")

    # a) subset
    df_tr_ctry = df_train[df_train.country == country].copy()
    df_va_ctry = df_val  [df_val.country   == country].copy()
    if len(df_tr_ctry)==0 or len(df_va_ctry)==0:
        continue

    # b)  merge rare geoells (<150 samples) to nearest neighbor
    counts = Counter(df_tr_ctry["geocell_id"])
    rare   = {g for g,c in counts.items() if c < 150}
    common = [g for g,c in counts.items() if c >= 150]

    # get centroids
    cent = df_cent.loc[list(rare)+common, ['centroid_lat','centroid_lon']]
    def hav(lat1,lon1,lat2,lon2):
        φ1,φ2 = math.radians(lat1), math.radians(lat2)
        dφ    = math.radians(lat2-lat1)
        dλ    = math.radians(lon2-lon1)
        a = math.sin(dφ/2)**2 + math.cos(φ1)*math.cos(φ2)*math.sin(dλ/2)**2
        return 2*6371*math.atan2(math.sqrt(a),math.sqrt(1-a))

    merge_map = {}
    for r in rare:
      lat0, lon0 = cent.loc[r]
      nn = min(common, key=lambda g: hav(
        lat0, lon0,
        cent.loc[g, 'centroid_lat'],
        cent.loc[g, 'centroid_lon']))
      merge_map[r] = nn

# simulate merge on train only, no mutation yet
    merged_train = df_tr_ctry["geocell_id"].map(lambda g: merge_map.get(g, g))
    unique_after = merged_train.nunique()

    if unique_after >= 2:
    # apply on train and val
        print(f"merging {len(rare)} rare cells {unique_after} total classes")
        df_tr_ctry["geocell_id"] = merged_train
        df_va_ctry["geocell_id"] = df_va_ctry["geocell_id"].map(lambda g: merge_map.get(g, g))
    else:
    # skip entirely
        print(f"  → skipping merge (would leave only {unique_after} class)")

# now encode & build class weights
    le_g_ctry         = LabelEncoder().fit(df_tr_ctry["geocell_id"])
    df_tr_ctry["g_lbl"] = le_g_ctry.transform(df_tr_ctry["geocell_id"])
    df_va_ctry["g_lbl"] = le_g_ctry.transform(df_va_ctry["geocell_id"])
    n_cells           = len(le_g_ctry.classes_)

    cnts = Counter(df_tr_ctry["g_lbl"])
    total = sum(cnts.values())
    class_weight = {lbl: total/(n_cells*c) for lbl,c in cnts.items()}

    # c) dataset with augmentation
    def make_ds(df, shuffle=True):
        paths  = df["filepath"].values
        lbls   = df["g_lbl"].values.astype(np.int32)
        ds     = tf.data.Dataset.from_tensor_slices((paths,lbls))
        if shuffle: ds = ds.shuffle(len(df))
        def _load(p,l):
            img = tf.io.read_file(p)
            img = tf.image.decode_jpeg(img,3)
            img = tf.image.resize(img,[IMG_SIZE,IMG_SIZE])
            img = tf.image.random_flip_left_right(img)
            img = tf.image.random_brightness(img,0.2)
            img = preprocess_input(img)
            return img, l
        return ds.map(_load, tf.data.AUTOTUNE)\
                 .batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

    ds_tr = make_ds(df_tr_ctry, shuffle=True)
    ds_va = make_ds(df_va_ctry, shuffle=False)

    # d) unfreeze last conv block
    for layer in stage1.layers[-10:]:
        layer.trainable = True

    # e) build & compile head with Dropout
    x    = Dropout(0.5, name=f"dropout_{country}")(base_feat)
    head = Dense(n_cells, activation="softmax", name=f"gc_{country}")(x)
    model_ct = Model(stage1.input, head, name=f"model_{country}")
    model_ct.compile(
        optimizer=Adam(1e-5),
        loss=SparseCategoricalCrossentropy(),
        metrics=["accuracy"]
    )

    # f) train
    ckpt = os.path.join(OUT_DIR, f"head_{country}.keras")
    hist = model_ct.fit(
        ds_tr,
        validation_data=ds_va,
        epochs=20,
        class_weight=class_weight,
        callbacks=[
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor="val_accuracy", factor=0.5, patience=2, mode="max"),
            tf.keras.callbacks.EarlyStopping(
                monitor="val_accuracy", patience=6, restore_best_weights=True, mode="max")
        ],
        verbose=1
    )

    # g) report & save
    ta = hist.history["accuracy"][-1]
    va = hist.history["val_accuracy"][-1]
    print(f"{country}: train={ta:.2%}  val={va:.2%}")

    ckpt       = os.path.join(OUT_DIR, f"head_{country}.keras")
    enc_path   = os.path.join(OUT_DIR, f"le_g_{country}.pkl")
    merge_path = os.path.join(OUT_DIR, f"merge_map_{country}.pkl")

    model_ct.save(ckpt)
    with open(enc_path, "wb") as f:
        pickle.dump(le_g_ctry, f)
    with open(merge_path, "wb") as f:
        pickle.dump(merge_map, f)

    print(f"saved: {ckpt}, {enc_path}, {merge_path}")

# Stage 3: Regression

Stage 3 Model (Regression)

In [None]:
# 1) Hyperparameters
IMG_SIZE    = 224
BATCH_SIZE  = 32
EPOCHS      = 20
LR          = 3e-4

# 2) load Stage‐1 and rebuild country encoder
stage1 = tf.keras.models.load_model(COUNTRY_MODEL, compile=False)
le_c   = LabelEncoder().fit(df_train["country"])


# 3) load all 12 heads + encoders + merge_maps into dicts
heads      = {}
le_gs      = {}
merge_maps = {}
for country in le_c.classes_:
    hpth = os.path.join(HEADS_DIR, f"head_{country}.keras")
    epth = os.path.join(HEADS_DIR, f"le_g_{country}.pkl")
    mpth = os.path.join(HEADS_DIR, f"merge_map_{country}.pkl")
    if os.path.exists(hpth):
        heads[country]      = tf.keras.models.load_model(hpth, compile=False)
        le_gs[country]      = pickle.load(open(epth,"rb"))
        merge_maps[country] = pickle.load(open(mpth,"rb"))


# 4) apply country-specific merges to build a global `geocell_merge`
def apply_merge(df):
    df = df.copy()
    df["geocell_merge"] = df.apply(
      lambda r: merge_maps.get(r["country"],{}).get(r["geocell_id"], r["geocell_id"]),
      axis=1
    )
    return df

df_train = apply_merge(df_train)
df_val   = apply_merge(df_val)


# 5) re-encode merged geocells and build centroids tensor
le_g = LabelEncoder().fit(df_train["geocell_merge"])
df_train["geocell_lbl"] = le_g.transform(df_train["geocell_merge"])
df_val  ["geocell_lbl"] = le_g.transform(df_val["geocell_merge"])

# grab centroids in the same order as le_g.classes_
centroids = df_cent.loc[le_g.classes_, ["centroid_lat","centroid_lon"]].values
centroids_tensor = tf.constant(centroids, dtype=tf.float32)  # shape [G,2]


# 6) build regression tf.data pipelines
def make_reg_ds(df, shuffle=True):
    paths = df["filepath"].values
    g_lbl = df["geocell_lbl"].values.astype(np.int32)
    lats  = df["lat"].values.astype(np.float32)
    lons  = df["lon"].values.astype(np.float32)

    ds = tf.data.Dataset.from_tensor_slices((paths, g_lbl, lats, lons))
    if shuffle: ds = ds.shuffle(len(df))
    def _load(p, g, la, lo):
        img = tf.io.read_file(p)
        img = tf.image.decode_jpeg(img,3)
        img = tf.image.resize(img, [IMG_SIZE,IMG_SIZE])
        img = preprocess_input(img)
        cen = tf.gather(centroids_tensor, g)            # [2]
        res = tf.stack([la, lo]) - cen                  # [2]
        return img, res
    return ds.map(_load, tf.data.AUTOTUNE)\
             .batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

train_reg_ds = make_reg_ds(df_train, shuffle=True)
val_reg_ds   = make_reg_ds(df_val,   shuffle=False)


# 7) build and compile the residual MLP
# reuse the frozen stage1 backbone up to its penultimate layer
feat = stage1.layers[-2].output

r = Dense(512, activation="relu", name="dense_reg_1")(feat)
r = Dense(256, activation="relu", name="dense_reg_2")(r)
res_out = Dense(2, name="delta")(r)

reg_model = Model(inputs=stage1.input, outputs=res_out)
reg_model.compile(
    optimizer=Adam(LR),
    loss="mse",
    metrics=["mae"]
)


# 8) train
reg_model.fit(
    train_reg_ds,
    validation_data=val_reg_ds,
    epochs=EPOCHS,
    verbose=1,
    callbacks=[
      tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3),
      tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
    ]
)


# 9) inference helper: country -> head -> centroid -> residual

def predict_coord(img):
    # a) country
    c_logits = stage1(tf.expand_dims(img,0), training=False)
    c_idx    = tf.argmax(c_logits, axis=1)[0].numpy()
    country  = le_c.classes_[c_idx]

    # b) geocell head
    head     = heads[country]
    g_logits = head(tf.expand_dims(img,0), training=False)
    g_idx    = tf.argmax(g_logits, axis=1)[0].numpy()
    raw_cell = le_gs[country].classes_[g_idx]

    # c) apply merge
    merged   = merge_maps[country].get(raw_cell, raw_cell)
    geo_idx  = np.where(le_g.classes_ == merged)[0][0]

    # d) centroid + residual
    cen      = centroids[geo_idx]
    delta    = reg_model(tf.expand_dims(img,0), training=False).numpy()[0]
    return cen + delta


reg_model.save("/content/drive/MyDrive/model stage 1/resnet/model(0003_03)/regressor_savedmodel.keras")

# Results test set

The code predicts country and geocell from test images, maps them to coordinates, and evaluates accuracy and location error per country

In [None]:
# load heads, encoders, merge maps
heads, encoders, merge_maps = {}, {}, {}
for c in le_c.classes_:
    hp = os.path.join(HEADS_DIR, f"head_{c}.keras")
    ep = os.path.join(HEADS_DIR, f"le_g_{c}.pkl")
    mp = os.path.join(HEADS_DIR, f"merge_map_{c}.pkl")
    if os.path.exists(hp):
        heads[c]      = load_model(hp)
        encoders[c]   = pickle.load(open(ep,"rb"))
        merge_maps[c] = pickle.load(open(mp,"rb"))

# helper to predict batch
def predict_batch(paths):
    imgs = []
    for p in paths:
        x = tf.io.read_file(p)
        x = tf.image.decode_jpeg(x,3)
        x = tf.image.resize(x,[IMG_SIZE,IMG_SIZE])
        imgs.append(tf.keras.applications.resnet50.preprocess_input(x))
    batch = tf.stack(imgs)
    c_logits = stage1(batch, training=False).numpy()
    c_idx    = np.argmax(c_logits,axis=1)
    c_pred   = le_c.classes_[c_idx]

    g_pred = []
    for img, c in zip(batch, c_pred):
        head = heads.get(c)
        enc  = encoders.get(c)
        if head is None:
            g_pred.append(None)
        else:
            gl = head(img[None],training=False).numpy()
            gi = np.argmax(gl,axis=1)[0]
            g_pred.append(enc.classes_[gi])
    return c_pred, g_pred

# predictions on test set
all_true_c, all_pred_c = [], []
all_true_g, all_pred_g = [], []

for i in range(0, len(df_test), BATCH_SIZE):
    batch = df_test.iloc[i:i+BATCH_SIZE]
    pc, pg = predict_batch(batch.filepath.tolist())
    all_true_c.extend(batch.country)
    all_true_g.extend(batch.geocell_id)
    all_pred_c.extend(pc)
    all_pred_g.extend(pg)

# merge maps
true_merged = [
    merge_maps[c].get(g, g)
    for c, g in zip(all_true_c, all_true_g)
]

# evaluation dataframe
df_map = pd.DataFrame({
    "true_country": all_true_c,
    "pred_country": all_pred_c,
    "true_geocell": true_merged,
    "pred_geocell": all_pred_g
})

# centroid coordinates
cent = df_cent[["centroid_lat","centroid_lon"]].to_dict("index")
df_map["true_coord"] = df_map["true_geocell"].map(lambda g: tuple(cent[g].values()))
df_map["pred_coord"] = df_map["pred_geocell"].map(lambda g: tuple(cent[g].values()))

# haverisne
def haversine_km(p1,p2):
    φ1,φ2 = np.radians(p1[0]), np.radians(p2[0])
    dφ    = np.radians(p2[0]-p1[0])
    dλ    = np.radians(p2[1]-p1[1])
    a = np.sin(dφ/2)**2 + np.cos(φ1)*np.cos(φ2)*np.sin(dλ/2)**2
    return 2*6371*np.arctan2(np.sqrt(a), np.sqrt(1-a))

# errors
df_map["lat_err"] = df_map.apply(lambda r: abs(r["pred_coord"][0]-r["true_coord"][0]), axis=1)
df_map["lon_err"] = df_map.apply(lambda r: abs(r["pred_coord"][1]-r["true_coord"][1]), axis=1)
df_map["geo_err"] = df_map.apply(lambda r: haversine_km(r["true_coord"],r["pred_coord"]), axis=1)
df_map["cty_corr"]  = (df_map.pred_country==df_map.true_country).astype(int)
df_map["cell_corr"] = (df_map.pred_geocell ==df_map.true_geocell).astype(int)

# group results
per_ctry = df_map.groupby("true_country").agg(
    lat_MAE    = ("lat_err",    "mean"),
    lon_MAE    = ("lon_err",    "mean"),
    geo_MAE_km = ("geo_err",    "mean"),
    country_acc= ("cty_corr",   "mean"),
    geocell_acc= ("cell_corr",  "mean")
).round(4)

print(per_ctry)


In [None]:
from sklearn.metrics import accuracy_score

# coordinates from df_map
true_lats = np.array([lat for lat, lon in df_map['true_coord']])
true_lons = np.array([lon for lat, lon in df_map['true_coord']])
pred_lats = np.array([lat for lat, lon in df_map['pred_coord']])
pred_lons = np.array([lon for lat, lon in df_map['pred_coord']])

# latitute and longitude mae
lat_mae = np.mean(np.abs(pred_lats - true_lats))
lon_mae = np.mean(np.abs(pred_lons - true_lons))

# global mae with haversine
def haversine_km(p1, p2):
    lat1, lon1 = p1
    lat2, lon2 = p2
    φ1, φ2 = np.radians(lat1), np.radians(lat2)
    dφ = np.radians(lat2 - lat1)
    dλ = np.radians(lon2 - lon1)
    a = np.sin(dφ / 2)**2 + np.cos(φ1) * np.cos(φ2) * np.sin(dλ / 2)**2
    return 2 * 6371 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))

errors_km = [
    haversine_km(tc, pc)
    for tc, pc in zip(df_map['true_coord'], df_map['pred_coord'])
]
global_mae_km = np.mean(errors_km)

# use geo_err for median if possible
if "geo_err" in df_map.columns:
    median_error = df_map["geo_err"].median()
else:
    median_error = np.median(errors_km)

# classification accuracy
country_acc = accuracy_score(df_map['true_country'], df_map['pred_country'])
geocell_acc = accuracy_score(df_map['true_geocell'], df_map['pred_geocell'])

# summary results
print("===== Global Evaluation Summary =====")
print(f"Latitude MAE:            {lat_mae:.5f}°")
print(f"Longitude MAE:           {lon_mae:.5f}°")
print(f"Coordinate MAE (km):     {global_mae_km:.2f} km")
print(f"Overall Median Error:    {median_error:.2f} km")
print(f"Country Accuracy:        {country_acc:.4f}")
print(f"Geocell Accuracy:        {geocell_acc:.4f}")


# Exploration

In [None]:
# Get 3 easiest and 3 hardest images to predict for model
model = COUNTRY_MODEL

# filepaths from df_test
df_map["filepath"] = df_test["filepath"].values

# helper to filter top N rows with unique true coordinates
def top_unique(df, column, n=5, largest=True):
    df_sorted = df.sort_values(by=column, ascending=not largest)
    unique_coords = set()
    selected = []
    for _, row in df_sorted.iterrows():
        if row["true_coord"] not in unique_coords:
            selected.append(row)
            unique_coords.add(row["true_coord"])
        if len(selected) == n:
            break
    return pd.DataFrame(selected)

# hardest and easiest based on geo_err, ensuring unique true_coords
hardest = top_unique(df_map, "geo_err", n=3, largest=True)[["filepath", "true_coord", "pred_coord", "geo_err"]]
easiest = top_unique(df_map, "geo_err", n=3, largest=False)[["filepath", "true_coord", "pred_coord", "geo_err"]]

# full titles seperately
def print_image_titles(df, label):
    print(f"\n=== {label} ===")
    for i, row in df.iterrows():
        print(f"{i+1}. Error: {row['geo_err']:.1f} km | True: {row['true_coord']} | Pred: {row['pred_coord']}")

print_image_titles(hardest, "3 Hardest Images")
print_image_titles(easiest, "3 Easiest Images")

# images
def show_images(df, title):
    n = len(df)
    fig, axes = plt.subplots(1, n, figsize=(3*n, 3))
    fig.suptitle(title, fontsize=16)
    for ax, (_, row) in zip(axes, df.iterrows()):
        img = Image.open(row["filepath"])
        ax.imshow(img)
        ax.set_title(f"{row['geo_err']:.1f} km")
        ax.axis("off")
    plt.tight_layout()
    plt.show()

show_images(hardest, "3 Hardest Images")
show_images(easiest, "3 Easiest Images")

In [None]:
# Attention mapping
# only keep non‐perfect predictions
df_nonzero = df_map[df_map.geo_err > 1e-3]

# recompute parts on that subset
q1, q2, q3, q4 = np.percentile(df_nonzero.geo_err, [25, 50, 75, 85])

# sample from the middle two bins
mid_low  = df_nonzero[(df_nonzero.geo_err>=q1)&(df_nonzero.geo_err<q2)].sample(2)
mid_high = df_nonzero[(df_nonzero.geo_err>=q2)&(df_nonzero.geo_err<q3)].sample(2)
high = df_nonzero[(df_nonzero.geo_err>=q3)&(df_nonzero.geo_err<q4)].sample(3)

to_inspect = pd.concat([mid_low, mid_high])
show_cam_row(to_inspect, "Attention Mapping", stage1)
