In [8]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as SparkFunctions
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
from pyspark.sql import Window
import sys
spark = SparkSession.builder.appName('housing').getOrCreate()
import pandas
import matplotlib.pyplot as plot

income_data = spark.read.format("csv").options(header="true").load("datasets/income_data.csv")
income_data.show()
income_data = income_data.withColumn("income_year", income_data["income_year"].cast(IntegerType()))
income_data = income_data.withColumn("income_regions", income_data["income_regions"].cast(StringType()))
income_data = income_data.withColumn("income_age_group", income_data["income_age_group"].cast(StringType()))
income_data = income_data.withColumn("income_value", income_data["income_value"].cast(FloatType()))
income_data.printSchema()
income_data.columns
income_data.describe().show()
income_data.select("income_year").distinct().show()
income_data.select("income_regions").distinct().show()
income_data.select("income_age_group").distinct().show()

expenditure_data = spark.read.format("csv").options(header="true").load("datasets/expenditure_data.csv")
expenditure_data.show()
expenditure_data = expenditure_data.withColumn("expenditure_year", expenditure_data["expenditure_year"].cast(IntegerType()))
expenditure_data = expenditure_data.withColumn("expenditure_regions", expenditure_data["expenditure_regions"].cast(StringType()))
expenditure_data = expenditure_data.withColumn("expenditure_category", expenditure_data["expenditure_category"].cast(StringType()))
expenditure_data = expenditure_data.withColumn("expenditure_value", expenditure_data["expenditure_value"].cast(FloatType()))
expenditure_data.printSchema()
expenditure_data.columns
expenditure_data.describe().show()
expenditure_data.select("expenditure_year").distinct().show()
expenditure_data.select("expenditure_regions").distinct().show()
expenditure_data.select("expenditure_category").distinct().show()

sales_data = spark.read.format("csv").options(header="true").load("datasets/sales_data.csv")
sales_data.show()
sales_data = sales_data.withColumn("sale_year", sales_data["sale_year"].cast(IntegerType()))
sales_data = sales_data.withColumn("sale_region", sales_data["sale_region"].cast(StringType()))
sales_data = sales_data.withColumn("gross_sale_price", sales_data["gross_sale_price"].cast(FloatType()))
sales_data.printSchema()
sales_data.columns
sales_data.describe().show()

cpi_data = spark.read.format("csv").options(header="true").load("datasets/cpi_data.csv")
cpi_data.show()
cpi_data = cpi_data.withColumn("cpi_year", cpi_data["cpi_year"].cast(IntegerType()))
cpi_data = cpi_data.withColumn("cpi_quarter_01", cpi_data["cpi_quarter_01"].cast(FloatType()))
cpi_data = cpi_data.withColumn("cpi_quarter_02", cpi_data["cpi_quarter_02"].cast(FloatType()))
cpi_data = cpi_data.withColumn("cpi_quarter_03", cpi_data["cpi_quarter_03"].cast(FloatType()))
cpi_data = cpi_data.withColumn("cpi_quarter_04", cpi_data["cpi_quarter_04"].cast(FloatType()))
cpi_data.printSchema()
cpi_data.columns
cpi_data.describe().show()
cpi_data.select("cpi_series_id").distinct().show()
cpi_data.select("cpi_year").distinct().show()

hpi_data = spark.read.format("csv").options(header="true").load("datasets/hpi_data.csv")
#hpi_data.show()
hpi_data = hpi_data.withColumn("hpi_year", hpi_data["hpi_year"].cast(IntegerType()))
hpi_data = hpi_data.withColumn("hpi_quarter_01", hpi_data["hpi_quarter_01"].cast(FloatType()))
hpi_data = hpi_data.withColumn("hpi_quarter_02", hpi_data["hpi_quarter_02"].cast(FloatType()))
hpi_data = hpi_data.withColumn("hpi_quarter_03", hpi_data["hpi_quarter_03"].cast(FloatType()))
hpi_data = hpi_data.withColumn("hpi_quarter_04", hpi_data["hpi_quarter_04"].cast(FloatType()))
hpi_data.printSchema()
hpi_data.columns
hpi_data.describe().show()
hpi_data.select("hpi_series_id").distinct().show()
hpi_data.select("hpi_year").distinct().show()

def spark_dataframe_shape(dataFrame):
    return (dataFrame.count(), len(dataFrame.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_dataframe_shape
print(income_data.shape())
print(expenditure_data.shape())
print(sales_data.shape())
print(cpi_data.shape())
print(hpi_data.shape())

# INCOME DATA PREPARATION
income_data = income_data.filter("income_age_group == '20 to 24' OR income_age_group == '25 to 29' OR income_age_group == '30 to 34'" )
income_data.select("income_age_group").distinct().show()
income_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in income_data.columns]).show()
income_data = income_data.filter(income_data.income_value.isNotNull())
income_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in income_data.columns]).show()
income_data.show()

# Assign and add age group code  
income_data = income_data.withColumn("age_group_code", 
                             SparkFunctions.when((SparkFunctions.col("income_age_group") == "20 to 24"), 1)
                                                         .when((SparkFunctions.col("income_age_group") == "25 to 29"), 2)
                                                         .otherwise(3))
income_data.select("age_group_code").distinct().show()
# Assign and add region code  
income_data = income_data.withColumn("income_region_code", 
                             SparkFunctions.when((SparkFunctions.col("income_regions") == "Northland Region"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Auckland Region"), "r001")
                                                         .when((SparkFunctions.col("income_regions") == "Waikato Region"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Bay of Plenty Region"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Gisborne/Hawkes Bay Regions"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Taranaki Region"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Manawatu-Wanganui Region"), "r002")
                                                         .when((SparkFunctions.col("income_regions") == "Wellington Region"), "r003")
                                                         .when((SparkFunctions.col("income_regions") == "Nelson/Tasman/Marlborough/West Coast Regions"), "r005")
                                                         .when((SparkFunctions.col("income_regions") == "Canterbury Region"), "r004")
                                                         .when((SparkFunctions.col("income_regions") == "Otago Region"), "r005")
                                                         .when((SparkFunctions.col("income_regions") == "Southland Region"), "r005")
                                                         .otherwise("r000"))
income_data.select("income_region_code").distinct().show()
income_data = income_data.drop("income_regions")
income_data.show(5)
income_data =  income_data.groupBy("income_region_code", "age_group_code", "income_year").mean("income_value")
income_data.show(5)
# EXPENDITURE DATA PREPARATION
expenditure_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in expenditure_data.columns]).show()
expenditure_data = expenditure_data.filter("expenditure_category == 'Total'" )
expenditure_data.select("expenditure_category").distinct().show()
expenditure_data = expenditure_data.withColumn("expenditure_per_person", SparkFunctions.col("expenditure_value") / 3)
expenditure_data.show(5)
expenditure_data = expenditure_data.drop("expenditure_category", "expenditure_value")
expenditure_data.show()
# Assign and add region code  
expenditure_data = expenditure_data.withColumn("expenditure_region_code", 
                             SparkFunctions.when((SparkFunctions.col("expenditure_regions") == "Auckland"), "r001")
                                                         .when((SparkFunctions.col("expenditure_regions") == "Rest of North Island"), "r002")
                                                         .when((SparkFunctions.col("expenditure_regions") == "Wellington"), "r003")
                                                         .when((SparkFunctions.col("expenditure_regions") == "Canterbury"), "r004")
                                                         .when((SparkFunctions.col("expenditure_regions") == "Rest of South Island"), "r005")
                                                         .otherwise("r000"))
expenditure_data.select("expenditure_region_code").distinct().show()
income_data.show()
income_data = income_data.withColumnRenamed("income_region_code", "income_region_code")
income_data = income_data.withColumnRenamed("age_group_code", "age_group_code")
income_data = income_data.withColumnRenamed("income_year", "income_year")
income_data = income_data.withColumnRenamed("avg(income_value)", "average_weekly_income")
income_data.printSchema()


income_data_outliers = income_data.select("average_weekly_income")

bounds = {
    c: dict(
        zip(["q1", "q3"], income_data_outliers.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in income_data_outliers.columns
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)



expenditure_data = expenditure_data.withColumnRenamed("expenditure_regions", "expenditure_regions")
expenditure_data = expenditure_data.withColumnRenamed("expenditure_region_code", "expenditure_region_code")
expenditure_data = expenditure_data.withColumnRenamed("expenditure_per_person", "expenditure_per_person")
expenditure_data = expenditure_data.withColumnRenamed("expenditure_year", "expenditure_year")
expenditure_data.printSchema()

expenditure_data_outliers = expenditure_data.select("expenditure_per_person")

bounds = {
    c: dict(
        zip(["q1", "q3"], expenditure_data_outliers.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in expenditure_data_outliers.columns
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

# MERGE INCOME + EXPENDITURE
merge_income_expenditure = income_data.join(expenditure_data, (income_data.income_region_code == expenditure_data.expenditure_region_code) & (income_data.income_year == expenditure_data.expenditure_year))
merge_income_expenditure.show(5)
merge_income_expenditure = merge_income_expenditure.drop("expenditure_region_code", "income_year")
merge_income_expenditure.show(5)
# SALES DATA PREPARATION
sales_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in sales_data.columns]).show()
sales_data = sales_data.drop("sales_qv_id", 
                                                  "sale_id", 
                                                  "sales_city", 
                                                  "sale_region_id", 
                                                  "sale_date", 
                                                  "net_sale_price", 
                                                  "capital_value", 
                                                  "land_value", 
                                                  "improvement_value", 
                                                  "house_age")
sales_data.show(5)
# Assign and add region code  
sales_data = sales_data.withColumn("sales_region_code", 
                             SparkFunctions.when((SparkFunctions.col("sale_region") == "Auckland (Unitary)"), "r001")
                                                         .when((SparkFunctions.col("sale_region") == "Bay of Plenty Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Gisborne (Unitary)"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Hawkes Bay Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Manawatu-Whanganui Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Northland Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Taranaki Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Waikato Region"), "r002")
                                                         .when((SparkFunctions.col("sale_region") == "Wellington Region"), "r003")
                                                         .when((SparkFunctions.col("sale_region") == "Canterbury Region"), "r004")
                                                         .when((SparkFunctions.col("sale_region") == "Tasman Nelson Marlborough"), "r005")
                                                         .when((SparkFunctions.col("sale_region") == "West Coast Region"), "r005")
                                                         .otherwise("r000"))
sales_data.select("sales_region_code").distinct().show()
sales_data = sales_data.drop("sale_region")
sales_data =  sales_data.groupBy("sale_year", "sales_region_code").mean("gross_sale_price")
sales_data.show(5)
sales_data = sales_data.withColumnRenamed("sale_year", "sale_year")
sales_data = sales_data.withColumnRenamed("sales_region_code", "sales_region_code")
sales_data = sales_data.withColumnRenamed("avg(gross_sale_price)", "average_sale_price")
sales_data.printSchema()

sales_data_outliers = sales_data.select("average_sale_price")

bounds = {
    c: dict(
        zip(["q1", "q3"], sales_data_outliers.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in sales_data_outliers.columns
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)


# MERGE INCOME + EXPENDITURE + SALES DATA
merge_income_expenditure_sales = merge_income_expenditure.join(sales_data, (merge_income_expenditure.income_region_code == sales_data.sales_region_code) & (merge_income_expenditure.expenditure_year == sales_data.sale_year))
merge_income_expenditure_sales.show(5)
# HOUSE PRICE INDEX DATA PREPARATION
hpi_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in hpi_data.columns]).show()
hpi_data.show(30)
hpi_data = hpi_data.drop("hpi_series_id")
hpi_data.printSchema()
hpi_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in hpi_data.columns]).show()
hpi_data = hpi_data.na.fill(value=2740.3,subset=["hpi_quarter_04"])
hpi_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in hpi_data.columns]).show()
hpi_data = hpi_data.withColumn("annual_hpi", ((SparkFunctions.col("hpi_quarter_01")) + (SparkFunctions.col("hpi_quarter_02")) + (SparkFunctions.col("hpi_quarter_03")) + (SparkFunctions.col("hpi_quarter_04"))) / 4)
hpi_data.show(30)
hpi_data = hpi_data.drop("hpi_series_id", "hpi_quarter_01", "hpi_quarter_02", "hpi_quarter_03", "hpi_quarter_04")
hpi_data.show(30)


hpi_data_outliers = hpi_data.select("annual_hpi")

bounds = {
    c: dict(
        zip(["q1", "q3"], hpi_data_outliers.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in hpi_data_outliers.columns
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

# CONSUMER PRICE INDEX DATA PREPARATION
cpi_data.select([SparkFunctions.count(SparkFunctions.when(SparkFunctions.col(c).isNull(), c)).alias(c) for c in cpi_data.columns]).show()
cpi_data.show(30)
cpi_data = cpi_data.drop("cpi_series_id")
cpi_data.printSchema()
cpi_data = cpi_data.withColumn("annual_cpi", ((SparkFunctions.col("cpi_quarter_01")) + (SparkFunctions.col("cpi_quarter_02")) + (SparkFunctions.col("cpi_quarter_03")) + (SparkFunctions.col("cpi_quarter_04"))) / 4)
cpi_data.show(5)
cpi_data = cpi_data.drop("cpi_series_id", "cpi_quarter_01", "cpi_quarter_02", "cpi_quarter_03", "cpi_quarter_04")
cpi_data.show(30)


cpi_data_outliers = cpi_data.select("annual_cpi")

bounds = {
    c: dict(
        zip(["q1", "q3"], cpi_data_outliers.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in cpi_data_outliers.columns
}
for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)


# MERGE HPI + CPI
merge_hpi_cpi = hpi_data.join(cpi_data, (hpi_data.hpi_year == cpi_data.cpi_year))
merge_hpi_cpi.show(30)
# MERGE ALL DATA
cleaned_data = merge_income_expenditure_sales.join(merge_hpi_cpi, (merge_income_expenditure_sales.expenditure_year == merge_hpi_cpi.hpi_year))
cleaned_data.show(5)
cleaned_data = cleaned_data.drop("hpi_year", "cpi_year", "sale_year", "sales_region_code")
cleaned_data.show(5)
cleaned_data = cleaned_data.withColumn("annual_income", SparkFunctions.col("average_weekly_income") * 52)                                     
cleaned_data = cleaned_data.withColumn("annual_expenditure", SparkFunctions.col("expenditure_per_person") * 52)
cleaned_data.show(5)
cleaned_data = cleaned_data.withColumn("annual_savings", ((SparkFunctions.col("annual_income")) - (SparkFunctions.col("annual_expenditure"))))
cleaned_data.show(5)
cleaned_data = cleaned_data.withColumn("downpayment_capacity", SparkFunctions.col("annual_savings") * 3)
cleaned_data.show(5)
cleaned_data = cleaned_data.withColumn("loan_to_value_ratio", SparkFunctions.col("average_sale_price") / 10)
cleaned_data.show(5)
cleaned_data = cleaned_data.withColumn("remainder", (SparkFunctions.col("downpayment_capacity") / SparkFunctions.col("loan_to_value_ratio")))
cleaned_data = cleaned_data.withColumn("percent", (SparkFunctions.col("remainder") * 100))
cleaned_data.select("percent").show(5)
cleaned_data.printSchema()
cleaned_data = cleaned_data.withColumn("affordability", 
                             SparkFunctions.when((SparkFunctions.col("percent") <= 20), 1)
                                                         .when(((SparkFunctions.col("percent") > 20) & (SparkFunctions.col("percent") <= 40)), 2)
                                                         .when(((SparkFunctions.col("percent") > 40) & (SparkFunctions.col("percent") <= 60)), 3)
                                                         .when(((SparkFunctions.col("percent") > 60) & (SparkFunctions.col("percent") <= 80)), 4)
                                                         .when(((SparkFunctions.col("percent") > 80) & (SparkFunctions.col("percent") <= 100)), 5)
                                                         .when((SparkFunctions.col("percent") > 100), 6)
                                                         .otherwise("0"))
cleaned_data.select("percent", "affordability").show(5)
cleaned_data = cleaned_data.drop( "annual_income", "annual_expenditure")
cleaned_data = cleaned_data.select("expenditure_year", 
                                                            "income_region_code",
                                                            "expenditure_regions",
                                                            "age_group_code",
                                                            "average_weekly_income",
                                                            "expenditure_per_person",
                                                            "average_sale_price",
                                                            "annual_hpi",
                                                            "annual_cpi",
                                                            "annual_savings",
                                                            "downpayment_capacity",
                                                            "loan_to_value_ratio",
                                                            "affordability")
cleaned_data.show(5)


cleaned_data = cleaned_data.withColumnRenamed("expenditure_year", "year")
cleaned_data = cleaned_data.withColumnRenamed("income_region_code", "region_code")
cleaned_data = cleaned_data.withColumnRenamed("expenditure_regions", "regions")
cleaned_data = cleaned_data.withColumnRenamed("age_group_code", "age_group_code")
cleaned_data = cleaned_data.withColumnRenamed("average_weekly_income", "average_weekly_income")
cleaned_data = cleaned_data.withColumnRenamed("expenditure_per_person", "expenditure_per_person")
cleaned_data = cleaned_data.withColumnRenamed("average_sale_price", "average_sale_price")
cleaned_data = cleaned_data.withColumnRenamed("annual_hpi", "hpi")
cleaned_data = cleaned_data.withColumnRenamed("annual_cpi", "cpi")
cleaned_data = cleaned_data.withColumnRenamed("annual_savings", "annual_savings")
cleaned_data = cleaned_data.withColumnRenamed("downpayment_capacity", "downpayment_capacity")
cleaned_data = cleaned_data.withColumnRenamed("loan_to_value_ratio", "loan_to_value_ratio")
cleaned_data = cleaned_data.withColumnRenamed("affordability", "affordability")
cleaned_data.show(5)
cleaned_data.printSchema()


# EXPORT CSV
cleaned_data.toPandas().to_csv('cleaned_data.csv')

# Refer to 70-30_split_data_mining_models file for model comparisons

+--------------+-----------+----------------+------------+
|income_regions|income_year|income_age_group|income_value|
+--------------+-----------+----------------+------------+
|             0|          0|               0|           1|
+--------------+-----------+----------------+------------+

+--------------+-----------+----------------+------------+
|income_regions|income_year|income_age_group|income_value|
+--------------+-----------+----------------+------------+
|             0|          0|               0|           0|
+--------------+-----------+----------------+------------+

+--------------+
|age_group_code|
+--------------+
|             1|
|             3|
|             2|
+--------------+

+------------------+
|income_region_code|
+------------------+
|              r002|
|              r001|
|              r003|
|              r004|
|              r005|
+------------------+

+-----------+----------------+------------+--------------+------------------+
|income_year|income_

+-----------------+
|sales_region_code|
+-----------------+
|             r002|
|             r001|
|             r003|
|             r004|
|             r005|
+-----------------+

+---------+-----------------+---------------------+
|sale_year|sales_region_code|avg(gross_sale_price)|
+---------+-----------------+---------------------+
|     2010|             r005|   345598.64387917327|
|     2010|             r004|   329218.27931034483|
|     2014|             r002|    314156.8007335681|
|     2016|             r002|   364626.93061757897|
|     2007|             r004|    331251.7724867725|
+---------+-----------------+---------------------+
only showing top 5 rows

root
 |-- sale_year: integer (nullable = true)
 |-- sales_region_code: string (nullable = false)
 |-- average_sale_price: double (nullable = true)

{'average_sale_price': {'upper': 850819.0894790316, 'q3': 539228.9590792839, 'q1': 331502.20547945204, 'lower': 19912.075079704286}}
+-------------+--------+--------------+------

+--------+----------+
|cpi_year|annual_cpi|
+--------+----------+
|    1988|    522.75|
|    1989|     552.5|
|    1990|     586.0|
|    1991|     601.5|
|    1992|     607.5|
|    1993|     615.5|
|    1994|     626.0|
|    1995|     649.5|
|    1996|     664.5|
|    1997|    672.25|
|    1998|    680.75|
|    1999|     680.0|
|    2000|    697.75|
|    2001|    716.25|
|    2002|    735.25|
|    2003|     748.0|
|    2004|    765.25|
|    2005|     788.5|
|    2006|     815.0|
|    2007|     834.5|
|    2008|    867.25|
|    2009|     886.0|
|    2010|     906.0|
|    2011|     943.0|
|    2012|    952.75|
|    2013|     963.5|
|    2014|    975.25|
|    2015|    978.25|
|    2016|     984.5|
|    2017|   1002.75|
+--------+----------+
only showing top 30 rows

{'annual_cpi': {'upper': 1400.375, 'q3': 963.5, 'q1': 672.25, 'lower': 235.375}}
