# AI Offline Parcel Legacy Format

This notebook processes pre-downloaded AI feature payloads to produce legacy format parcel rollups.

from pathlib import Path
import pandas as pd
from tqdm import tqdm
import json
import ast
import shapely.wkt
import geopandas as gpd
import concurrent.futures
import os

from nmaipy.feature_api import FeatureApi
from nmaipy import parcels
from nmaipy.constants import (
    LAT_LONG_CRS,
    BUILDING_ID,
    ROOF_ID,
    TRAMPOLINE_ID,
    POOL_ID,
    CONSTRUCTION_ID,
    SOLAR_ID,
    VEG_IDS,
    SURFACES_IDS,
    ROOF_CHAR_IDS,
)

pd.set_option('display.max_rows', 500)

# Used for projections
COUNTRY = "us"
# Number of concurrent processes (set to available CPU cores)
WORKERS = 4
# Batch size used for concurrency
BATCH_SIZE = 1e4
# Set to limit the number of batches to process, use only for dev testing (set to False to disable)
LIMIT = None

# Data and output paths
BASE_DATA_DIR = Path("/home/jovyan/data/parcel_rollup")
SOURCE_DIR = BASE_DATA_DIR / "source"
BATCHES_DIR = BASE_DATA_DIR / "batches"
PROCESSED_DIR = BASE_DATA_DIR / "processed"
OUTPUT_DIR = BASE_DATA_DIR / "output"
LEGACY_DIR = BASE_DATA_DIR / "legacy"

BATCHES_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
LEGACY_DIR.mkdir(parents=True, exist_ok=True)

# Column ordering
FIRST_COLUMNS = ["aoi_id", "date", "mesh_date", "link", "system_version"]
LAST_COLUMNS = ["geometry"]

In [2]:
# Get class lookup from the AI feature API
feature_api = FeatureApi()
classes_df = feature_api.get_feature_classes()
classes_df = classes_df[classes_df["type"] == "Feature"]
# Filter to set we're interseted in
classes = [BUILDING_ID, ROOF_ID, TRAMPOLINE_ID, POOL_ID, CONSTRUCTION_ID, SOLAR_ID]
classes_df = classes_df[classes_df.index.isin(classes)]
display(classes_df)

Unnamed: 0_level_0,type,description,schema
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0339726f-081e-5a6e-b9a9-42d95c1b5c8a,Feature,Swimming Pool,
3680e1b8-8ae1-5a15-8ec7-820078ef3298,Feature,Solar Panel,
753621ee-0b9f-515e-9bcf-ea40b96612ab,Feature,Trampoline,
a2a81381-13c6-57dc-a967-af696e45f6c7,Feature,Construction Site,
a2e4ae39-8a61-5515-9d18-8900aa6e6072,Feature,Building,
c08255a4-ba9f-562b-932c-ff76f2faeeeb,Feature,Roof,


In [3]:
# Batch source files into smaller chucks to enable concurrency
def batch_csv(source_path):
    with open(source_path, "r") as fs:
        header = fs.readline()
        counter = 0
        while True:
            target_path = BATCHES_DIR / f"{source_path.stem}_{str(counter).zfill(3)}.csv"
            with open(target_path, "w") as ft:
                ft.write(header)
                for _ in range(int(BATCH_SIZE)):
                    line = fs.readline()
                    if line == "":
                        break
                    ft.write(line)
            counter += 1
            if line == "" or (LIMIT and counter >= LIMIT):
                break

with concurrent.futures.ProcessPoolExecutor(WORKERS) as executor:
    for source_path in SOURCE_DIR.glob("*.csv"):
        executor.submit(batch_csv, source_path)

In [4]:
def process_batch(source_path, force=False):
    """
    Process a batch. The output is saved in the AI offline parcel format.
    """
    # Check for output, only run if there is none.
    # The temp path is used to make writing output atomic.
    outpath = PROCESSED_DIR / source_path.name
    outpath_temp = PROCESSED_DIR / f"{source_path.name}.tmp"
    if outpath.is_file() and not force:
        return
    
    # Read, parse JSON payload to dict and rename parcel ID
    source_df = pd.read_csv(source_path, sep="|")
    source_df.payload = source_df.payload.apply(ast.literal_eval)
    source_df = source_df.rename(columns={"parcelPtId": "aoi_id"})

    # Extract to parcels GeoDataFrame
    parcels_gdf = gpd.GeoDataFrame(source_df[['aoi_id']], geometry=source_df.geometry.apply(shapely.wkt.loads))
    parcels_gdf = parcels_gdf.set_crs(LAT_LONG_CRS)

    # Extract features and metadata GeoDataFrames
    payloads = [FeatureApi.payload_gdf(row.payload, row.aoi_id) for row in source_df.itertuples()]
    features, metadata = zip(*payloads)
    features_gdf = pd.concat(features)
    metadata_df = pd.DataFrame(metadata)
    
    # Filter features based on parcel geometry
    features_gdf = parcels.filter_features_in_parcels(
        parcels_gdf, features_gdf, country=COUNTRY
    )
    
    # Create rollup
    rollup_df = parcels.parcel_rollup(parcels_gdf, features_gdf, classes_df)

    # Combine data sets
    final_df = rollup_df.merge(parcels_gdf, on="aoi_id")
    final_df = final_df.merge(metadata_df, on="aoi_id")
    columns = FIRST_COLUMNS + [c for c in final_df.columns if c not in FIRST_COLUMNS + LAST_COLUMNS] + LAST_COLUMNS
    final_df = final_df[columns]
    final_df = final_df.rename(columns={"aoi_id": "parcelPtId"})

    # Save
    final_df.to_csv(outpath_temp, index=False)
    os.rename(outpath_temp, outpath)


In [5]:
# Loop over batches
with concurrent.futures.ProcessPoolExecutor(WORKERS) as executor:    
    jobs = []
    for source_path in BATCHES_DIR.glob("*.csv"):
        jobs.append(executor.submit(process_batch, source_path))
    for job in tqdm(jobs):
        job.result()

100%|██████████| 20/20 [06:12<00:00, 18.65s/it]


In [6]:
def map_to_legacy_schema(dfr):
    """
    Map data in the AI offline Parcel data schema to the legacy format
    """
    # Dominant roof materials
    dfr["dominant_roof_material"] = "unknown"
    dfr.loc[dfr["roof_present"] == "N", "dominant_roof_material"] = "not available"
    dfr["dominant_roof_material_confidence"] = 1.0

    for name, cname in [("Tile Roof", "tile"), ("Shingle Roof", "shingle"), ("Metal Roof", "metal")]:
        mask = dfr[f"primary_roof_{cname}_roof_dominant"] == "Y"
        dfr.loc[mask, "dominant_roof_material"] = name
        dfr.loc[mask, "dominant_roof_material_confidence"] = dfr.loc[mask, f"primary_roof_{cname}_roof_confidence"]
        
    # Num storeys
    dfr["storey_category"] = "not available"
    dfr["storey_category_confidence"] = 1.0

    storey_categories = dfr[[c for c in dfr.columns if "num_storeys" in c]].idxmax(axis=1)
    for storeys in ["1", "2", "3+"]:
        mask = storey_categories == f"primary_building_num_storeys_{storeys}_confidence"
        dfr.loc[mask, "storey_category"] = storeys
        dfr.loc[mask, "storey_category_confidence"] = dfr.loc[mask, f"primary_building_num_storeys_{storeys}_confidence"]

    # Direct column mappings
    direct_mappings = {
         'parcelPtId': 'parcel_id',
         'geometry': 'wkt',
         'link': 'mapbrowser_url',
         'date': 'survey_date',
         'primary_roof_tree_overhang_present': 'tree_overhang_present',
         'primary_roof_tree_overhang_confidence': 'tree_overhang_confidence',
         'primary_roof_hip_present': 'hip_roof_type_present',
         'primary_roof_hip_confidence': 'hip_roof_type_confidence',
         'primary_roof_gable_present': 'gable_roof_type_present',
         'primary_roof_gable_confidence': 'gable_roof_type_confidence',
         'primary_roof_flat_present': 'flat_roof_type_present',
         'primary_roof_flat_confidence': 'flat_roof_type_confidence',
         'primary_roof_turret_present': 'turret_roof_type_present',
         'primary_roof_turret_confidence': 'turret_roof_type_confidence',
         'primary_roof_pitch_degrees': 'roof_pitch_degrees',
         'primary_building_area_sqft': 'area_under_roof_sqft',
         'primary_building_area_sqm': 'area_under_roof_sqm',
         'primary_building_height_m': 'building_height_m',
         'primary_building_height_ft': 'building_height_ft'
    }

    dfr = dfr.rename(columns=direct_mappings)

    # Map bools
    for c in dfr.columns:
        if "_present" in c:
            dfr = dfr.replace({c: {"Y": "True", "N": "False"}})
            dfr[c] = dfr[c].fillna("False")
        if "_confidence" in c:
            dfr[c] = dfr[c].fillna(1.0)

    # Filter columns
    final_columns = [
         'parcel_id',
         'wkt',
         'mapbrowser_url',
         'survey_date',
         'roof_present',
         'roof_confidence',
         'dominant_roof_material',
         'dominant_roof_material_confidence',
         'solar_panel_present',
         'solar_panel_confidence',
         'tree_overhang_present',
         'tree_overhang_confidence',
         'swimming_pool_present',
         'swimming_pool_confidence',
         'trampoline_present',
         'trampoline_confidence',
         'hip_roof_type_present',
         'hip_roof_type_confidence',
         'gable_roof_type_present',
         'gable_roof_type_confidence',
         'flat_roof_type_present',
         'flat_roof_type_confidence',
         'turret_roof_type_present',
         'turret_roof_type_confidence',
         'roof_pitch_degrees',
         'area_under_roof_sqm',
         'area_under_roof_sqft',
         'building_height_m',
         'building_height_ft',
         'storey_category',
         'storey_category_confidence'
    ]

    dfr = dfr[final_columns]
    return dfr

In [7]:
# Combine batches and map to legacy format
for source in SOURCE_DIR.glob("*.csv"):
    dfs = []
    for p in PROCESSED_DIR.glob(f"*{source.stem}*.csv"):
        dfs.append(pd.read_csv(p))
    full_df = pd.concat(dfs)
    full_df.to_csv(OUTPUT_DIR / f"{source.stem}.csv", index=False)
    transformed_full_df = map_to_legacy_schema(full_df)
    transformed_full_df.to_csv(LEGACY_DIR / f"{source.stem}.csv", index=False)
