In [10]:
# ---- user config ----
YOUR_NAME = 'sara'
AWS_PROFILE = 'cities'

# If you want to limit the run (debug):
# LIMIT = 20
LIMIT = None

# Cities to ignore for now
IGNORE_CITIES = [
    'Ngo__Nigeria',
    'Bugama__Nigeria',
    'Mubi__Nigeria',
    'San_Pedro_de_Macoris__Dominican_Republic',
]

# Dask tuning
PARTITION_SIZE = 1   # 1 city per task (safer, better isolation)


In [11]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
# ---- shared S3 paths (mirrors pre_processing.py) ----
MAIN_PATH = 's3://wri-cities-sandbox/identifyingLandSubdivisions/data'
INPUT_PATH = f'{MAIN_PATH}/input'
CITY_INFO_PATH = f'{INPUT_PATH}/city_info'
EXTENTS_PATH = f'{CITY_INFO_PATH}/extents'
BUILDINGS_PATH = f'{INPUT_PATH}/buildings'
BUILDINGS_DISTANCES_PATH = f'{INPUT_PATH}/buildings_with_distances'  # not used directly by current function
ROADS_PATH = f'{INPUT_PATH}/roads'
INTERSECTIONS_PATH = f'{INPUT_PATH}/intersections'
NATURAL_FEATURES_PATH = f'{INPUT_PATH}/natural_features_and_railroads'
GRIDS_PATH = f'{INPUT_PATH}/city_info/grids'
SEARCH_BUFFER_PATH = f'{INPUT_PATH}/city_info/search_buffers'
BLOCKS_PATH = f'{INPUT_PATH}/blocks'
OUTPUT_PATH = f'{MAIN_PATH}/output'


In [13]:
# ---- check S3 connection using AWS_PROFILE ----
import boto3, os

session = boto3.Session(profile_name=AWS_PROFILE)
s3 = session.client('s3')

# Export profile so s3fs/cloudpathlib inherit it
os.environ['AWS_PROFILE'] = AWS_PROFILE

s3.list_buckets()


{'ResponseMetadata': {'RequestId': 'GWWACT5M89F5X4NF',
  'HostId': '3OQV2ulEq745mJroZawOFr1aNfIi1+gBvoqaNRWwgN5c+3oMGMo9wyhtIJguNnRJxIOB4bO0/VFZkkVJTtu1hidKujzlfVpTKHQlcr7LYmo=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '3OQV2ulEq745mJroZawOFr1aNfIi1+gBvoqaNRWwgN5c+3oMGMo9wyhtIJguNnRJxIOB4bO0/VFZkkVJTtu1hidKujzlfVpTKHQlcr7LYmo=',
   'x-amz-request-id': 'GWWACT5M89F5X4NF',
   'date': 'Sat, 17 Jan 2026 23:10:51 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'aft-sandbox-540362055257',
   'CreationDate': datetime.datetime(2022, 9, 13, 15, 12, 20, tzinfo=tzutc()),
   'BucketArn': 'arn:aws:s3:::aft-sandbox-540362055257'},
  {'Name': 'amplify-citiesindicatorsapi-dev-10508-deployment',
   'CreationDate': datetime.datetime(2023, 8, 30, 5, 5, 13, tzinfo=tzutc()),
   'BucketArn': 'arn:aws:s3:::amplify-citiesindicatorsapi-dev-10508-deployment'},
  {'Name': 'cities-heat',
   'Crea

In [14]:
# ---- start Coiled cluster (adjust n_workers / instance type as needed) ----
import coiled

cluster = coiled.Cluster(
    workspace='wri-cities-data',
    name=f'ils-preproc-{YOUR_NAME}',
    region='us-west-2',
    arm=True,
    worker_vm_types='r8g.xlarge',
    spot_policy='spot',
    n_workers=8,
    package_sync_ignore=['pyspark', 'pypandoc'],
    worker_options={'nthreads': 1},
)
client = cluster.get_client()
print(f'Started Dask client. Dashboard: {client.dashboard_link}')


[2026-01-17 18:10:50,811][INFO    ][coiled] Fetching latest package priorities...
[2026-01-17 18:10:50,813][INFO    ][coiled.package_sync] Resolving your local subdivisions2 Python environment...
[2026-01-17 18:10:51,232][INFO    ][coiled.package_sync] Scanning 446 conda packages...
[2026-01-17 18:10:51,239][INFO    ][coiled.package_sync] Scanning 261 python packages...
[2026-01-17 18:10:51,991][INFO    ][coiled.software_utils] No username or password found for https://conda.anaconda.org/conda-forge
[2026-01-17 18:10:52,516][INFO    ][coiled] Running pip check...
[2026-01-17 18:10:52,962][INFO    ][coiled] Validating environment...
[2026-01-17 18:10:56,685][INFO    ][coiled] Creating wheel for ~/Documents/Identifying Land Subdivisions/identifyingLandSubdivisions...
[2026-01-17 18:10:56,815][INFO    ][coiled] Creating wheel for /opt/spark-2.2.0/python...
[2026-01-17 18:10:56,982][INFO    ][coiled.package_sync] Package - aiobotocore, Pip check had the following issues that need resolving

Started Dask client. Dashboard: https://cluster-vhdph.dask.host/YwcDGTBBc9-0vRBT/status



+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4     | 4.4.4  | 4.4.5     | 4.4.5   |
+---------+--------+-----------+---------+


2026-01-17 21:42:05,061 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [15]:
# ---- build city list from S3 (roads folders) ----
import s3fs

fs = s3fs.S3FileSystem(anon=False)

road_city_dirs = fs.ls(ROADS_PATH)
all_cities = sorted([p.split('/')[-1] for p in road_city_dirs])
all_cities = [c for c in all_cities if c and c not in ['.DS_Store']]

cities = [c for c in all_cities if c not in set(IGNORE_CITIES)]

print('roads city dirs:', len(all_cities))
print('cities to run:', len(cities))

if LIMIT:
    cities = cities[:LIMIT]
    print('LIMIT applied ->', len(cities))


roads city dirs: 1234
cities to run: 1234


In [16]:
# ---- logging + runner (inspired by gather_data_executor) ----
import os, time, socket, traceback
from datetime import datetime, timezone
from cloudpathlib import S3Path

import dask
from dask import compute

from pre_processing import calculate_building_distances_to_roads, produce_azimuths

RUN_ID = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
LOGS_S3_DIR = f"{OUTPUT_PATH}/logs/pre_processing/{RUN_ID}"
SUMMARY_S3_PATH = f"{LOGS_S3_DIR}/summary.csv"
LOCAL_LOG_DIR = f"/tmp/pre_processing_logs/{RUN_ID}"


def utc_now():
    return datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')


def append_log(path, msg):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, 'a', encoding='utf-8') as f:
        f.write(msg)


def s3_exists(uri: str) -> bool:
    # cheap existence check for s3://...
    try:
        return fs.exists(uri.replace('s3://', ''))
    except Exception:
        return False


def required_inputs_exist(city_name: str):
    # For building distances: needs buildings + roads
    buildings_path = f"{BUILDINGS_PATH}/{city_name}/Overture_building_{city_name}.geoparquet"
    roads_path = f"{ROADS_PATH}/{city_name}/{city_name}_OSM_roads.geoparquet"

    # For azimuths: unknown; assume it will handle its own reads, but roads+buildings are usually required
    ok_buildings = s3_exists(buildings_path)
    ok_roads = s3_exists(roads_path)

    missing = []
    if not ok_buildings:
        missing.append('buildings')
    if not ok_roads:
        missing.append('roads')

    return (len(missing) == 0), missing, {'buildings': buildings_path, 'roads': roads_path}


def dist_output_path(city_name: str):
    # matches calculate_building_distances_to_roads() naming
    p = f"{BUILDINGS_PATH}/{city_name}/Overture_building_{city_name}.geoparquet"
    return p.replace('.geoparquet', '_with_distances.geoparquet')


def run_city_preproc(city_name: str):
    host = socket.gethostname()
    t0 = time.time()
    local_log = f"{LOCAL_LOG_DIR}/{city_name}.log"

    ok, missing, paths = required_inputs_exist(city_name)
    if not ok:
        return {
            'city': city_name,
            'status': 'missing_inputs',
            'missing': ','.join(missing),
            'ts_start': utc_now(),
            'ts_end': utc_now(),
            'secs': 0.0,
            'host': host,
            'distances_path': '',
            'azimuths_result': '',
            'log_s3': '',
        }

    # --- skip distances if already exists ---
    out_dist = dist_output_path(city_name)
    do_dist = not s3_exists(out_dist)

    append_log(
        local_log,
        f"[{utc_now()}] city={city_name} host={host} start\n"
    )

    append_log(
        local_log,
        f"[{utc_now()}] inputs: {paths}\n"
    )

    append_log(
        local_log,
        f"[{utc_now()}] distances_out: {out_dist} (will_run={do_dist})\n"
    )


    status = 'ok'
    err = ''
    distances_res = ''
    azimuths_res = ''

    try:
        tasks = []
        if do_dist:
            tasks.append(calculate_building_distances_to_roads(city_name))
        else:
            distances_res = out_dist

        # Always run azimuths (we don't know a reliable output path to skip safely)
        tasks.append(produce_azimuths(city_name, YOUR_NAME))

        # --- Step 1: distances (must finish before azimuths reads) ---
        if do_dist:
            distances_res = compute(calculate_building_distances_to_roads(city_name))[0]

            # hard check: ensure file exists on S3 before moving on
            if not s3_exists(out_dist):
                raise FileNotFoundError(f"distances output not found after compute: {out_dist}")
        else:
            # if we skipped, still ensure it truly exists (otherwise azimuths will fail)
            if not s3_exists(out_dist):
                raise FileNotFoundError(f"distances file expected but missing: {out_dist}")
            distances_res = out_dist

        # --- Step 2: azimuths (reads buildings_with_distances) ---
        azimuths_res = compute(produce_azimuths(city_name, YOUR_NAME))[0]


        append_log(
            local_log,
            f"[{utc_now()}] distances_res={distances_res}\n"
        )

        append_log(
            local_log,
            f"[{utc_now()}] azimuths_res={azimuths_res}\n"
        )

    except Exception as e:
        status = 'error'
        err = repr(e)
        append_log(
            local_log,
            f"[{utc_now()}] ERROR: {err}\n"
        )

        append_log(
            local_log,
            traceback.format_exc() + "\n"
        )

    # upload log
    log_s3 = f"{LOGS_S3_DIR}/city_logs/{city_name}.log"
    try:
        S3Path(log_s3).parent.mkdir(parents=True, exist_ok=True)
        S3Path(log_s3).upload_from(local_log)
    except Exception as e:
        append_log(local_log, f"[{utc_now()}] WARN: failed to upload log to s3: {repr(e)}")

    secs = time.time() - t0
    rec = {
        'city': city_name,
        'status': status,
        'error': err,
        'missing': '',
        'ts_start': '',
        'ts_end': utc_now(),
        'secs': round(secs, 3),
        'host': host,
        'distances_path': str(distances_res),
        'azimuths_result': str(azimuths_res),
        'log_s3': log_s3,
        'distances_skipped': (not do_dist),
    }
    return rec

  RUN_ID = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')


In [17]:
# ---- execute across cities with dask.bag ----
import dask.bag as db
import pandas as pd

bag = db.from_sequence(cities, partition_size=PARTITION_SIZE)
records = bag.map(run_city_preproc).compute()

summary = pd.DataFrame(records)
print(summary['status'].value_counts(dropna=False))

local_summary = f"/tmp/summary_pre_processing_{RUN_ID}.csv"
summary.to_csv(local_summary, index=False, sep=';')

S3Path(LOGS_S3_DIR).mkdir(parents=True, exist_ok=True)
S3Path(SUMMARY_S3_PATH).upload_from(local_summary)

print('✅ Summary written to:', SUMMARY_S3_PATH)


status
ok    1234
Name: count, dtype: int64
✅ Summary written to: s3://wri-cities-sandbox/identifyingLandSubdivisions/data/output/logs/pre_processing/20260117T231251Z/summary.csv


In [18]:
# ---- quick peek at failures / missing ----
errs = summary[summary['status'].isin(['error', 'missing_inputs'])].copy()
print('n problematic:', len(errs))
errs.head(25)


n problematic: 0


Unnamed: 0,city,status,error,missing,ts_start,ts_end,secs,host,distances_path,azimuths_result,log_s3,distances_skipped


### Notes
- City list is built from existing **ROADS** folders on S3 and skips the 4 ignore cities.
- **Building distances** are skipped if the `*_with_distances.geoparquet` already exists.
- **Azimuths** are always run (no reliable output path to skip without seeing `calculate_azimuths` implementation).
- Logs + summary CSV are uploaded to: `s3://.../output/logs/pre_processing/<RUN_ID>/`.
