In [None]:
import json
import requests
from shapely.ops import transform
import pyproj
import h3
import git
from geopy.distance import geodesic
import os
import matplotlib.pyplot as plt
from shapely.geometry import Polygon
from agent_components.llms.chatAI import ChatAIHandler
import re

In [None]:
with open(fr"{git.Repo('.', search_parent_directories=True).working_tree_dir}˙\data\processed_GeoCoDe_test.json", "r", encoding="utf-8") as f:
    geocode_test_set = json.load(f)

In [None]:
with open(
        fr"{git.Repo('.', search_parent_directories=True).working_tree_dir}˙\output\reflective_candidate_resolution\fatal_error_and_invalid_correction\GeoCoDe\meta-llama-3.1-8b-instruct_with_meta-llama-3.1-8b-instruct_critic\20250122_seed_24_1000_articles\correct_articles_k_161.json", "r", encoding="utf-8") as f:
    correctly_geocoded_articles = json.load(f)
print(len(correctly_geocoded_articles))
correctly_geocoded_articles

In [None]:
geo_relatable_harsh_from_geocoded_articles = [
    article for article in correctly_geocoded_articles
    if (any(word in article["article_text"] for word in ["lies", "located"]) and
        any(word in article["article_text"] for word in ["km", "kilometer", "kilometre"]))
]
print(len(geo_relatable_harsh_from_geocoded_articles))
print(f"# Toponyms: {sum(len(article['toponyms']) for article in geo_relatable_harsh_from_geocoded_articles)}")
geo_relatable_harsh_from_geocoded_articles

# Run LLM Georelating for all fully correctly geocoded georelatable articles

In [None]:
with open(
        fr"{git.Repo('.', search_parent_directories=True).working_tree_dir}˙\output\reflective_candidate_resolution\fatal_error_and_invalid_correction\GeoCoDe\meta-llama-3.1-8b-instruct_with_meta-llama-3.1-8b-instruct_critic\20250122_seed_24_1000_articles\articles_with_at_least_one_correct_toponym_k_161.json", "r", encoding="utf-8") as f:
    correctly_geocoded_articles = json.load(f)

In [None]:
def georelate(model_name, long_term_memory, article_text, mentioned_toponyms):
    prompt = long_term_memory.generate_georelating_prompt(article_text=article_text,
                                                          mentioned_toponyms=mentioned_toponyms,
                                                          example_path=fr"{git.Repo('.', search_parent_directories=True).working_tree_dir}˙\data\few_shot_example_georelation.json")
    handler = ChatAIHandler()
    model = handler.get_model(model_name)
    llm_answer = model.invoke(prompt)
    pattern = re.compile(r"<think>(.*?)</think>", re.DOTALL)

    thoughts = pattern.findall(llm_answer.content)  # Extract thoughts
    non_thoughts = pattern.sub("", llm_answer.content)  # Remove thoughts from text
    start = non_thoughts.find("```json")
    if start != -1:
        start = start + 7
        end = non_thoughts.find("```", start)  # Find the next occurrence after start
        content = non_thoughts[start:end]
        prediction = json.loads(content)
    elif non_thoughts.find("```") != -1:
        start = start + 4
        end = non_thoughts.find("```", start)  # Find the next occurrence after start
        content = non_thoughts[start:end]
        prediction = json.loads(content)
    else:
        prediction = json.loads(llm_answer.content)
    return thoughts, prediction

# Add ground truth center coordinates, bounding box and approx area to articles

In [None]:
def get_bounding_box(relation_id):
    query = f"""
    [out:json];
    relation({relation_id});
    out geom;
    """
    try:
        url = "https://overpass-api.de/api/interpreter"
        response = requests.post(url, data={"data": query})
        data = response.json()

        if not data['elements']:
            return None, None, None, None

        relation = data['elements'][0]
        if 'bounds' not in relation:
            return None, None, None, None

        bounds = relation['bounds']
        min_lat = bounds['minlat']
        max_lat = bounds['maxlat']
        min_lon = bounds['minlon']
        max_lon = bounds['maxlon']

        return min_lat, max_lat, min_lon, max_lon

    except Exception as e:
        print(f"Error fetching relation {relation_id}: {e}")
        return None, None, None, None

def get_osm_admin_center(osm_id):
    """
    Query Overpass API for the admin_center or center of the given OSM ID.
    """
    query = f"""
    [out:json];
    relation({osm_id});
    out center;
    """

    url = "https://overpass-api.de/api/interpreter"
    response = requests.get(url, params={"data": query})

    if response.status_code == 200:
        data = response.json()
        elements = data.get("elements", [])

        for element in elements:
            if "center" in element:
                return element["center"]  # Returns {lat, lon}

    return None  # Return None if no center/admin_center is found

def get_area_sq_km(geometry, source_crs="EPSG:4326", target_crs="EPSG:3857"):
    project = pyproj.Transformer.from_crs(source_crs, target_crs, always_xy=True).transform
    projected_geometry = transform(project, geometry)
    area_sq_km = projected_geometry.area / 1_000_000
    return area_sq_km


def process_articles(articles):
    """
    Extracts the last ID from article_id and fetches its coordinates.
    """
    processed_articles = []
    results = {}

    for article in articles:
        article_ids = article["article_id"].split()
        last_id = article_ids[-1]  # Get last ID

        centroid_coords = get_osm_admin_center(last_id)

        bb_min_lat, bb_max_lat, bb_min_lon, bb_max_lon = get_bounding_box(last_id)
        if all((bb_min_lat, bb_max_lat, bb_min_lon, bb_max_lon)):
            bounding_box_polygon = Polygon([(bb_min_lon, bb_min_lat), (bb_min_lon, bb_max_lat),
                                        (bb_max_lon, bb_max_lat), (bb_max_lon, bb_min_lat)])

            bb_area = get_area_sq_km(bounding_box_polygon)
            if centroid_coords and bb_area:
                article.update({'gt': {'centroid': centroid_coords,
                                       'bounding_box':{
                                           'bb_min_lat': bb_min_lat,
                                           'bb_max_lat': bb_max_lat,
                                           'bb_min_lon': bb_min_lon,
                                           'bb_max_lon': bb_max_lon
                                       },
                                       'bb area': bb_area}})
                processed_articles.append(article)

    return processed_articles

In [None]:
articles_for_georelation = process_articles(correctly_geocoded_articles)

### H3: Resolution handling

In [None]:
import pandas as pd
import numpy as np

# Define H3 resolutions with their corresponding hexagon areas (from table)
h3_data = [(res, h3.average_hexagon_area(res, unit="m^2")) for res in range(16)]  # resolutions 0–15

# Convert to DataFrame
df = pd.DataFrame(h3_data, columns=["Resolution", "Hexagon_Area_km2"])

def get_h3_resolution_scaled(area_km2):
    """Find the H3 resolution where the input area and hexagon area are most proportionate."""
    df["Scale_Diff"] = np.abs(np.log2(area_km2 / df["Hexagon_Area_km2"]))
    closest_resolution = df.loc[df["Scale_Diff"].idxmin()]
    return int(closest_resolution["Resolution"])

# Parsing of Complex Location Descriptions Evaluation

In [None]:
def load_json_file(directory: str, search_string: str):
    """
    Loops over all files in the specified directory and loads JSON files
    whose filenames contain the specified search string.

    :param directory: Path to the directory containing files.
    :param search_string: String to search for in filenames.
    :return: A dictionary with filenames as keys and loaded JSON content as values.
    """
    json_data = None

    if not os.path.isdir(directory):
        raise ValueError(f"The specified path '{directory}' is not a directory.")

    for filename in os.listdir(directory):
        if search_string in filename and filename.endswith('.json'):
            file_path = os.path.join(directory, filename)
            with open(file_path, 'r', encoding='utf-8') as file:
                json_data = json.load(file)
                break
    return json_data

GEORELATION_DIR = fr"{git.Repo('.', search_parent_directories=True).working_tree_dir}˙\output\georelation\all_articles_with_at_lest_one_correctly_geocoded_toponym\llama-3.3-70b-instruct\20250206"
k = 161

all_error_distances = []
all_squared_area_diffs = []
all_log_q = []
all_normalized_area_diffs = []
evaluation_results = []
very_off_area_predictions = []

for gt_article in articles_for_georelation:
    eval_results_for_article = {}
    article_ids = gt_article["article_id"].split()
    last_id = article_ids[-1]  # Get last ID

    # first, load generation
    try:
        georelated_article = load_json_file(GEORELATION_DIR, gt_article["article_id"])
        if not georelated_article:
            for id in article_ids:
                georelated_article = load_json_file(GEORELATION_DIR, id)
                if georelated_article:
                    break
    except Exception as e:
        continue
    if not georelated_article:
        continue

    # calculate distance of coordinates
    gt_coordinates = (gt_article['gt']['centroid']['lat'],
                      gt_article['gt']['centroid']['lon'])
    predicted_coordinates = (georelated_article['georelation']['coordinates of geographical unit']['latitude'],
                             georelated_article['georelation']['coordinates of geographical unit']['longitude'])
    error_distance = geodesic(gt_coordinates, predicted_coordinates).kilometers
    all_error_distances.append(error_distance)

    # calculate area error distance
    gt_area = gt_article['gt']['bb area']
    predicted_area = georelated_article["georelation"]["area in square km"]

    log_q = np.log(predicted_area/gt_area)
    all_log_q.append(log_q)

    squared_area_diff = (predicted_area - gt_area)**2
    all_squared_area_diffs.append(squared_area_diff)

    normalized_error_diff = (predicted_area - gt_area) / gt_area
    if normalized_error_diff > 10:
        very_off_area_predictions.append(georelated_article)
    all_normalized_area_diffs.append(normalized_error_diff)

    # calculate intersection
    # Define an H3 cell
    h3_index = h3.latlng_to_cell(
        lat=georelated_article["georelation"]["coordinates of geographical unit"]["latitude"],
        lng=georelated_article["georelation"]["coordinates of geographical unit"]["longitude"],
        res=get_h3_resolution_scaled(predicted_area)
    )

    # Get the boundary coordinates of the H3 cell
    hexagon = h3.cell_to_boundary(h3_index)

    # Convert polygon to shapely format
    hexagon_coords = [(lng, lat) for lat, lng in hexagon]  # Convert to (lng, lat) for Folium
    hexagon_polygon = Polygon(hexagon_coords)
    hexagon_area_estimate = get_area_sq_km(hexagon_polygon)

    bb = gt_article['gt']['bounding_box']
    bb_min_lat, bb_max_lat, bb_min_lon, bb_max_lon = bb['bb_min_lat'], bb['bb_max_lat'], bb['bb_min_lon'], bb['bb_max_lon']
    bounding_box_polygon = Polygon([(bb_min_lon, bb_min_lat), (bb_min_lon, bb_max_lat),
                                    (bb_max_lon, bb_max_lat), (bb_max_lon, bb_min_lat)])
    intersection = bounding_box_polygon.intersection(hexagon_polygon)
    if not intersection.is_empty:
        intersection_area = get_area_sq_km(intersection)
    else:
        intersection_area = 0

    eval_results_for_article.update({"article_id": last_id,
                                     "gt": gt_article['gt'],
                                     "georelation": georelated_article['georelation'],
                                     "absolute_error_distance": error_distance,
                                     "squared_area_error": squared_area_diff,
                                     "log_q": log_q,
                                     "normalized_area_error": normalized_error_diff,
                                     "hexagon_area_estimate": hexagon_area_estimate,
                                     "area_intersection": intersection_area})
    evaluation_results.append(eval_results_for_article)

all_normalized_area_diffs = np.array(all_normalized_area_diffs)
# Accuracy@k
within_k = [d <= k for d in all_error_distances]
accuracy_at_k = sum(within_k) / len(all_error_distances)
print(f"Accuracy@{k}: {accuracy_at_k}")

# AUC
def calculate_auc(sorted_values):
    max_error = 20038  # Earth's circumference in km / 2 (maximum possible distance)
    size = len(sorted_values)
    if size <= 1:
        return 0.0

    h = 1  # step size
    sum = 0.5 * (np.log(1 + sorted_values[0]) / np.log(max_error) + np.log(
        1 + sorted_values[-1]) / np.log(max_error))  # initial area

    for i in range(1, size - 1):
        sum += np.log(1 + sorted_values[i]) / np.log(max_error)

    auc = sum * h / (size - 1)
    return auc

sorted_error_distances = sorted(
    all_error_distances)  # assuming error_distances is a dictionary with error error_distances
auc = calculate_auc(sorted_error_distances)
print(f"AUC: {auc}")

# Mean error distance
mean_error_distance = np.mean(all_error_distances)
print(f"Mean error distance: {mean_error_distance}")

# Median error distance
median_error_distance = np.median(all_error_distances)
print(f"Median error distance: {median_error_distance}")

# 𝜁 Median Symmetric Accuracy
abs_all_log_q = np.abs(all_log_q)
all_log_q = np.array(all_log_q)
zeta = 100 * (np.exp(np.median(abs_all_log_q)) - 1)
print(f"Median Symmetric Accuracy: {zeta}")

# SSPB
sspb = 100 * np.sign(np.median(all_log_q)) * (np.exp(np.abs(np.median(all_log_q))) - 1)
print(f"Symmetric Signed Percentage Bias: {sspb}")

print(f"nof very off area predictions: {len(very_off_area_predictions)}")


In [None]:
average_precision = np.sum([evaluation["area_intersection"]/evaluation['hexagon_area_estimate'] for evaluation in evaluation_results])/len(evaluation_results)
average_recall = np.sum([evaluation["area_intersection"]/evaluation['gt']['bb area'] for evaluation in evaluation_results])/len(evaluation_results)
average_f1 = 2*average_precision*average_recall/(average_precision+average_recall)

print(f"average precision: {average_precision}, \naverage recall: {average_recall}, \naverage f1: {average_f1}")

### Calculate upper bound for F1 score

In [None]:
def create_hexagon(side_length=1):
    """Create a regular hexagon centered at the origin with a given side length."""
    angles = np.linspace(0, 2 * np.pi, 7)[:-1]  # Six angles
    vertices = [(side_length * np.cos(a), side_length * np.sin(a)) for a in angles]
    return Polygon(vertices)

def create_rectangle(width, height):
    """Create a rectangle centered at the origin with given width and height."""
    half_w, half_h = width / 2, height / 2
    vertices = [(-half_w, -half_h), (half_w, -half_h), (half_w, half_h), (-half_w, half_h)]
    return Polygon(vertices)

def compute_f1(hexagon, rectangle):
    """Compute F1 score based on intersection, precision, and recall."""
    intersection = hexagon.intersection(rectangle).area
    precision = intersection / hexagon.area
    recall = intersection / rectangle.area if rectangle.area > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return f1

def simulate_f1():
    """Simulate the F1 score as the rectangle shrinks from the hexagon's bounding box and visualize with a heat map."""
    hexagon = create_hexagon()
    bbox = hexagon.bounds  # (minx, miny, maxx, maxy)
    max_width, max_height = bbox[2] - bbox[0], bbox[3] - bbox[1]

    max_overall = max(max_height, max_width)

    width_values = height_values = np.linspace(0.1, max_overall, 1000)

    f1_matrix = np.zeros((len(height_values), len(width_values)))
    best_f1, best_w, best_h = 0, 0, 0

    for i, h in enumerate(height_values):
        for j, w in enumerate(width_values):
            rectangle = create_rectangle(w, h)
            f1_score = compute_f1(hexagon, rectangle)
            f1_matrix[i, j] = f1_score
            if f1_score > best_f1:
                best_f1, best_w, best_h = f1_score, w, h

    plt.figure(figsize=(8, 6))
    plt.imshow(f1_matrix, extent=[0.1, max_overall, 0.1, max_overall], origin='lower', aspect='auto', cmap='coolwarm')
    plt.colorbar(label='F1 Score')
    plt.xlabel('Rectangle Width')
    plt.ylabel('Rectangle Height')
    plt.title('F1 Score Heat Map for Hexagon-Rectangle Overlap')
    plt.show()

    # Plot the hexagon and the best rectangle
    best_rectangle = create_rectangle(best_w, best_h)
    fig, ax = plt.subplots(figsize=(6, 6))

    x, y = hexagon.exterior.xy
    ax.plot(x, y, 'b-', label='Hexagon')

    x, y = best_rectangle.exterior.xy
    ax.plot(x, y, 'r-', label=f'Best Rectangle (w={best_w:.3f}, h={best_h:.3})')

    ax.set_xlim(-max_overall / 2, max_overall / 2)
    ax.set_ylim(-max_overall / 2, max_overall / 2)
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_title('Best F1 Score Rectangle within Hexagon')
    ax.legend()
    plt.show()
    return f1_matrix, best_f1

# f1_matrix, best_f1 = simulate_f1()