# AI Final Project: Mapping Sun Exposure and Extreme Heat Risk in Grove Hall, Boston
## William Wang, MUSA 6950, Spring 2025

In [None]:
import os
import glob
import math
import numpy as np
import pandas as pd
import cv2
import requests
import rasterio
import matplotlib.pyplot as plt
import geopandas as gpd
from shapely.geometry import Point
from scipy.stats import gaussian_kde
import pvlib
from rasterio.transform import from_bounds
import torch
from torchvision import transforms
from efficientvit.models.utils import resize
from efficientvit.seg_model_zoo import create_efficientvit_seg_model
from eval_efficientvit_seg_model import ToTensor, get_canvas, CityscapesDataset

In [None]:
# -----------------------
# Step 1: Download Street View Panoramas
# -----------------------

API_KEY = "AIzaSyCdxDlGz2ebZBz_tBiBfc9l5k4ERwdFyak"
OUTPUT_FOLDER = "full_panoramas"
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# ========== FUNCTION DEFINITIONS ==========

def get_pano_metadata(lat, lon):
    url = f"https://maps.googleapis.com/maps/api/streetview/metadata?location={lat},{lon}&key={API_KEY}"
    response = requests.get(url)
    return response.json()

def download_tile(pano_id, zoom, x, y):
    url = f"https://cbk0.googleapis.com/cbk?output=tile&panoid={pano_id}&zoom={zoom}&x={x}&y={y}"
    response = requests.get(url)
    if response.status_code == 200:
        return Image.open(BytesIO(response.content))
    return None

def download_full_panorama(pano_id, save_path, zoom=3):
    tile_size = 512
    width, height = 26, 13
    panorama = Image.new("RGB", (tile_size * width, tile_size * height))

    for y in range(height):
        for x in range(width):
            tile = download_tile(pano_id, zoom, x, y)
            if tile:
                panorama.paste(tile, (x * tile_size, y * tile_size))
    panorama.save(save_path)

# ========== GENERATE GRID POINTS (Grove Hall) ==========
# Rough bounds for Grove Hall neighborhood
min_lat, max_lat = 42.310, 42.320
min_lon, max_lon = -71.095, -71.075
spacing = 0.001  # ~100m spacing

lat_points = np.round(np.arange(min_lat, max_lat, spacing), 6)
lon_points = np.round(np.arange(min_lon, max_lon, spacing), 6)
coords = [(lat, lon) for lat in lat_points for lon in lon_points]

# ========== MAIN LOOP ==========
metadata_records = []
print("📸 Starting download of full GSV panoramas...")
for lat, lon in tqdm(coords):
    try:
        meta = get_pano_metadata(lat, lon)
        if meta.get("status") == "OK" and "pano_id" in meta:
            pano_id = meta["pano_id"]
            date = meta.get("date", "")
            yaw = meta.get("pano_yaw_deg", "")
            if yaw is None:
              yaw = 0.0
            save_path = os.path.join(OUTPUT_FOLDER, f"{pano_id}.jpg")

            if not os.path.exists(save_path):  # Avoid re-download
                download_full_panorama(pano_id, save_path)

            metadata_records.append({
                "panoid": pano_id,
                "lat": lat,
                "lon": lon,
                "date": date,
                "yaw": yaw
            })
    except Exception as e:
        print(f"⚠️ Error at {lat},{lon}: {e}")
        continue
        
input_file = '/content/drive/MyDrive/Colab Notebooks/data/gsv_panoramas_grovehall_100m.csv'
output_file = 'gsv_grovehall_with_yaw_final.csv'

with open(input_file, 'r', encoding='utf-8') as fin, open(output_file, 'w', newline='', encoding='utf-8') as fout:
    reader = csv.DictReader(fin)
    fieldnames = reader.fieldnames + ['heading']
    writer = csv.DictWriter(fout, fieldnames=fieldnames)
    writer.writeheader()

    for row in reader:
        pid = row['panoid']
        lat = float(row['lat'])
        lon = float(row['lon'])

        try:
            panoids = search_panoramas(lat, lon)
            if panoids:
                heading = panoids[0].heading
            else:
                heading = None
        except Exception as e:
            print(f"Error at {pid}: {e}")
            heading = None

        row['heading'] = heading
        writer.writerow(row)

In [None]:
# -----------------------
# Step 2: Train or Load EfficientViT Model
# -----------------------

def load_model(model_name="efficientvit-seg-l1-cityscapes", weight_path=None):
    model = create_efficientvit_seg_model(model_name, pretrained=False).cuda()
    checkpoint = torch.load(weight_path)
    state_dict = checkpoint.get("state_dict", checkpoint)
    model.load_state_dict(state_dict)
    model.eval()
    return model

def segment_image(model, image_path, output_image_path=None, output_pred_path=None):
    image = np.array(Image.open(image_path).convert("RGB"))
    h, w = image.shape[:2]

    # Resize
    crop_size = 512
    if h < w:
        th = crop_size
        tw = math.ceil(w / h * th / 32) * 32
    else:
        tw = crop_size
        th = math.ceil(h / w * tw / 32) * 32
    data = cv2.resize(image, (tw, th), interpolation=cv2.INTER_CUBIC) if (h != th or w != tw) else image.copy()

    transform = transforms.Compose([
        ToTensor(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    data = transform({"data": data, "label": np.ones_like(data)})["data"]
    data = data.unsqueeze(0).cuda()

    with torch.inference_mode():
        output = model(data)
        if output.shape[-2:] != image.shape[:2]:
            output = resize(output, size=image.shape[:2])
        pred = torch.argmax(output, dim=1).cpu().numpy()[0]

    if output_image_path:
        canvas = get_canvas(image, pred, CityscapesDataset.class_colors)
        Image.fromarray(canvas).save(output_image_path)
    if output_pred_path:
        np.save(output_pred_path, pred)

    return pred

model = load_model(
    model_name="efficientvit-seg-l1-cityscapes",
    weight_path="checkpoints/efficientvit_seg_l1_cityscapes.pt"
)

#Compute Sky Masks
input_folder = "/mnt/data/stitched_panoramas/"
segmentation_output_folder = "/mnt/data/segmentation_predictions/"
os.makedirs(segmentation_output_folder, exist_ok=True)

stitched_images = glob.glob(os.path.join(input_folder, "*.jpg"))

for img_path in stitched_images:
    panoid = os.path.basename(img_path).replace(".jpg", "")
    seg_output_image_path = f"/mnt/data/segmentation_results/{panoid}_seg_vis.png"
    seg_output_pred_path = os.path.join(segmentation_output_folder, f"{panoid}_seg.npy")

    segment_image(
        model,
        image_path=img_path,
        output_image_path=seg_output_image_path,
        output_pred_path=seg_output_pred_path
    )


In [None]:
# -----------------------
# Step 3: Project to Fisheye with Yaw
# -----------------------

def pano_to_fisheye(pred_map, yaw_deg, output_size=512):
    height, width = pred_map.shape
    center = output_size // 2
    fisheye = np.zeros((output_size, output_size), dtype=np.uint8)

    yaw_rad = np.radians(yaw_deg)
    for y in range(output_size):
        for x in range(output_size):
            dx = x - center
            dy = y - center
            r = math.sqrt(dx**2 + dy**2)

            if r > center or r == 0:
                continue

            theta = math.atan2(dy, dx) - yaw_rad
            phi = (r / center) * (math.pi/2)

            if phi > (math.pi/2):
                continue

            u = (theta % (2 * math.pi)) / (2 * math.pi) * width
            v = (phi / (math.pi/2)) * height

            if np.isnan(u) or np.isnan(v):
                continue

            u = int(np.clip(u, 0, width-1))
            v = int(np.clip(v, 0, height-1))

            fisheye[y, x] = pred_map[v, u]

    return fisheye

# Batch process all panoramas
for idx, row in tqdm(df.iterrows(), total=len(df)):
    panoid = row["panoid"]
    yaw = row["yaw"]

    pred_path = os.path.join(segmentation_folder, f"{panoid}_stitched_seg.npy")
    if not os.path.exists(pred_path):
        print(f"⚠️ Missing segmentation for {panoid}")
        continue

    pred_map = np.load(pred_path)

    # Project to fisheye with yaw
    fisheye_pred = pano_to_fisheye(pred_map, yaw_deg=yaw, output_size=512)

    # Extract sky mask (Cityscapes sky class = 10)
    sky_mask = (fisheye_pred == 10).astype(np.uint8)

    # Save fisheye sky mask
    out_path = os.path.join(fisheye_output_folder, f"{panoid}_fisheye_sky_mask.png")
    cv2.imwrite(out_path, sky_mask * 255)

    print(f"✅ Saved fisheye sky mask for {panoid}")

In [None]:
# -----------------------
# Step 4: Simulate Sun Position & Exposure Calculation
# -----------------------

def get_daily_sun_positions(lat, lon, date, freq="10min"):
    times = pd.date_range(f"{date} 04:00", f"{date} 20:00", freq=freq, tz="America/New_York")
    solpos = pvlib.solarposition.get_solarposition(times, lat, lon)
    return solpos[['zenith', 'azimuth']]

def calculate_sun_exposure_for_day(sky_mask, solpos, image_size=512):
    center = image_size // 2
    radius = center
    exposed_minutes = 0

    for _, row in solpos.iterrows():
        zenith = row["zenith"]
        azimuth = row["azimuth"]
        if zenith > 90:
            continue

        r = (zenith / 90) * radius
        theta = np.radians(azimuth)
        x = int(center + r * np.sin(theta))
        y = int(center - r * np.cos(theta))

        if 0 <= x < image_size and 0 <= y < image_size:
            if sky_mask[y, x] > 0:
                exposed_minutes += 10

    return exposed_minutes / 60  # convert to hours

def calculate_average_july_exposure(sky_mask_path, lat, lon):
    sky_mask = cv2.imread(sky_mask_path, cv2.IMREAD_GRAYSCALE)
    sky_mask = (sky_mask > 127).astype(np.uint8)

    total_hours = 0
    for day in range(1, 32):
        date = f"2024-07-{day:02d}"
        solpos = get_daily_sun_positions(lat, lon, date)
        total_hours += calculate_sun_exposure_for_day(sky_mask, solpos)

    return total_hours / 31

In [None]:
# -----------------------
# Step 5: Raster Density Map Generation
# -----------------------

#Estimate Extreme Heat Risk

# Load sun exposure results
df = pd.read_csv("/mnt/data/july_avg_sun_exposure.csv")

# Define a function to classify heat risk
def classify_heat_risk(avg_sun_hours):
    if avg_sun_hours > 4:
        return "High"
    elif avg_sun_hours > 2:
        return "Moderate"
    else:
        return "Low"

# Apply classification
df['heat_risk'] = df['avg_sun_hours'].apply(classify_heat_risk)