# Imports, data load and cleaning

In [None]:
import timeit

import itables
import polars as pl
import pandas as pd
import plotly.express as px
import plotly.io as pio
import seaborn as sns

sns.set_theme()

pl.Config.set_fmt_str_lengths(250)
pl.Config.set_tbl_rows(40)

import jupyter_black

jupyter_black.load(line_length=120)

Let's load in all tax assessments for the 2.54M properties in MA as of 2023.

Data sourced from https://www.mass.gov/info-details/massgis-data-property-tax-parcels as GIS files, but I also made a CSV extract available at https://drive.google.com/file/d/1h8sZ3U2nmurJ5BxfngAdhQfb0U13ladB/view?usp=drive_link

Note the term "town" is used interchangeably with "city"

In [None]:
# TODO(you): Update these paths to match where you downloaded the CSV assessments file and the
# boundary files.

# Home prices as an easy to use CSV.
# Update to point to where you downloaded the CSV file per the README.
MA_HOME_PRICES_CSV_PATH = "../ma-home-prices/mass_building_assessments_2023.csv"

# State parcels, used for town boundaries.
# Update to point to where you downloaded the CSV file per the README.
PARCEL_BOUNDARIES_GDB = "../ma-home-prices/mass_parcels_2024/MassGIS_L3_Parcels.gdb"

# Property parcels, used for individual property boundaries.
# In theory the file above has the data in the file below, but due to a corruption issue we can't pull it so we load both.
# Update to point to where you downloaded the CSV file per the README.
PARCEL_BOUNDARIES_EAST_SHP = "/Users/mdezube/Documents/QGIS/Reference Data/MA Full Dataset/MassGIS SHP format/gisdata/men1/parcels/L3_TAXPAR_POLY_ASSESS_EAST.shp"

In [None]:
start = timeit.default_timer()

assessments = pl.read_csv(
    MA_HOME_PRICES_CSV_PATH,
    infer_schema_length=250000,
    dtypes={"ZIP": pl.Utf8},
)

end = timeit.default_timer()
print(f"Loaded data in {end - start:0.1f} seconds")

## How does pandas compare in speed?

In [None]:
start = timeit.default_timer()

pandas_df = pd.read_csv(MA_HOME_PRICES_CSV_PATH, low_memory=False)

end = timeit.default_timer()
print(f"Loaded data in {end - start:0.1f} seconds")

## How does pandas compare in memory usage?

In [None]:
pandas_df.info(memory_usage="deep", verbose=False)

In [None]:
del pandas_df

**Out of core compuation** 

We won't do it today, but you can stream a CSV  to load only part of a CSV that wouldn't fit in memory, see https://docs.pola.rs/py-polars/html/reference/api/polars.scan_csv.html

In [None]:
# Clean up a few column names and content
assessments = assessments.rename({"TOWN_ID": "TAXING_TOWN_ID", "CITY": "LOCAL_TOWN"})
assessments = assessments.with_columns(
    pl.col("LOCAL_TOWN").str.to_uppercase(),
    pl.col("LS_DATE").cast(pl.Utf8).str.to_date("%Y%m%d", strict=False),
    pl.col("OWNER1").str.replace(r"COMMONWLTH\b|COMMWLTH\b", "COMMONWEALTH").str.replace(r"MASS\b", "MASSACHUSETTS"),
)
assessments = assessments.with_columns(pl.col("LS_DATE").dt.year().alias("ls_year"))

print(f"{assessments.shape[0]:,} rows at {round(assessments.estimated_size(unit='gb'), 1)}GB")

# Data exploration

In [None]:
assessments[1000:1005]

In [None]:
# Join in official town names to their IDs.
town_id_to_name = pl.read_csv("town_id_to_name.csv")
town_id_to_name = town_id_to_name.rename({"TOWN": "TAXING_TOWN"})
assessments = assessments.join(
    town_id_to_name,
    left_on="TAXING_TOWN_ID",
    right_on="TOWN_ID",
)

Three main contexts in polars, i.e. ways to view the data:

1. Selection: df.select(...), df.with_columns(...)
2. Filtering: df.filter()
3. Group by / Aggregation: df.group_by(...).agg(...)

But don't worry, standard items likes pivots, joins, melts etc are also supported.

## Selection

In [None]:
# Average parcel value in MA, includes commercial and residential and gov't.
assessments.select(
    pl.col("BLDG_VAL").mean().alias("BLDG_VAL_mean"),
    pl.col("LAND_VAL").mean().alias("LAND_VAL_mean"),
    pl.col("BLDG_VAL").sum().alias("BLDG_VAL_sum"),
    pl.col("LAND_VAL").sum().alias("LAND_VAL_sum"),
)

### Select columns by regex

In [None]:
# Shorthand for selecting columns by a regex.  Wrap it in ^$
assessments.select(pl.col("^*_VAL$")).mean()

### Select columns by type

In [None]:
# Shorthand for selecting columns by type.  Very useful if we want the averages for every column
# where that concept is valid.
assessments.select(pl.col(pl.Int64, pl.UInt32, pl.Float32, pl.Float64)).mean()

In [None]:
# For clarity, let's drop some IDs we won't use.
assessments = assessments.select(
    pl.exclude(
        [
            "PROP_ID",
            "LOC_ID",
            "MA_PROP_ID",
            "ZONING",
            "FULL_STR_STD",
            "CAMA_ID",
            "REG_ID",
            "LOCATION",
        ]
    )
)

## Filtering

In [None]:
# Let's look at Newton!

_df = assessments.filter(pl.col("LOCAL_TOWN") == "BOSTON")
itables.show(_df.to_pandas(), maxBytes=1024 * 1024, column_filters="footer")

In [None]:
# What about properties owned by Boston University in Boston?
# As a non profit you can generally look up the total assets through public filings – for those interested!

_df = assessments.filter(pl.col("OWNER1").str.to_uppercase().str.contains("BOSTON UNIVERSITY")).sort(
    by="TOTAL_VAL", descending=True
)

print(f"Total value: ${_df.select('TOTAL_VAL').sum().item():,}\n")

itables.show(_df.to_pandas(), maxBytes=1024 * 1024 * 5, column_filters="footer")

## Groups bys

In [None]:
assessments.filter(pl.col("TAXING_TOWN") == "BOSTON").shape

### What towns are owned by parent towns?

In [None]:
pl.Config.set_fmt_table_cell_list_len(10)

_df = (
    assessments.group_by(pl.col("TAXING_TOWN"))
    .agg(pl.col("LOCAL_TOWN").unique().sort(), pl.col("LOCAL_TOWN").n_unique().alias("n_reporting_towns"))
    .filter(pl.col("n_reporting_towns") > 1)
    .sort(by="n_reporting_towns", descending=True)
)

_df.head(10)

### Top property owners in Boston

In [None]:
# Who owns Boston?
# Admittedly there is nuance here, e.g. Boston University's owners are sometimes "Trustees of Boston University", but
# others times "Trustees" is abbreviated.  Important to clean up to get more precise results.

_df = (
    assessments.filter(pl.col("TAXING_TOWN") == "BOSTON")
    .group_by(pl.col("OWNER1"))
    .agg(
        pl.col("TOTAL_VAL").sum(),
    )
    .with_columns((100 * pl.col("TOTAL_VAL") / pl.col("TOTAL_VAL").sum()).alias("% of Total"))
    .to_pandas()
    .sort_values(by="TOTAL_VAL", ascending=False)
    .reset_index(drop=True)
)

_df[:20].style.format(thousands=",", precision=2)

### Average home value by town

In [None]:
# Now let's ask questions by use code/area, i.e. let's ask:
# What are the average residential home values by town?  Avg cost per sqft?  Etc.

# Use CODES
# 101 ......Single Family
# 102 ......Condominium
# 103 ......Mobile Home (includes land used for purpose of a mobile home park)
# 104 ......Two-Family
# 105 ......Three-Family
# 106 ......Accessory Land with Improvement - garage, etc.
# 107 ......(Intentionally left blank)
# 108 ......(Intentionally left blank)
# 109 ......Multiple Houses on one parcel (for example, a single and a two-family on one parcel)

residential = assessments.filter(pl.col("USE_CODE").str.contains("^10[1-9]"))
residential_price_analysis = (
    residential.group_by(["TAXING_TOWN", "TAXING_TOWN_ID"])
    .agg(
        pl.col("LOCAL_TOWN"),
        pl.sum("BLDG_VAL").alias("bldg_val_sum"),
        pl.sum("LAND_VAL").alias("land_val_sum"),
        pl.sum("OTHER_VAL").alias("other_val_sum"),
        pl.sum("TOTAL_VAL").alias("total_val_sum"),
        pl.mean("TOTAL_VAL").alias("total_val_avg"),
        pl.mean("RES_AREA").alias("res_area_avg"),
        pl.median("RES_AREA").alias("res_area_median"),
        (pl.sum("TOTAL_VAL") / pl.sum("RES_AREA")).alias("cost_per_sq_ft"),
        pl.count("BLDG_VAL").alias("num_buildings"),
    )
    .sort(by="total_val_avg", descending=True)
    .with_columns(pl.col("LOCAL_TOWN").list.unique())
)
# We convert to pandas at the end just so we can format the numbers with commas.
residential_price_analysis.head(20).to_pandas().style.format(thousands=",", precision=0)

In [None]:
# Above we see more than one city name per town ID, how can this be?

In [None]:
# Output is also hard to read and needs formatting, let's convert to pandas for that.

# Only pull the columns we care about.
residential_price_analysis_pd = residential_price_analysis.select(
    [
        "TAXING_TOWN",
        "TAXING_TOWN_ID",
        "total_val_avg",
        "res_area_avg",
        "cost_per_sq_ft",
        "num_buildings",
        "LOCAL_TOWN",
    ]
).to_pandas()
# Top twenty towns by their average home value.
residential_price_analysis_pd.head(50).style.format(thousands=",", precision=0)

In [None]:
ax = sns.barplot(
    data=residential_price_analysis_pd[:25],
    x="total_val_avg",
    y="TAXING_TOWN",
    palette="flare",
    hue="total_val_avg",
)
ax.set(xlabel="Average Home Value", ylabel="Town")

# GIS

## Average property value by town

### Load in the boundaries for each town

In [None]:
import fiona
import folium
import geopandas as gpd

layers = [layer for layer in fiona.listlayers(PARCEL_BOUNDARIES_GDB)]
print(layers)

town_boundaries = gpd.read_file(PARCEL_BOUNDARIES_GDB, layer="L3_TOWNFY", use_arrow=True)
town_boundaries.TOWN = town_boundaries.TOWN.str.upper()
town_boundaries.head(1)

### Choropleth visual

In [None]:
_df = town_boundaries.merge(
    residential_price_analysis_pd,
    left_on="TOWN_ID",
    right_on="TAXING_TOWN_ID",
).drop("LOCAL_TOWN", axis="columns")

# Cap the avg value at 1.5M so Nantucket doesn't destroy the legend.
_df["total_val_avg"] = _df["total_val_avg"].apply(lambda x: max(min(x, 1500000), 150000))
_df.explore(
    "total_val_avg",
    style_kwds={"fillOpacity": 0.8},
    cmap="viridis",
    legend=True,
    k=10,
    scheme="equal_interval",
    legend_kwds={
        "colorbar": False,
        "caption": "Average home value (clipped to .15M - 1.5M]",
        "max_labels": 5,
        "fmt": "{:.0f}",
    },
)

## View individual homes

In [None]:
import warnings
import geopandas as gpd

# A trick to silence warnings, but only specifically from one imported module as not to hide important ones.
warnings.filterwarnings("ignore", module="pyogrio")

parcel_boundaries = gpd.read_file(
    # We use this shape file for now as the .gdb file that has east and west together has some invalid geometries that can't be loaded
    # via pyogrio.
    PARCEL_BOUNDARIES_EAST_SHP,
    engine="pyogrio",
    use_arrow=True,
)
parcel_boundaries = parcel_boundaries.set_index(["TOWN_ID", "PROP_ID"])
parcel_boundaries["total_val_clip"] = parcel_boundaries["TOTAL_VAL"].clip(0, 5000000)

In [None]:
parcels_of_interest = parcel_boundaries[
    (parcel_boundaries.CITY.str.upper().str.contains("NEWTON|BROOKLINE"))
    & (parcel_boundaries.USE_CODE.str.contains("^101"))
]

In [None]:
map = parcels_of_interest.explore(
    column="total_val_clip",
    popup=True,
    tooltip=[
        "CITY",
        "TOTAL_VAL",
        "BLDG_VAL",
        "LAND_VAL",
        "BLD_AREA",
        "YEAR_BUILT",
        "OWNER1",
        "RES_AREA",
        "STYLE",
    ],
    legend=False,
    cmap="viridis",
)
folium.TileLayer("openstreetmap", opacity=0.5, name="OpenStreetMap .5", show=False).add_to(map)
folium.TileLayer("cartodb positron", show=True).add_to(map)
folium.LayerControl().add_to(map)

map

# Using SQL in Polars

## Average sale price by year

In [None]:
# Requires:
#   Sales prices under $7M
#   Avg sale per town <= $2M and avg sale per town >= $400k
#   Must have >= 3 years of data
#   Must have >= 100 sales in each year
query = """
WITH avg_sale_price_by_year AS (
    SELECT
      AVG(LS_PRICE) AS ls_avg_price,
      TAXING_TOWN,
      ls_year,
      COUNT(*) AS num_sales
    FROM frame
    WHERE ls_year IN (2019, 2020, 2021, 2022, 2022) AND LS_PRICE != 0 AND LS_PRICE < 7000000
    GROUP BY TAXING_TOWN, ls_year
    HAVING ls_avg_price >= 400000 AND ls_avg_price <= 2000000 AND num_sales >= 200
    ORDER BY TAXING_TOWN, ls_year
),

add_window_functions AS (
    SELECT
        *,
        MIN(num_sales) OVER(PARTITION BY TAXING_TOWN) AS min_sales_in_lowest_year,
        MAX(ls_year) OVER(PARTITION BY TAXING_TOWN) - MIN(ls_year) OVER(PARTITION BY TAXING_TOWN) + 1 AS years_in_range
    FROM avg_sale_price_by_year
)

SELECT *
FROM add_window_functions
WHERE min_sales_in_lowest_year >= 100 AND years_in_range >= 3
"""
_df = pl.SQLContext(frame=residential).execute(query, eager=True)
_df.head(10)

In [None]:
# Unusued atm, can calculate YoY increases.
# _df = _df.with_columns(
#     (pl.col("LS_AVG_PRICE") / pl.col("LS_AVG_PRICE").shift(1).over('TAXING_TOWN')).fill_null(1).alias('PCT_INCREASE')
# )

## Average sale price by year visual

In [None]:
fig = px.line(
    _df.drop_nulls().to_pandas(),
    x="ls_year",
    y="ls_avg_price",
    color="TAXING_TOWN",
    markers=True,
    custom_data=["TAXING_TOWN", "num_sales"],
    height=600,
)
fig.update_traces(
    hovertemplate="<br>".join(
        [
            "%{customdata[0]}",
            "Year: %{x}",
            "Avg Price: %{y}",
            "NUM_SALES: %{customdata[1]}",
        ]
    )
)
fig.show()