# Urbanization Ratio Extraction for Water Quality Datasets

This notebook computes urbanization ratios around sampling locations from the provided water quality training dataset and submission template.

The urbanization ratio is calculated as the fraction of pixels classified as Urban areas (class 190) in the ESA CCI Land Cover (annual 300 m resolution) dataset within buffer zones (e.g., 500 m and 1 km) around each sampling point.

**Data sources:**

- `water_quality_training_dataset.csv`: training data with latitude and longitude
- `submission_template.csv`: validation set locations

**Outputs:**

This notebook reads both CSVs, extracts unique sampling coordinates, computes urbanization ratios for each year from 2011 to 2015 and buffer distances (500 m and 1 km), and merges the results back to the training and submission datasets.

Note: Running the ratio computation across all points may take significant time since it downloads and processes raster tiles from the Microsoft Planetary Computer. A progress bar is displayed during computation.

In [None]:
# If running in a fresh environment, uncomment the following line to install required packages
!pip install pystac-client planetary-computer geopandas shapely numpy pandas rioxarray rasterio tqdm

In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import mapping, box
import requests
import time
from tqdm import tqdm

import planetary_computer

# Raster handling
import rioxarray as rxr
import rasterio

# For functional operations
from functools import reduce
from functools import lru_cache

In [None]:
# STAC search helper with pagination
STAC_SEARCH_URL = 'https://planetarycomputer.microsoft.com/api/stac/v1/search'

def search_items_stac(collection_id, bbox, start_date, end_date,
                      batch_size=200, sleep_seconds=0.2, max_retries=5):
    """Perform a STAC search and return all items within the specified date range and bounding box."""
    payload = {
        'collections': [collection_id],
        'bbox': bbox,
        'datetime': f'{start_date}/{end_date}',
        'limit': batch_size
    }
    items = []
    session = requests.Session()

    # 用於指數退避的初始等待
    backoff = sleep_seconds

    # 嘗試搜尋及重試
    for attempt in range(max_retries):
        try:
            response = session.post(STAC_SEARCH_URL, json=payload, timeout=30)
            response.raise_for_status()
            data = response.json()
            batch_items = data.get('features', []) or []
            items.extend(batch_items)

            # Helper to find the next link
            def get_next_link(obj):
                for link in obj.get('links', []):
                    if link.get('rel') == 'next':
                        return link.get('href')
                return None

            next_link = get_next_link(data)
            # 依序取得下一頁資源
            while next_link:
                time.sleep(sleep_seconds)
                resp = session.get(next_link, timeout=30)
                resp.raise_for_status()
                data = resp.json()
                batch_items = data.get('features', []) or []
                items.extend(batch_items)
                next_link = get_next_link(data)

            return items

        except requests.exceptions.HTTPError as e:
            # 需要重試的狀態碼
            if response.status_code in [429, 500, 502, 503, 504] and attempt < max_retries - 1:
                # 指數退避等待
                time.sleep(backoff)
                backoff *= 2
                continue
            else:
                raise

    return items  # 若多次重試後仍失敗，回傳現有結果或空

In [None]:
import random

def open_raster_with_retry(href, tries=5, base_sleep=0.5):
    '''Open a raster dataset with retries to handle transient HTTP errors.'''
    last_err = None
    for t in range(tries):
        try:
            return rxr.open_rasterio(href, masked=True)
        except rasterio.errors.RasterioIOError as e:
            last_err = e
            time.sleep(base_sleep * (2 ** t) + random.random() * 0.2)
    raise last_err

@lru_cache(maxsize=16)
def search_stac_cached(collection_id, bbox, start_date, end_date):
    """Wrapper for lru_cache to avoid repeated searches over the same range."""
    return tuple(search_items_stac(collection_id, bbox, start_date, end_date))

from shapely.geometry import box

def pick_item_for_buffer(items, buffer_geom, year):
    """Pick a STAC item that contains (or at least intersects) the buffer bbox."""
    minx, miny, maxx, maxy = buffer_geom.bounds
    buf_box = box(minx, miny, maxx, maxy)
    # Filter by year substring in ID if available
    year_items = [it for it in items if f"-{year}-" in it.get("id", "")]
    candidates = year_items if year_items else list(items)
    contains = []
    intersects = []
    for it in candidates:
        ib = it.get("bbox")
        if not ib:
            continue
        it_box = box(*ib)
        if it_box.contains(buf_box):
            contains.append((it_box.area, it))
        elif it_box.intersects(buf_box):
            intersects.append((it_box.area, it))
    if contains:
        contains.sort(key=lambda x: x[0])
        return contains[0][1]
    if intersects:
        intersects.sort(key=lambda x: x[0])
        return intersects[0][1]
    return candidates[0] if candidates else None

def compute_builtup_ratio(
    gdf, buffer_distance, year, landcover_collection="esa-cci-lc",
    batch_size=200, sleep_seconds=0.2):
    """Compute built-up ratio for each geometry in gdf at the given buffer distance and year."""
    results = []
    start_date = f"{year}-01-01"
    end_date = f"{year}-12-31"
    # calculate unified bounding box for all point buffers
    buffers_3857 = gdf.to_crs("EPSG:3857").geometry.buffer(buffer_distance)
    buffers_4326 = gpd.GeoSeries(buffers_3857, crs="EPSG:3857").to_crs("EPSG:4326")
    union_minx, union_miny, union_maxx, union_maxy = buffers_4326.total_bounds
    union_bbox = [union_minx, union_miny, union_maxx, union_maxy]
    # search once per year for all items in bounding box
    items = search_stac_cached(landcover_collection, tuple(union_bbox), start_date, end_date)
    items = list(items)
    for idx, buffer_geom in tqdm(enumerate(buffers_4326), total=len(buffers_4326),
                                 desc=f"Buffer {buffer_distance}m, Year {year}"):
        selected_item = pick_item_for_buffer(items, buffer_geom, year)
        ratio = np.nan
        if selected_item is not None:
            asset_key = "map" if "map" in selected_item["assets"] else list(selected_item["assets"].keys())[0]
            raw_href = selected_item["assets"][asset_key]["href"]
            signed_href = planetary_computer.sign(raw_href)
            try:
                arr = open_raster_with_retry(signed_href)
                # Clip raster to the single buffer
                masked = arr.rio.clip([mapping(buffer_geom)], drop=True, invert=False)
                data = masked.values
                valid = data[~np.isnan(data)]
                built_up = (valid == 190).sum()
                ratio = built_up / len(valid) if len(valid) > 0 else np.nan
            except Exception:
                ratio = np.nan
        results.append({"geometry_index": idx,
                        f"built_up_ratio_{int(buffer_distance)}m_{year}": ratio})
    return pd.DataFrame(results)

In [None]:
# Paths to datasets (adjust if necessary)
train_path = 'water_quality_training_dataset.csv'
submission_path = 'submission_template.csv'

# Read datasets
train_df = pd.read_csv(train_path)
submission_df = pd.read_csv(submission_path)

print('Training data shape:', train_df.shape)
print('Submission data shape:', submission_df.shape)

# Extract unique coordinate pairs
train_coords = train_df[['Latitude', 'Longitude']].drop_duplicates().reset_index(drop=True)
submission_coords = submission_df[['Latitude', 'Longitude']].drop_duplicates().reset_index(drop=True)

all_coords = pd.concat([train_coords, submission_coords]).drop_duplicates().reset_index(drop=True)

# Create GeoDataFrame
points_gdf = gpd.GeoDataFrame(all_coords, geometry=gpd.points_from_xy(all_coords['Longitude'], all_coords['Latitude']), crs='EPSG:4326')

print('Total unique points:', len(points_gdf))

In [None]:
# Define buffer distances and years
buffer_distances = [1000,5000]
years = [2011, 2012, 2013, 2014, 2015]

ratio_frames = []
for d in buffer_distances:
    year_frames = []
    for yr in years:
        df_ratio = compute_builtup_ratio(points_gdf, buffer_distance=d, year=yr, landcover_collection='esa-cci-lc')
        year_frames.append(df_ratio)
    merged_years = reduce(lambda left, right: left.merge(right, on='geometry_index'), year_frames)
    ratio_frames.append(merged_years)

# Merge buffer distances
final_ratio = reduce(lambda left, right: left.merge(right, on='geometry_index'), ratio_frames)

# Add coordinates back
final_ratio = pd.concat([points_gdf[['Latitude', 'Longitude']].reset_index(drop=True), final_ratio.drop(columns=['geometry_index'])], axis=1)

# Merge back to training and submission
train_with_ratio = train_df.merge(final_ratio, on=['Latitude', 'Longitude'], how='left')
submission_with_ratio = submission_df.merge(final_ratio, on=['Latitude', 'Longitude'], how='left')

# Display results
print('Training data with ratios:')
display(train_with_ratio.head())
print('Submission data with ratios:')
display(submission_with_ratio.head())

In [None]:
submission_with_ratio

In [None]:
train_with_ratio

In [None]:
train_with_ratio.to_csv("train_with_ratio.csv", index=False)
submission_with_ratio.to_csv("submission_with_ratio.csv", index=False)

In [None]:
import snowflake
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
train_with_ratio.to_csv("/tmp/urbanization_train.csv",index = False)

session.sql("""
    PUT file:///tmp/urbanization_train.csv
    'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge-2"/versions/live/'
    AUTO_COMPRESS=FALSE
    OVERWRITE=TRUE
""").collect()

print("File saved! Refresh the browser to see the files in the sidebar")

In [None]:
train_with_ratio.to_csv("/tmp/urbanization_val.csv",index = False)

session.sql("""
    PUT file:///tmp/urbanization_val.csv
    'snow://workspace/USER$.PUBLIC."EY-AI-and-Data-Challenge-2"/versions/live/'
    AUTO_COMPRESS=FALSE
    OVERWRITE=TRUE
""").collect()

print("File saved! Refresh the browser to see the files in the sidebar")