# Lab 5 - Parcel Feature Extraction

Next, we will illustrate the construction of features related to our main task: finding the relationship between property development and water quality over time.  In a previous lab, you identified lakes for which we have complete information for the years from 2004 to 2015.  In this lab, we will

[Original Data and variable information](https://gisdata.mn.gov/organization/us-mn-state-metrogis?q=Metro+Regional+Parcel+Dataset&sort=score+desc%2C+metadata_modified+desc)

## Problem 1 - Feature construction

**Overview.** Remember that our target output file will have one row per year-lake combination.  To attach property information, we will need to group and aggregate the parcel data to create features for each lake-year combination.  When grouping the data, be sure to maintain the variables needed to join to the water quality data, namely the lake ID and year.  Since we are looking at tracking property development/change over time, we will want to generate features tracking

* Number of properties close to each lake,
* Summaries of the value of properties close to each lake,
* Aggregations on the size and type of the properties, and
* Other features that might impact water quality.
    
#### Task 1. Understanding parcel variables

Before we can construct features, we need to make sure we understand the parcel data.  The metro parcel data is provided by the State of Minnesota and the meta data can be found online.  For example, searching for *metro parcel 2014* lead to [this site](https://geo.btaa.org/catalog/304cf3d8-a53b-4ea9-b02a-f550bd68e320).  Clicking on the *Meta data* button in the top left, brought up more information.  Clicking *Download* opened in this meta data [in a separate page](https://resources.gisdata.mn.gov/pub/gdrs/data/pub/us_mn_state_metrogis/plan_regonal_parcels_2014/metadata/metadata.html)

Look through the **Section 4: Attributes** and identify variables that might impact the water quality of near-by lakes.

> <font color="orange"> Your thoughts here </font>

### Task 2. Feature Brainstorming

Our objective is to build a feature table with one row per lake-year, using grouped summary statistics. Here are effective strategies for feature construction:

1. **Numerical summaries:** Calculate group-level statistics (mean, median, standard deviation, IQR, etc.) for numeric variables.
2. **Categorical summaries:** For text data, consider:
   - **Success rates:** Compute proportions for binary variables (e.g., percent of homes with basements).
   - **Label cleaning:** Review and standardize unique labels to remove duplicates or inconsistencies.
   - **Broader categories:** Recode variables with many rare categories into a smaller, more meaningful set.
   - **Indicator columns:** Create indicator variables and aggregate them to show presence/absence or proportions (e.g., count of each property use type).

Review the variables you identified earlier and outline a feature construction strategy for each.

> <font color="orange"> Your thoughts here </font>

### Task 4. Initial querying with filter and select

First, you should build a query that filters the parcel data to 
1. only include parcels within 1600 feet of the lakes we are studying, and 
2. only for the lakes with complete information.  

You should also select only the columns you will need for feature construction and joining to the water quality data.

In [1]:
import polars as pl

# Load parquets with partition keys included
# The parquet data is partitioned by Monit_MAP_CODE1, so we need to include it
parcels = (
    pl.scan_parquet("data/parcel.data/**/*.parquet")
    .filter(pl.col("Distance_Parcel_Lake_meters") <= 1600)
    # Monit_MAP_CODE1 is already a partition column, automatically included by scan_parquet
)

In [2]:
# Load and prepare water quality data (from lab4)
water_quality = (
    pl.read_csv(
        'data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
        separator='\t',
        infer_schema_length=10000
    )
)

# Filter for years 2004-2015 and approved measurements
water_quality_filtered = (
    water_quality
    .filter(pl.col('Total_Phosphorus_QUALIFIER') == 'Approved')
    .with_columns(pl.col('START_DATE').str.extract(r'(\d{4})', 1).cast(pl.Int32).alias('Year'))
    .filter((pl.col('Year') >= 2004) & (pl.col('Year') <= 2015))
)

# Create summaries by lake and year
water_quality_summaries = (
    water_quality_filtered
    .group_by(['DNR_ID_Site_Number', 'LAKE_NAME', 'Year'])
    .agg([
        pl.col('Total_Phosphorus_RESULT').mean().alias('phosphorus_avg'),
        pl.col('Total_Phosphorus_RESULT').count().alias('phosphorus_count'),
        pl.col('Secchi_Depth_RESULT').mean().alias('secchi_avg'),
        pl.col('Secchi_Depth_RESULT').count().alias('secchi_count'),
    ])
)

# Filter for complete data: lakes with both measurements for all years
dnr_ids = (
    water_quality_summaries
    .filter((pl.col('phosphorus_count') > 0) & (pl.col('secchi_count') > 0))
    .select('DNR_ID_Site_Number')
    .unique()
    .to_series()
    .to_list()
)

water_quality_summaries_complete = (
    water_quality_summaries
    .filter(pl.col('DNR_ID_Site_Number').is_in(dnr_ids))
)

print(f"Loaded water quality data: {water_quality_summaries_complete.shape[0]} rows")
water_quality_summaries_complete.head()

Loaded water quality data: 1687 rows


DNR_ID_Site_Number,LAKE_NAME,Year,phosphorus_avg,phosphorus_count,secchi_avg,secchi_count
str,str,i32,f64,u32,f64,u32
"""82031300-01""","""Goetschel Lake""",2010,0.052429,7,1.348571,7
"""82004400-01""","""West Boot Lake""",2006,0.024143,7,2.6822,5
"""70005400-01""","""Spring Lake""",2007,0.059714,7,1.0,7
"""10005800-01""","""Maria Lake""",2005,0.15625,12,0.725,12
"""10001800-01""","""Schutz Lake""",2009,0.052,2,,0


## Problem 2.  Numerical Summaries

Two important categories of property data involve the size (e.g., finished square footage) and value (e.g., accessed value and/or taxes paid).

**Tasks.** 

1. Identify 2-3 variables for each of these categories.
2. Write a query that computes the summary statistics for each of these variables for each lake-year.  
3. Write this summary table out to a CSV file named `parcel_numerical_summaries.csv`.  Again, you should partition by lake ID and year.

In [3]:
# Compute numerical summaries by PIN (property ID) first
parcel_summaries_by_pin = (
    parcels
    .with_columns([
        pl.col("FIN_SQ_FT").cast(pl.Float64),
        pl.col("ACRES_POLY").cast(pl.Float64),
        pl.col("EMV_TOTAL").cast(pl.Float64),
        pl.col("TOTAL_TAX").cast(pl.Float64),
    ])
    .group_by(["PIN", "Year", "centroid_lat", "centroid_long"])
    .agg([
        pl.col("FIN_SQ_FT").mean().alias("mean_FIN_SQ_FT"),
        pl.col("FIN_SQ_FT").median().alias("median_FIN_SQ_FT"),
        pl.col("ACRES_POLY").mean().alias("mean_acres"),
        pl.col("ACRES_POLY").median().alias("median_acres"),
        pl.col("EMV_TOTAL").mean().alias("mean_EMV_TOTAL"),
        pl.col("EMV_TOTAL").median().alias("median_EMV_TOTAL"),
        pl.col("TOTAL_TAX").mean().alias("mean_total_tax"),
        pl.col("TOTAL_TAX").median().alias("median_total_tax"),
    ]).collect()
)

# Join with xref to get lake IDs
xref_lakes = (
    pl.read_csv(
        'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
        separator='\t',
    )
    .select(['Monit_MAP_CODE1', 'centroid_lat', 'centroid_long'])
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ])
    .unique()
)

numerical_summaries = (
    parcel_summaries_by_pin
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ])
    .join(xref_lakes, on=['lat_r', 'lon_r'], how='inner')
    .group_by(['Monit_MAP_CODE1', 'Year'])
    .agg([
        pl.col('mean_FIN_SQ_FT').mean().alias('mean_FIN_SQ_FT'),
        pl.col('median_FIN_SQ_FT').mean().alias('median_FIN_SQ_FT'),
        pl.col('mean_acres').mean().alias('mean_acres'),
        pl.col('median_acres').mean().alias('median_acres'),
        pl.col('mean_EMV_TOTAL').mean().alias('mean_EMV_TOTAL'),
        pl.col('median_EMV_TOTAL').mean().alias('median_EMV_TOTAL'),
        pl.col('mean_total_tax').mean().alias('mean_total_tax'),
        pl.col('median_total_tax').mean().alias('median_total_tax'),
    ])
    .with_columns(pl.col("Year").cast(pl.Int32))
)

numerical_summaries.write_csv('parcel_numerical_summaries.csv')
print(f"Wrote parcel_numerical_summaries.csv ({numerical_summaries.shape[0]} rows)")
numerical_summaries.head()

Wrote parcel_numerical_summaries.csv (5268 rows)


Monit_MAP_CODE1,Year,mean_FIN_SQ_FT,median_FIN_SQ_FT,mean_acres,median_acres,mean_EMV_TOTAL,median_EMV_TOTAL,mean_total_tax,median_total_tax
str,i32,f64,f64,f64,f64,f64,f64,f64,f64
"""82012400-01""",2012,1923.013514,1923.013514,8.531351,8.531351,349878.378378,349878.378378,0.0,0.0
"""82009200-01""",2006,1701.16318,1701.16318,0.334397,0.334397,420188.619247,420188.619247,4811.146444,4811.146444
"""70007200-02""",2011,952.943272,952.943272,0.688892,0.688892,269843.007916,269843.007916,0.0,0.0
"""62005400-01""",2004,1062.747898,1062.747898,0.770045,0.770045,304998.082745,304998.082745,0.0,0.0
"""27017901-01""",2007,0.0,0.0,12.853274,12.853274,436796.412556,436796.412556,4778.753363,4778.753363


## Problem 3.  Simple categorical summaries.

In this part, you will create summary statistics for some of the simpler categorical variables.

**Binary variables.** There are two examples of binary variables, listed below.  You will need to compute the percent of `Yes` for each.

* GARAGE: Garage Y/N
* BASEMENT: Basement Y/N

**Other categorical variables.** There are a number of other categorical variables.  You need to select one of these variables, inspect/clean your variable as needed, create indicator variables for each resulting label, and compute summary statistics for each label.

* HOMESTEAD: Homestead Status
* TAX_EXEMPT: Tax Exempt Status 
* DWELL_TYPE: Dwelling Type 
* HOME_STYLE: Home Style
* HEATING: Heating type
* COOLING: Cooling type

**Tasks.**
Create a query that

1. Select one binary and two other categorical variables for feature construction,
2. Reads in the parcel data and selects the relevant columns (be sure to keep the lake ID and year),
3. Inspect unique labels and recode/clean as needed,
4. Create a literal column of ones, and
5. Pivot to get the counts of each label per lake-year (do this once per category).

Write this summary table out to a csv file named `parcel_categorical_summaries.csv`.  Again, you should partition by lake ID and year.

In [4]:
from glob import glob
import polars as pl

# Load the reference table
xref = (
    pl.scan_csv(
        'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
        separator='\t',
    )
    .select(['Monit_MAP_CODE1', 'centroid_lat', 'centroid_long'])
    .collect()
)

xref_join = (
    xref
    .select(['Monit_MAP_CODE1', 'centroid_lat', 'centroid_long'])
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ])
    .select(['Monit_MAP_CODE1', 'lat_r', 'lon_r'])
    .unique()
)

categorical_results = []
for f in sorted(glob('data/MinneMUDAC_raw_files/*_metro_tax_parcels.txt')):
    year = int(f.split('/')[-1].split('_')[0])
    try:
        df = pl.read_csv(f, separator='|', ignore_errors=True)
        if 'centroid_lat' not in df.columns or 'centroid_long' not in df.columns:
            continue
        df = df.with_columns([
            pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
            pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
        ])

        joined = df.join(xref_join, on=['lat_r', 'lon_r'], how='inner')
        if joined.height == 0:
            continue

        garage_expr = (pl.col('GARAGE') == 'Y') if 'GARAGE' in joined.columns else pl.lit(False)
        basement_expr = (pl.col('BASEMENT') == 'Y') if 'BASEMENT' in joined.columns else pl.lit(False)

        agg = (
            joined
            .group_by('Monit_MAP_CODE1')
            .agg([
                garage_expr.mean().alias('pct_garage'),
                basement_expr.mean().alias('pct_basement')
            ])
            .with_columns(pl.lit(year).alias('Year'))
        )
        categorical_results.append(agg)
    except Exception as e:
        pass

if categorical_results:
    parcel_categorical_summaries = pl.concat(categorical_results).sort(['Monit_MAP_CODE1', 'Year'])
    parcel_categorical_summaries.write_csv('parcel_categorical_summaries.csv')
    print(f"Wrote parcel_categorical_summaries.csv ({parcel_categorical_summaries.shape[0]} rows)")
    parcel_categorical_summaries.head()
else:
    parcel_categorical_summaries = pl.DataFrame()
    print('No categorical summaries produced (check parsing logs)')

Wrote parcel_categorical_summaries.csv (5720 rows)


## Problem 4.  Join all the summaries.

Finally, you need to join all the summaries created above, along with the water quality summaries created in a previous lab, into one overall summary file.  Write the resulting table to a CSV file named `water_quality_and_parcel_summaries_2004_to_2015.csv`.

In [5]:
import os

wq = water_quality_summaries_complete.rename({'DNR_ID_Site_Number': 'Monit_MAP_CODE1'})

if os.path.exists('parcel_numerical_summaries.csv') and os.path.exists('parcel_categorical_summaries.csv'):
    parcel_num = pl.read_csv('parcel_numerical_summaries.csv')
    parcel_cat = pl.read_csv('parcel_categorical_summaries.csv')

    parcel_num = parcel_num.with_columns(pl.col('Year').cast(pl.Int64))
    parcel_cat = parcel_cat.with_columns(pl.col('Year').cast(pl.Int64))
    wq = wq.with_columns(pl.col('Year').cast(pl.Int64))

    output = (
        wq
        .join(parcel_num, on=['Monit_MAP_CODE1', 'Year'], how='left')
        .join(parcel_cat, on=['Monit_MAP_CODE1', 'Year'], how='left')
    )
    output.write_csv('water_quality_and_parcel_summaries_2004_to_2015.csv')
    print(f"Wrote final output with {output.shape[0]} rows and {output.shape[1]} columns")
    output.head()
else:
    print('Parcel summary CSVs not found; run numerical and categorical summary cells first')

Wrote final output with 1687 rows and 17 columns


## Problem 5.  Put it all together

It is often useful to package all of the data constructions steps together in one convenient place.  Your last task is to

1. Gather all of your data construction code below.
    * You don't need to include exploratory code, e.g., exploring join mismatches; only the code necessary to combine, clean, and write your data.
2. Clean/refactor the code.
3. Be sure to display all important intermediate results.

In [6]:
# Complete pipeline: Load water quality, build parcel features, and join
import polars as pl
import os
from glob import glob

# Step 1: Load water quality data
water_quality = (
    pl.read_csv(
        'data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
        separator='\t',
        infer_schema_length=10000
    )
)
water_quality_filtered = (
    water_quality
    .filter(pl.col('Total_Phosphorus_QUALIFIER') == 'Approved')
    .with_columns(pl.col('START_DATE').str.extract(r'(\d{4})', 1).cast(pl.Int32).alias('Year'))
    .filter((pl.col('Year') >= 2004) & (pl.col('Year') <= 2015))
)
water_quality_summaries = (
    water_quality_filtered
    .group_by(['DNR_ID_Site_Number', 'LAKE_NAME', 'Year'])
    .agg([
        pl.col('Total_Phosphorus_RESULT').mean().alias('phosphorus_avg'),
        pl.col('Total_Phosphorus_RESULT').count().alias('phosphorus_count'),
        pl.col('Secchi_Depth_RESULT').mean().alias('secchi_avg'),
        pl.col('Secchi_Depth_RESULT').count().alias('secchi_count'),
    ])
)
dnr_ids = (
    water_quality_summaries
    .filter((pl.col('phosphorus_count') > 0) & (pl.col('secchi_count') > 0))
    .select('DNR_ID_Site_Number').unique().to_series().to_list()
)
wq = (
    water_quality_summaries
    .filter(pl.col('DNR_ID_Site_Number').is_in(dnr_ids))
    .rename({'DNR_ID_Site_Number': 'Monit_MAP_CODE1'})
)
print(f"✓ Water quality: {wq.shape[0]} rows")

# Step 2: Build parcel numerical summaries
parcels = pl.scan_parquet("data/parcel.data/**/*.parquet").filter(pl.col("Distance_Parcel_Lake_meters") <= 1600)

parcel_summaries_by_pin = (
    parcels
    .with_columns([
        pl.col("FIN_SQ_FT").cast(pl.Float64),
        pl.col("ACRES_POLY").cast(pl.Float64),
        pl.col("EMV_TOTAL").cast(pl.Float64),
        pl.col("TOTAL_TAX").cast(pl.Float64),
    ])
    .group_by(["PIN", "Year", "centroid_lat", "centroid_long"])
    .agg([
        pl.col("FIN_SQ_FT").mean().alias("mean_FIN_SQ_FT"),
        pl.col("FIN_SQ_FT").median().alias("median_FIN_SQ_FT"),
        pl.col("ACRES_POLY").mean().alias("mean_acres"),
        pl.col("ACRES_POLY").median().alias("median_acres"),
        pl.col("EMV_TOTAL").mean().alias("mean_EMV_TOTAL"),
        pl.col("EMV_TOTAL").median().alias("median_EMV_TOTAL"),
        pl.col("TOTAL_TAX").mean().alias("mean_total_tax"),
        pl.col("TOTAL_TAX").median().alias("median_total_tax"),
    ]).collect()
)

xref_lakes = (
    pl.read_csv('data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt', separator='\t')
    .select(['Monit_MAP_CODE1', 'centroid_lat', 'centroid_long'])
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ]).unique()
)

numerical_summaries = (
    parcel_summaries_by_pin
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ])
    .join(xref_lakes, on=['lat_r', 'lon_r'], how='inner')
    .group_by(['Monit_MAP_CODE1', 'Year'])
    .agg([
        pl.col('mean_FIN_SQ_FT').mean().alias('mean_FIN_SQ_FT'),
        pl.col('median_FIN_SQ_FT').mean().alias('median_FIN_SQ_FT'),
        pl.col('mean_acres').mean().alias('mean_acres'),
        pl.col('median_acres').mean().alias('median_acres'),
        pl.col('mean_EMV_TOTAL').mean().alias('mean_EMV_TOTAL'),
        pl.col('median_EMV_TOTAL').mean().alias('median_EMV_TOTAL'),
        pl.col('mean_total_tax').mean().alias('mean_total_tax'),
        pl.col('median_total_tax').mean().alias('median_total_tax'),
    ])
    .with_columns(pl.col("Year").cast(pl.Int32))
)
print(f"✓ Numerical summaries: {numerical_summaries.shape[0]} rows")

# Step 3: Build categorical summaries
xref_join = (
    pl.read_csv('data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt', separator='\t')
    .select(['Monit_MAP_CODE1', 'centroid_lat', 'centroid_long'])
    .with_columns([
        pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
        pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
    ])
    .select(['Monit_MAP_CODE1', 'lat_r', 'lon_r']).unique()
)

categorical_results = []
for f in sorted(glob('data/MinneMUDAC_raw_files/*_metro_tax_parcels.txt')):
    year = int(f.split('/')[-1].split('_')[0])
    try:
        df = pl.read_csv(f, separator='|', ignore_errors=True)
        if 'centroid_lat' not in df.columns or 'centroid_long' not in df.columns:
            continue
        df = df.with_columns([
            pl.col('centroid_lat').cast(pl.Float64).round(6).alias('lat_r'),
            pl.col('centroid_long').cast(pl.Float64).round(6).alias('lon_r')
        ])
        joined = df.join(xref_join, on=['lat_r', 'lon_r'], how='inner')
        if joined.height == 0:
            continue
        garage_expr = (pl.col('GARAGE') == 'Y') if 'GARAGE' in joined.columns else pl.lit(False)
        basement_expr = (pl.col('BASEMENT') == 'Y') if 'BASEMENT' in joined.columns else pl.lit(False)
        agg = (
            joined.group_by('Monit_MAP_CODE1').agg([
                garage_expr.mean().alias('pct_garage'),
                basement_expr.mean().alias('pct_basement')
            ]).with_columns(pl.lit(year).alias('Year'))
        )
        categorical_results.append(agg)
    except:
        pass

categorical_summaries = pl.concat(categorical_results).sort(['Monit_MAP_CODE1', 'Year'])
print(f"✓ Categorical summaries: {categorical_summaries.shape[0]} rows")

# Step 4: Join all data
output = (
    wq
    .join(numerical_summaries, on=['Monit_MAP_CODE1', 'Year'], how='left')
    .join(categorical_summaries, on=['Monit_MAP_CODE1', 'Year'], how='left')
)
output.write_csv('water_quality_and_parcel_summaries_2004_to_2015.csv')
print(f"✓ Final output: {output.shape[0]} rows × {output.shape[1]} columns")
output.head()

✓ Water quality: 1687 rows
✓ Numerical summaries: 5268 rows
✓ Numerical summaries: 5268 rows
✓ Categorical summaries: 5720 rows
✓ Final output: 1687 rows × 17 columns
✓ Categorical summaries: 5720 rows
✓ Final output: 1687 rows × 17 columns


Monit_MAP_CODE1,LAKE_NAME,Year,phosphorus_avg,phosphorus_count,secchi_avg,secchi_count,mean_FIN_SQ_FT,median_FIN_SQ_FT,mean_acres,median_acres,mean_EMV_TOTAL,median_EMV_TOTAL,mean_total_tax,median_total_tax,pct_garage,pct_basement
str,str,i32,f64,u32,f64,u32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""10004800-01""","""Wasserman Lake""",2006,0.167969,32,1.25,12,1850.365217,1850.365217,1.596775,1.596775,380258.972332,380258.972332,3576.527273,3576.527273,0.664379,
"""19002500-01""","""Keller Lake""",2004,0.039733,15,1.52,15,2171.247892,2171.247892,0.400093,0.400093,248076.897133,248076.897133,2338.893339,2338.893339,,
"""82007600-01""","""Barker Lake""",2008,0.108571,7,1.197429,7,0.0,0.0,22.651304,22.651304,352650.931677,352650.931677,263408.695652,263408.695652,,
"""82010900-01""","""Eagle Point Lake""",2007,0.1705,2,0.85,2,0.0,0.0,0.0,0.0,875000.0,875000.0,0.0,0.0,,
"""02000500-01""","""George Watch Lake""",2006,0.164286,7,0.728571,7,1637.532154,1637.532154,2.23492,2.23492,233350.482315,233350.482315,3405.800643,3405.800643,,0.918143
