In [None]:
!pip install --upgrade osmnx

Collecting osmnx
  Downloading osmnx-2.0.3-py3-none-any.whl.metadata (4.9 kB)
Downloading osmnx-2.0.3-py3-none-any.whl (100 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.2/100.2 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: osmnx
Successfully installed osmnx-2.0.3


# Imports and Setup

In [None]:
import csv
import math
import os
from geopy.distance import geodesic
from PIL import Image, ImageDraw
import pandas as pd
from shapely.geometry import Point, Polygon
import matplotlib.pyplot as plt
from google.colab import drive
import requests
import sys
from IPython.display import Image as IPyImage, display
from shapely.geometry import shape, Polygon, Point
import osmnx as ox
from PIL import Image, ImageDraw
import os
import cv2
import numpy as np

Mounted at /content/drive


#  Geocoding & Map Display

In [None]:
def get_lat_lon_from_address(address):
    """
    Uses the Google Geocoding API to convert a physical address into
    geographic coordinates (latitude and longitude).
    Parameters:
        address (str): The address to geocode.

    Returns:
        tuple: (latitude, longitude) if successful.
    Raises:
        ValueError: If no results are found for the given address.
        Exception: If the API request fails.
    """
    base_url = "https://maps.googleapis.com/maps/api/geocode/json"
    params = {
        "address": address,
        "key": GOOGLE_API_KEY
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        results = response.json().get('results')
        if results:
            location = results[0]['geometry']['location']
            return location['lat'], location['lng']
        else:
            raise ValueError(f"No results found for address: {address}")
    else:
        raise Exception(f"Geocoding API error: {response.status_code}")


def geocode_and_display_map(lat, lon,address):
    """
    Displays a satellite map of the given latitude and longitude using Google Static Maps API.

    Parameters:
        lat (float): Latitude of the location.
        lon (float): Longitude of the location.
    """


    try:
        print(f"Coordinates: {lat}, {lon}")
        lat, lon = get_lat_lon_from_address(address)
        # Build the Google Static Map URL with a marker at the location
        map_url = (
            f"https://maps.googleapis.com/maps/api/staticmap?"
            f"center={lat},{lon}&zoom=16&size=600x400&scale=2&maptype=satellite"
            f"&markers=size:tiny|color:red|label:A|{lat},{lon}"
            f"&key={GOOGLE_API_KEY}"
        )


        # print("Map URL:", map_url)
        display(IPyImage(url=map_url))

    except Exception as e:
        # print(f"Error: {e}")
        print(f"\nNo image footprint contains the target location)")
        print("Please try another address or check your image metadata.")
        sys.exit()

 # Coordinate & Pixel Utilities

In [None]:
# Create folders for output
os.makedirs("cropped_images", exist_ok=True)
os.makedirs("annotated_images", exist_ok=True)
os.makedirs("test", exist_ok=True)


def parse_image_coords(coords_str):
    part = coords_str.split("POINT(")[1].split(")")[0]
    lon_str, lat_str = part.strip().split()
    return float(lat_str), float(lon_str)

def compute_bearing(lat1, lon1, lat2, lon2):
    """
    Calculates the bearing (direction angle) from a start point (lat1, lon1)
    to an end point (lat2, lon2).

    Parameters:
        lat1, lon1 (float): Latitude and longitude of the start point.
        lat2, lon2 (float): Latitude and longitude of the target point.

    Returns:
        float: Bearing in degrees, ranging from 0° to 360°.
    """
    lat1_rad = math.radians(lat1)
    lat2_rad = math.radians(lat2)
    dlon_rad = math.radians(lon2 - lon1)
    x = math.sin(dlon_rad) * math.cos(lat2_rad)
    y = math.cos(lat1_rad) * math.sin(lat2_rad) - math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(dlon_rad)
    bearing = math.degrees(math.atan2(x, y))
    return (bearing + 360) % 360


def get_next_folder_number(base_path):
    existing = [int(name) for name in os.listdir(base_path) if name.isdigit()]
    return str(max(existing, default=0) + 1)

# Lat/Lon to Pixel Conversion

In [None]:
def latlon_to_pixel(drone_lat, drone_lon, altitude, heading, gimbal_pitch,
                    target_lat, target_lon, image_width, image_height, hfov, vfov, image_name="unknown"):
    """
    Converts the geographic coordinates of a target point into pixel coordinates
    within an image captured by a UAV, accounting for camera orientation and FOV.

    For nadir images, it uses a GSD-based projection.
    For oblique images, it uses angular projection based on heading and pitch.

    Parameters:
        drone_lat, drone_lon (float): Coordinates of the UAV.
        altitude (float): UAV altitude above ground in meters.
        heading (float): UAV heading angle in degrees.
        gimbal_pitch (float): Camera gimbal pitch angle in degrees.
        target_lat, target_lon (float): Geographic coordinates of the target.
        image_width, image_height (int): Dimensions of the image in pixels.
        hfov, vfov (float): Horizontal and vertical field of view in degrees.

    Returns:
        tuple: (x_pixel, y_pixel, delta_azimuth, delta_elevation,
                horizontal_distance, total_pitch)
    """
    if gimbal_pitch <= -87 and gimbal_pitch >= -92:  # Nadir case

        # Compute the full ground coverage in meters based on altitude and camera FOV
        ground_width = 2 * altitude * math.tan(math.radians(hfov / 2))
        ground_height = 2 * altitude * math.tan(math.radians(vfov / 2))
        meter_per_pixel_x = ground_width / image_width
        meter_per_pixel_y = ground_height / image_height

        dlat_deg = target_lat - drone_lat
        dlon_deg = target_lon - drone_lon
        dy_m = dlat_deg * 110540         # north/south difference in meters
        dx_m = dlon_deg * (111320 * math.cos(math.radians(drone_lat)))  # east/west difference

        dx_px = -dx_m / meter_per_pixel_x
        dy_px =  dy_m / meter_per_pixel_y

        x_pixel = image_width / 2 + dx_px
        y_pixel = image_height / 2 + dy_px
        horizontal_distance = geodesic((drone_lat, drone_lon), (target_lat, target_lon)).meters

        return x_pixel, y_pixel, 0, 90, horizontal_distance, -90

    else:
        # Oblique images
        horizontal_distance = geodesic((drone_lat, drone_lon), (target_lat, target_lon)).meters
        measured_bearing = compute_bearing(drone_lat, drone_lon, target_lat, target_lon)
        normalized_heading = heading % 360
        delta_azimuth = measured_bearing - normalized_heading

        if delta_azimuth > 180:
            delta_azimuth -= 360
        elif delta_azimuth < -180:
            delta_azimuth += 360

        total_pitch = -90.0 if horizontal_distance == 0 else -math.degrees(math.atan2(altitude, horizontal_distance))
        delta_elevation = total_pitch - gimbal_pitch

        x_pixel = image_width / 2 + (delta_azimuth * image_width / hfov)
        y_pixel = image_height / 2 - (delta_elevation * image_height / vfov)
        return x_pixel, y_pixel, delta_azimuth, delta_elevation, horizontal_distance, total_pitch

# Image Selection

In [None]:
def extract_polygon_coords(polygon_str):
    """
    Extracts coordinate pairs from a WKT polygon string (SRID=4326;POLYGON).
    This is typically used to retrieve the footprint of an image.

    Parameters:
        polygon_str (str): Polygon in WKT format as a string.
            Example: 'SRID=4326;POLYGON((lon1 lat1, lon2 lat2, ..., lonN latN))'

    Returns:
        list of tuples: A list of (longitude, latitude) coordinates defining the polygon.
    """
    coords = polygon_str.replace("SRID=4326;POLYGON((", "").replace("))", "").split(",")
    return [(float(coord.split()[0]), float(coord.split()[1])) for coord in coords]


def load_and_filter_metadata(target_lat, target_lon,metadata_path):
    """
    Loads image metadata from a CSV file and filters it to retain only those images
    whose polygon footprints contain the specified target location.

    Parameters:
        target_lat (float): Latitude of the target location.
        target_lon (float): Longitude of the target location.
        metadata_path (str): Full path to the metadata CSV file.

    Returns:
        DataFrame: A filtered DataFrame containing only images that contain the target point.
    """
    df = pd.read_csv(metadata_path)
    df[['longitude', 'latitude']] = df['image_coords'].str.extract(
        r'SRID=4326;POINT\(([-\d.]+) ([-]?\d+\.\d+)\)'
    ).astype(float)
    df["polygon_coords"] = df['footprint'].apply(extract_polygon_coords)
    target_point = Point(target_lon, target_lat)
    df["contains_target"] = df["polygon_coords"].apply(lambda coords: Polygon(coords).contains(target_point))
    df_filtered = df[df["contains_target"]].copy()

    if df_filtered.empty:
        print(f"\nNo image footprint contains the target location: ({target_lat}, {target_lon})")
        print("Please try another address or check your image metadata.")

        sys.exit()


    if not df_filtered.empty:
        df_filtered["distance"] = df_filtered.apply(
            lambda row: geodesic(
                (row["latitude"], row["longitude"]), (target_lat, target_lon)
            ).meters,
            axis=1
        )
    return df_filtered



def select_nadir_oblique_images(df_filtered):
    """
    From the filtered metadata, selects the most suitable nadir image (camera facing down)
    and one oblique image per cardinal direction (North, East, South, West) if available.

    Parameters:
        df_filtered (DataFrame): The filtered metadata containing relevant UAV images.

    Returns:
        DataFrame: A combined DataFrame of the selected nadir and oblique images with direction labels.
    """
    nadir_range_filtered = df_filtered[
    (df_filtered["gimbal_pitch"] >= -92) &
    (df_filtered["gimbal_pitch"] <= -87)
]
    nadir_idx = (nadir_range_filtered["gimbal_pitch"] + 90).abs().idxmin() \
        if not nadir_range_filtered.empty else None
    nadir_image = df_filtered.loc[[nadir_idx]] if not pd.isna(nadir_idx) else pd.DataFrame()
    df_filtered["normalized_heading"] = df_filtered["heading"].apply(lambda x: (x + 180) % 360 - 180)
    directions = {"North": 0, "East": 90, "South": 180, "West": -90}
    df_oblique_candidates = df_filtered[
    ((df_filtered["gimbal_pitch"] >= 15) & (df_filtered["gimbal_pitch"] <= 45)) |
    ((df_filtered["gimbal_pitch"] >= -45) & (df_filtered["gimbal_pitch"] <= -15))
    ].copy()

    oblique_images = pd.DataFrame()
    for direction, angle in directions.items():
        heading_min = angle - 15
        heading_max = angle + 15
        condition_heading = df_oblique_candidates["normalized_heading"].between(heading_min, heading_max)
        oblique = df_oblique_candidates[condition_heading].sort_values("distance").head(1)
        if not oblique.empty:
            oblique["direction"] = direction
            oblique_images = pd.concat([oblique_images, oblique])

    if not nadir_image.empty:
        nadir_image["direction"] = "Nadir (Center)"
    return pd.concat([nadir_image, oblique_images]).reset_index(drop=True)
def display_metadata(filtered_df, selected):
    print("\nAll available images with their metadata:")
    for _, row in filtered_df.iterrows():
        img_filename = os.path.basename(row['image_uri'])
        print(f"Image URI: {img_filename}, Latitude: {row['latitude']}, Longitude: {row['longitude']}, Heading: {row['heading']}°, Gimbal Pitch: {row['gimbal_pitch']}°")

    print("\nSelected images for cropping:")
    for _, row in selected.iterrows():
        print(f"{row['direction']}: {row['image_uri']} (Heading: {row['heading']}°, Pitch: {row['gimbal_pitch']}°)")

# OSM Polygon Extraction

In [None]:
def extract_building_polygon(lat, lon):
    """
    Extract the building polygon (if any) from OSM that contains the given lat/lon.
    Returns a Shapely polygon or None if not found.
    """
    target_point = Point(lon, lat)
    tags = {"building": True}
    buildings = ox.features_from_point((lat, lon), tags=tags, dist=50)
    polygons = buildings[buildings.geom_type.isin(['Polygon', 'MultiPolygon'])]
    matched = polygons[polygons.geometry.contains(target_point)]
    return matched.geometry.iloc[0] if not matched.empty else None


def polygon_to_pixels(polygon, nadir_row, width, height, hfov, vfov, shift_x=0, shift_y=0):
    """
    Convert lat/lon coordinates of a polygon to pixel coordinates using nadir image parameters.
    Apply an  pixel shift for better alignment.
    """
    coords = list(polygon.exterior.coords)
    pixel_coords = []
    for lon, lat in coords:
        px, py, *_ = latlon_to_pixel(
            nadir_row['latitude'], nadir_row['longitude'], nadir_row['altitude'],
            nadir_row['heading'], nadir_row['gimbal_pitch'],
            lat, lon,
            width, height,
            hfov, vfov
        )

        pixel_coords.append((px + shift_x, py + shift_y))
    return pixel_coords


def crop_nadir_building_polygon(nadir_row, target_lat, target_lon, image_dir, hfov, vfov):
    image_name = os.path.basename(nadir_row['image_uri'])
    image_path = os.path.join(image_dir, image_name)
    if not os.path.exists(image_path):
        print(f"Image not found: {image_path}")
        return None

    image = Image.open(image_path)
    width, height = image.size

    # Get building polygon from OSM
    building_polygon = extract_building_polygon(target_lat, target_lon)
    if building_polygon is None:
        print("❌ No building polygon found at this location.")
        return None

    pixel_coords = polygon_to_pixels(building_polygon, nadir_row, width, height, hfov, vfov, shift_x=0, shift_y=0)
    xs, ys = zip(*pixel_coords)
    min_x, max_x = min(xs), max(xs)
    min_y, max_y = min(ys), max(ys)

    # Calculate width and height of the box
    box_width = max_x - min_x
    box_height = max_y - min_y

    # Expand by 20%
    expand_x = box_width * 0.2
    expand_y = box_height * 0.2

    crop_box = (
        int(max(min_x - expand_x / 2, 0)),
        int(max(min_y - expand_y / 2, 0)),
        int(min(max_x + expand_x / 2, width)),
        int(min(max_y + expand_y / 2, height))
    )

    cropped = image.crop(crop_box)

    # Save the cropped image
    output_path = os.path.join("cropped_images", "nadir_building_crop.jpg")
    cropped.save(output_path)
    print(f"✅ Nadir building crop saved to: {output_path}")

    # Overlay polygon on the original image
    overlay = image.copy()
    draw = ImageDraw.Draw(overlay)
    draw.polygon(pixel_coords, outline="red", width=10)
    overlay_path = os.path.join("annotated_images", "nadir_building_overlay.jpg")
    overlay.save(overlay_path)
    print(f"✅ Nadir image with polygon overlay saved to: {overlay_path}")
    return ("Nadir", cropped)

def overlay_nadir_building_on_obliques(selected_df, building_polygon, image_dir, hfov=69.6, vfov=55.2):
    """
    Projects OSM building polygon onto oblique images with oblique-specific correction.
    """
    cropped_oblique_buildings = []
    numbered_folder = get_next_folder_number("cropped_images")
    cropped_folder = os.path.join("cropped_images", numbered_folder)
    annotated_folder = os.path.join("annotated_images", numbered_folder)
    os.makedirs(cropped_folder, exist_ok=True)
    os.makedirs(annotated_folder, exist_ok=True)
    for _, row in selected_df.iterrows():
        if row['direction'] == "Nadir (Center)":
            continue

        image_name = os.path.basename(row['image_uri'])
        image_path = os.path.join(image_dir, image_name)
        if not os.path.exists(image_path):
            print(f"Image not found: {image_path}")
            continue

        image = Image.open(image_path)
        width, height = image.size

        pixel_coords = []
        for lon, lat in building_polygon.exterior.coords:
            px, py, *_ = latlon_to_pixel(
                row['latitude'], row['longitude'], row['altitude'],
                row['heading'], row['gimbal_pitch'],
                lat, lon,
                width, height, hfov, vfov
            )

            # Apply **oblique-specific offset**
            # print(px, py)
            px, py = apply_polynomial_offset_model(px, py)
            # print(px, py)

            pixel_coords.append((px, py))
        xs, ys = zip(*pixel_coords)
        min_x, max_x = min(xs), max(xs)
        min_y, max_y = min(ys), max(ys)
        box_width = max_x - min_x
        box_height = max_y - min_y


        # Expand%
        expand_x = box_width * 0.8
        expand_y = box_height * 0.8
        vertical_shift = int(0.4 * box_height)

        crop_box = (
      int(max(min_x - expand_x / 2, 0)),
      int(max(min_y - expand_y / 2 + vertical_shift, 0)),
      int(min(max_x + expand_x / 2, width)),
      int(min(max_y + expand_y / 2 + vertical_shift, height))
  )

        cropped = image.crop(crop_box)
        print(crop_box)


        # Annotate polygon on oblique image
        annotated = image.copy()
        draw = ImageDraw.Draw(annotated)
        draw.polygon(pixel_coords, outline="blue", width=10)

        overlay_path = os.path.join(annotated_folder, f"oblique_overlay_{row['direction']}_{image_name}")
        annotated.save(overlay_path)
        print(f"✅ Saved oblique overlay: {overlay_path}")
        cropped_path = os.path.join(cropped_folder, f"oblique_crop_{row['direction']}_{image_name}")
        cropped.save(cropped_path)
        cropped_oblique_buildings.append((row['direction'], cropped))
    return cropped_oblique_buildings


# Offset Correction Models

In [None]:
import joblib
model_dx = joblib.load("model_dx.pkl")
model_dy = joblib.load("model_dy.pkl")


def apply_polynomial_offset_model(x, y):
    input_point = np.array([[x, y]])
    dx = model_dx.predict(input_point)[0]
    dy = model_dy.predict(input_point)[0]
    return x + dx, y + dy



# Main Execution

In [1]:
if __name__ == "__main__":
  image_dir = '/content/images'
  metadata_file = '/content/NE-metadata-sample.csv'
  GOOGLE_API_KEY = ''

  os.makedirs('cropped_images', exist_ok=True)
  os.makedirs('annotated_images', exist_ok=True)

  if not os.path.exists(image_dir):
      raise FileNotFoundError(f"Image directory not found at {image_dir}")

  if not os.path.exists(metadata_file):
      raise FileNotFoundError(f"Metadata file not found at {metadata_file}")

  if not GOOGLE_API_KEY:
      raise ValueError("Google API key is missing. Please provide a valid key.")


  # For testing with existing images, you can use this address:
  # PWPC+V3F Lakewood, Colorado, USA
  address =input("Enter an address: ")
  lat, lon = get_lat_lon_from_address(address)
  geocode_and_display_map(lat, lon,address)

  filtered_df = load_and_filter_metadata(lat, lon,metadata_file)
  selected = select_nadir_oblique_images(filtered_df)
  display_metadata(filtered_df, selected)

  building_polygon = extract_building_polygon(lat, lon)

  if not building_polygon:
    print("No building found for the given coordinates.")



  if building_polygon is not None:
    nadir_selected = selected[selected["direction"] == "Nadir (Center)"]
    if not nadir_selected.empty:
        nadir_row = nadir_selected.iloc[0]
        nadir_cropped = crop_nadir_building_polygon(nadir_row, lat, lon, image_dir, hfov=69.6, vfov=55.2)
        oblique_cropped = overlay_nadir_building_on_obliques(selected, building_polygon, image_dir, hfov=69.6, vfov=55.2)
        all_cropped = [nadir_cropped] + oblique_cropped
    else:
        print("⚠️ No nadir image found. Proceeding with oblique images only.")
        all_cropped = overlay_nadir_building_on_obliques(selected, building_polygon, image_dir, hfov=69.6, vfov=55.2)

    if all_cropped:
        fig, axs = plt.subplots(1, len(all_cropped), figsize=(5 * len(all_cropped), 5))
        if len(all_cropped) == 1:
            axs = [axs]
        for ax, (label, img) in zip(axs, all_cropped):
            ax.imshow(img)
            ax.set_title(label)
            ax.axis('off')
        plt.tight_layout()
        plt.savefig("combined_cropped_images.jpg")
        plt.show()
        plt.close(fig)



NameError: name 'os' is not defined

#Training

In [None]:
!curl -s ifconfig.me

34.73.25.36

In [None]:
import pandas as pd
import joblib
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from math import sqrt

#  Load your data from Excel
df = pd.read_excel("offset_data_v2.xlsx")  # Replace with your filename

# 📐 Define input features and target offsets
X = df[['x_pixel_pred', 'y_pixel_pred']].values
y_dx = df['x_pixel_actual'] - df['x_pixel_pred']
y_dy = df['y_pixel_actual'] - df['y_pixel_pred']

# 🧪 Split into training and test sets
X_train, X_test, y_dx_train, y_dx_test, y_dy_train, y_dy_test = train_test_split(
    X, y_dx, y_dy, test_size=0.2, random_state=42
)

#  Create polynomial regression models
model_dx = make_pipeline(PolynomialFeatures(2), LinearRegression())
model_dy = make_pipeline(PolynomialFeatures(2), LinearRegression())

#  Train the models
model_dx.fit(X_train, y_dx_train)
model_dy.fit(X_train, y_dy_train)

#  Evaluate the models
dx_pred = model_dx.predict(X_test)
dy_pred = model_dy.predict(X_test)



rmse_dx = sqrt(mean_squared_error(y_dx_test, dx_pred))
rmse_dy = sqrt(mean_squared_error(y_dy_test, dy_pred))


print(f"📏 Test RMSE (dx): {rmse_dx:.2f} pixels")
print(f"📏 Test RMSE (dy): {rmse_dy:.2f} pixels")


# 🔁 Load trained models
joblib.dump(model_dx, "model_dx.pkl")
joblib.dump(model_dy, "model_dy.pkl")


📏 Test RMSE (dx): 118.40 pixels
📏 Test RMSE (dy): 56.75 pixels


['model_dy.pkl']