# Utilize data from Zillow and combine that with Social Vulnerability (SVI) index
Data available through CDC to demonstrate relationship between social
vulnerability and their impact on, home sale prices, days between on-market to
pending, price increases etc.

 - Zillow data is available at: https://www.zillow.com/research/data/
 - SVI data is available at:
https://www.atsdr.cdc.gov/placeandhealth/svi/data_documentation_download.html
 - In this example, data sets may use different geo-identifiers so you may have to
cross walk tables provided by HUD or come up with your own mechanism to
harmonize the different geographical divisions.
(https://www.huduser.gov/portal/datasets/usps_crosswalk.html)

# Plan to Obtain Zillow Days on Market Data and HUD's Vulnerability Index

## Step 1: Create geocoding tables
1. **Secure Gazzeteer files**
    - Since Zillow uses MSA and Zipcodes for publishing their data and CDC uses FIPS code (county level), it is critical that we have a geospatial baseline to correlate the two
2. **Latitude and Longitude**
    - While there are files that provide county FIPS codes (as GeoIDs), the grains are NOT consistent across these two datasets we will adopt Lat and Lon codes for every MSA and CDC counties to tether the two data layers
3. **S2 Codes**
    - The latitude and longitudes of the two datasets, being floating point numbers, leave plenty of theta join conditions to marry the two. Its non-performant and prone to gaps. We will therefore use s2sphere ("locality preserving" space filling curve index which has similar 1D indices for geolocations that are closer to each other on 2D earth) to quickly locate fuzzy proximate matching between two layer coordinates
    - We will stamp in s2codes and marry the two Zillow zipcodes and CDC FIPS codes using lat/lon. We will merge these using `pandas.merge_asof` operation.

## Step 2: Obtain Zillow Days on Market Data
1. **Identify Data Source**: 
    - Zillow provides various datasets, including Days on Market (DoM) data.
    - The data can be accessed from Zillow's research data page: [Zillow Research Data](https://www.zillow.com/research/data/).

2. **Download Data**:
    - Download the DoM data from Zillow's website.
    - Ensure the data is in a suitable format (e.g., CSV).

3. **Load Data into Jupyter Notebook**:
    - Use `pandas` to read the CSV file into a DataFrame.
    - Perform initial data cleaning and preprocessing if necessary.

4. **Save Data Locally**:
    - Save the cleaned data to a local file for future use.

## Step 3: Obtain HUD's Vulnerability Index
1. **Identify Data Source**:
    - The Social Vulnerability Index (SVI) data is available through the CDC.
    - The data can be accessed from the CDC's SVI page: [CDC SVI Data](https://www.atsdr.cdc.gov/placeandhealth/svi/data_documentation_download.html).

2. **Download Data**:
    - Download the SVI data from the CDC's website.
    - Ensure the data is in a suitable format (e.g., CSV).

3. **Load Data into Jupyter Notebook**:
    - Use `pandas` to read the CSV file into a DataFrame.
    - Perform initial data cleaning and preprocessing if necessary.

4. **Save Data Locally**:
    - Save the cleaned data to a local file for future use.

## Step 3: Data Harmonization
1. **Geographical Crosswalk**:
    - Use geographical crosswalk tables (reference tables built in first step) provided by HUD to harmonize different geographical divisions.
    - Ensure that the Zillow and SVI data can be joined on common s2spehere cell IDs using "fuzzy" `pd.merge_asof` logic.

2. **Data Integration**:
    - Integrate the Zillow DoM data with the SVI data.
    - Ensure that the integrated dataset is clean and ready for analysis.
    - Persist this data for analysis in a separate notebook.

# Spark Init

In [1]:
import ast
import atexit
import io
import itertools
import json
import os
import re
import sys
import uuid
from datetime import datetime
import pandas as pd
from IPython.display import HTML, display

if "spark" not in vars():
    import findspark
    findspark.init()
    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    from pyspark.sql import DataFrame, SparkSession
    from pyspark.sql.types import (
        FloatType,
        IntegerType,
        StringType,
        StructField,
        StructType,
    )
    from pyspark.sql.window import Window
    from pyspark.storagelevel import StorageLevel

    spark = (
        SparkSession.builder.master("local[8,2]")
        .config("spark.driver.memory", "1g")
        .enableHiveSupport()
        .getOrCreate()
    )
    atexit.register(lambda: spark.stop())

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.window import Window


def cleanse_val(val):
    return re.sub(r"([^\s\w\d])+", "", val.lower()).strip() if val else ""


def reg(spark_df, name=None):
    uniqsig = "df_{0}".format(cleanse_val(str(uuid.uuid4()))) if not name else name
    spark_df.createOrReplaceTempView(uniqsig)
    return uniqsig


def show(df, rows=5):
    display(df.limit(rows).toPandas())


# Override table show/registration functions
DataFrame.reg = reg
DataFrame.dshow = show

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/08 09:41:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Step 1. Geocoding Data from Census Gezzetter files

This is the source that first maps county level FIPS to lat/lon info.

In [2]:
zip_county_geocode = (
    pd.read_csv(
        "data/2024_Gaz_tracts_national.txt",
        sep=r"\s+",
        usecols=["USPS", "GEOID", "INTPTLAT", "INTPTLONG"],
        dtype=str,
    )
    .fillna("")
    .astype(str)
)
zip_county_geocode_df = spark.createDataFrame(zip_county_geocode).cache()
zip_county_geocode_df = (
    zip_county_geocode_df.groupBy(
        F.col("USPS").alias("State"),
        F.expr('substr(lpad(GEOID, 11, "0"), 0, 5)').alias("GeoID"),
    )
    .agg(F.mean("INTPTLAT").alias("Latitude"), F.mean("INTPTLONG").alias("Longitude"))
    .cache()
)
zip_county_geocode_df.dshow()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
                                                                                

Unnamed: 0,State,GeoID,Latitude,Longitude
0,AK,2100,59.098404,-135.575791
1,CA,6059,33.730653,-117.860925
2,GA,13095,31.564073,-84.163457
3,ID,16017,48.26341,-116.62218
4,IL,17125,40.259177,-89.900969


## S2 Cell Mapping Logic

The S2 geometry library is used to map latitude and longitude coordinates to S2 cells. S2 cells are hierarchical, equal-area cells that cover the surface of a sphere. This allows for efficient spatial indexing and querying.

### Function Description

The function `latlon_to_s2id` is a pyspark UDF that takes latitude and longitude coordinates and maps them to S2 cell ID. A 20-character long number. LongLongType in `C++`. To shim it in Python, we will use `string` serde format.

### Steps
1. **Latitude and Longitude Validation**:
    - The function first checks if the latitude and longitude are valid floating-point numbers within their respective ranges (-90 to 90 for latitude and -180 to 180 for longitude).

2. **Conversion to S2 Cell ID**:
    - If the coordinates are valid, they are converted to an S2 cell ID using the `s2sphere` library. This involves creating an `LatLng` object from the latitude and longitude, and then converting it to an S2 cell ID.

3. **Return S2 Cell ID**:
    - The function returns the S2 cell ID. If the input coordinates are invalid, it returns `None`.

## Step 2. Geocoding Data from HUD crosswalk files

This is the source that first city level zip to lat/lon info.

In [3]:
# Data is at https://www.huduser.gov/apps/public/uspscrosswalk/home
county_zip_df = spark.createDataFrame(
    pd.read_excel(
        "data/COUNTY_SUB_ZIP_062024.xlsx",
        usecols=["COUNTY_SUB", "ZIP", "USPS_ZIP_PREF_CITY", "USPS_ZIP_PREF_STATE"],
        dtype=str,
    )
    .fillna("")
    .astype(str)
)
county_zip_df = county_zip_df.select(
    F.expr('lpad(COUNTY_SUB, 10, "0")').alias("GeoID"),
    F.expr('substr(lpad(COUNTY_SUB, 10, "0"), 0, 5)').alias("CountyFIPS"),
    F.col("ZIP"),
    F.col("USPS_ZIP_PREF_CITY").alias("City"),
    F.col("USPS_ZIP_PREF_STATE").alias("State"),
)

from s2sphere import CellId, LatLng


# Function to convert latitude and longitude to S2 cell ID
def latlon_to_s2id(lat, lon):
    # Check if lat and lon are floats and within valid ranges
    if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
        if -90 <= lat <= 90 and -180 <= lon <= 180:
            latlng = LatLng.from_degrees(lat, lon)
            cell = CellId.from_lat_lng(latlng)
            return cell.id()
    # Return None if the inputs are invalid
    return None


# Map S2 Code for Geo geometry
geocodes_df = (
    spark.sql(
        f"""select a.*, b.Latitude, b.Longitude from {county_zip_df.reg()} a
    left join {zip_county_geocode_df.reg()} b
    on a.CountyFIPS=b.GeoID and a.State=b.State"""
    )
    .drop_duplicates(["ZIP"])
    .withColumn(
        "S2Id",
        F.lpad(F.udf(latlon_to_s2id, T.StringType())("Latitude", "Longitude"), 20, "0"),
    )
).cache()

geocodes_df.dshow()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
                                                                                

Unnamed: 0,GeoID,CountyFIPS,ZIP,City,State,Latitude,Longitude,S2Id
0,2501730700,25017,2053,MEDWAY,MA,42.44269,-71.246332,9935956960593095687
1,2502182315,25021,2070,SHELDONVILLE,MA,42.204774,-71.143206,9936209396979585163
2,2502116495,25021,2090,WESTWOOD,MA,42.204774,-71.143206,9936209396979585163
3,3301104900,33011,3442,BENNINGTON,NH,42.881262,-71.548593,9935984454483744731
4,2303137270,23031,3904,KITTERY,ME,43.427069,-70.6225,5526674384555133445


## Read GeoID to Zipcode Mapping

The data from Zillow is by GeoID (for metrics) and Zipcode (as primary key). Its unfortunate that they have two level referential mapping: Zillow maps housing metrics to MSA in one file. In a separate file, without a consistent MSA *"ID"*, they list zipcodes and normalized string attributes for MSA in another file. So we have to download both referential files and map the geocodes for their metrics.

 - 1. First file gather metrics (in real numbers) by MSA city, state, county (in string form)
 - 2. Second, we gather the zip code to MSA string attributes
 - 3. Third, we crosswalk MSA metrics to zipcode

In [4]:
import pandas as pd

zillow_metro_map_file = "data/zillow_metro_ref_map.csv"
zillow_metro_missing_map_file = "data/Missing_Zips.csv"
zillow_metro_map_file_exists = os.path.exists(
    zillow_metro_map_file
)  # Does the mapping file aready exist?

# Read from Internet if mapping from Metro to Geo data is NOT already present
zillow_home_values_zip_ref = (
    pd.concat(
        [
            pd.read_csv(
                "https://files.zillowstatic.com/research/public_csvs/zhvf_growth/Zip_zhvf_growth_uc_sfrcondo_tier_0.33_0.67_month.csv?t=1730940305",
                usecols=[
                    "RegionID",
                    "SizeRank",
                    "RegionName",
                    "RegionType",
                    "StateName",
                    "State",
                    "City",
                    "Metro",
                    "CountyName",
                ],
                dtype=str,
            )
            .fillna("")
            .astype(str),
            pd.read_csv(zillow_metro_missing_map_file, dtype=str)
            .fillna("")
            .astype(str),
        ],
        ignore_index=True,
    ).reset_index()
    if not zillow_metro_map_file_exists
    else pd.read_csv(zillow_metro_map_file, dtype=str).fillna("").astype(str)
)

if not zillow_metro_map_file_exists:
    zillow_home_values_zip_ref["City"] = zillow_home_values_zip_ref["City"].apply(
        lambda x: x.strip().upper() if x else ""
    )
    zillow_home_values_zip_ref["County"] = (
        zillow_home_values_zip_ref["CountyName"]
        .str.replace(r" COUNTY$", "", case=False, regex=True)
        .apply(lambda x: x.strip().upper())
    )

    zillow_home_values_zip_ref.to_csv(
        zillow_metro_map_file,
        index=False,
    )  # Save for later
    zillow_metro_map_file_exists = os.path.exists(zillow_metro_map_file)

display(zillow_home_values_zip_ref.head())

Unnamed: 0,index,RegionID,SizeRank,RegionName,RegionType,StateName,State,City,Metro,CountyName,County
0,0,91982,1,77494,zip,TX,TX,KATY,"Houston-The Woodlands-Sugar Land, TX",Fort Bend County,FORT BEND
1,1,61148,2,8701,zip,NJ,NJ,LAKEWOOD,"New York-Newark-Jersey City, NY-NJ-PA",Ocean County,OCEAN
2,2,91940,3,77449,zip,TX,TX,KATY,"Houston-The Woodlands-Sugar Land, TX",Harris County,HARRIS
3,3,62080,4,11368,zip,NY,NY,NEW YORK,"New York-Newark-Jersey City, NY-NJ-PA",Queens County,QUEENS
4,4,91733,5,77084,zip,TX,TX,HOUSTON,"Houston-The Woodlands-Sugar Land, TX",Harris County,HARRIS


## Zillow Step 2. Days on Market values
Get data from Zillow for DoM and persist locally

In [5]:
import re

zillow_metro_dom_file = "data/zillow_dom_metro_data.csv"
zillow_metro_dom_file_exists = os.path.exists(
    zillow_metro_dom_file
)  # Does the days on market file aready exist?

# Read from Internet if mapping from Metro to Geo data is NOT already present
zillow_dom_data_timeseries = (
    pd.read_csv(
        "https://files.zillowstatic.com/research/public_csvs/mean_doz_pending/Metro_mean_doz_pending_uc_sfrcondo_sm_week.csv?t=1730940305",
        dtype=str,
    )
    .fillna("")
    .astype(str)
    if not zillow_metro_dom_file_exists
    else pd.read_csv(zillow_metro_dom_file, dtype=str).fillna("").astype(str)
)

latest_date = sorted(
    [col for col in zillow_dom_data_timeseries.columns if col[:2] == "20"], reverse=True
)[0]
zillow_dom_data_timeseries = zillow_dom_data_timeseries[
    [col for col in zillow_dom_data_timeseries.columns if col[:2] != "20"]
    + [latest_date]
]
if not zillow_metro_dom_file_exists:
    zillow_dom_data_timeseries = zillow_dom_data_timeseries[
        zillow_dom_data_timeseries.RegionType.str.startswith("msa")
    ]
    zillow_dom_data_timeseries["City"] = zillow_dom_data_timeseries.apply(
        lambda x: re.sub(f", {x.StateName}$", "", str(x.RegionName)).strip(), axis=1
    ).apply(lambda x: str(x).strip().upper() if x else "")
    zillow_dom_data_timeseries.to_csv(
        zillow_metro_dom_file,
        index=False,
    )  # Save for later
    zillow_metro_dom_file_exists = os.path.exists(zillow_metro_dom_file)

display(zillow_dom_data_timeseries)
display(latest_date)

Unnamed: 0,RegionID,SizeRank,RegionName,RegionType,StateName,2024-10-05,City
1,394913,1,"New York, NY",msa,NY,58.0,NEW YORK
2,753899,2,"Los Angeles, CA",msa,CA,39.0,LOS ANGELES
3,394463,3,"Chicago, IL",msa,IL,32.0,CHICAGO
4,394514,4,"Dallas, TX",msa,TX,55.0,DALLAS
5,394692,5,"Houston, TX",msa,TX,61.0,HOUSTON
...,...,...,...,...,...,...,...
334,394472,492,"Clearlake, CA",msa,CA,86.0,CLEARLAKE
335,394759,519,"Laconia, NH",msa,NH,39.0,LACONIA
336,845162,535,"Granbury, TX",msa,TX,72.0,GRANBURY
337,753905,604,"Newport, OR",msa,OR,99.0,NEWPORT


'2024-10-05'

## Zillow Step 3. Map Region IDs to see Zip Info

Process in Pyspark just to handle large joins.

In [6]:
zillow_dom_data_timeseries_df = spark.createDataFrame(
    zillow_dom_data_timeseries
).cache()
zillow_home_values_zip_ref_df = spark.createDataFrame(
    zillow_home_values_zip_ref
).cache()
zillow_dom_data_timeseries_df.orderBy('RegionID').dshow()
zillow_home_values_zip_ref_df.orderBy('RegionID').dshow()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


Unnamed: 0,RegionID,SizeRank,RegionName,RegionType,StateName,2024-10-05,City
0,394298,473,"Aberdeen, WA",msa,WA,51.0,ABERDEEN
1,394299,251,"Abilene, TX",msa,TX,60.0,ABILENE
2,394304,83,"Akron, OH",msa,OH,24.0,AKRON
3,394306,295,"Albany, GA",msa,GA,65.0,ALBANY
4,394307,330,"Albany, OR",msa,OR,39.0,ALBANY


Unnamed: 0,index,RegionID,SizeRank,RegionName,RegionType,StateName,State,City,Metro,CountyName,County
0,30959,-1,1,3756,zip,NH,NH,LEBANON,Lebanon-NH,Grafton County,GRAFTON
1,8021,100000,8306,98848,zip,WA,WA,QUINCY,"Moses Lake, WA",Grant County,GRANT
2,17875,100002,22198,98850,zip,WA,WA,ROCK ISLAND,"Wenatchee, WA",Douglas County,DOUGLAS
3,10719,100003,11596,98851,zip,WA,WA,SOAP LAKE,"Moses Lake, WA",Grant County,GRANT
4,20101,100004,26355,98852,zip,WA,WA,STEHEKIN,"Wenatchee, WA",Chelan County,CHELAN


## Zillow Step 3. Ensure MSA as Zipcode

Crosswalk and add Zipcodes to MSA Info. Ensure no duplicates and fuzzy search based on City Name and Code.

In [7]:
# Fuzzy join on City name and Deduplicate/Group; Collect Mode of County and Zipcode
data_cols = ", ".join([f"""`{col}`""" for col in zillow_dom_data_timeseries_df.columns])
zillow_zip_referenced = spark.sql(
    f"""select z.*, Latitude, Longitude, substr(lpad(GeoID, 10, "0"), 0, 10) GeoID, CountyFIPS, c.S2Id from
        (select {data_cols}, cast(mode(ZipCode) as int) ZipCode, mode(County) County from 
            (select a.*, b.ZipCode, b.County from {zillow_dom_data_timeseries_df.reg()} a
                left join 
                    (select *, substr(lpad(RegionName, 5, "0"), 0, 5) as ZipCode from {zillow_home_values_zip_ref_df.reg()}) b
                    on 
                    (a.City=b.City or instr(a.City, b.City) >=1 or instr(b.City, a.City) >=1) and 
                    a.StateName=b.StateName and b.RegionName is not null) x
        group by {data_cols}) z
    left join {geocodes_df.reg()} c on z.ZipCode=c.ZIP"""
)
zillow_zip_referenced.orderBy("RegionID").dshow()

                                                                                

Unnamed: 0,RegionID,SizeRank,RegionName,RegionType,StateName,2024-10-05,City,ZipCode,County,Latitude,Longitude,GeoID,CountyFIPS,S2Id
0,394298,473,"Aberdeen, WA",msa,WA,51.0,ABERDEEN,99170,PIERCE,47.678393,-117.371865,5306390056,53063,6097338212157320367
1,394299,251,"Abilene, TX",msa,TX,60.0,ABILENE,76311,TAYLOR,33.92034,-98.546437,4848594215,48485,9679116401234704717
2,394304,83,"Akron, OH",msa,OH,24.0,AKRON,45362,SUMMIT,40.11659,-84.612424,3903701294,39037,9817824436034892991
3,394306,295,"Albany, GA",msa,GA,65.0,ALBANY,31707,DOUGHERTY,31.564073,-84.163457,1309593312,13095,9868083232745789563
4,394307,330,"Albany, OR",msa,OR,39.0,ALBANY,97350,CLATSOP,44.555663,-122.930637,4104391904,41043,6107003858569160703


## Zillow Step 4. Check all MSAs are Mapped

In [8]:
assert(zillow_zip_referenced.filter("ZipCode is null").count() == 0)

# Download Social Vulnerability Index Data from CDC

In [9]:
import re

svi_data_file = "data/svi_data_2022.csv"
svi_data_file_exists = os.path.exists(svi_data_file)  # Does the SVI file aready exist?

# Read from Internet if mapping from Metro to Geo data is NOT already present
svi_data = (
    pd.read_csv(
        "https://svi.cdc.gov/Documents/Data/2022/csv/states/SVI_2022_US.csv",
        dtype=str,
    )
    .fillna("")
    .astype(str)
    if not svi_data_file_exists
    else pd.read_csv(svi_data_file, dtype=str).fillna("").astype(str)
)

svi_data = svi_data[
    [
        col
        for col in svi_data.columns
        if col[:3] not in ("MP_", "EP_") and col[:2] not in ("M_")
    ]
]

# Comprehensive rename dictionary based on the SVI documentation and image categories
rename_dict = {
    # Basic Information
    "LOCATION": "Location Description",
    "AREA_SQMI": "Area in Square Miles",
    # Population and Housing Estimates
    "E_TOTPOP": "Estimated Total Population",
    "M_TOTPOP": "Margin of Error for Total Population",
    "E_HU": "Estimated Housing Units",
    "M_HU": "Margin of Error for Housing Units",
    "E_HH": "Estimated Households",
    "M_HH": "Margin of Error for Households",
    # Socioeconomic Status (Theme 1)
    "EP_POV150": "Percentage Below 150% Poverty",
    "MP_POV150": "Margin of Error for Percentage Below 150% Poverty",
    "EP_UNEMP": "Unemployment Rate",
    "MP_UNEMP": "Margin of Error for Unemployment Rate",
    "EP_HBURD": "Percentage with Housing Cost Burden",
    "MP_HBURD": "Margin of Error for Housing Cost Burden",
    "EP_NOHSDP": "Percentage with No High School Diploma",
    "MP_NOHSDP": "Margin of Error for No High School Diploma",
    "EP_UNINSUR": "Percentage Uninsured",
    "MP_UNINSUR": "Margin of Error for Uninsured",
    # Household Characteristics (Theme 2)
    "EP_AGE65": "Percentage Aged 65 & Older",
    "MP_AGE65": "Margin of Error for Aged 65 & Older",
    "EP_AGE17": "Percentage Aged 17 & Younger",
    "MP_AGE17": "Margin of Error for Aged 17 & Younger",
    "EP_DISABL": "Percentage with a Disability",
    "MP_DISABL": "Margin of Error for Disability",
    "EP_SNGPNT": "Percentage Single-Parent Households",
    "MP_SNGPNT": "Margin of Error for Single-Parent Households",
    "EP_LIMENG": "Percentage with Limited English Proficiency",
    "MP_LIMENG": "Margin of Error for Limited English Proficiency",
    # Racial & Ethnic Minority Status (Theme 3)
    "EP_MINRTY": "Percentage Minority",
    "MP_MINRTY": "Margin of Error for Minority Status",
    "EP_HISP": "Percentage Hispanic or Latino",
    "MP_HISP": "Margin of Error for Hispanic or Latino",
    "EP_AFAM": "Percentage Black or African American, Not Hispanic",
    "MP_AFAM": "Margin of Error for Black or African American, Not Hispanic",
    "EP_ASIAN": "Percentage Asian, Not Hispanic",
    "MP_ASIAN": "Margin of Error for Asian, Not Hispanic",
    "EP_AIAN": "Percentage American Indian/Alaska Native, Not Hispanic",
    "MP_AIAN": "Margin of Error for American Indian/Alaska Native, Not Hispanic",
    "EP_NHPI": "Percentage Native Hawaiian/Pacific Islander, Not Hispanic",
    "MP_NHPI": "Margin of Error for Native Hawaiian/Pacific Islander, Not Hispanic",
    "EP_TWOMORE": "Percentage Two or More Races, Not Hispanic",
    "MP_TWOMORE": "Margin of Error for Two or More Races, Not Hispanic",
    "EP_OTHERRACE": "Percentage Other Race, Not Hispanic",
    "MP_OTHERRACE": "Margin of Error for Other Race, Not Hispanic",
    # Raw Estimates for Racial & Ethnic Groups
    "E_AFAM": "Estimated Count of Black or African American, Not Hispanic",
    "E_HISP": "Estimated Count of Hispanic or Latino (of any race)",
    "E_ASIAN": "Estimated Count of Asian, Not Hispanic",
    "E_AIAN": "Estimated Count of American Indian or Alaska Native, Not Hispanic",
    "E_NHPI": "Estimated Count of Native Hawaiian or Other Pacific Islander, Not Hispanic",
    "E_TWOMORE": "Estimated Count of Two or More Races, Not Hispanic",
    "E_OTHERRACE": "Estimated Count of Other Race, Not Hispanic",
    # Housing Type & Transportation (Theme 4)
    "EP_MUNIT": "Percentage in Multi-Unit Structures",
    "MP_MUNIT": "Margin of Error for Multi-Unit Structures",
    "EP_MOBILE": "Percentage Mobile Homes",
    "MP_MOBILE": "Margin of Error for Mobile Homes",
    "EP_CROWD": "Percentage Crowded Housing",
    "MP_CROWD": "Margin of Error for Crowded Housing",
    "EP_NOVEH": "Percentage with No Vehicle",
    "MP_NOVEH": "Margin of Error for No Vehicle",
    "EP_GROUPQ": "Percentage in Group Quarters",
    "MP_GROUPQ": "Margin of Error for Group Quarters",
    # Percentile Rankings for Each Theme
    "RPL_THEME1": "Rank for Socioeconomic Status",
    "RPL_THEME2": "Rank for Household Characteristics",
    "RPL_THEME3": "Rank for Racial & Ethnic Minority Status",
    "RPL_THEME4": "Rank for Housing Type & Transportation",
    "RPL_THEMES": "Overall Social Vulnerability Rank",
    # Flag Indicators
    "F_POV150": "Flag for Below 150% Poverty",
    "F_UNEMP": "Flag for Unemployment",
    "F_HBURD": "Flag for Housing Cost Burden",
    "F_NOHSDP": "Flag for No High School Diploma",
    "F_UNINSUR": "Flag for Uninsured",
    "F_THEME1": "Sum of Flags for Socioeconomic Status",
    "F_AGE65": "Flag for Aged 65 & Older",
    "F_AGE17": "Flag for Aged 17 & Younger",
    "F_DISABL": "Flag for Disability",
    "F_SNGPNT": "Flag for Single-Parent Households",
    "F_LIMENG": "Flag for Limited English",
    "F_THEME2": "Sum of Flags for Household Characteristics",
    "F_MINRTY": "Flag for Minority Status",
    "F_THEME3": "Sum of Flags for Racial & Ethnic Minority Status",
    "F_MUNIT": "Flag for Multi-Unit Housing",
    "F_MOBILE": "Flag for Mobile Homes",
    "F_CROWD": "Flag for Crowding",
    "F_NOVEH": "Flag for No Vehicle",
    "F_GROUPQ": "Flag for Group Quarters",
    "F_THEME4": "Sum of Flags for Housing Type & Transportation",
    "F_TOTAL": "Sum of All Flags",
    # Adjunct Variables (for context)
    "E_DAYPOP": "Estimated Daytime Population",
    "E_NOINT": "Households without Internet",
    "M_NOINT": "Margin of Error for Households without Internet",
}

# Apply the renaming to the DataFrame
# svi_data.rename(columns=rename_dict, inplace=True)

if not svi_data_file_exists:
    svi_data.to_csv(
        svi_data_file,
        index=False,
    )  # Save for later
    svi_data_file_exists = os.path.exists(svi_data_file)

# Stamp the DataFrame with Latitude, Longitude and S2Id from the geocodes DataFrame
svi_data_df = (
    spark.sql(
        f"""select a.*, c.Latitude, c.Longitude, substr(lpad(GeoID, 10, "0"), 0, 10) GeoID, CountyFIPS, ZIP, S2Id
    from {spark.createDataFrame(svi_data).withColumn('rowId', F.monotonically_increasing_id()).reg()} a
    left join {geocodes_df.reg()} c
    on substr(lpad(a.FIPS, 10, "0"), 0, 10)=substr(lpad(c.GeoID, 10, "0"), 0, 10) or (a.STCNTY=c.CountyFIPS and a.ST_ABBR=c.State)"""
    )
    .drop_duplicates(["rowId"])
    .cache()
)
svi_data_df.dshow()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
24/11/08 09:43:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/11/08 09:43:21 WARN TaskSetManager: Stage 34 contains a task of very large size (4670 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Unnamed: 0,ST,STATE,ST_ABBR,STCNTY,COUNTY,FIPS,LOCATION,AREA_SQMI,E_TOTPOP,E_HU,...,E_NHPI,E_TWOMORE,E_OTHERRACE,rowId,Latitude,Longitude,GeoID,CountyFIPS,ZIP,S2Id
0,1,Alabama,AL,1003,Baldwin County,1003010708,Census Tract 107.08; Baldwin County; Alabama,31.471145448452,11526,4313,...,0,477,49,26,30.521592,-87.749136,100390846,1003,36526,9843299529118305827
1,1,Alabama,AL,1003,Baldwin County,1003010711,Census Tract 107.11; Baldwin County; Alabama,1.370962487356,3890,1925,...,0,182,0,29,30.521592,-87.749136,100390846,1003,36526,9843299529118305827
2,1,Alabama,AL,1073,Jefferson County,1073000500,Census Tract 5; Jefferson County; Alabama,1.807211029014,2995,1540,...,0,0,0,474,33.517576,-86.817855,107392385,1073,35444,9838425136801829203
3,1,Alabama,AL,1097,Mobile County,1097003100,Census Tract 31; Mobile County; Alabama,2.380561302056,4711,1973,...,0,338,0,964,30.681075,-88.156308,109792187,1097,36689,9843264900684253535
4,4,Arizona,AZ,4005,Coconino County,4005000700,Census Tract 7; Coconino County; Arizona,2.3890833454,4667,1893,...,86,345,8,1677,35.515056,-111.63734,400591198,4005,85931,9741971041521842387


# Exporting Geomapped CDC and Zillow Data

This section of the code is responsible for exporting and persisting crosswalked files as CSVs.
Saving these files is crucial for future analysis as it allows us to maintain a record of the data transformations and mappings that have been applied. This ensures that we can trace back the steps taken during data processing, verify the accuracy of our transformations, and reproduce results if needed. Additionally, having these files saved facilitates further analysis and reporting, enabling us to build upon the existing data without reprocessing from scratch.


In [10]:
svi_data_df.toPandas().to_csv(
    "vulnerability_index.csv", index=False
)
zillow_zip_referenced.withColumnRenamed(latest_date, "DOM").toPandas().to_csv(
    "zillow_dom.csv", index=False
)

                                                                                

# Normalization and Scaling

Normalizing and standardizing the vulnerability indices correctly is essential for meaningful comparisons, especially when dealing with diverse regions with varying population sizes, geographic areas, and socioeconomic conditions. Here’s how to approach this:

### Step 1: Normalize Indices
First, adjust indices to make them population- or area-weighted as appropriate. This ensures that metrics like income, racial mix, or housing types account for regional population density or area. Common approaches include:
   - **Population-normalized values**: Convert counts to percentages (e.g., percentage of population below the poverty line).
   - **Area-normalized values**: Calculate values per unit area if certain indices depend on geography (e.g., housing density).

### Step 2: Standardize to Z-Scores (Unit Normal Scale)
Standardizing the indices helps you compare across metros with inherently different scales. This can be achieved using **z-scores**, which center the values around a mean of 0 and standard deviation of 1.

### Step 3: Standardize Zillow Metrics
Since Zillow metrics like home prices may vary widely across metros, it could also be beneficial to standardize these values. This ensures that analyses are focused on relative trends rather than absolute values.

### Step 4: Merge and Analyze
With standardized values, you can now merge the Zillow and SVI data, making meaningful comparisons between indices and housing metrics across regions.

In [11]:
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Load Zillow and SVI data as strings to prevent unwanted type conversions
zillow_data = pd.read_csv("zillow_dom.csv", dtype=str)
svi_data = pd.read_csv("vulnerability_index.csv", dtype=str)

# Drop rows from SVI data where population is zero (likely unusable data points)
svi_data = svi_data.loc[svi_data["E_TOTPOP"] != 0]

# Drop rows from Zillow data where 'DOM' (Days on Market) is null
zillow_data = zillow_data.loc[~zillow_data["DOM"].isna()]

# Convert 'DOM' column to float for further numeric operations
zillow_data['DENSITY_DOM'] = zillow_data['DOM'].astype(float)

# Define key columns for normalization calculations
population_col = "E_TOTPOP"       # Total population
area_col = "AREA_SQMI"            # Area in square miles
housing_units_col = "E_HU"        # Housing units count

# Calculate population density as people per square mile and add to SVI data
svi_data["POP_DENSITY"] = svi_data[population_col].astype(float) / svi_data[area_col].astype(float)

# Normalize key SVI metrics by population density to get density-based attributes
# Each calculation creates a new column in SVI data, e.g., 'DENSITY_POV150' for poverty density
svi_data["DENSITY_POV150"] = svi_data["E_POV150"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_UNEMP"] = svi_data["E_UNEMP"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_NOHSDP"] = svi_data["E_NOHSDP"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_UNINSUR"] = svi_data["E_UNINSUR"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_AGE65"] = svi_data["E_AGE65"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_AGE17"] = svi_data["E_AGE17"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_DISABL"] = svi_data["E_DISABL"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_SNGPNT"] = svi_data["E_SNGPNT"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_LIMENG"] = svi_data["E_LIMENG"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_MINRTY"] = svi_data["E_MINRTY"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_MUNIT"] = svi_data["E_MUNIT"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_MOBILE"] = svi_data["E_MOBILE"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_CROWD"] = svi_data["E_CROWD"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_NOVEH"] = svi_data["E_NOVEH"].astype(float) / svi_data["POP_DENSITY"]
svi_data["DENSITY_GROUPQ"] = svi_data["E_GROUPQ"].astype(float) / svi_data["POP_DENSITY"]

# List of SVI columns to be standardized later in the code
svi_columns_to_standardize = [
    "DENSITY_POV150", "DENSITY_UNEMP", "DENSITY_NOHSDP", "DENSITY_UNINSUR", 
    "DENSITY_AGE65", "DENSITY_AGE17", "DENSITY_DISABL", "DENSITY_SNGPNT", 
    "DENSITY_LIMENG", "DENSITY_MINRTY", "DENSITY_MUNIT", "DENSITY_MOBILE", 
    "DENSITY_CROWD", "DENSITY_NOVEH", "DENSITY_GROUPQ",
]

# Zillow columns to standardize, which currently includes only 'DENSITY_DOM'
zillow_columns_to_standardize = ["DENSITY_DOM"]

# Remove rows with null or invalid S2 IDs (spatial ID) from both datasets
zillow_data = zillow_data.dropna(subset=["S2Id"]).sort_values(by="S2Id")
svi_data = svi_data.dropna(subset=["S2Id"]).sort_values(by="S2Id")

# Convert S2Id to integer (assuming S2Id needs truncation and standardization)
zillow_data["S2Id"] = zillow_data["S2Id"].apply(lambda x: int(x[:16]) if x else None).astype(int)
svi_data["S2Id"] = svi_data["S2Id"].apply(lambda x: int(x[:16]) if x else None).astype(int)

# Perform an approximate merge on S2 IDs to match the nearest S2 ID between Zillow and SVI data
# This uses a "nearest" join on sorted S2 IDs to allow approximate spatial matching
merged_data = pd.merge_asof(
    zillow_data, svi_data, on="S2Id", direction="nearest", suffixes=("_zillow", "_svi")
)

# Standardize SVI and Zillow columns based on density (z-score normalization)
# This ensures each density column has a mean of 0 and standard deviation of 1 for consistent scaling
scaler = StandardScaler()
merged_data[svi_columns_to_standardize + zillow_columns_to_standardize] = scaler.fit_transform(
    merged_data[svi_columns_to_standardize + zillow_columns_to_standardize]
)

# Display and save the merged data to a CSV file
# The standardized data is saved to 'training_ready_data.csv' for further analysis
display(merged_data)
merged_data.to_csv('training_ready_data.csv', index=False)

Unnamed: 0,RegionID,SizeRank,RegionName,RegionType,StateName,DOM,City,ZipCode,County,Latitude_zillow,...,DENSITY_AGE17,DENSITY_DISABL,DENSITY_SNGPNT,DENSITY_LIMENG,DENSITY_MINRTY,DENSITY_MUNIT,DENSITY_MOBILE,DENSITY_CROWD,DENSITY_NOVEH,DENSITY_GROUPQ
0,394359,288,"Bangor, ME",msa,ME,41.0,BANGOR,4402,PENOBSCOT,44.99074795000001,...,-0.081803,-0.075641,-0.092566,-0.163310,-0.087080,-0.064732,-0.089347,-0.070081,-0.029047,-0.251746
1,394788,367,"Lewiston, ME",msa,ME,18.0,LEWISTON,4050,ANDROSCOGGIN,43.751804898765435,...,-0.090074,-0.096018,-0.092566,-0.178616,-0.086562,-0.073884,-0.101208,-0.082287,-0.075013,-0.172835
2,394997,105,"Portland, ME",msa,ME,29.0,PORTLAND,4101,CUMBERLAND,43.751804898765435,...,-0.090074,-0.096018,-0.092566,-0.178616,-0.086562,-0.073884,-0.101208,-0.082287,-0.075013,-0.172835
3,394353,339,"Augusta, ME",msa,ME,36.0,AUGUSTA,4050,KENNEBEC,43.751804898765435,...,-0.090074,-0.096018,-0.092566,-0.178616,-0.086562,-0.073884,-0.101208,-0.082287,-0.075013,-0.172835
4,394759,519,"Laconia, NH",msa,NH,39.0,LACONIA,3246,BELKNAP,43.52958755555556,...,-0.072527,-0.083714,-0.066159,-0.179198,-0.084591,-0.073884,-0.095713,-0.091870,-0.077453,-0.264472
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
327,394719,328,"Jamestown, NY",msa,NY,38.0,JAMESTOWN,11770,JEFFERSON,40.82983666857143,...,-0.095663,-0.103980,-0.092464,-0.177978,-0.089131,-0.073884,-0.101361,-0.091050,-0.080189,-0.264161
328,845159,86,"Poughkeepsie, NY",msa,NY,57.0,POUGHKEEPSIE,11770,JEFFERSON,40.82983666857143,...,-0.095663,-0.103980,-0.092464,-0.177978,-0.089131,-0.073884,-0.101361,-0.091050,-0.080189,-0.264161
329,394633,331,"Glens Falls, NY",msa,NY,33.0,GLENS FALLS,11770,JEFFERSON,40.82983666857143,...,-0.095663,-0.103980,-0.092464,-0.177978,-0.089131,-0.073884,-0.101361,-0.091050,-0.080189,-0.264161
330,394387,195,"Binghamton, NY",msa,NY,34.0,BINGHAMTON,11770,BROOME,40.82983666857143,...,-0.095663,-0.103980,-0.092464,-0.177978,-0.089131,-0.073884,-0.101361,-0.091050,-0.080189,-0.264161


# Analyze

Analyze the dataset for any Zillow metrics correlations with CDC vulnerability metrics (normalized/standardized) in a separate notebook