# ETL For Zillow Research Data.
---

#### Set up Logging for ETL
----

In [2]:
import logging

# Create a custom logger
logger = logging.getLogger('zillow_etl')

# Set the log level
logger.setLevel(logging.DEBUG)

# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler('etl.log')
c_handler.setLevel(logging.WARNING)
f_handler.setLevel(logging.DEBUG)

# Create formatters and add them to handlers
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)

# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)

logger.info("Logger for Zillow ETL initialized")


## Extraction
---

#### Data URLs

In [3]:
zhvi_data_url = 'https://files.zillowstatic.com/research/public_csvs/zhvi/Metro_zhvi_uc_sfrcondo_tier_0.33_0.67_sm_sa_month.csv?t=1716516118'
zhvf_data_url = 'https://files.zillowstatic.com/research/public_csvs/zhvf_growth/Metro_zhvf_growth_uc_sfrcondo_tier_0.33_0.67_sm_sa_month.csv?t=1716516118'
rental_data_url = 'https://files.zillowstatic.com/research/public_csvs/zori/Metro_zori_uc_sfrcondomfr_sm_month.csv?t=1716516118'
listings_data_url = 'https://files.zillowstatic.com/research/public_csvs/invt_fs/Metro_invt_fs_uc_sfrcondo_sm_month.csv?t=1717335854'
sales_data_url = 'https://files.zillowstatic.com/research/public_csvs/sales_count_now/Metro_sales_count_now_uc_sfrcondo_month.csv?t=1717335854'
days_market_data_url = 'https://files.zillowstatic.com/research/public_csvs/mean_doz_pending/Metro_mean_doz_pending_uc_sfrcondo_sm_month.csv?t=1717335854'
market_heat_data_url ='https://files.zillowstatic.com/research/public_csvs/market_temp_index/Metro_market_temp_index_uc_sfrcondo_month.csv?t=1717335854'
new_construction_data_url = 'https://files.zillowstatic.com/research/public_csvs/new_con_sales_count_raw/Metro_new_con_sales_count_raw_uc_sfrcondo_month.csv?t=1717335854'

###Data Extraction
---

In [4]:
import pandas as pd
import aiohttp
import asyncio

async def fetch_data(url: str) -> pd.DataFrame:
    logger.info(f"Starting fetch_data from URL: {url}")
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                data = await response.text()
                df = pd.read_csv(pd.compat.StringIO(data))
                logger.info("Data fetching complete")
                logger.debug(f"Data fetched: {df.head()}")
                return df
    except Exception as e:
        logger.error("Error during data fetching", exc_info=True)
        raise

#### DataFrames
----

In [5]:
async def fetch_all_data(urls: list) -> list:
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks)


# List of URLs
urls = [
    zhvi_data_url, zhvi_data_url, rental_data_url, listings_data_url,
    sales_data_url, days_market_data_url, market_heat_data_url, new_construction_data_url
]

# Fetch all data
dataframes = asyncio.run(fetch_all_data(urls))

RuntimeError: asyncio.run() cannot be called from a running event loop

## Transform
---

#### Data Cleaning
---



*   Drop any columns with 50 percent or more missing Data.
*   Any columns with missing data will be imputed.
* Convert date columns as their own seperate columns as 'Date', convert as datetime object data type
* Convert any unecessary float values to int32.



##### Dropping incomplete data with 50 percent or more empty data
----

In [None]:

import pandas as pd

def purge_data(df: pd.DataFrame) -> pd.DataFrame:
    purge_columns = [ c for c in df.columns if df[c].isna().mean()*100 > 50]
    purged_df = df

    for c in purge_columns:
      purged_df = df.drop(c, axis=1)
    return purged_df


In [None]:
purged_zhvi_dataframe = purge_data(zhvi_dataframe)
purged_zhvf_dataframe = purge_data(zhvf_dataframe)
purged_rental_dataframe = purge_data(rental_dataframe)
purged_sales_dataframe = purge_data(sales_dataframe)
purged_days_market_dataframe = purge_data(days_market_dataframe)


#### Filling Missing Data
---

In [None]:
import numpy as np
from sklearn.impute import SimpleImputer

def impute_numerical_data(df: pd.DataFrame, strategy: str) -> pd.DataFrame:
    logger.info(f"Starting impute_numerical_data with strategy: {strategy}")
    try:
        # Separate numerical and categorical columns
        numerical_data = df.select_dtypes(include=np.number)
        categorical_data = df.select_dtypes(exclude=np.number)

        logger.debug(f"Numerical columns: {numerical_data.columns.tolist()}")
        logger.debug(f"Categorical columns: {categorical_data.columns.tolist()}")

        # Initialize SimpleImputer with the given strategy
        imputer = SimpleImputer(missing_values=np.nan, strategy=strategy)

        # Fit and transform the numerical data
        imputed_numerical_data = imputer.fit_transform(numerical_data)

        # Convert the numpy array back to a DataFrame
        imputed_numerical_df = pd.DataFrame(imputed_numerical_data, columns=numerical_data.columns)

        # Concatenate the categorical data and the imputed numerical data
        imputed_df = pd.concat([categorical_data.reset_index(drop=True), imputed_numerical_df], axis=1)

        logger.info("Numerical Imputation complete")
        return imputed_df

    except Exception as e:
        logger.error("Error during imputation", exc_info=True)
        raise


In [None]:
def impute_categorical_data(df: pd.DataFrame, strategy: str) -> pd.DataFrame:
    try:
        logger.info(f"Starting impute_categorical_data with strategy: {strategy}")

        # Separate numerical and categorical columns
        numerical_data = df.select_dtypes(include=np.number)
        categorical_data = df.select_dtypes(exclude=np.number)

        logger.debug(f"Numerical columns: {numerical_data.columns.tolist()}")
        logger.debug(f"Categorical columns: {categorical_data.columns.tolist()}")

        # Initialize SimpleImputer with the given strategy
        simp = SimpleImputer(strategy=strategy)

        # Fit and transform the categorical data
        imp_cat_data = simp.fit_transform(categorical_data)

        # Convert the numpy array back to a DataFrame
        imp_cat_data_df = pd.DataFrame(imp_cat_data, columns=categorical_data.columns)

        # Concatenate the imputed categorical data and the numerical data
        new_df = pd.concat([imp_cat_data_df, numerical_data], axis=1)

        logger.info("Categorical Imputation Complete")
        return new_df

    except Exception as e:
        logger.error("Error during imputation", exc_info=True)
        raise

In [None]:
def preprocess_data(df: pd.DataFrame, cat_strat : str, num_strat: str) -> pd.DataFrame:
  purged_df = purge_data(df)
  num_imp_purged_df = impute_numerical_data(purged_df, num_strat)
  cat_imputed_df = impute_categorical_data(num_imp_purged_df, cat_strat)
  return cat_imputed_df


In [None]:
preprocessed_zhvi_dataframe = preprocess_data(zhvi_dataframe, 'most_frequent', 'median')
preprocessed_zhvf_dataframe = preprocess_data(zhvf_dataframe, 'most_frequent', 'median')
preprocessed_rental_dataframe = preprocess_data(rental_dataframe, 'most_frequent', 'median')
preprocessed_listings_dataframe = preprocess_data(listings_dataframe, 'most_frequent', 'median')
preprocessed_sales_dataframe = preprocess_data(sales_dataframe, 'most_frequent','median')
preprocessed_days_dataframe = preprocess_data(days_market_dataframe,'most_frequent','median')
preprocessed_market_heat_dataframe = preprocess_data(market_heat_dataframe,'most_frequent','median')
preprocessed_new_construction_dataframe = preprocess_data(new_construction_dataframe,'most_frequent','median')

In [None]:
preprocessed_zhvi_dataframe = preprocess_data(zhvi_dataframe, 'most_frequent', 'median')
preprocessed_zhvf_dataframe = preprocess_data(zhvf_dataframe, 'most_frequent', 'median')
preprocessed_rental_dataframe = preprocess_data(rental_dataframe, 'most_frequent', 'median')
preprocessed_listings_dataframe = preprocess_data(listings_dataframe, 'most_frequent', 'median')
preprocessed_sales_dataframe = preprocess_data(sales_dataframe, 'most_frequent','median')
preprocessed_days_dataframe = preprocess_data(days_market_dataframe,'most_frequent','median')
preprocessed_market_heat_dataframe = preprocess_data(market_heat_dataframe,'most_frequent','median')
preprocessed_new_construction_dataframe = preprocess_data(new_construction_dataframe,'most_frequent','median')

#### Transforming date columns
---

In [None]:
def melt_dates(df: pd.DataFrame, valName: str) -> pd.DataFrame:

    id_columns = [c for c in df.columns if '-' not in c]
    date_columns = [c for c in df.columns if '-' in c]
    df = pd.melt(frame=df, id_vars=id_columns, var_name='date', value_name=valName, ignore_index=True)
    #date type conversion
    df['date'] = pd.to_datetime(df['date'])
    return df


#### Currency formatting
---

In [None]:
def format_currency(x):
    return "${:,.2f}".format(x)

In [None]:
tr_rental_df = melt_dates(preprocessed_rental_dataframe, 'rent')

In [None]:
tr_rental_df['rent'] = tr_rental_df['rent'].astype('float')

In [None]:
preprocessed_zhvi_dataframe = preprocess_data(zhvi_dataframe,'most_frequent','median')

In [None]:
tr_zhvi_df = melt_dates(preprocessed_zhvi_dataframe, 'ZHVI')

In [None]:
tr_zhvi_df['ZHVI'] = tr_zhvi_df['ZHVI'].apply(format_currency)

In [None]:
tr_zhvf_df = melt_dates(preprocessed_zhvf_dataframe, 'ZHVF')

In [None]:
tr_listings_df = melt_dates(preprocessed_listings_dataframe,'Listings')

In [None]:
tr_sales_df = melt_dates(preprocessed_sales_dataframe, 'Sales')

In [None]:
tr_sales_df.head()

In [None]:
tr_days_market_df = melt_dates(preprocessed_days_dataframe,'Days')

In [None]:
tr_market_heat_df = melt_dates(preprocessed_market_heat_dataframe, 'Market Heat')

In [None]:
tr_market_heat_df.head()

In [None]:
tr_new_construction_df = melt_dates(preprocessed_new_construction_dataframe,'New Construction')

In [None]:
tr_new_construction_df.head()

#### Feature Engineering
---
* Season
* Political Leaning


#### Seasons
---

In [None]:
def map_season(df: pd.DataFrame) -> pd.DataFrame:
  month_to_season = {
    12: 'Winter', 1: 'Winter', 2: 'Winter',
    3: 'Spring', 4: 'Spring', 5: 'Spring',
    6: 'Summer', 7: 'Summer', 8: 'Summer',
    9: 'Fall', 10: 'Fall', 11: 'Fall'
  }
  df['Season'] = df['date'].dt.month.map(month_to_season)
  return df

In [None]:
#rental data
tr_rental_df = map_season(tr_rental_df)
#ZHVI
tr_zhvi_df = map_season(tr_zhvi_df)
#ZHVF
tr_zhvf_df = map_season(tr_zhvf_df)
#Listings
tr_listings_df = map_season(tr_listings_df)
#Sales
tr_sales_df = map_season(tr_sales_df)
#DaysMarket
tr_days_market_df = map_season(tr_days_market_df)

In [None]:
tr_rental_df.head()

In [None]:
tr_rental_df['RegionID'] = tr_rental_df['RegionID'].astype('int32')
tr_rental_df['SizeRank'] = tr_rental_df['SizeRank'].astype('int32')

In [None]:
tr_rental_df.head()

In [None]:
tr_days_market_df.head()

In [None]:
tr_zhvi_df.head()

In [None]:
tr_zhvi_df['RegionID'] = tr_zhvi_df['RegionID'].astype('int32')
tr_zhvi_df['SizeRank'] = tr_zhvi_df['SizeRank'].astype('int32')

In [None]:
tr_zhvi_df.head()

In [None]:
tr_listings_df.head()

In [None]:
tr_listings_df['Listings'] = tr_listings_df['Listings'].astype('int32')
tr_listings_df['RegionID'] = tr_listings_df['RegionID'].astype('int32')
tr_listings_df['SizeRank'] = tr_listings_df['SizeRank'].astype('int32')

In [None]:
tr_listings_df.head()

In [None]:
tr_sales_df.head()

In [None]:
tr_days_market_df.head()

In [None]:
tr_days_market_df['RegionID'] = tr_days_market_df['RegionID'].astype('int32')
tr_days_market_df['SizeRank'] = tr_days_market_df['SizeRank'].astype('int32')
tr_days_market_df['Days'] = tr_days_market_df['Days'].astype('int32')

In [None]:
tr_days_market_df.head()

#### Political Leaning
---

In [None]:
def map_political_leaning(df: pd.DataFrame) -> pd.DataFrame:
  state_political_leaning = {
    'AL': 'Red',
    'AK': 'Red',
    'AZ': 'Red',  # Note: Arizona has been a battleground state, but leaned Republican historically.
    'AR': 'Red',
    'CA': 'Blue',
    'CO': 'Blue',  # Note: Colorado has been trending blue in recent elections.
    'CT': 'Blue',
    'DE': 'Blue',
    'FL': 'Red',  # Note: Florida is a swing state but leaned Republican recently.
    'GA': 'Red',  # Note: Georgia has been a battleground state, leaning Democratic in 2020.
    'HI': 'Blue',
    'ID': 'Red',
    'IL': 'Blue',
    'IN': 'Red',
    'IA': 'Red',  # Note: Iowa has swung between parties but leaned Republican recently.
    'KS': 'Red',
    'KY': 'Red',
    'LA': 'Red',
    'ME': 'Blue',  # Note: Maine can split its electoral votes, with one district leaning Republican.
    'MD': 'Blue',
    'MA': 'Blue',
    'MI': 'Blue',  # Note: Michigan is a battleground state, leaning Democratic in recent elections.
    'MN': 'Blue',
    'MS': 'Red',
    'MO': 'Red',
    'MT': 'Red',
    'NE': 'Red',  # Note: Nebraska can split its electoral votes, with one district leaning Democratic.
    'NV': 'Blue',  # Note: Nevada has been leaning Democratic recently.
    'NH': 'Blue',
    'NJ': 'Blue',
    'NM': 'Blue',
    'NY': 'Blue',
    'NC': 'Red',  # Note: North Carolina is a battleground state.
    'ND': 'Red',
    'OH': 'Red',  # Note: Ohio has swung between parties but leaned Republican recently.
    'OK': 'Red',
    'OR': 'Blue',
    'PA': 'Blue',  # Note: Pennsylvania is a battleground state, leaning Democratic in recent elections.
    'RI': 'Blue',
    'SC': 'Red',
    'SD': 'Red',
    'TN': 'Red',
    'TX': 'Red',  # Note: Texas has been a solidly Republican state but is becoming more competitive.
    'UT': 'Red',
    'VT': 'Blue',
    'VA': 'Blue',  # Note: Virginia has been trending blue in recent elections.
    'WA': 'Blue',
    'WV': 'Red',
    'WI': 'Blue',  # Note: Wisconsin is a battleground state, leaning Democratic in recent elections.
    'WY': 'Red'
  }

  df['Political Leaning'] = df['StateName'].map(state_political_leaning)
  return df

In [None]:
#ZHVI Data
tr_zhvi_df = map_political_leaning(tr_zhvi_df)
#ZHVF Data
tr_zhvf_df = map_political_leaning(tr_zhvf_df)
#Rental Data
tr_rental_df = map_political_leaning(tr_rental_df)
#Listings Data
tr_listings_df = map_political_leaning(tr_listings_df)

## Loading Data
---

#### Load data to sqlite db
---

In [None]:
import sqlite3
conn = sqlite3.connect('../data/Zillow/zillow.db')

In [None]:
#rental data
tr_rental_df.to_sql('rent', conn,if_exists='replace')
#ZHVF
tr_zhvf_df.to_sql('zhvf', conn, if_exists='replace')
#ZHVI
tr_zhvi_df.to_sql('zhvi', conn, if_exists='replace')
#Listings
tr_listings_df.to_sql('listings', conn, if_exists='replace')
#Sales
tr_sales_df.to_sql('sales', conn, if_exists='replace')
#DaysMarket
tr_days_market_df.to_sql('days_market', conn, if_exists='replace')
conn.commit()
conn.close()