# Lab 3 - Creating partitioned parquet files

In this lab, we will perform our first round of data preparation by writing the larger files (`XREF` and the yearly `parcel` files to the `parquet` format.  

In the process, we will discuss and investigate an important concept in managing lots of data: the principle of locality.  Big data problem as IO bound, meaning that almost all of the time/resources will be used managing the input/output of data.  The principle of locality holds that is often reused over a short period of time (temporal locality) and data that is stored in similar locations tend to be used at similar points in a program (spatial locality).  The `parquet` always us to partition a data set to leverage these properties.   *The correct partitioning can result in orders of magnitude speed up in processing time!*



## The Principle of Locality.

![image.png](attachment:image.png)

We can leverage the behavior of the operating system (OS)--in particular the loading of chucks of data in proximity and keeping that data in memory for a time--by partitioning our data so that similar data is stored together.

## We need to group the data by lake and distance to the lake

![image.png](attachment:image.png)

## Important notes on data size and platform selection

The primary challenge in big data processing is the size of the data.  In this lab, we will be working with the parcel files, which individually are small enough to fit into memory on a single machine, but become problematic when processed in bulk.  Furthermore, we will be aggregating and filtering the data during feature engineering, so storing the data in a columnar format will be beneficial for performance.

Consequently, we will need to use `polars` support for reading and writing lazy data frames, such as `scan_csv`, `scan_parquet`, and `sink_parquet`, which will allow us to take full advatange of predicate and projection pushdown when processing the data.  This will help to minimize the amount of data that needs to be read and processed, which will improve performance.

## Problem 1 - Understanding the big picture and tables keys

**Tasks.**  

1. Explain why, in the case of parcel data, it is important to group the rows by lake id and distance to the lake.
2. Neither of these columns is present in the parcel data files.  How will we go about adding this information?

> <font color="orange"> Your thoughts here </font>

Grouping parcel data by lake ID and distance is critical for spatial analysis, allowing us to study property trends relative to specific water bodies, not just the general area. Since this data is missing, we must generate it using GIS: first, a spatial join assigns a unique Lake ID to each parcel by associating it with the nearest lake boundary. Second, a nearest neighbor analysis calculates the straight-line distance to the lake from the parcel's center. These steps enrich the dataset for precise, comparative lake-effect analysis.

## Problem 2 - Writing the XREF to a partitioned parquet "file"

**Tasks.**  

1. Use `polars` to build a lazy query that performs the following tasks.
   - Scan the `XREF` data and select the relevant columns (Lake ID, centroid lat & long, distance to the lake).  Be sure to set `infer_schema=False` to keep the latitude and longitude columns as strings.<br>
   - Create a new categorical variable named with three categories based on distance to the lake: `within 500m`, `between 501-1600m`, and `over 1600m`.<br>
2. Test your query by collecting the first 10 or so rows using limit and collect.
3. Write the table as a partitioned parquet file (by Lake ID).

In [2]:
import polars as pl
import polars.selectors as cs
from glob import glob
import re
from functools import reduce
import requests, zipfile, os, shutil, io
from humanize import naturalsize
from pathlib import Path
from toolz import pipe
from toolz.curried import map
from more_itertools import consume

In [3]:
lake_path = 'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt'

lake_query = (
    pl.scan_csv(
        lake_path,
        has_header=True,
        separator="\t"
    )
    .limit(5)
    .collect()
)
print(lake_query)

shape: (5, 10)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ Parcel_PI ┆ Monit_MAP ┆ Monit_SIT ┆ Monit_LAK ┆ … ┆ Distance_ ┆ centroid_ ┆ centroid_ ┆ Parcel_p │
│ N         ┆ _CODE1    ┆ E_CODE    ┆ E_SITE    ┆   ┆ Parcel_La ┆ long      ┆ lat       ┆ key      │
│ ---       ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ke_meters ┆ ---       ┆ ---       ┆ ---      │
│ str       ┆ str       ┆ i64       ┆ i64       ┆   ┆ ---       ┆ f64       ┆ f64       ┆ i64      │
│           ┆           ┆           ┆           ┆   ┆ f64       ┆           ┆           ┆          │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ null      ┆ 19007900- ┆ 19007900  ┆ 1         ┆ … ┆ 2571.5267 ┆ -93.11451 ┆ 44.94283  ┆ 2163034  │
│           ┆ 01        ┆           ┆           ┆   ┆ 92        ┆           ┆           ┆          │
│ null      ┆ 19007900- ┆ 19007900  ┆ 1         ┆ … ┆ 2515.3738 ┆ -93.11539 

In [4]:
xref_lazy = (
    pl.scan_csv(
        lake_path,
        has_header=True,
        separator="\t",
        infer_schema_length=0,
        schema_overrides={
            "centroid_lat": pl.String(),
            "centroid_long": pl.String()
        }
    )
    .select([
        "Monit_MAP_CODE1",
        "centroid_lat",
        "centroid_long",
        "Distance_Parcel_Lake_meters"
    ])
    .with_columns([
        pl.col("Distance_Parcel_Lake_meters").cast(pl.Float64)
    ])
    .with_columns([
        pl.when(pl.col("Distance_Parcel_Lake_meters") <= 500)
          .then(pl.lit("within_500m"))
          .when((pl.col("Distance_Parcel_Lake_meters") > 500) & (pl.col("Distance_Parcel_Lake_meters") <= 1600))
          .then(pl.lit("between_501_1600m"))
          .otherwise(pl.lit("over_1600m"))
          .alias("distance_category")
    ])
)

xref_lazy.limit(10).collect()

Monit_MAP_CODE1,centroid_lat,centroid_long,Distance_Parcel_Lake_meters,distance_category
str,str,str,f64,str
"""19007900-01""","""44.94283""","""-93.11451""",2571.526792,"""over_1600m"""
"""19007900-01""","""44.94234""","""-93.11539""",2515.373802,"""over_1600m"""
"""19007900-01""","""44.94231""","""-93.11556""",2511.924959,"""over_1600m"""
"""19007900-01""","""44.94223""","""-93.11572""",2502.99164,"""over_1600m"""
"""19007900-01""","""44.94189""","""-93.1158""",2465.206234,"""over_1600m"""
"""19007900-01""","""44.94325""","""-93.11218""",2631.461256,"""over_1600m"""
"""19007900-01""","""44.94711""","""-93.11036""",3074.8658,"""over_1600m"""
"""19007900-01""","""44.94712""","""-93.11048""",3074.67095,"""over_1600m"""
"""19007900-01""","""44.9468""","""-93.11042""",3040.091456,"""over_1600m"""
"""19007900-01""","""44.94678""","""-93.11296""",3016.673976,"""over_1600m"""


In [5]:
# Place your code/thoughts in one or more code/markdown cells, respectively.
import polars as pl

xref_lazy = (
    pl.scan_csv(
        'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
        has_header=True,
        separator="\t",
        infer_schema_length=0,  # disables schema inference
        schema_overrides={
            "centroid_lat": pl.String(),
            "centroid_long": pl.String()
        }
    )
        .select([
        "Monit_MAP_CODE1",
        "centroid_lat",
        "centroid_long",
        "Distance_Parcel_Lake_meters"
    ])
        .with_columns([
        pl.col("Distance_Parcel_Lake_meters").cast(pl.Float64)
    ])
    .with_columns([
        pl.when(pl.col("Distance_Parcel_Lake_meters") <= 500)
          .then(pl.lit("within_500m"))
          .when((pl.col("Distance_Parcel_Lake_meters") > 500) & (pl.col("Distance_Parcel_Lake_meters") <= 1600))
          .then(pl.lit("between_501_1600m"))
          .otherwise(pl.lit("over_1600m"))
          .alias("distance_category")
    ])

)


In [6]:
xref_lazy.limit(10).collect()

Monit_MAP_CODE1,centroid_lat,centroid_long,Distance_Parcel_Lake_meters,distance_category
str,str,str,f64,str
"""19007900-01""","""44.94283""","""-93.11451""",2571.526792,"""over_1600m"""
"""19007900-01""","""44.94234""","""-93.11539""",2515.373802,"""over_1600m"""
"""19007900-01""","""44.94231""","""-93.11556""",2511.924959,"""over_1600m"""
"""19007900-01""","""44.94223""","""-93.11572""",2502.99164,"""over_1600m"""
"""19007900-01""","""44.94189""","""-93.1158""",2465.206234,"""over_1600m"""
"""19007900-01""","""44.94325""","""-93.11218""",2631.461256,"""over_1600m"""
"""19007900-01""","""44.94711""","""-93.11036""",3074.8658,"""over_1600m"""
"""19007900-01""","""44.94712""","""-93.11048""",3074.67095,"""over_1600m"""
"""19007900-01""","""44.9468""","""-93.11042""",3040.091456,"""over_1600m"""
"""19007900-01""","""44.94678""","""-93.11296""",3016.673976,"""over_1600m"""


In [7]:
lake_partitioned_out_path = lake_path.replace('.txt', '_by_base.parquet')

(xref_lazy
    # PartitionByKey is a python object
 .sink_parquet(pl.PartitionByKey(lake_partitioned_out_path,
                                 by="Monit_MAP_CODE1"),
                                 mkdir=True
              )
)

### Part 3 - Inspecting the partitioned `parquet` file

**Tasks.** Inspect the resulting "file" (actually a folder) from the last set and answer the following questions.

1. What impact did the partitioning have on the way the data was saved?
2. How would this structure help `polars` apply predicate pushdown?
3. How would this structure provide help via the principle of locality.  
4. When working with a cluster of machines, operations such as `group_by` are WIDE operations, meaning they generally need to shuffle data between machines.  Such a suffle is *very* expensive.  In a future lab, we will be creating features for each labke by grouping and aggregating on the lakes and years.  How would applying a similar structure to the parcel data help in this case? **Hint.** Remember that the data will be distributed across multiple machines using the partitions, i.e., each machine will load all or some of the same partition(s).


> <font color="orange"> Your thoughts here </font>

Grouping parcel data by lake ID and distance is critical for spatial analysis, allowing us to study property trends relative to specific water bodies, not just the general area. Since this data is missing, we must generate it using GIS: first, a spatial join assigns a unique Lake ID to each parcel by associating it with the nearest lake boundary. Second, a nearest neighbor analysis calculates the straight-line distance to the lake from the parcel's center. These steps enrich the dataset for precise, comparative lake-effect analysis.


### Part 4 - Filter parcels and joining lake id - one file

One of our programming principles is "do it to one before you do it to all".  Let's practice this by building a query using the `2004` file. Start by saving the path to the `2004` parcel file to the variable `q`.

**Tasks.**

1. Write a query in `polars` that performs the following tasks.
   - Build a query to scan corresponding CSV,
   - Selects common columns list the script file from the previous lab,
   - Joins on the necessary info from the `XREF` (lake ID, distance to the lake, distance category defined above, and centroid lat & long).
2. Test your query by collecting the first 10 or so rows using limit and collect.

Do this in a single `polars` dot chain.

**Important note.** When using a lazy query with a join, both tables must be lazy.  You can achieve this by using `pl.scan_csv` for the parcel data and `pl.read_parquet(...).lazy()` for the `XREF` data.

In [8]:
xref_parcel = (
    pl.scan_csv(
        'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
        has_header=True,
        separator="\t",
        infer_schema_length=0,  # disables schema inference
        schema_overrides={
            "centroid_lat": pl.Float64(),
            "centroid_long": pl.Float64()
        }
    )
     .select([
        "Monit_MAP_CODE1",
        "centroid_lat",
        "centroid_long",
        "Distance_Parcel_Lake_meters"
    ])
    .with_columns([
        pl.col("centroid_lat").cast(pl.Float64),
        pl.col("centroid_long").cast(pl.Float64),
        pl.col("Distance_Parcel_Lake_meters").cast(pl.Float64)
    ])

    .with_columns([
        pl.when(pl.col("Distance_Parcel_Lake_meters") <= 500)
          .then(pl.lit("within_500m"))
          .when((pl.col("Distance_Parcel_Lake_meters") > 500) & (pl.col("Distance_Parcel_Lake_meters") <= 1600))
          .then(pl.lit("between_501_1600m"))
          .otherwise(pl.lit("over_1600m"))
          .alias("distance_category")
    ])
)

parcel_04 = pl.scan_csv(
    "data/MinneMUDAC_raw_files/2004_metro_tax_parcels.txt",
    has_header=True,
    separator="|",
    infer_schema_length=0,
    schema_overrides={
            "centroid_lat": pl.Float64(),
            "centroid_long": pl.Float64()
        }
)

parcel_query_04 = (
    parcel_04
    .select([
        "ACRES_DEED", "ACRES_POLY", "AGPRE_ENRD", "AGPRE_EXPD", "AG_PRESERV",
        "BASEMENT", "BLDG_NUM", "BLOCK", "CITY", "CITY_USPS", "COOLING",
        "COUNTY_ID", "DWELL_TYPE", "EMV_BLDG", "EMV_LAND", "EMV_TOTAL",
        "FIN_SQ_FT", "GARAGE", "GARAGESQFT", "GREEN_ACRE", "HEATING",
        "HOMESTEAD", "HOME_STYLE", "LANDMARK", "LOT", "MULTI_USES",
        "NUM_UNITS", "OPEN_SPACE", "OWNER_MORE", "OWNER_NAME", "OWN_ADD_L1",
        "OWN_ADD_L2", "OWN_ADD_L3", "PARC_CODE", "PIN", "PLAT_NAME",
        "PREFIXTYPE", "PREFIX_DIR", "SALE_DATE", "SALE_VALUE", "SCHOOL_DST",
        "SPEC_ASSES", "STREETNAME", "STREETTYPE", "SUFFIX_DIR", "Shape_Area",
        "Shape_Leng", "TAX_ADD_L1", "TAX_ADD_L2", "TAX_ADD_L3", "TAX_CAPAC",
        "TAX_EXEMPT", "TAX_NAME", "TOTAL_TAX", "UNIT_INFO", "USE1_DESC",
        "USE2_DESC", "USE3_DESC", "USE4_DESC", "WSHD_DIST", "XUSE1_DESC",
        "XUSE2_DESC", "XUSE3_DESC", "XUSE4_DESC", "YEAR_BUILT", "Year",
        "ZIP", "ZIP4", "centroid_lat", "centroid_long"
    ])
    .join(
        xref_parcel,
        on=["centroid_lat", "centroid_long"],
        how="left"
    )
    .limit(10)
    .collect()

)

print(parcel_query_04)

shape: (10, 73)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ ACRES_DEE ┆ ACRES_POL ┆ AGPRE_ENR ┆ AGPRE_EXP ┆ … ┆ centroid_ ┆ Monit_MAP ┆ Distance_ ┆ distance │
│ D         ┆ Y         ┆ D         ┆ D         ┆   ┆ long      ┆ _CODE1    ┆ Parcel_La ┆ _categor │
│ ---       ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ ke_meters ┆ y        │
│ str       ┆ str       ┆ str       ┆ str       ┆   ┆ f64       ┆ str       ┆ ---       ┆ ---      │
│           ┆           ┆           ┆           ┆   ┆           ┆           ┆ f64       ┆ str      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 0.0       ┆ 8.03      ┆ null      ┆ null      ┆ … ┆ -93.26739 ┆ 02009100- ┆ 7441.1003 ┆ over_160 │
│           ┆           ┆           ┆           ┆   ┆           ┆ 03        ┆ 46        ┆ 0m       │
│ 0.0       ┆ 0.93      ┆ null      ┆ null      ┆ … ┆ -93.2701  ┆ 02009100-

### Part 5 - Filter parcels and joining lake id - All files

Next, we will partition and write each of the 2004-2015 parcel files to a `parquet` "file".  To do this, complete each of the following tasks.

**Tasks.**

2. We need to adapt the query from the previous step to work for all the years, which can be accomplished by placing that query in a comprehension.  The query should perform the following tasks for each year from 2004-2015.
   - Scan the corresponding CSV (again, `infer_schema=False` to keep lat & long as strings), <br>
   - Select the common columns (import from `parcel.py`),<br>
   - Join on the necessary info from the `XREF` (lake ID, distance to the lake, distance category defined above, and centroid lat & long).<br>
   - Compute the distance to lake using the great circle distance, then construct the lake distance categorical variable as defined above.<br>
   <br>
   Do this in a single `polars` dot chain.<br>
3. Use a comprehension to create a list of lazy data frames for each year from 2004-2015 that contain the joined parcel data.  The expression should use the query from the previous step.  First, test your query on a limited number of rows (e.g., 1000) to ensure it works correctly.
4. Now we want to sink all of the parcel data to a single partitioned parquet "file".  You will need to
    
**Important note.** When writing the parcel files separately, each parcel files took 10+ minutes on my laptop, so writing the combined file will take a while.  Pick a convenient time and be sure to plug in your laptop!

In [9]:
# great circle distance
import math

def haversine(lat1, lon1, lat2, lon2):
    # radius of Earth in meters
    R = 6371000
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = (math.sin(dlat/2)**2 +
         math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
         math.sin(dlon/2)**2)
    return 2 * R * math.asin(math.sqrt(a))

# xref_parcel
xref_parcel = (
    pl.scan_csv(
        "data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt",
        has_header=True,
        separator="\t",
        infer_schema_length=0,
        schema_overrides={
            "centroid_lat": pl.Float64(),
            "centroid_long": pl.Float64()
        }
    )
    .with_columns([
        pl.col("centroid_lat").cast(pl.Float64),
        pl.col("centroid_long").cast(pl.Float64),
        pl.col("Distance_Parcel_Lake_meters").cast(pl.Float64)
    ])
    .with_columns([
        pl.when(pl.col("Distance_Parcel_Lake_meters") <= 500)
          .then(pl.lit("within_500m"))
          .when((pl.col("Distance_Parcel_Lake_meters") > 500) & (pl.col("Distance_Parcel_Lake_meters") <= 1600))
          .then(pl.lit("between_501_1600m"))
          .otherwise(pl.lit("over_1600m"))
          .alias("distance_category")
    ])
)

common_cols = [
    "ACRES_DEED", "ACRES_POLY", "AGPRE_ENRD", "AGPRE_EXPD", "AG_PRESERV",
    "BASEMENT", "BLDG_NUM", "BLOCK", "CITY", "CITY_USPS", "COOLING",
    "COUNTY_ID", "DWELL_TYPE", "EMV_BLDG", "EMV_LAND", "EMV_TOTAL",
    "FIN_SQ_FT", "GARAGE", "GARAGESQFT", "GREEN_ACRE", "HEATING",
    "HOMESTEAD", "HOME_STYLE", "LANDMARK", "LOT", "MULTI_USES",
    "NUM_UNITS", "OPEN_SPACE", "OWNER_MORE", "OWNER_NAME", "OWN_ADD_L1",
    "OWN_ADD_L2", "OWN_ADD_L3", "PARC_CODE", "PIN", "PLAT_NAME",
    "PREFIXTYPE", "PREFIX_DIR", "SALE_DATE", "SALE_VALUE", "SCHOOL_DST",
    "SPEC_ASSES", "STREETNAME", "STREETTYPE", "SUFFIX_DIR", "Shape_Area",
    "Shape_Leng", "TAX_ADD_L1", "TAX_ADD_L2", "TAX_ADD_L3", "TAX_CAPAC",
    "TAX_EXEMPT", "TAX_NAME", "TOTAL_TAX", "UNIT_INFO", "USE1_DESC",
    "USE2_DESC", "USE3_DESC", "USE4_DESC", "WSHD_DIST", "XUSE1_DESC",
    "XUSE2_DESC", "XUSE3_DESC", "XUSE4_DESC", "YEAR_BUILT", "Year",
    "ZIP", "ZIP4", "centroid_lat", "centroid_long"
]


In [10]:
# comprehension of all years
years = range(2004, 2015)

parcel_queries = [
    (
        pl.scan_csv(
            f"data/MinneMUDAC_raw_files/{year}_metro_tax_parcels.txt",
            has_header=True,
            separator="|",
            infer_schema_length=0,
            schema_overrides={
                "centroid_lat": pl.Float64(),
                "centroid_long": pl.Float64()
            }
        )
        .select(common_cols)
        .with_columns([
            pl.col("centroid_lat").cast(pl.Float64),
            pl.col("centroid_long").cast(pl.Float64),
            pl.lit(year).alias("YearTag")  # add explicit year tag
        ])
        .join(
            xref_parcel,
            on=["centroid_lat", "centroid_long"],
            how="left"
        )
    )
    for year in years
]

In [None]:
import polars as pl

combined_lazy = pl.concat(parcel_queries)

parcel_partitioned_out_path = "output/parcel_combined_by_year.parquet"

combined_lazy.sink_parquet(
    pl.PartitionByKey(parcel_partitioned_out_path, by="YearTag"),
    mkdir=True
)
