# Building a Model

In this chapter we'll learn how to choose which type of model we want. Then we will learn how to apply our data to the model and evaluate it. Lastly, we'll learn how to interpret the results and save the model for later!

## Preparing the environment

### Importing libraries

In [1]:
import pandas as pd
import seaborn as sns
import sys
import matplotlib.pyplot as plt

from environment import histogram_boxplot, linear_model_plot, labeled_barplot
from typing import List
from pprint import pprint
from datetime import timedelta, date

from pyspark.sql.types import (_parse_datatype_string, StructType, StructField,
                               DoubleType, IntegerType, StringType)
from pyspark.sql import SparkSession, Row, functions as F
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import (Binarizer, Bucketizer, OneHotEncoder, StringIndexer,
                                VectorAssembler)
from pyspark.ml.regression import (RandomForestRegressor, RandomForestRegressionModel,
                                   GBTRegressor, GBTRegressionModel)
from pyspark.ml.evaluation import RegressionEvaluator

### Connect to Spark

In [2]:
spark = SparkSession.builder.getOrCreate()

# eval DataFrame in notebooks
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

# activate gpu
spark.conf.set("spark.driver.resource.gpu.amount","1")

In [3]:
sc = spark.sparkContext

### Checking the version of PySpark and Python

In [4]:
print('Spark  version:', spark.version)
print('Python version:', sys.version_info)

Spark  version: 3.5.1
Python version: sys.version_info(major=3, minor=11, micro=9, releaselevel='final', serial=0)


### Loading data

In [5]:
# read file
real_state = spark.read.csv('data-sources/2017_StPaul_MN_Real_Estate.csv', header=True, inferSchema=True)

# save to parquet format
file_path = 'spark-warehouse/real_state.parquet'
real_state.write.parquet(file_path, mode="overwrite")
real_state = spark.read.parquet(file_path)
real_state.createOrReplaceTempView("real_state")
print('Dataframe shape:', (real_state.count(), len(real_state.columns)))
real_state.printSchema()
real_state.limit(2)

Dataframe shape: (5000, 74)
root
 |-- No.: integer (nullable = true)
 |-- MLSID: string (nullable = true)
 |-- StreetNumberNumeric: integer (nullable = true)
 |-- streetaddress: string (nullable = true)
 |-- STREETNAME: string (nullable = true)
 |-- PostalCode: integer (nullable = true)
 |-- StateOrProvince: string (nullable = true)
 |-- City: string (nullable = true)
 |-- SalesClosePrice: integer (nullable = true)
 |-- LISTDATE: string (nullable = true)
 |-- LISTPRICE: integer (nullable = true)
 |-- LISTTYPE: string (nullable = true)
 |-- OriginalListPrice: integer (nullable = true)
 |-- PricePerTSFT: double (nullable = true)
 |-- FOUNDATIONSIZE: integer (nullable = true)
 |-- FENCE: string (nullable = true)
 |-- MapLetter: string (nullable = true)
 |-- LotSizeDimensions: string (nullable = true)
 |-- SchoolDistrictNumber: string (nullable = true)
 |-- DAYSONMARKET: integer (nullable = true)
 |-- offmarketdate: string (nullable = true)
 |-- Fireplaces: integer (nullable = true)
 |-- R

No.,MLSID,StreetNumberNumeric,streetaddress,STREETNAME,PostalCode,StateOrProvince,City,SalesClosePrice,LISTDATE,LISTPRICE,LISTTYPE,OriginalListPrice,PricePerTSFT,FOUNDATIONSIZE,FENCE,MapLetter,LotSizeDimensions,SchoolDistrictNumber,DAYSONMARKET,offmarketdate,Fireplaces,RoomArea4,roomtype,ROOF,RoomFloor4,PotentialShortSale,PoolDescription,PDOM,GarageDescription,SQFTABOVEGROUND,Taxes,RoomFloor1,RoomArea1,TAXWITHASSESSMENTS,TAXYEAR,LivingArea,UNITNUMBER,YEARBUILT,ZONING,STYLE,ACRES,CoolingDescription,APPLIANCES,backonmarketdate,ROOMFAMILYCHAR,RoomArea3,EXTERIOR,RoomFloor3,RoomFloor2,RoomArea2,DiningRoomDescription,BASEMENT,BathsFull,BathsHalf,BATHQUARTER,BATHSTHREEQUARTER,Class,BATHSTOTAL,BATHDESC,RoomArea5,RoomFloor5,RoomArea6,RoomFloor6,RoomArea7,RoomFloor7,RoomArea8,RoomFloor8,Bedrooms,SQFTBELOWGROUND,AssumableMortgage,AssociationFee,ASSESSMENTPENDING,AssessedValuation
1,RMLS,11511,11511 Stillwater ...,Stillwater,55042,MN,LELM - Lake Elmo,143000,7/15/2017 0:00,139900,Exclusive Right,139900,145.9184,980,Other,C4,279X200,834 - Stillwater,10,7/30/2017 0:00,0,12 x 9,"Living Room, Dini...",,Main,No,,10,Attached Garage,980,1858,Main,16 x 13,1858.0,2017,980,,1950,Residential-Single,(SF) One Story,1.28,Central,"Range, Dishwasher...",,,,Vinyl,,Main,9 x 7,Eat In Kitchen,Full,1,1,0,0,SF,2,Main Floor 3/4 Ba...,13 x 11,Main,10 x 10,Main,,,,,3,0,,0,Unknown,0.0
2,RMLS,11200,11200 31st St N,31st,55042,MN,LELM - Lake Elmo,190000,10/9/2017 0:00,210000,Exclusive Right,210000,85.2783,1144,,C1,100x140,834 - Stillwater,4,10/13/2017 0:00,0,11x11,"Living Room, Dini...","Asphalt Shingles,...",Main,No,,4,"Attached Garage, ...",1268,1640,Main,22x14,1640.0,2017,2228,,1971,Residential-Single,(SF) Split Entry ...,0.32,Central,"Range, Microwave,...",,Lower Level,22x14,Vinyl,Lower,Main,11x12,Informal Dining R...,"Full, Partial Fin...",1,0,0,2,SF,3,Main Floor Full B...,15x11,Main,14x11,Main,10x11,Main,11x11,Lower,4,960,,0,Unknown,0.0


In [6]:
# read file
coord_df = spark.read.csv('data-sources/real-state-only-coord.csv', header=True, inferSchema=True)

# save to parquet format
file_path = 'spark-warehouse/real_state_coord.parquet'
coord_df.write.parquet(file_path, mode="overwrite")
coord_df = spark.read.parquet(file_path)
coord_df.createOrReplaceTempView("coord")
print('Dataframe shape:', (coord_df.count(), len(coord_df.columns)))
coord_df.printSchema()
coord_df.limit(2)

Dataframe shape: (5000, 3)
root
 |-- No.: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



No.,latitude,longitude
1,45.00585,-92.87286
2,44.99488,-92.87927


In [7]:
# read file
neighborhood_walk_df = spark.read.csv('data-sources/walk.csv', header=True, inferSchema=True)

# cast to int
for col_name in ['bikescore', 'transitscore']:
    neighborhood_walk_df = neighborhood_walk_df.withColumn(col_name, neighborhood_walk_df[col_name].cast('int'))
    
# save to parquet format
file_path = 'spark-warehouse/neighborhood_walk.parquet'
neighborhood_walk_df.write.parquet(file_path, mode="overwrite")
neighborhood_walk_df = spark.read.parquet(file_path)
neighborhood_walk_df.createOrReplaceTempView("neighborhood_walk")
print('Dataframe shape:', (neighborhood_walk_df.count(), len(neighborhood_walk_df.columns)))
neighborhood_walk_df.printSchema()
neighborhood_walk_df.limit(2)

Dataframe shape: (4243, 5)
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- walkscore: integer (nullable = true)
 |-- bikescore: integer (nullable = true)
 |-- transitscore: integer (nullable = true)



latitude,longitude,walkscore,bikescore,transitscore
44.92635,-93.18928,61,72,
44.9581,-93.17982,74,83,


In [8]:
# read file
hist_realstate_price_df = spark.read.csv('data-sources/real-state-medianprices.csv', 
                                         header=True, inferSchema=True)

# save to parquet format
hist_realstate_price_df.createOrReplaceTempView("hist_realstate_price")
print('Dataframe shape:', (hist_realstate_price_df.count(), len(hist_realstate_price_df.columns)))
hist_realstate_price_df.printSchema()
hist_realstate_price_df.limit(2)

Dataframe shape: (10, 3)
root
 |-- MedianCity: string (nullable = true)
 |-- MedianHomeValue: integer (nullable = true)
 |-- MedianYear: integer (nullable = true)



MedianCity,MedianHomeValue,MedianYear
LELM - Lake Elmo,401000,2016
MAPW - Maplewood,193000,2016


In [9]:
# Loading the data
mort_df = spark.read.csv('data-sources/mortage-data.csv', header=True, inferSchema=True)
mort_df = mort_df.withColumn('DATE', F.to_date('DATE', format='M/d/yyyy'))
hist_realstate_price_df.createOrReplaceTempView("mortage")
print('Dataframe shape:', (mort_df.count(), len(mort_df.columns)))
mort_df.printSchema()
mort_df.show(2)

Dataframe shape: (261, 2)
root
 |-- DATE: date (nullable = true)
 |-- MORTGAGE30US: double (nullable = true)

+----------+------------+
|      DATE|MORTGAGE30US|
+----------+------------+
|2013-10-10|        4.23|
|2013-10-17|        4.28|
+----------+------------+
only showing top 2 rows



### Tables catalogue

In [10]:
spark.catalog.listTables()

[Table(name='coord', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='hist_realstate_price', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mortage', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='neighborhood_walk', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='real_state', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

### Util funtions

In [11]:
# Define a function for splitting list columns in a set of additional features
def split_and_pivot(df: pd.DataFrame, split_col: str, split_char=', ') -> pd.DataFrame:
    '''Return a new DataFrame where a column that can be interpreted as list is replaced
    with a new set of columns where each item is counted in each column.
    p.e. col1 in df have strings like 'itema, itemb, itemc', the resulted df will contain 
    the columns col1_itema, col1_itemb, col1_itemc instead of col1.
    Input:
        pd: DataFrame
        split_col: String with the name of the column to split and then explode
        split_char: Characters to make the split on, default is ', '
    Return New Dataframe'''

    # Prepare the dataframe
    df = (df.withColumn('ROW_ID', F.monotonically_increasing_id() + 1)   # RowID column for pivot
            .withColumn(f'{split_col}_list', F.split(df[split_col], ', ')))
    
    # Explode the values into new records
    ex_df = (df.withColumn(f'ex_{split_col}_list', F.explode(df[f'{split_col}_list']))
               .withColumn(f'ex_{split_col}_list', F.concat(F.lit(f'{split_col}_'), f'ex_{split_col}_list'))
               .withColumn('constant_val', F.lit(1)))                       # Dummy column fot pivot
                 
    # Pivot on the exploded column
    piv_df = (ex_df.groupBy('ROW_ID')
                   .pivot(f'ex_{split_col}_list')
                   .agg(F.coalesce(F.first('constant_val'))))
    binary_cols = piv_df.columns[1:]

    # Join the dataframes together
    joined_df = df.join(piv_df, on='ROW_ID', how='left')

    # Dropping dummy columns
    joined_df = joined_df.drop(*['ROW_ID', f'{split_col}_list', split_col])

    return joined_df.repartition(5), binary_cols

In [12]:
# Define a function for pivoting catego columns
def pivot_catego_col(df: pd.DataFrame, catego_col: str) -> pd.DataFrame:
    '''Return a new DataFrame where a catego column is transposed into a set of columns.
    p.e. col1 in df can have one value from these options: a, b, the resulted df will contain 
    the columns col1:a, col1:b, col1:c instead of the catego col1.
    Input:
        pd: DataFrame
        catego_col: String with the name of the column to pivot
    Return New Dataframe'''

    # Prepare the dataframe
    null_col = 'NULL_COL'
    df = (df.withColumn('ROW_ID', F.monotonically_increasing_id() + 1)   # RowID column for pivot=
            .withColumn(catego_col, F.concat(F.lit(f'{catego_col}:'), catego_col))
            .withColumn('constant_val', F.lit(1))                        # Dummy column fot pivot
            .fillna(null_col, subset=[catego_col])
    )                       
              
    # Pivot on the catego column
    piv_df = (df.groupBy('ROW_ID')
                .pivot(catego_col)
                .agg(F.coalesce(F.first('constant_val')))
                .fillna(0))
    binary_cols = piv_df.columns[1:]
    if null_col in binary_cols:
        del binary_cols[binary_cols.index(null_col)]
        
    # Join the dataframes together
    joined_df = df.join(piv_df, on='ROW_ID', how='left')
    joined_df - joined_df.select(joined_df.columns[:-1])

    # Dropping dummy columns
    joined_df = joined_df.drop(*['ROW_ID', 'constant_val', catego_col, null_col])

    return joined_df.repartition(5), binary_cols

In [13]:
def saving_to_kitchen(df: pd.DataFrame, table_name: str='real_state') -> None:
    '''Save the DataFrame to HD'''
    file_path = f'spark-warehouse/{table_name}_kitchen.parquet'
    df.write.parquet(file_path, mode="overwrite")

In [14]:
def read_from_kitchen(table_name: str='real_state') -> pd.DataFrame:
    '''Read DataFrame from HD'''
    file_path = f'spark-warehouse/{table_name}_kitchen.parquet'
    df_parquet = spark.read.parquet(file_path).repartition(5)
    df_parquet.createOrReplaceTempView(table_name)
    print('Dataframe shape:', (df_parquet.count(), len(df_parquet.columns)))
    return df_parquet

In [15]:
def counting_null_values(df: pd.DataFrame, show_only_nulls=True) -> None:
    '''Print how many nulls are per column'''
    Dict_Null = {col: df.filter(df[f"`{col}`"].isNull()).count() for col in df.columns}
    
    # Filtering columns without null values
    if show_only_nulls:
        Dict_Null = {k: v for k, v in Dict_Null.items() if v != 0}
    pprint(Dict_Null)

In [16]:
# Define a function for pivoting catego columns
def catego_one_hot(df: pd.DataFrame, index_col: str, catego_col: str) -> pd.DataFrame:
    '''Return a one hot encoder for a specific column in DataFrame.
    Input:
        pd: DataFrame
        index_col: String with the name of the row unique identifier col
        catego_col: String with the name of the catego col
    Return New Dataframe'''

    
    def extract(row):
        return tuple(map(lambda x: row[x], row.__fields__)) + tuple(row.c_idx_vec.toArray().tolist())

    # Prepare the df
    df_subset = df[[index_col, catego_col]].fillna('None')
    
    string_indexer = StringIndexer(inputCol=catego_col, outputCol="c_idx")
    encoded_df = string_indexer.fit(df_subset).transform(df_subset)
    
    one_hot_encoder = OneHotEncoder(inputCol="c_idx", outputCol="c_idx_vec")
    one_hot_encoder.setDropLast(False)
    one_hot_encoder = one_hot_encoder.fit(encoded_df)
    vector_df = one_hot_encoder.transform(encoded_df)

    # Get c and its repective index. One hot encoder will put those on same index in vector
    colIdx = vector_df.select(catego_col, "c_idx").distinct().rdd.collectAsMap()
    colIdx =  sorted((value, f"{catego_col}:" + key) for (key, value) in colIdx.items())
    
    newCols = list(map(lambda x: x[1], colIdx))
    actualCol = vector_df.columns
    allColNames = actualCol + newCols

    transformed_df = vector_df.rdd.map(extract).toDF(allColNames)
    for col in newCols:
        transformed_df = transformed_df.withColumn(col, transformed_df[f'`{col}`'].cast("int"))
    transformed_df = transformed_df.drop(catego_col, 'c_idx', 'c_idx_vec', newCols[-1]).repartition(5)
    result = df.drop(catego_col).join(transformed_df, on='No.', how='left')
    return result, newCols[:-1]

In [17]:
def split_data_by_date(df_original, split_col, test_days=45):
    '''Return dataset splitted in two sets: training and testing'''

    # Find the date to use in spitting test and train
    max_date = df_original.agg(F.max(split_col)).collect()[0][0]
    split_date = max_date - timedelta(days=test_days)
    
    # Create Sequential Test and Training Sets
    train_df = df_original.where(df_original['offmarketdate'] < split_date) 

    # An extra where is needed on LISTDATE to ensure it contains items listed as of the split_date.
    test_df = (df_original.where(df_original['offmarketdate'] >= split_date)
                          .where(df_original['LISTDATE'] <= split_date) )
    return train_df, test_df

## Feature engineering

Preparing the database for this track

### Casting to proper format

- BigInt: `['SQFTBELOWGROUND', 'AssociationFee']`
- Date: `['offmarketdate', 'LISTDATE']`
- Double: `['backonmarketdate']`

In [18]:
def cast_to_proper_format(df: pd.DataFrame) -> pd.DataFrame:
    '''Some columns are casted to bigint, date, and double'''
    # cast columns - bigint
    for col_name in ['SQFTBELOWGROUND', 'AssociationFee']:
        df = df.withColumn(col_name, df[col_name].cast('bigint'))
    
    # cast columns - date
    for col_name in ['offmarketdate', 'LISTDATE']:
        df = df.withColumn(col_name, F.to_date(col_name, format='M/d/yyyy H:m'))
    
    # cast columns - double
    for col_name in ['backonmarketdate']:
        df = df.withColumn(col_name, df[col_name].cast('double'))
    
    print('Cast - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### DateTime components

In [19]:
def work_with_datetime_features(df: pd.DataFrame) -> pd.DataFrame:
    '''New features related to datetime are added'''
    df = (df.withColumn('LIST_DAYOFMONTH', F.dayofmonth('LISTDATE'))
            .withColumn('LIST_DAYOFYEAR', F.dayofyear('LISTDATE'))
            .withColumn('LIST_MONTH', F.month('LISTDATE'))
            .withColumn('LIST_WEEKOFYEAR', F.weekofyear('LISTDATE'))
            .withColumn('LIST_YEAR', F.year('LISTDATE')))
    
    print('DateTime Features - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### Join with other datasets

In [20]:
def enrich_joining_more_tables(df: pd.DataFrame) -> pd.DataFrame:
    '''Adding neighborhood friendly, historical value'''
    
    global neighborhood_walk_df
    global coord_df
    global hist_realstate_price_df
    
    # join to add coord to real state dataset
    df = df.join(coord_df, on='No.', how='left')
    
    # join to add neighborhood_walk features
    df = df.withColumn('longitude', F.round('longitude', 5))  # To have same precision
    df = df.withColumn('latitude', F.round('latitude', 5))
    neighborhood_walk_df = neighborhood_walk_df.withColumn('longitude', F.round('longitude', 5))
    neighborhood_walk_df = neighborhood_walk_df.withColumn('latitude', F.round('latitude', 5))
    
    df = df.join(neighborhood_walk_df, on=['longitude', 'latitude'], how='left')
    
    # join to add historical median prices
    hmp = (hist_realstate_price_df.select(hist_realstate_price_df['MedianCity'].alias('City'),
                                          'MedianHomeValue',
                                          hist_realstate_price_df['MedianYear'].alias('REPORT_YEAR')))
    
    df = df.withColumn('REPORT_YEAR', (df['LIST_YEAR'] - 1))
    df = df.join(hmp, on=['City', 'REPORT_YEAR'], how='left')
    df = df.drop('REPORT_YEAR')
    
    print('Joining - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### Window & Lags

In [21]:
def adding_lags_with_window(df: pd.DataFrame, show=True) -> pd.DataFrame:
    '''Adding mortage statistical data'''
    left_data = df.select('LISTDATE', '`No.`',
                          F.lit(None).alias('MORTGAGE30US'),
                          F.lit(None).alias('MORTGAGE30US-1WK'),
                          F.lit(None).alias('MORTGAGE30US-2WK'),
                          F.lit(None).alias('MORTGAGE30US-3WK'),
                          F.lit(None).alias('MORTGAGE30US-4WK'))
    
    # Preparing the right side with all lag values
    right_data = mort_df.select(mort_df['DATE'].alias('LISTDATE'), 'MORTGAGE30US', F.lit(None).alias('No.'))
    w = Window().orderBy(right_data['LISTDATE'])   # Create window for lags
    right_data = (right_data.withColumn('MORTGAGE30US-1WK', F.lag('MORTGAGE30US', offset=1).over(w))
                            .withColumn('MORTGAGE30US-2WK', F.lag('MORTGAGE30US', offset=2).over(w))
                            .withColumn('MORTGAGE30US-3WK', F.lag('MORTGAGE30US', offset=3).over(w))
                            .withColumn('MORTGAGE30US-4WK', F.lag('MORTGAGE30US', offset=4).over(w)))
    
    result_data = left_data.unionByName(right_data)
    w = Window().orderBy('LISTDATE', '`No.`').rowsBetween(Window.unboundedPreceding, -1)
    result_data = (result_data.withColumn('MORTGAGE30US', F.last('MORTGAGE30US', True).over(w))
                               .withColumn('MORTGAGE30US-1WK', F.last('MORTGAGE30US-1WK', True).over(w))
                               .withColumn('MORTGAGE30US-2WK', F.last('MORTGAGE30US-2WK', True).over(w))
                               .withColumn('MORTGAGE30US-3WK', F.last('MORTGAGE30US-3WK', True).over(w))
                               .withColumn('MORTGAGE30US-4WK', F.last('MORTGAGE30US-4WK', True).over(w))
                               .filter(~F.isnull('`No.`')))
    if show:
        result_data.orderBy(result_data.LISTDATE.desc()).show(5)
    
    df = df.join(result_data, on=['LISTDATE', 'No.'], how='left')
    
    
    print('Window & Lag - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### Ratios

In [22]:
def adding_ratios_features(df: pd.DataFrame) -> pd.DataFrame:
    '''New proportion features added'''
    df = (df.withColumn('ASSESSED_TO_LIST', df['AssessedValuation'] / df['LISTPRICE'])
          .withColumn('BED_TO_BATHS', df['Bedrooms'] / df['BATHSTOTAL'])
          .withColumn('TAX_TO_LIST', df['Taxes'] / df['LISTPRICE'])
          .withColumn('SQFT_TOTAL', df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND'])
          .withColumn('LISTING_PRICE_PER_SQFT', df['LISTPRICE'] / (df['SQFTBELOWGROUND'] + 
                                                                   df['SQFTABOVEGROUND']))
          .withColumn('LISTING_TO_MEDIAN_RATIO', df['LISTPRICE'] / df['MedianHomeValue']))
    print('Ratios - Dataframe shape:', (df.count(), len(df.columns)))
    return df

# Others not included:
 # |-- PERCENT_BIGGER_SIZE_HOMES: double (nullable = true)
 # |-- PERCENT_NEWER_AGE_HOMES: double (nullable = true)
 # |-- PERCENT_OLDER_AGE_HOMES: double (nullable = true)
 # |-- PERCENT_SIMILAR_AGE_HOMES: double (nullable = true)
 # |-- PERCENT_SIMILAR_SIZE_HOMES: double (nullable = true)
 # |-- PERCENT_SMALLER_SIZE_HOMES: double (nullable = true)
 # |-- PRICE_REDUCTION_PERCENT: double (nullable = true)

### Dropping some columns

In [23]:
def removing_unuseful_columns(df: pd.DataFrame) -> pd.DataFrame:
    '''Removing columns that not add value to the problem'''
    df = df.drop(*['backonmarketdate', 'Class', 'LotSizeDimensions', 'MapLetter', 'MLSID', 
                   'PDOM', 'PostalCode',  'RoomArea1', 'RoomArea2', 'RoomArea3', 'RoomArea4', 
                   'RoomArea5', 'RoomArea6', 'RoomArea7', 'RoomArea8',  'ROOMFAMILYCHAR', 
                   'roomtype', 'TAXYEAR', 'UNITNUMBER', 'RoomFloor1', 'RoomFloor2', 'RoomFloor3', 
                   'RoomFloor4', 'RoomFloor5', 'RoomFloor6', 'RoomFloor7', 'RoomFloor8', 
                   'StateOrProvince', 'streetaddress', 'STREETNAME', 'StreetNumberNumeric'])
    print('Dropping 1 - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### Splitting & Exploding

In [24]:
def explode_list_features(df: pd.DataFrame, list_features: List) -> pd.DataFrame:
    '''Transform list columns into columns features'''
    binary_cols = []
    
    for split_col in list_features:
        df, bin_cols = split_and_pivot(df=df, split_col=split_col, split_char=', ')
        binary_cols += bin_cols
        print(f'Explode {split_col} - Dataframe shape:', (df.count(), len(df.columns)))
    return df, binary_cols

### OneHot Encoder - catego columns

In [25]:
def explode_catego_features(df: pd.DataFrame, list_features: List) -> pd.DataFrame:
    '''Encode and explode catego features'''
    catego_cols = []
    for catego_col in list_features:
        df, cat_cols = catego_one_hot(df, index_col='`No.`', catego_col=catego_col)
        catego_cols += cat_cols
        print(f'Encode {catego_col} - Dataframe shape:', (df.count(), len(df.columns)))
    return df, catego_cols

In [26]:
def encode_catego_col(df: pd.DataFrame, catego_cols: List) -> pd.DataFrame:
    '''Encode catego features'''

    for catego in catego_cols:
        df = df.fillna('None', subset=[catego])
        string_indexer = StringIndexer(inputCol=catego, outputCol=f'{catego}_idx')
        index_df = string_indexer.fit(df).transform(df)
        encoder = OneHotEncoder(inputCol=f'{catego}_idx', outputCol = f'{catego}_vec')
        df = encoder.fit(index_df).transform(index_df)
        df = df.drop(*[catego, f'{catego}_idx'])
        print(f'Encode {catego} - Dataframe shape:', (df.count(), len(df.columns)))

    return df

### Dropping features with less than 30 observations

In [27]:
def removing_columns_less_obs(df: pd.DataFrame) -> pd.DataFrame:
    '''Dropping features with less than 30 observations'''
    # Conditional removing 
    obs_thresh = 30
    cols_to_remove = {f'`{col}`': df.where(F.isnull(f'`{col}`')).agg(F.sum(F.lit(1))).collect()[0][0]
                      for col in df.columns
                      if (df.where(~ F.isnull(f'`{col}`')).agg(F.sum(F.lit(1))).collect()[0][0] is None)
                         or (df.where(~ F.isnull(f'`{col}`')).agg(F.sum(F.lit(1))).collect()[0][0] < obs_thresh)
    }
    df = df.drop(*list(cols_to_remove.keys()))
    print('Dropping 2 - Dataframe shape after removing:', (df.count(), len(df.columns)))
    return df

### Final removing

In [28]:
def final_removing(df):
    '''Removing columns not required anymore'''
    df = df.drop(*['latitude', 'LISTDATE', 'longitude', 'No.', 'offmarketdate'])
    print('Dropping 2 - Dataframe shape:', (df.count(), len(df.columns)))
    return df

### Putting All together

In [29]:
# Step 1: Loading the data
df = real_state.select('*')

# Step 2: casting, joining, enrichment and cleaning
df = cast_to_proper_format(df)
df = work_with_datetime_features(df)
df = enrich_joining_more_tables(df)
df = adding_lags_with_window(df)
df = adding_ratios_features(df)
df = removing_unuseful_columns(df)

saving_to_kitchen(df)
df.limit(2)

Cast - Dataframe shape: (5000, 74)
DateTime Features - Dataframe shape: (5000, 79)
Joining - Dataframe shape: (5000, 85)
+----------+----+------------+----------------+----------------+----------------+----------------+
|  LISTDATE| No.|MORTGAGE30US|MORTGAGE30US-1WK|MORTGAGE30US-2WK|MORTGAGE30US-3WK|MORTGAGE30US-4WK|
+----------+----+------------+----------------+----------------+----------------+----------------+
|2018-01-10|3836|        3.95|            3.99|            3.94|            3.93|            3.94|
|2017-12-29| 144|        3.99|            3.94|            3.93|            3.94|             3.9|
|2017-12-28|1740|        3.99|            3.94|            3.93|            3.94|             3.9|
|2017-12-28|2381|        3.99|            3.94|            3.93|            3.94|             3.9|
|2017-12-27|2809|        3.94|            3.93|            3.94|             3.9|            3.92|
+----------+----+------------+----------------+----------------+----------------+------

LISTDATE,No.,City,longitude,latitude,SalesClosePrice,LISTPRICE,LISTTYPE,OriginalListPrice,PricePerTSFT,FOUNDATIONSIZE,FENCE,SchoolDistrictNumber,DAYSONMARKET,offmarketdate,Fireplaces,ROOF,PotentialShortSale,PoolDescription,GarageDescription,SQFTABOVEGROUND,Taxes,TAXWITHASSESSMENTS,LivingArea,YEARBUILT,ZONING,STYLE,ACRES,CoolingDescription,APPLIANCES,EXTERIOR,DiningRoomDescription,BASEMENT,BathsFull,BathsHalf,BATHQUARTER,BATHSTHREEQUARTER,BATHSTOTAL,BATHDESC,Bedrooms,SQFTBELOWGROUND,AssumableMortgage,AssociationFee,ASSESSMENTPENDING,AssessedValuation,LIST_DAYOFMONTH,LIST_DAYOFYEAR,LIST_MONTH,LIST_WEEKOFYEAR,LIST_YEAR,walkscore,bikescore,transitscore,MedianHomeValue,MORTGAGE30US,MORTGAGE30US-1WK,MORTGAGE30US-2WK,MORTGAGE30US-3WK,MORTGAGE30US-4WK,ASSESSED_TO_LIST,BED_TO_BATHS,TAX_TO_LIST,SQFT_TOTAL,LISTING_PRICE_PER_SQFT,LISTING_TO_MEDIAN_RATIO
2017-10-09,2,LELM - Lake Elmo,-92.87927,44.99488,190000,210000,Exclusive Right,210000,85.2783,1144,,834 - Stillwater,4,2017-10-13,0,"Asphalt Shingles,...",No,,"Attached Garage, ...",1268,1640,1640.0,2228,1971,Residential-Single,(SF) Split Entry ...,0.32,Central,"Range, Microwave,...",Vinyl,Informal Dining R...,"Full, Partial Fin...",1,0,0,2,3,Main Floor Full B...,4,960,,0,Unknown,0.0,9,282,10,41,2017,25,,,401000,3.85,3.83,3.83,3.78,3.78,0.0,1.3333333333333333,0.0078095238095238,2228,94.25493716337522,0.5236907730673317
2017-07-15,1,LELM - Lake Elmo,-92.87286,45.00585,143000,139900,Exclusive Right,139900,145.9184,980,Other,834 - Stillwater,10,2017-07-30,0,,No,,Attached Garage,980,1858,1858.0,980,1950,Residential-Single,(SF) One Story,1.28,Central,"Range, Dishwasher...",Vinyl,Eat In Kitchen,Full,1,1,0,0,2,Main Floor 3/4 Ba...,3,0,,0,Unknown,0.0,15,196,7,28,2017,32,,,401000,4.03,3.96,3.88,3.9,3.91,0.0,1.5,0.0132809149392423,980,142.75510204081633,0.3488778054862843


In [30]:
# Step 3: Explode list features
cols_to_split_and_explode = ['APPLIANCES', 'BASEMENT', 'BATHDESC', 'CoolingDescription',
                             'DiningRoomDescription', 'EXTERIOR', 'FENCE', 'GarageDescription',
                             'PoolDescription', 'ROOF', 'ZONING']

for col_name in cols_to_split_and_explode:
    df[[col_name]].distinct().show(1, truncate=60)

+------------------------------------------------------------+
|                                                  APPLIANCES|
+------------------------------------------------------------+
|Range, Microwave, Dishwasher, Refrigerator, Tankless Wate...|
+------------------------------------------------------------+
only showing top 1 row

+------------------------------------------------------------+
|                                                    BASEMENT|
+------------------------------------------------------------+
|Full, Finished (Livable), Drain Tiled, Sump Pump, Egress ...|
+------------------------------------------------------------+
only showing top 1 row

+------------------------------------------------------------+
|                                                    BATHDESC|
+------------------------------------------------------------+
|Main Floor 1/2 Bath, Upper Level Full Bath , Private Mast...|
+------------------------------------------------------------+
only sh

In [31]:
df = read_from_kitchen()
df, binary_cols = explode_list_features(df, cols_to_split_and_explode[:6])
saving_to_kitchen(df)

Dataframe shape: (5000, 65)
Explode APPLIANCES - Dataframe shape: (5000, 86)
Explode BASEMENT - Dataframe shape: (5000, 104)
Explode BATHDESC - Dataframe shape: (5000, 129)
Explode CoolingDescription - Dataframe shape: (5000, 134)
Explode DiningRoomDescription - Dataframe shape: (5000, 140)
Explode EXTERIOR - Dataframe shape: (5000, 150)


In [32]:
df = read_from_kitchen()
df, bin_cols = explode_list_features(df, cols_to_split_and_explode[6:])
binary_cols += bin_cols
saving_to_kitchen(df)

Dataframe shape: (5000, 150)
Explode FENCE - Dataframe shape: (5000, 160)
Explode GarageDescription - Dataframe shape: (5000, 187)
Explode PoolDescription - Dataframe shape: (5000, 193)
Explode ROOF - Dataframe shape: (5000, 206)
Explode ZONING - Dataframe shape: (5000, 210)


In [33]:
df = read_from_kitchen()
df.limit(2)

Dataframe shape: (5000, 210)


LISTDATE,No.,City,longitude,latitude,SalesClosePrice,LISTPRICE,LISTTYPE,OriginalListPrice,PricePerTSFT,FOUNDATIONSIZE,SchoolDistrictNumber,DAYSONMARKET,offmarketdate,Fireplaces,PotentialShortSale,SQFTABOVEGROUND,Taxes,TAXWITHASSESSMENTS,LivingArea,YEARBUILT,STYLE,ACRES,BathsFull,BathsHalf,BATHQUARTER,BATHSTHREEQUARTER,BATHSTOTAL,Bedrooms,SQFTBELOWGROUND,AssumableMortgage,AssociationFee,ASSESSMENTPENDING,AssessedValuation,LIST_DAYOFMONTH,LIST_DAYOFYEAR,LIST_MONTH,LIST_WEEKOFYEAR,LIST_YEAR,walkscore,bikescore,transitscore,MedianHomeValue,MORTGAGE30US,MORTGAGE30US-1WK,MORTGAGE30US-2WK,MORTGAGE30US-3WK,MORTGAGE30US-4WK,ASSESSED_TO_LIST,BED_TO_BATHS,TAX_TO_LIST,SQFT_TOTAL,LISTING_PRICE_PER_SQFT,LISTING_TO_MEDIAN_RATIO,APPLIANCES_Air-To-Air Exchanger,APPLIANCES_Central Vacuum,APPLIANCES_Cooktop,APPLIANCES_Dishwasher,APPLIANCES_Disposal,APPLIANCES_Dryer,APPLIANCES_Electronic Air Filter,APPLIANCES_Exhaust Fan/Hood,APPLIANCES_Freezer,APPLIANCES_Furnace Humidifier,APPLIANCES_Indoor Grill,APPLIANCES_Microwave,APPLIANCES_None,APPLIANCES_Other,APPLIANCES_Range,APPLIANCES_Refrigerator,APPLIANCES_Tankless Water Heater,APPLIANCES_Trash Compactor,APPLIANCES_Wall Oven,APPLIANCES_Washer,APPLIANCES_Water Softener - Owned,APPLIANCES_Water Softener - Rented,BASEMENT_Concrete Block,BASEMENT_Crawl Space,BASEMENT_Day/Lookout Windows,BASEMENT_Drain Tiled,BASEMENT_Drainage System,BASEMENT_Egress Windows,BASEMENT_Finished (Livable),BASEMENT_Full,BASEMENT_Insulating Concrete Forms,BASEMENT_None,BASEMENT_Partial,BASEMENT_Partial Finished,BASEMENT_Poured Concrete,BASEMENT_Slab,BASEMENT_Stone,BASEMENT_Sump Pump,BASEMENT_Unfinished,BASEMENT_Walkout,BASEMENT_Wood,BATHDESC_1/2 Basement,BATHDESC_1/2 Master,BATHDESC_3/4 Basement,BATHDESC_3/4 Master,BATHDESC_Basement,BATHDESC_Bathroom Ensuite,BATHDESC_Full Basement,BATHDESC_Full Master,BATHDESC_Jack & Jill 3/4,BATHDESC_Jack and Jill,BATHDESC_Main Floor 1/2 Bath,BATHDESC_Main Floor 3/4 Bath,BATHDESC_Main Floor Full Bath,BATHDESC_Master Walk-Thru,BATHDESC_Other,BATHDESC_Private Master,BATHDESC_Rough In,BATHDESC_Separate Tub & Shower,BATHDESC_Two Basement Baths,BATHDESC_Two Master Baths,BATHDESC_Upper Level 1/2 Bath,BATHDESC_Upper Level 3/4 Bath,BATHDESC_Upper Level Full Bath,BATHDESC_Walk Thru,BATHDESC_Walk-In Shower,BATHDESC_Whirlpool,CoolingDescription_Central,CoolingDescription_Ductless Mini-Split,CoolingDescription_Geothermal,CoolingDescription_None,CoolingDescription_Wall,CoolingDescription_Window,DiningRoomDescription_Breakfast Area,DiningRoomDescription_Eat In Kitchen,DiningRoomDescription_Informal Dining Room,DiningRoomDescription_Kitchen/Dining Room,DiningRoomDescription_Living/Dining Room,DiningRoomDescription_Other,DiningRoomDescription_Separate/Formal Dining Room,EXTERIOR_Block,EXTERIOR_Brick/Stone,EXTERIOR_Cement Board,EXTERIOR_Engineered Wood,EXTERIOR_Fiber Board,EXTERIOR_Metal,EXTERIOR_Other,EXTERIOR_Shakes,EXTERIOR_Stucco,EXTERIOR_Vinyl,EXTERIOR_Wood,FENCE_Chain Link,FENCE_Electric,FENCE_Full,FENCE_Invisible,FENCE_None,FENCE_Other,FENCE_Partial,FENCE_Privacy,FENCE_Rail,FENCE_Wire,FENCE_Wood,GarageDescription_Assigned,GarageDescription_Attached Garage,GarageDescription_Carport,GarageDescription_Contract Pkg Required,GarageDescription_Covered,GarageDescription_Detached Garage,GarageDescription_Driveway - Asphalt,GarageDescription_Driveway - Concrete,GarageDescription_Driveway - Gravel,GarageDescription_Driveway - Other Surface,GarageDescription_Driveway - Shared,GarageDescription_Garage Door Opener,GarageDescription_Heated Garage,GarageDescription_Insulated Garage,GarageDescription_More Parking Offsite for Fee,GarageDescription_More Parking Onsite for Fee,GarageDescription_No Int Access to Dwelling,GarageDescription_None,GarageDescription_On-Street Parking Only,GarageDescription_Other,GarageDescription_Secured,GarageDescription_Tandem,GarageDescription_Tuckunder,GarageDescription_Unassigned,GarageDescription_Uncovered/Open,GarageDescription_Underground Garage,GarageDescription_Units Vary,GarageDescription_Valet Parking for Fee,PoolDescription_Above Ground,PoolDescription_Below Ground,PoolDescription_Heated,PoolDescription_Indoor,PoolDescription_None,PoolDescription_Outdoor,PoolDescription_Shared,ROOF_Age 8 Years or Less,ROOF_Age Over 8 Years,ROOF_Asphalt Shingles,ROOF_Flat,ROOF_Metal,ROOF_Other,ROOF_Pitched,ROOF_Rubber,ROOF_Shakes,ROOF_Slate,ROOF_Tar/Gravel,ROOF_Tile,ROOF_Unspecified Shingle,ROOF_Wood Shingles,ZONING_Business/Commercial,ZONING_Industrial,ZONING_Other,ZONING_Residential-Multi-Family,ZONING_Residential-Single
2017-05-05,2024,STP - Saint Paul,-93.16495,44.95667,175000,175000,Exclusive Right,175000,171.737,625,625 - St. Paul,11,2017-05-16,1,No,1019,2188,2246.0,1019,1907,(SF) One 1/2 Stories,0.11,1,0,0,0,1,2,0,Not Assumable,0,No,58.2,5,125,5,18,2017,86,80.0,,172000,4.02,4.03,3.97,4.08,4.1,0.0003325714285714286,2.0,0.0125028571428571,1019,171.7369970559372,1.0174418604651163,,,,1.0,,,,,,,,,,,1,1,,,,,,,1.0,,1.0,,,,,1.0,,,,,,,,,1.0,,,,,,1.0,,,,,,,,,1,,,,,,,,,,,,,,1.0,,,,,,,1.0,,,,,1.0,,1.0,1.0,,,,,,,,,1.0,,,,,,1.0,,,,,,,,,,1,,,,,,,,,,,,,,,,,,,,,,,,,,,1.0,,,,,1,,,,,,,,,,,,,,,,1
2017-07-06,333,MAPW - Maplewood,-93.04813,44.99513,210000,204900,Exclusive Right,204900,111.1111,1082,622 - North St Pa...,7,2017-07-20,1,No,1070,2758,2758.0,1890,1971,(SF) Split Entry ...,0.46,1,0,0,1,2,3,820,,0,No,0.0,6,187,7,27,2017,22,,,193000,3.96,3.88,3.9,3.91,3.89,0.0,1.5,0.0134602244997559,1890,108.41269841269842,1.061658031088083,,,,,,1.0,,1.0,,,,,,,1,1,,,,1.0,,,,,,,,,,,,1.0,,,,,,,,,,,,,,,,,,,,,1.0,1,,,,,,,,,,,,,,,,,,,1.0,,,1.0,,,,,,,,,,,,1.0,,,,,,,,,,,,,,,,,,,,1,,1.0,,,,,,,,,1.0,,,,,,,,,,,,,,,,,,,1.0,,1,,,,1.0,,,,,,,,,,,,1


In [34]:
# Step 4: Encoding catego features
cols_to_pivot = ['ASSESSMENTPENDING', 'AssumableMortgage', 'City', 'PotentialShortSale',
                 'SchoolDistrictNumber', 'LISTTYPE', 'STYLE']

for col_name in cols_to_pivot:
    df[[col_name]].distinct().show(truncate=False)

+-----------------+
|ASSESSMENTPENDING|
+-----------------+
|Unknown          |
|No               |
|Yes              |
+-----------------+

+-------------------+
|AssumableMortgage  |
+-------------------+
|Yes w/ Qualifying  |
|Information Coming |
|Not Assumable      |
|Yes w/No Qualifying|
|NULL               |
+-------------------+

+----------------+
|City            |
+----------------+
|OAKD - Oakdale  |
|MAPW - Maplewood|
|WB - Woodbury   |
|LELM - Lake Elmo|
|STP - Saint Paul|
+----------------+

+------------------+
|PotentialShortSale|
+------------------+
|No                |
|Not Disclosed     |
+------------------+

+-----------------------------+
|SchoolDistrictNumber         |
+-----------------------------+
|832 - Mahtomedi              |
|625 - St. Paul               |
|623 - Roseville              |
|622 - North St Paul-Maplewood|
|833 - South Washington County|
|834 - Stillwater             |
|624 - White Bear Lake        |
|6 - South St. Paul           |
+--------

In [35]:
df = read_from_kitchen()
df, catego_cols = explode_catego_features(df, cols_to_pivot)
saving_to_kitchen(df)

Dataframe shape: (5000, 210)
Encode ASSESSMENTPENDING - Dataframe shape: (5000, 211)
Encode AssumableMortgage - Dataframe shape: (5000, 214)
Encode City - Dataframe shape: (5000, 217)
Encode PotentialShortSale - Dataframe shape: (5000, 217)
Encode SchoolDistrictNumber - Dataframe shape: (5000, 223)
Encode LISTTYPE - Dataframe shape: (5000, 226)
Encode STYLE - Dataframe shape: (5000, 242)


In [36]:
df = read_from_kitchen()
df.limit(2)

Dataframe shape: (5000, 242)


No.,LISTDATE,longitude,latitude,SalesClosePrice,LISTPRICE,OriginalListPrice,PricePerTSFT,FOUNDATIONSIZE,DAYSONMARKET,offmarketdate,Fireplaces,SQFTABOVEGROUND,Taxes,TAXWITHASSESSMENTS,LivingArea,YEARBUILT,ACRES,BathsFull,BathsHalf,BATHQUARTER,BATHSTHREEQUARTER,BATHSTOTAL,Bedrooms,SQFTBELOWGROUND,AssociationFee,AssessedValuation,LIST_DAYOFMONTH,LIST_DAYOFYEAR,LIST_MONTH,LIST_WEEKOFYEAR,LIST_YEAR,walkscore,bikescore,transitscore,MedianHomeValue,MORTGAGE30US,MORTGAGE30US-1WK,MORTGAGE30US-2WK,MORTGAGE30US-3WK,MORTGAGE30US-4WK,ASSESSED_TO_LIST,BED_TO_BATHS,TAX_TO_LIST,SQFT_TOTAL,LISTING_PRICE_PER_SQFT,LISTING_TO_MEDIAN_RATIO,APPLIANCES_Air-To-Air Exchanger,APPLIANCES_Central Vacuum,APPLIANCES_Cooktop,APPLIANCES_Dishwasher,APPLIANCES_Disposal,APPLIANCES_Dryer,APPLIANCES_Electronic Air Filter,APPLIANCES_Exhaust Fan/Hood,APPLIANCES_Freezer,APPLIANCES_Furnace Humidifier,APPLIANCES_Indoor Grill,APPLIANCES_Microwave,APPLIANCES_None,APPLIANCES_Other,APPLIANCES_Range,APPLIANCES_Refrigerator,APPLIANCES_Tankless Water Heater,APPLIANCES_Trash Compactor,APPLIANCES_Wall Oven,APPLIANCES_Washer,APPLIANCES_Water Softener - Owned,APPLIANCES_Water Softener - Rented,BASEMENT_Concrete Block,BASEMENT_Crawl Space,BASEMENT_Day/Lookout Windows,BASEMENT_Drain Tiled,BASEMENT_Drainage System,BASEMENT_Egress Windows,BASEMENT_Finished (Livable),BASEMENT_Full,BASEMENT_Insulating Concrete Forms,BASEMENT_None,BASEMENT_Partial,BASEMENT_Partial Finished,BASEMENT_Poured Concrete,BASEMENT_Slab,BASEMENT_Stone,BASEMENT_Sump Pump,BASEMENT_Unfinished,BASEMENT_Walkout,BASEMENT_Wood,BATHDESC_1/2 Basement,BATHDESC_1/2 Master,BATHDESC_3/4 Basement,BATHDESC_3/4 Master,BATHDESC_Basement,BATHDESC_Bathroom Ensuite,BATHDESC_Full Basement,BATHDESC_Full Master,BATHDESC_Jack & Jill 3/4,BATHDESC_Jack and Jill,BATHDESC_Main Floor 1/2 Bath,BATHDESC_Main Floor 3/4 Bath,BATHDESC_Main Floor Full Bath,BATHDESC_Master Walk-Thru,BATHDESC_Other,BATHDESC_Private Master,BATHDESC_Rough In,BATHDESC_Separate Tub & Shower,BATHDESC_Two Basement Baths,BATHDESC_Two Master Baths,BATHDESC_Upper Level 1/2 Bath,BATHDESC_Upper Level 3/4 Bath,BATHDESC_Upper Level Full Bath,BATHDESC_Walk Thru,BATHDESC_Walk-In Shower,BATHDESC_Whirlpool,CoolingDescription_Central,CoolingDescription_Ductless Mini-Split,CoolingDescription_Geothermal,CoolingDescription_None,CoolingDescription_Wall,CoolingDescription_Window,DiningRoomDescription_Breakfast Area,DiningRoomDescription_Eat In Kitchen,DiningRoomDescription_Informal Dining Room,DiningRoomDescription_Kitchen/Dining Room,DiningRoomDescription_Living/Dining Room,DiningRoomDescription_Other,DiningRoomDescription_Separate/Formal Dining Room,EXTERIOR_Block,EXTERIOR_Brick/Stone,EXTERIOR_Cement Board,EXTERIOR_Engineered Wood,EXTERIOR_Fiber Board,EXTERIOR_Metal,EXTERIOR_Other,EXTERIOR_Shakes,EXTERIOR_Stucco,EXTERIOR_Vinyl,EXTERIOR_Wood,FENCE_Chain Link,FENCE_Electric,FENCE_Full,FENCE_Invisible,FENCE_None,FENCE_Other,FENCE_Partial,FENCE_Privacy,FENCE_Rail,FENCE_Wire,FENCE_Wood,GarageDescription_Assigned,GarageDescription_Attached Garage,GarageDescription_Carport,GarageDescription_Contract Pkg Required,GarageDescription_Covered,GarageDescription_Detached Garage,GarageDescription_Driveway - Asphalt,GarageDescription_Driveway - Concrete,GarageDescription_Driveway - Gravel,GarageDescription_Driveway - Other Surface,GarageDescription_Driveway - Shared,GarageDescription_Garage Door Opener,GarageDescription_Heated Garage,GarageDescription_Insulated Garage,GarageDescription_More Parking Offsite for Fee,GarageDescription_More Parking Onsite for Fee,GarageDescription_No Int Access to Dwelling,GarageDescription_None,GarageDescription_On-Street Parking Only,GarageDescription_Other,GarageDescription_Secured,GarageDescription_Tandem,GarageDescription_Tuckunder,GarageDescription_Unassigned,GarageDescription_Uncovered/Open,GarageDescription_Underground Garage,GarageDescription_Units Vary,GarageDescription_Valet Parking for Fee,PoolDescription_Above Ground,PoolDescription_Below Ground,PoolDescription_Heated,PoolDescription_Indoor,PoolDescription_None,PoolDescription_Outdoor,PoolDescription_Shared,ROOF_Age 8 Years or Less,ROOF_Age Over 8 Years,ROOF_Asphalt Shingles,ROOF_Flat,ROOF_Metal,ROOF_Other,ROOF_Pitched,ROOF_Rubber,ROOF_Shakes,ROOF_Slate,ROOF_Tar/Gravel,ROOF_Tile,ROOF_Unspecified Shingle,ROOF_Wood Shingles,ZONING_Business/Commercial,ZONING_Industrial,ZONING_Other,ZONING_Residential-Multi-Family,ZONING_Residential-Single,ASSESSMENTPENDING:No,ASSESSMENTPENDING:Unknown,AssumableMortgage:None,AssumableMortgage:Not Assumable,AssumableMortgage:Information Coming,AssumableMortgage:Yes w/ Qualifying,City:STP - Saint Paul,City:WB - Woodbury,City:MAPW - Maplewood,City:OAKD - Oakdale,PotentialShortSale:No,SchoolDistrictNumber:625 - St. Paul,SchoolDistrictNumber:622 - North St Paul-Maplewood,SchoolDistrictNumber:833 - South Washington County,SchoolDistrictNumber:834 - Stillwater,SchoolDistrictNumber:623 - Roseville,SchoolDistrictNumber:832 - Mahtomedi,SchoolDistrictNumber:624 - White Bear Lake,LISTTYPE:Exclusive Right,LISTTYPE:Service Agreement,LISTTYPE:Exclusive Right with Exclusions,LISTTYPE:Exclusive Agency,STYLE:(SF) Two Stories,STYLE:(SF) One 1/2 Stories,STYLE:(SF) One Story,STYLE:(TH) Side x Side,STYLE:(SF) Split Entry (Bi-Level),STYLE:(CC) High Rise (4+ Levels),STYLE:(SF) Four or More Level Split,STYLE:(CC) Low Rise (3- Levels),STYLE:(SF) More Than Two Stories,STYLE:(TH) Quad/4 Corners,STYLE:(SF) Three Level Split,STYLE:(TH) Detached,STYLE:(SF) Modified Two Story,STYLE:(TW) Twin Home,STYLE:(CC) Manor/Village,STYLE:(CC) Converted Mansion,STYLE:(SF) Other
1285,2017-05-24,-93.13282,44.91974,120000,125000,125000,133.3333,900,17,2017-06-10,0,900,1316,1660.0,900,1921,0.13,1,0,0,0,1,2,0,0,344.0,24,144,5,21,2017,51,72.0,,172000,4.02,4.05,4.02,4.03,3.97,0.002752,2.0,0.010528,900,138.88888888888889,0.7267441860465116,,,,1,1,1,,1.0,,1.0,,1,,,1,1,,,,1,1.0,,,,,,,,,,,,1.0,,,,,,1.0,,,,,,,,,,,,,,,1.0,,,,,,,,,,,,,,1,,,,,,1.0,,,1.0,,,,,1,,,,,,,,1.0,,,,,,,,,,,,,,,,,,1,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1.0,,,,,,,,,,,,,,,,,,1,0,1,0,1,0,0,1,0,0,0,1,1,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4846,2017-09-06,-92.90081,44.9223,455000,459900,474900,113.8639,1509,40,2017-10-19,1,2796,5761,6094.0,3996,1998,0.361,2,1,0,1,4,5,1200,480,333.0,6,249,9,36,2017,12,,,291000,3.82,3.86,3.89,3.9,3.93,0.0007240704500978474,1.25,0.0125266362252663,3996,115.09009009009009,1.5804123711340206,,,,1,1,1,,,,,,1,,,1,1,,,,1,,,,,,1.0,,1.0,,1.0,,,,,,,,1.0,,,,1.0,,,,,,,,,,1.0,,,1.0,,,,,,,,,1.0,,,,1,,,,,,,,,,,,,,1,,,,,,,,,,,,,,,,,,,,,,,,,,1,,,,,,,,,,,,,,,,,,,,,,,,,,,1.0,,,,1.0,1.0,,,,1.0,,,,,,,,,,,,1,0,1,1,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [37]:
# # Step 5: last removing
# df = final_removing(df)
# df.printSchema()

## Choosing the Algorithm

### Test and Train Splits for Time Series

In [38]:
# Reviewing the data
df = read_from_kitchen()
column_list = ['offmarketdate']
pprint(df[column_list].dtypes)
df[column_list].show(5, truncate=False)

Dataframe shape: (5000, 242)
[('offmarketdate', 'date')]
+-------------+
|offmarketdate|
+-------------+
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
+-------------+
only showing top 5 rows



In [39]:
# Create variables for max and min dates in our dataset
max_date = df.agg({'offmarketdate': 'max'}).collect()[0][0]
min_date = df.agg({'offmarketdate': 'min'}).collect()[0][0]
print(f'''
Min date: {min_date}
Max date: {max_date}
''')


Min date: 2017-02-24
Max date: 2018-01-24



In [40]:
# Find how many days our data spans
max_date = F.max(df['offmarketdate'])
min_date = F.min(df['offmarketdate'])

range_in_days = df.agg(F.datediff(max_date, min_date)).collect()[0][0]
print(f'''
Range in days: {range_in_days}
''')


Range in days: 334



In [41]:
# Find the date to split the dataset on
split_in_days = round(range_in_days * 0.8)
split_date = df.agg(F.date_add(F.min(df['offmarketdate']), split_in_days)).collect()[0][0]
print(f'''
80% of the range in days: {split_in_days}
Threshold to split      : {split_date}
''')


80% of the range in days: 267
Threshold to split      : 2017-11-18



In [42]:
# Split the data into 80% train, 20% test
train_df = df.where(df['offmarketdate'] < split_date)
test_df = df.where(df['offmarketdate'] >= split_date)

In [43]:
train_df[['offmarketdate']].show(5, truncate=False)

+-------------+
|offmarketdate|
+-------------+
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
|2017-06-29   |
+-------------+
only showing top 5 rows



In [44]:
test_df[['offmarketdate']].show(5, truncate=False)

+-------------+
|offmarketdate|
+-------------+
|2017-12-17   |
|2017-12-12   |
|2017-12-12   |
|2017-12-12   |
|2017-12-12   |
+-------------+
only showing top 5 rows



## Ex. 1 - Creating Time Splits

In the video, we learned why splitting data randomly can be dangerous for time series as data from the future can cause overfitting in our model. Often with time series, you acquire new data as it is made available and you will want to retrain your model using the newest data. In the video, we showed how to do a percentage split for `test` and `training` sets but suppose you wish to train on all available data except for the last 45days which you want to use for a test set.

In this exercise, we will create a function to find the split date for using the last 45 days of data for testing and the rest for training. 

**Instructions:**

1. Create a function `train_test_split_date()` that takes in a dataframe, `df`, the date column to use for splitting `split_col` and the number of days to use for the test set, `test_days` and set it to have a default value of `45`.
2. Find the `min` and `max` dates for `split_col` using `,()`.
3. Find the date to split the test and training sets using `max_date` and subtract `test_days` from it by using `timedelta()` which takes a days parameter, in this case, pass in `test_days`
4. Using `OFFMKTDATE` as the `split_col` find `split_date` and use it to filter the dataframe into two new ones, `train_df` and `test_df`, Where `test_df` is only the last `45` days of the data. Additionally, ensure that the `test_df only` contains homes listed as of the split date by filtering `df['LISTDATE']` less than or equal to the `split_date`.

In [45]:
# Reviewing the data
column_list = ['offmarketdate', 'LISTDATE']
pprint(df[column_list].dtypes)
df[column_list].show(5, truncate=False)

[('offmarketdate', 'date'), ('LISTDATE', 'date')]
+-------------+----------+
|offmarketdate|LISTDATE  |
+-------------+----------+
|2017-11-09   |2017-07-05|
|2017-09-22   |2017-07-22|
|2017-06-13   |2017-05-19|
|2017-08-09   |2017-08-03|
|2017-08-03   |2017-07-26|
+-------------+----------+
only showing top 5 rows



In [46]:
def train_test_split_date(df, split_col, test_days=45):
    """Calculate the date to split test and training sets"""
    max_date = df.agg(F.max(split_col)).collect()[0][0]
    
    # Subtract an integer number of days from the last date in dataset
    split_date = max_date - timedelta(days=test_days)
    return split_date


# Find the date to use in spitting test and train
split_date = train_test_split_date(df, 'offmarketdate')


# Create Sequential Test and Training Sets
train_df = df.where(df['offmarketdate'] < split_date) 

# An extra where is needed on LISTDATE to ensure it contains items listed as of the split_date.
test_df = df.where(df['offmarketdate'] >= split_date).where(df['LISTDATE'] <= split_date) 

# Reviewing the data
print('Threshold date:', split_date)

# Inspect the result
column_list = ['offmarketdate', 'LISTDATE', 'DAYSONMARKET']
train_df[column_list].show(5)
test_df[column_list].show(5)

Threshold date: 2017-12-10
+-------------+----------+------------+
|offmarketdate|  LISTDATE|DAYSONMARKET|
+-------------+----------+------------+
|   2017-10-02|2017-09-14|          18|
|   2017-09-01|2017-07-28|          35|
|   2017-09-08|2017-06-09|          67|
|   2017-05-03|2017-04-27|           2|
|   2017-05-19|2017-05-04|          15|
+-------------+----------+------------+
only showing top 5 rows

+-------------+----------+------------+
|offmarketdate|  LISTDATE|DAYSONMARKET|
+-------------+----------+------------+
|   2017-12-19|2017-11-03|          46|
|   2017-12-17|2017-12-04|          13|
|   2017-12-15|2017-11-02|          43|
|   2018-01-04|2017-12-04|          31|
|   2017-12-31|2017-09-13|         109|
+-------------+----------+------------+
only showing top 5 rows



## Ex. 2 - Adjusting Time Features

We have mentioned throughout this course some of the dangers of leaking information to your model during training. Data leakage will cause your model to have very optimistic metrics for accuracy but once real data is run through it the results are often very disappointing.

In this exercise, we are going to ensure that `DAYSONMARKET` only reflects what information we have at the time of predicting the value. I.e., if the house is still on the market, we don't know how many more days it will stay on the market. We need to adjust our `test_df` to reflect what information we currently have as of `2017-12-10`.

**NOTE**: This example will use the `lit()` function. This function is used to allow single values where an entire column is expected in a function call.

**Instructions:**

1. Import the following functions from `pyspark.sql.functions` to use later on: `datediff()`, `to_date()`, `lit()`. Already done!
2. Convert the date string `'2017-12-10'` to a pyspark date by first calling the literal function, `lit()` on it and then `to_date()`
3. Create `test_df` by filtering `OFFMKTDATE` greater than or equal to the `split_date` and `LISTDATE` less than or equal to the `split_date` using `where()`.
4. Replace `DAYSONMARKET` by calculating a new column called `DAYSONMARKET`, the new column should be the difference between `split_date` and `LISTDATE` use `datediff()` to perform the date calculation. Inspect the new column and the original using the code provided.

In [47]:
# Create Sequential Test set
split_date = F.to_date(F.lit('2017-12-10'))
test_df = df.where(df['offmarketdate'] >= split_date).where(df['LISTDATE'] <= split_date)

# Create a copy of DAYSONMARKET to review later
test_df = test_df.withColumn('DAYSONMARKET_Original', test_df['DAYSONMARKET'])

# Recalculate DAYSONMARKET from what we know on our split date
test_df = test_df.withColumn('DAYSONMARKET', F.datediff(split_date, test_df['LISTDATE']))

# Review the difference
column_list = ['offmarketdate', 'LISTDATE', 'DAYSONMARKET_Original', 'DAYSONMARKET']
test_df[column_list].show(5)

+-------------+----------+---------------------+------------+
|offmarketdate|  LISTDATE|DAYSONMARKET_Original|DAYSONMARKET|
+-------------+----------+---------------------+------------+
|   2017-12-19|2017-11-03|                   46|          37|
|   2017-12-17|2017-12-04|                   13|           6|
|   2017-12-15|2017-11-02|                   43|          38|
|   2018-01-04|2017-12-04|                   31|           6|
|   2017-12-31|2017-09-13|                  109|          88|
+-------------+----------+---------------------+------------+
only showing top 5 rows



## Ex. 3 - Dropping Columns with Low Observations

After doing a lot of feature engineering it's a good idea to take a step back and look at what you've created. If you've used some automation techniques on your categorical features like exploding or OneHot Encoding you may find that you now have hundreds of new binary features. While the subject of feature selection is material for a whole other course but there are some quick steps you can take to reduce the dimensionality of your data set.

In this exercise, we are going to remove columns that have less than 30 observations. 30 is a common minimum number of observations for statistical significance. Any less than that and the relationships cause overfitting because of a sheer coincidence!

**Instructions:**

1. Using the provided for loop that iterates through the list of binary columns, calculate the sum of the values in the column using the agg function. Use `collect()` to run the calculation immediately and save the results to `obs_count`.
2. Compare `obs_count` to `obs_threshold`, the if statement should be true if `obs_count` is less than or equal to `obs_threshold`.
3. Remove columns that have been appended to `cols_to_remove` list by using `drop()`. Recall that the `*` allows the list to be unpacked.
4. Print the starting and ending shape of the PySpark dataframes by using `count()` for number of records and `len()` on `df.columns` or `new_df.columns` to find the number of columns.

In [48]:
# Reviewing the data
df = read_from_kitchen()
df.printSchema()

Dataframe shape: (5000, 242)
root
 |-- No.: integer (nullable = true)
 |-- LISTDATE: date (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- SalesClosePrice: integer (nullable = true)
 |-- LISTPRICE: integer (nullable = true)
 |-- OriginalListPrice: integer (nullable = true)
 |-- PricePerTSFT: double (nullable = true)
 |-- FOUNDATIONSIZE: integer (nullable = true)
 |-- DAYSONMARKET: integer (nullable = true)
 |-- offmarketdate: date (nullable = true)
 |-- Fireplaces: integer (nullable = true)
 |-- SQFTABOVEGROUND: integer (nullable = true)
 |-- Taxes: integer (nullable = true)
 |-- TAXWITHASSESSMENTS: double (nullable = true)
 |-- LivingArea: integer (nullable = true)
 |-- YEARBUILT: integer (nullable = true)
 |-- ACRES: double (nullable = true)
 |-- BathsFull: integer (nullable = true)
 |-- BathsHalf: integer (nullable = true)
 |-- BATHQUARTER: integer (nullable = true)
 |-- BATHSTHREEQUARTER: integer (nullable = true)
 |-- BATHSTOT

In [49]:
print(f'''
Total fields: {len(binary_cols)}
{binary_cols}
''')


Total fields: 156
['APPLIANCES_Air-To-Air Exchanger', 'APPLIANCES_Central Vacuum', 'APPLIANCES_Cooktop', 'APPLIANCES_Dishwasher', 'APPLIANCES_Disposal', 'APPLIANCES_Dryer', 'APPLIANCES_Electronic Air Filter', 'APPLIANCES_Exhaust Fan/Hood', 'APPLIANCES_Freezer', 'APPLIANCES_Furnace Humidifier', 'APPLIANCES_Indoor Grill', 'APPLIANCES_Microwave', 'APPLIANCES_None', 'APPLIANCES_Other', 'APPLIANCES_Range', 'APPLIANCES_Refrigerator', 'APPLIANCES_Tankless Water  Heater', 'APPLIANCES_Trash Compactor', 'APPLIANCES_Wall Oven', 'APPLIANCES_Washer', 'APPLIANCES_Water Softener - Owned', 'APPLIANCES_Water Softener - Rented', 'BASEMENT_Concrete Block', 'BASEMENT_Crawl Space', 'BASEMENT_Day/Lookout Windows', 'BASEMENT_Drain Tiled', 'BASEMENT_Drainage System', 'BASEMENT_Egress Windows', 'BASEMENT_Finished (Livable)', 'BASEMENT_Full', 'BASEMENT_Insulating Concrete Forms', 'BASEMENT_None', 'BASEMENT_Partial', 'BASEMENT_Partial Finished', 'BASEMENT_Poured Concrete', 'BASEMENT_Slab', 'BASEMENT_Stone', 'BA

In [50]:
obs_threshold = 30
cols_to_remove = list()

# Inspect first 10 binary columns in list
for col in binary_cols:
    # Count the number of 1 values in the binary column
    obs_count = df.agg(F.sum(f'`{col}`')).collect()[0][0]
    
    # If less than our observation threshold, remove
    if obs_count < obs_threshold:
        cols_to_remove.append(col)
    
print(f'''
Total fields: {len(cols_to_remove)}
{cols_to_remove}
''')


Total fields: 35
['APPLIANCES_Indoor Grill', 'APPLIANCES_None', 'APPLIANCES_Tankless Water  Heater', 'APPLIANCES_Trash Compactor', 'APPLIANCES_Water Softener - Rented', 'BASEMENT_Insulating Concrete Forms', 'BASEMENT_Wood', 'BATHDESC_Jack & Jill 3/4', 'BATHDESC_Other', 'BATHDESC_Two Basement Baths', 'BATHDESC_Two Master Baths', 'CoolingDescription_Geothermal ', 'DiningRoomDescription_Other', 'EXTERIOR_Block', 'FENCE_Electric', 'FENCE_Rail', 'GarageDescription_Carport', 'GarageDescription_Contract Pkg Required', 'GarageDescription_More Parking Offsite for Fee', 'GarageDescription_Unassigned', 'GarageDescription_Units Vary', 'GarageDescription_Valet Parking for Fee', 'PoolDescription_Above Ground', 'PoolDescription_Indoor', 'ROOF_Metal', 'ROOF_Other', 'ROOF_Shakes', 'ROOF_Slate', 'ROOF_Tar/Gravel', 'ROOF_Tile', 'ROOF_Unspecified Shingle', 'ROOF_Wood Shingles', 'ZONING_Business/Commercial', 'ZONING_Industrial', 'ZONING_Other']



In [51]:
# Drop columns and print starting and ending dataframe shapes
new_df = df.drop(*cols_to_remove)

print('Rows: ' + str(df.count()) + ' Columns: ' + str(len(df.columns)))
print('Rows: ' + str(new_df.count()) + ' Columns: ' + str(len(new_df.columns)))

Rows: 5000 Columns: 242
Rows: 5000 Columns: 207


## Ex. 4 - Naively Handling Missing and Categorical Values

Random Forest Regression is robust enough to allow us to ignore many of the more time consuming and tedious data preparation steps. While some implementations of Random Forest handle missing and categorical values automatically, PySpark's does not. The math remains the same however so we can get away with some naive value replacements.

For missing values since our data is strictly positive, we will assign -1. The random forest will split on this value and handle it differently than the rest of the values in the same feature.

For categorical values, we can just map the text values to numbers and again the random forest will appropriately handle them by splitting on them. In this example, we will dust off pipelines from Introduction to PySpark to write our code more concisely. Please note that the exercise will start by displaying the dtypes of the columns in the dataframe, compare them to the results at the end of this exercise.

**Instructions:**

1. Replace the values in `WALKSCORE` and `BIKESCORE` with `-1` using fillna() and the subset parameter.
2. Create a list of `StringIndexers` by using list comprehension to iterate over each column in `categorical_cols`.
3. Apply `fit()` and `transform()` to the pipeline `indexer_pipeline`.
4. Drop the `categorical_cols` using drop() since they are no longer needed. Inspect the result data types using `dtypes`.

In [52]:
# Loading the data
df = (real_state.select('*').join(coord_df, on='No.', how='left')
                .withColumn('longitude', F.round('longitude', 5))
                .withColumn('latitude', F.round('latitude', 5))
                .join((neighborhood_walk_df.withColumn('longitude', F.round('longitude', 5))
                                           .withColumn('latitude', F.round('latitude', 5))), 
                      on=['longitude', 'latitude'], how='left'))

# Replace missing values
df = df.fillna(-1, subset=['walkscore', 'bikescore'])

# Create list of StringIndexers using list comprehension
categorical_cols = ['City', 'LISTTYPE', 'SchoolDistrictNumber', 'PotentialShortSale', 
                    'STYLE', 'AssumableMortgage', 'ASSESSMENTPENDING']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_IDX").setHandleInvalid("keep") 
            for col in categorical_cols]

# Create pipeline of indexers
indexer_pipeline = Pipeline(stages=indexers)

# Fit and Transform the pipeline to the original data
df_indexed = indexer_pipeline.fit(df).transform(df)

# Clean up redundant columns
df_indexed = df_indexed.drop(*categorical_cols)

# Inspect data transformations
df_indexed.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('No.', 'int'),
 ('MLSID', 'string'),
 ('StreetNumberNumeric', 'int'),
 ('streetaddress', 'string'),
 ('STREETNAME', 'string'),
 ('PostalCode', 'int'),
 ('StateOrProvince', 'string'),
 ('SalesClosePrice', 'int'),
 ('LISTDATE', 'string'),
 ('LISTPRICE', 'int'),
 ('OriginalListPrice', 'int'),
 ('PricePerTSFT', 'double'),
 ('FOUNDATIONSIZE', 'int'),
 ('FENCE', 'string'),
 ('MapLetter', 'string'),
 ('LotSizeDimensions', 'string'),
 ('DAYSONMARKET', 'int'),
 ('offmarketdate', 'string'),
 ('Fireplaces', 'int'),
 ('RoomArea4', 'string'),
 ('roomtype', 'string'),
 ('ROOF', 'string'),
 ('RoomFloor4', 'string'),
 ('PoolDescription', 'string'),
 ('PDOM', 'int'),
 ('GarageDescription', 'string'),
 ('SQFTABOVEGROUND', 'int'),
 ('Taxes', 'int'),
 ('RoomFloor1', 'string'),
 ('RoomArea1', 'string'),
 ('TAXWITHASSESSMENTS', 'double'),
 ('TAXYEAR', 'int'),
 ('LivingArea', 'int'),
 ('UNITNUMBER', 'string'),
 ('YEARBUILT', 'int'),
 ('ZONING', 'string'),


## Building a Model

### Loading the data

In [53]:
# Step 1: Loading the data
df = real_state.select('*')

# Step 2: casting, joining, enrichment and cleaning
df = cast_to_proper_format(df)
df = work_with_datetime_features(df)
df = enrich_joining_more_tables(df)
df = adding_lags_with_window(df, show=False)
df = adding_ratios_features(df)
df = removing_unuseful_columns(df)

saving_to_kitchen(df)

Cast - Dataframe shape: (5000, 74)
DateTime Features - Dataframe shape: (5000, 79)
Joining - Dataframe shape: (5000, 85)
Window & Lag - Dataframe shape: (5000, 90)
Ratios - Dataframe shape: (5000, 96)
Dropping 1 - Dataframe shape: (5000, 65)


In [54]:
# Step 3: Explode list features
df = read_from_kitchen()
df, binary_cols = explode_list_features(df, cols_to_split_and_explode[:6])
saving_to_kitchen(df)

Dataframe shape: (5000, 65)
Explode APPLIANCES - Dataframe shape: (5000, 86)
Explode BASEMENT - Dataframe shape: (5000, 104)
Explode BATHDESC - Dataframe shape: (5000, 129)
Explode CoolingDescription - Dataframe shape: (5000, 134)
Explode DiningRoomDescription - Dataframe shape: (5000, 140)
Explode EXTERIOR - Dataframe shape: (5000, 150)


In [55]:
df = read_from_kitchen()
df, bin_cols = explode_list_features(df, cols_to_split_and_explode[6:])
binary_cols += bin_cols
saving_to_kitchen(df)

Dataframe shape: (5000, 150)
Explode FENCE - Dataframe shape: (5000, 160)
Explode GarageDescription - Dataframe shape: (5000, 187)
Explode PoolDescription - Dataframe shape: (5000, 193)
Explode ROOF - Dataframe shape: (5000, 206)
Explode ZONING - Dataframe shape: (5000, 210)


In [56]:
# Step 4: Encoding catego features
df = read_from_kitchen()
df = encode_catego_col(df, cols_to_pivot)
saving_to_kitchen(df)

Dataframe shape: (5000, 210)
Encode ASSESSMENTPENDING - Dataframe shape: (5000, 210)
Encode AssumableMortgage - Dataframe shape: (5000, 210)
Encode City - Dataframe shape: (5000, 210)
Encode PotentialShortSale - Dataframe shape: (5000, 210)
Encode SchoolDistrictNumber - Dataframe shape: (5000, 210)
Encode LISTTYPE - Dataframe shape: (5000, 210)
Encode STYLE - Dataframe shape: (5000, 210)


In [57]:
# Step 5: Removing features with less than 30 features
df = read_from_kitchen()
df = removing_columns_less_obs(df)
saving_to_kitchen(df)

Dataframe shape: (5000, 210)
Dropping 2 - Dataframe shape after removing: (5000, 210)


In [58]:
# Step 6: Replace missing values
df = read_from_kitchen()
df = df.fillna(-1)

Dataframe shape: (5000, 210)


### Vectorizing the features for the ml

In [59]:
# Splitting data between train and test
train_df, test_df = split_data_by_date(df, split_col='offmarketdate', test_days=45)
train_df = final_removing(train_df)
test_df = final_removing(test_df)

Dropping 2 - Dataframe shape: (4828, 205)
Dropping 2 - Dataframe shape: (154, 205)


In [60]:
target_col = 'SalesClosePrice' 
predict_col = 'PredPrice'

model = VectorAssembler(inputCols=train_df.drop(target_col).columns, outputCol='features')
train_df = model.transform(train_df)
test_df = model.transform(test_df)

train_df.limit(2)

SalesClosePrice,LISTPRICE,OriginalListPrice,PricePerTSFT,FOUNDATIONSIZE,DAYSONMARKET,Fireplaces,SQFTABOVEGROUND,Taxes,TAXWITHASSESSMENTS,LivingArea,YEARBUILT,ACRES,BathsFull,BathsHalf,BATHQUARTER,BATHSTHREEQUARTER,BATHSTOTAL,Bedrooms,SQFTBELOWGROUND,AssociationFee,AssessedValuation,LIST_DAYOFMONTH,LIST_DAYOFYEAR,LIST_MONTH,LIST_WEEKOFYEAR,LIST_YEAR,walkscore,bikescore,transitscore,MedianHomeValue,MORTGAGE30US,MORTGAGE30US-1WK,MORTGAGE30US-2WK,MORTGAGE30US-3WK,MORTGAGE30US-4WK,ASSESSED_TO_LIST,BED_TO_BATHS,TAX_TO_LIST,SQFT_TOTAL,LISTING_PRICE_PER_SQFT,LISTING_TO_MEDIAN_RATIO,APPLIANCES_Air-To-Air Exchanger,APPLIANCES_Central Vacuum,APPLIANCES_Cooktop,APPLIANCES_Dishwasher,APPLIANCES_Disposal,APPLIANCES_Dryer,APPLIANCES_Electronic Air Filter,APPLIANCES_Exhaust Fan/Hood,APPLIANCES_Freezer,APPLIANCES_Furnace Humidifier,APPLIANCES_Indoor Grill,APPLIANCES_Microwave,APPLIANCES_None,APPLIANCES_Other,APPLIANCES_Range,APPLIANCES_Refrigerator,APPLIANCES_Tankless Water Heater,APPLIANCES_Trash Compactor,APPLIANCES_Wall Oven,APPLIANCES_Washer,APPLIANCES_Water Softener - Owned,APPLIANCES_Water Softener - Rented,BASEMENT_Concrete Block,BASEMENT_Crawl Space,BASEMENT_Day/Lookout Windows,BASEMENT_Drain Tiled,BASEMENT_Drainage System,BASEMENT_Egress Windows,BASEMENT_Finished (Livable),BASEMENT_Full,BASEMENT_Insulating Concrete Forms,BASEMENT_None,BASEMENT_Partial,BASEMENT_Partial Finished,BASEMENT_Poured Concrete,BASEMENT_Slab,BASEMENT_Stone,BASEMENT_Sump Pump,BASEMENT_Unfinished,BASEMENT_Walkout,BASEMENT_Wood,BATHDESC_1/2 Basement,BATHDESC_1/2 Master,BATHDESC_3/4 Basement,BATHDESC_3/4 Master,BATHDESC_Basement,BATHDESC_Bathroom Ensuite,BATHDESC_Full Basement,BATHDESC_Full Master,BATHDESC_Jack & Jill 3/4,BATHDESC_Jack and Jill,BATHDESC_Main Floor 1/2 Bath,BATHDESC_Main Floor 3/4 Bath,BATHDESC_Main Floor Full Bath,BATHDESC_Master Walk-Thru,BATHDESC_Other,BATHDESC_Private Master,BATHDESC_Rough In,BATHDESC_Separate Tub & Shower,BATHDESC_Two Basement Baths,BATHDESC_Two Master Baths,BATHDESC_Upper Level 1/2 Bath,BATHDESC_Upper Level 3/4 Bath,BATHDESC_Upper Level Full Bath,BATHDESC_Walk Thru,BATHDESC_Walk-In Shower,BATHDESC_Whirlpool,CoolingDescription_Central,CoolingDescription_Ductless Mini-Split,CoolingDescription_Geothermal,CoolingDescription_None,CoolingDescription_Wall,CoolingDescription_Window,DiningRoomDescription_Breakfast Area,DiningRoomDescription_Eat In Kitchen,DiningRoomDescription_Informal Dining Room,DiningRoomDescription_Kitchen/Dining Room,DiningRoomDescription_Living/Dining Room,DiningRoomDescription_Other,DiningRoomDescription_Separate/Formal Dining Room,EXTERIOR_Block,EXTERIOR_Brick/Stone,EXTERIOR_Cement Board,EXTERIOR_Engineered Wood,EXTERIOR_Fiber Board,EXTERIOR_Metal,EXTERIOR_Other,EXTERIOR_Shakes,EXTERIOR_Stucco,EXTERIOR_Vinyl,EXTERIOR_Wood,FENCE_Chain Link,FENCE_Electric,FENCE_Full,FENCE_Invisible,FENCE_None,FENCE_Other,FENCE_Partial,FENCE_Privacy,FENCE_Rail,FENCE_Wire,FENCE_Wood,GarageDescription_Assigned,GarageDescription_Attached Garage,GarageDescription_Carport,GarageDescription_Contract Pkg Required,GarageDescription_Covered,GarageDescription_Detached Garage,GarageDescription_Driveway - Asphalt,GarageDescription_Driveway - Concrete,GarageDescription_Driveway - Gravel,GarageDescription_Driveway - Other Surface,GarageDescription_Driveway - Shared,GarageDescription_Garage Door Opener,GarageDescription_Heated Garage,GarageDescription_Insulated Garage,GarageDescription_More Parking Offsite for Fee,GarageDescription_More Parking Onsite for Fee,GarageDescription_No Int Access to Dwelling,GarageDescription_None,GarageDescription_On-Street Parking Only,GarageDescription_Other,GarageDescription_Secured,GarageDescription_Tandem,GarageDescription_Tuckunder,GarageDescription_Unassigned,GarageDescription_Uncovered/Open,GarageDescription_Underground Garage,GarageDescription_Units Vary,GarageDescription_Valet Parking for Fee,PoolDescription_Above Ground,PoolDescription_Below Ground,PoolDescription_Heated,PoolDescription_Indoor,PoolDescription_None,PoolDescription_Outdoor,PoolDescription_Shared,ROOF_Age 8 Years or Less,ROOF_Age Over 8 Years,ROOF_Asphalt Shingles,ROOF_Flat,ROOF_Metal,ROOF_Other,ROOF_Pitched,ROOF_Rubber,ROOF_Shakes,ROOF_Slate,ROOF_Tar/Gravel,ROOF_Tile,ROOF_Unspecified Shingle,ROOF_Wood Shingles,ZONING_Business/Commercial,ZONING_Industrial,ZONING_Other,ZONING_Residential-Multi-Family,ZONING_Residential-Single,ASSESSMENTPENDING_vec,AssumableMortgage_vec,City_vec,PotentialShortSale_vec,SchoolDistrictNumber_vec,LISTTYPE_vec,STYLE_vec,features
256232,250000,250000,121.322,1066,17,1,2112,2956,2956.0,2112,1913,0.161,1,1,0,1,3,4,0,0,0.0,1,244,9,35,2017,23,22,-1,172000,3.82,3.86,3.89,3.9,3.93,0.0,1.3333333333333333,0.011824,2112,118.37121212121212,1.4534883720930232,-1,-1,-1,1,-1,1,-1,-1,-1,-1,-1,1,-1,-1,1,1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,1,1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,"(2,[0],[1.0])","(4,[1],[1.0])","(4,[0],[1.0])","(1,[0],[1.0])","(7,[0],[1.0])","(4,[0],[1.0])","(17,[0],[1.0])","[250000.0,250000...."
500000,500000,500000,200.1601,1055,3,1,1698,8843,9450.0,2498,1938,0.24,1,1,0,1,3,4,800,0,607.0,28,209,7,30,2017,65,63,-1,172000,3.92,3.96,4.03,3.96,3.88,0.001214,1.3333333333333333,0.017686,2498,200.160128102482,2.9069767441860463,-1,-1,1,1,1,1,-1,1,-1,-1,-1,1,-1,-1,1,1,-1,-1,-1,1,-1,-1,-1,-1,1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,1,"(2,[1],[1.0])","(4,[1],[1.0])","(4,[0],[1.0])","(1,[0],[1.0])","(7,[0],[1.0])","(4,[0],[1.0])","(17,[1],[1.0])","[500000.0,500000...."


### Training a Random Forest

In [61]:
# Initialize model with columns to utilize
model_rfr = RandomForestRegressor(featuresCol="features", labelCol=target_col, predictionCol=predict_col,
                              seed=42)

# Train model
model_rfr = model_rfr.fit(train_df)

### Predicting with a Model

In [62]:
# Make predictions
predictions_rfr = model_rfr.transform(test_df)

# Inspect results
predictions_rfr.select(predict_col, target_col).show(5)

+------------------+---------------+
|         PredPrice|SalesClosePrice|
+------------------+---------------+
| 236471.6803406473|         224879|
|  175012.201655954|         175000|
|146472.71081608784|         138500|
| 374836.1408278668|         352450|
|272376.59048311797|         270000|
+------------------+---------------+
only showing top 5 rows



### Evaluating a Model

In [63]:
# Select columns to compute test error
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol=predict_col)

# Create evaluation metrics
rmse = evaluator.evaluate(predictions_rfr, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions_rfr, {evaluator.metricName: "r2"})

# Print Model Metrics
print('RMSE: ' + str(rmse))
print('R^2: ' + str(r2))

RMSE: 19947.50830910839
R^2: 0.9746941367142943


## Ex. 5 - Building a Regression Model

One of the great things about` PySpar`k` M`L module is that most algorithms can be tried and tested without changing much code. Random Forest Regression is a fairly simple ensemble model, using bagging to fit. Another tree based ensemble model is Gradient Boosted Trees which uses a different approach called boosting to fit. In this exercise let's train a` GBTRegresso`r.**

Instructi:**
01.  XP
Im`port GBTRegr`essor `from pyspark.ml.regre`ssion which you will notice is the same modul`e as RandomForestRegr`es2. sor.
Instan`tiate GBTReg`ressor` with featu`resCol set to the vector column of our features n`amed, fe`at`ures, la`belCol set to our dependent vari`able, SALESCLOS`EPRICE and the random se`ed` 3. to 42
Train the model by c`allin`g fit() on gbt with the imported training` data, t`rain_df.

In [64]:
# Initialize model with columns to utilize
model_gbt = GBTRegressor(featuresCol="features", labelCol=target_col, predictionCol=predict_col,
                     seed=42)

# Train model
model_gbt = model_gbt.fit(train_df)

## Ex. 6 - Evaluating & Comparing Algorithms

Now that we've created a new model with `GBTRegressor` its time to compare it against our baseline of `RandomForestRegressor`. To do this we will compare the `predictions` of both models to the actual data and calculate `RMSE` and `R^2`.

**Instructions:**

1. Import `RegressionEvaluator` from `pyspark.ml.evaluation` so it is available for use later.
2. Initialize `RegressionEvaluator` by setting `labelCol` to our actual data, `SALESCLOSEPRICE` and `predictionCol` to our predicted data, `Prediction_Price`
3. To calculate our metrics, call evaluate on evaluator with the prediction values `preds` and create a dictionary with key `evaluator.metricName` and value of `rmse`, do the same for the `r2` metric.

In [65]:
# Complete the preditions for model_gbt
predictions_gbt = model_gbt.transform(test_df)

# Select columns to compute test error
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol=predict_col)

# Dictionary of model predictions to loop over
models = {'Gradient Boosted Trees': predictions_gbt, 
          'Random Forest Regression': predictions_rfr}

for key, preds in models.items():
  # Create evaluation metrics
  rmse = evaluator.evaluate(preds, {evaluator.metricName: 'rmse'})
  r2 = evaluator.evaluate(preds, {evaluator.metricName: 'r2'})
  
  # Print Model Metrics
  print(key + ' RMSE: ' + str(rmse))
  print(key + ' R^2: ' + str(r2))

Gradient Boosted Trees RMSE: 16062.082720514649
Gradient Boosted Trees R^2: 0.9835923053714396
Random Forest Regression RMSE: 19947.50830910839
Random Forest Regression R^2: 0.9746941367142943


## Interpreting, Saving & Loading

### Interpreting a Model

In [66]:
# Convert feature importances to a pandas column
rfr_df = pd.DataFrame(model_rfr.featureImportances.toArray(),
                     columns = ['importance'])

# Convert list of feature names to pandas column
feature_cols = train_df.drop(target_col).columns
rfr_df['feature'] = pd.Series(feature_cols)

# Sort the data based on feature importance
rfr_df.sort_values(by=['importance'], ascending=False, inplace=True)

# Interpret results
rfr_df.head(9)

Unnamed: 0,importance,feature
0,0.398508,LISTPRICE
1,0.353775,OriginalListPrice
40,0.056558,LISTING_TO_MEDIAN_RATIO
38,0.048381,SQFT_TOTAL
6,0.025101,SQFTABOVEGROUND
9,0.017145,LivingArea
7,0.014175,Taxes
16,0.012765,BATHSTOTAL
8,0.012555,TAXWITHASSESSMENTS


### Saving & Loading Models

In [67]:
# Save model
model_rfr.write().overwrite().save('model/rfr_real_estate_model')

# Load model from
loaded_model = RandomForestRegressionModel.load('model/rfr_real_estate_model')
loaded_model

RandomForestRegressionModel: uid=RandomForestRegressor_6924256a3cef, numTrees=20, numFeatures=236

## Ex. 7 - Interpreting Results

It is almost always important to know which features are influencing your prediction the most. Perhaps its counterintuitive and that's an insight? Perhaps a hand full of features account for most of the accuracy of your model and you don't need to perform time acquiring or massaging other features.

In this example we will be looking at a model that has been trained without any LISTPRICE information. With that gone, what influences the price the most?

**Instructions:**

1. Create a pandas dataframe using the values of importances and name the column importance by setting the parameter columns.
2. Using the imported list of features names, `feature_cols`, create a new `pandas.Series` by wrapping it in the `pd.Series()` function. Set it to the column `fi_df['feature']`.
3. Sort the dataframe using `sort_values()`, setting the by parameter to our importance column and sort it descending by setting `ascending` to `False`. Inspect the results.

In [68]:
# Getting the required information
importances = model_gbt.featureImportances.toArray()
feature_cols = train_df.drop(target_col).columns

# Convert feature importances to a pandas column
fi_df = pd.DataFrame(importances, columns=['importance'])

# Convert list of feature names to pandas column
fi_df['feature'] = pd.Series(feature_cols)

# Sort the data based on feature importance
fi_df.sort_values(by=['importance'], ascending=False, inplace=True)

# Inspect Results
fi_df.head(9)

Unnamed: 0,importance,feature
0,0.736077,LISTPRICE
40,0.026718,LISTING_TO_MEDIAN_RATIO
2,0.02546,PricePerTSFT
6,0.025331,SQFTABOVEGROUND
4,0.020757,DAYSONMARKET
21,0.019973,LIST_DAYOFMONTH
11,0.016557,ACRES
9,0.016406,LivingArea
16,0.013993,BATHSTOTAL


## Ex. 8 - Saving & Loading Models

Often times you may find yourself going back to a previous model to see what assumptions or settings were used when diagnosing where your prediction errors were coming from. Perhaps there was something wrong with the data? Maybe you need to incorporate a new feature to capture an unusual event that occurred?

In this example, you will practice saving and loading a model.

**Instructions:**

1. Import `RandomForestRegressionModel` from `pyspark.ml.regression`.
2. Using the model in memory called model call the `save()` method on it and name the model `rfr_no_listprice`.
3. Reload the saved model file `rfr_no_listprice` by calling `load()` on `RandomForestRegressionModel` and storing it into `loaded_model`.

In [69]:
# Save model
model_gbt.write().overwrite().save('model/gbt_real_estate_model')

# Load model from
loaded_model = GBTRegressionModel.load('model/gbt_real_estate_model')
loaded_model

GBTRegressionModel: uid=GBTRegressor_98d23640d2a8, numTrees=20, numFeatures=236

## Close session

In [70]:
spark.stop()