In [15]:
import numpy as np
import polars as pl
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

from sodapy import Socrata
from datetime import datetime
import mpu

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder, OrdinalEncoder
from sklearn.compose import ColumnTransformer 
from sklearn.model_selection import cross_val_score 

# Data Preprocessing

We follow a number of steps here that include the following:
1. Load our data
2. Sample data and reduce the total number of rows used
3. Convert datetime fields correctly
4. Filter out nulls
5. Drop irrelevant columns

In [16]:
DATA_CALGARY_PATH = Path("data/calgary_2017_2021.parquet")
SOCRATA_API_KEY = "" # this should be move to an env var
COLUMN_DROP_LIST = [
    "roll_number",
    "sub_property_use",
    "point",
    "land_size_sf",
    "land_size_ac",
    "unique_key",
    "comm_name",
    "assessment_class_description",
    "address",
    "nr_assessed_value",
    "re_assessed_value",
    "land_use_designation",
    "fl_assessed_value",
    "property_type"
]
CALGARY_CENTER_LAT_LONG = (51.047956, -114.068913)
DATE_RANGE_SELECTED = (datetime(2017,1,1), datetime(2021,1,1))

def load_data() -> pl.DataFrame:
    """Checks if parquet file exists, if it does, loads that. If not, downloads from 
    the socrata API. 

    :return: datafram containing our records
    """
    
    if DATA_CALGARY_PATH.exists():
        return pl.read_parquet(DATA_CALGARY_PATH)
    else:
        client = Socrata("data.calgary.ca", SOCRATA_API_KEY)
        records = client.get("6zp6-pxei",  limit=3_000_000)
        results_df = pl.from_records(records, infer_schema_length=2000)
        results_df = results_df.with_columns(pl.col("roll_year").str.strptime(pl.Date, fmt='%Y').cast(pl.Datetime))
        results_df.write_parquet(DATA_CALGARY_PATH)
        
def pre_process_data(df: pl.DataFrame, drop_col_list: list, date_range: tuple[datetime, datetime]) -> pl.DataFrame:
    """_summary_

    :param df: _description_
    :param drop_col_list: _description_
    :param date_range: _description_
    :param sample_ratio: ratio of dataset to use
    :return: _description_
    """
    # Drop unneeded columns
    processed_df = df.drop(drop_col_list)
    
    # Reduce dataset to specified date range
    print(f"Filtering to set date range")
    len_prior = len(processed_df)
    processed_df = processed_df.filter(pl.col("roll_year").is_between(*date_range))
    print(f"Filtering for date complete. Old len: {len_prior}, new_len: {len(processed_df)}")
    
    # Filter dataset to residential properties only
    processed_df = processed_df.filter(pl.col("assessment_class") == "RE")
    
    # Cast to correct types
    processed_df  = processed_df.with_columns(pl.col(["assessed_value", "year_of_construction"]).cast(pl.Int32))
    processed_df = processed_df.with_columns(pl.col("land_size_sm").cast(pl.Float64))
    processed_df = processed_df.with_columns(pl.col(["longitude", "latitude"]).cast(pl.Float64))
    
    
    # Drop nulls
    processed_df = processed_df.drop_nulls() # will drop nulls across all columns 
    
    # Drop assessment class column since it is no longer required
    processed_df = processed_df.drop(["assessment_class"])

    return processed_df
    
    
    


# Feature Engineering

In order to capitalize on our dataset, we do some feature engineering that incldues: 

1. Calculate distance from city center
2. Find and set quadrant for each location 
3. One hot encode categorical columns
   1. Reduce cardinality of columns that have too many categories

We suspect that these features may provide some additional information when it comes to modeeling our dataset. 

In [17]:
def feature_engineer(df: pl.DataFrame) -> pl.DataFrame:
    """Do a few feature engineering things! 

    :param df: _description_
    :return: _description_
    """
    
    def cal_distance_from_center(lat: float, long: float) -> float:
        return mpu.haversine_distance((lat,long), CALGARY_CENTER_LAT_LONG)
    
    def quadrant(lat: float, long: float) -> str:
        NS = "N" if lat > CALGARY_CENTER_LAT_LONG[0] else "S"
        EW = "E" if long > CALGARY_CENTER_LAT_LONG[1] else "W"
        
        return NS + EW
    
    
    comm_codes = df.groupby("comm_code").agg([pl.count().alias("Count")]).sort("Count", descending=True)
    
    def map_community(comm_code):
        """Used for reducing cardinality in the high cardinality comm_code column! 
        Only keep values that have higher than 5000 occurences
        :param comm_code: _description_
        """
        if comm_code in comm_codes.filter(pl.col("Count") > 5000)["comm_code"].to_list():
            return comm_code
        else:
            return "Other"
    
    # Add columns for city quadrant and distance from center of the city 
    
    processed_df = df.with_columns(
        df.select(
            pl.struct(
                ["latitude", "longitude"]
            ).apply(
                lambda x: cal_distance_from_center(x["latitude"], x["longitude"]), pl.Float64
            ).alias("distance_from_center")
        )
    )
    processed_df = processed_df.with_columns(
        processed_df.select(
            pl.struct(
                ["latitude", "longitude"]
            ).apply(
                lambda x: quadrant(x["latitude"], x["longitude"]), pl.Utf8
            ).alias("quadrant")
        )
    )
    
    processed_df = processed_df.with_columns(
        processed_df.select(
            pl.struct(
                ["comm_code"]
            ).apply(
                lambda x: map_community(x["comm_code"]), pl.Utf8
            ).alias("comm_code")
        )
    )
    
    # Convert our date time column into a year column as that is the most useful piece of info there! 
    processed_df = processed_df.with_columns(
        pl.col("roll_year").dt.year().alias("roll_year")
    )
    
    return processed_df

# Prepare data for training testing! 

At this stage, we need to split the data into a training and test set.
We also need to convert data into a pandas format as sklearn pipeline column transformers don't work 
on polars dataframes.

We will also sample our dataset to reduce the training load

In [18]:
def preapre_data_for_training(df: pl.DataFrame, sample_frac: float = 0.3) -> tuple[pd.DataFrame, pd.DataFrame]:
    """We will do some final prep on our data to make sure its ready to go for training! 

    :param df: _description_
    :param sample_frac: _description_
    :return: _description_
    """
    
    processed_df = df.sample(frac=sample_frac, with_replacement=False, seed=42)
    
    X = processed_df.drop("assessed_value").to_pandas()
    y = processed_df["assessed_value"].to_pandas()
    
    return (X, y)
    

# We build our training pipeline here!

1. We first put together the pipeline via bunch of transformers on our columns
2. Then we put it all together as a sklearn pipeline 
3. We then run our training cross validatio to get our results 

In [19]:
def run_training(X: pd.DataFrame, y:pd.Series) -> np.array:
    ct = ColumnTransformer(
        [
            ("one_hot_commc_code", OneHotEncoder(), ["comm_code"]),
            ("one_hot_quadrant", OneHotEncoder(), ["quadrant"]),
            ("std_scaler_land_size", StandardScaler(), ["land_size_sm"]),
            ("std_scaler_distance_from_center", StandardScaler(), ["distance_from_center"]),
        ],
        remainder="passthrough",
    )

    regression_pipeline = Pipeline(
        steps=[("preprocessor", ct), ("regressor", GradientBoostingRegressor(random_state=42))]
    )
    scores = cross_val_score(regression_pipeline, X, y, cv=5, scoring="neg_root_mean_squared_error")
    
    return scores

# Putting it all together!

1. First we call our data loading function
2. We then pass that to our pre-processor
3. We then do some final touch ups on the data
4. We then run our training
5. We then show our training results!

In [20]:
# Prep data
data = load_data()
pre_processed_data = pre_process_data(data, COLUMN_DROP_LIST, DATE_RANGE_SELECTED)
engineered_data = feature_engineer(pre_processed_data)
X,y = preapre_data_for_training(engineered_data, 0.3)

# Run Training
scores = run_training(X, y)

Filtering to set date range
Filtering for date complete. Old len: 2432429, new_len: 2432429


In [21]:
print(scores)

[ -848298.43529913  -781377.23648934 -1041393.92058521  -737743.58112529
  -924187.35656956]
