In [62]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 14.2 kB/110 kB 13%] [Connected t                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 229 kB in

In [63]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek, udf
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType, StringType

#importing data from csv

from google.colab import drive
drive.mount('/content/drive')

data = spark.read.csv('/content/drive/My Drive/Colab Notebooks/Real_Estate_Sales_2001-2020_GL.csv', header=True, inferSchema=True, sep=",")

print('shape data {}, {}'.format(data.count(), len(data.columns)))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
shape data 997213, 11


In [64]:
data.show(5)

+-------------+---------+-------------+-------+--------------------+--------------+-----------+-----------+-------------+----------------+----------------+
|Serial Number|List Year|Date Recorded|   Town|             Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Years until sold|
+-------------+---------+-------------+-------+--------------------+--------------+-----------+-----------+-------------+----------------+----------------+
|      2020348|     2020|    9/13/2021|Ansonia|     230 WAKELEE AVE|        150500|   325000.0|      0.463|   Commercial|             Nan|               1|
|        20002|     2020|    10/2/2020|Ashford|     390 TURNPIKE RD|        253000|   430000.0|     0.5883|  Residential|   Single Family|               0|
|       200212|     2020|     3/9/2021|   Avon|    5 CHESTNUT DRIVE|        130400|   179900.0|     0.7248|  Residential|           Condo|               1|
|       200243|     2020|    4/13/2021|   Avon|111 NORTHINGTON D

In [65]:
for col, type_ in data.dtypes:
  print('type of {} is {}'.format(col, type_))

type of Serial Number is int
type of List Year is int
type of Date Recorded is string
type of Town is string
type of Address is string
type of Assessed Value is int
type of Sale Amount is double
type of Sales Ratio is double
type of Property Type is string
type of Residential Type is string
type of Years until sold is int


In [66]:
data = data.withColumn("Date Recorded", to_date(data["Date Recorded"], "M/d/yyyy"))

In [67]:
data.printSchema()

root
 |-- Serial Number: integer (nullable = true)
 |-- List Year: integer (nullable = true)
 |-- Date Recorded: date (nullable = true)
 |-- Town: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Assessed Value: integer (nullable = true)
 |-- Sale Amount: double (nullable = true)
 |-- Sales Ratio: double (nullable = true)
 |-- Property Type: string (nullable = true)
 |-- Residential Type: string (nullable = true)
 |-- Years until sold: integer (nullable = true)



In [68]:
data = data.withColumn("Month", month("Date Recorded"))

data.show(5)

+-------------+---------+-------------+-------+--------------------+--------------+-----------+-----------+-------------+----------------+----------------+-----+
|Serial Number|List Year|Date Recorded|   Town|             Address|Assessed Value|Sale Amount|Sales Ratio|Property Type|Residential Type|Years until sold|Month|
+-------------+---------+-------------+-------+--------------------+--------------+-----------+-----------+-------------+----------------+----------------+-----+
|      2020348|     2020|   2021-09-13|Ansonia|     230 WAKELEE AVE|        150500|   325000.0|      0.463|   Commercial|             Nan|               1|    9|
|        20002|     2020|   2020-10-02|Ashford|     390 TURNPIKE RD|        253000|   430000.0|     0.5883|  Residential|   Single Family|               0|   10|
|       200212|     2020|   2021-03-09|   Avon|    5 CHESTNUT DRIVE|        130400|   179900.0|     0.7248|  Residential|           Condo|               1|    3|
|       200243|     2020|   

In [69]:
len_ = data.count()
for i in data.columns:
    print(i)
    # Updated line: Use the column name in a string format within the filter function
    print("not nulls are {}".format(data.filter(data[i].isNotNull()).count()/len_))  # Also fixed the syntax for calling count() method
    data.describe(i).show()

Serial Number
not nulls are 1.0
+-------+-----------------+
|summary|    Serial Number|
+-------+-----------------+
|  count|           997213|
|   mean|431186.4096025623|
| stddev|6549219.174111525|
|    min|                0|
|    max|       2000500023|
+-------+-----------------+

List Year
not nulls are 1.0
+-------+------------------+
|summary|         List Year|
+-------+------------------+
|  count|            997213|
|   mean|2010.1898290535723|
| stddev| 6.237876523035602|
|    min|              2001|
|    max|              2020|
+-------+------------------+

Date Recorded
not nulls are 1.0
+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+

Town
not nulls are 1.0
+-------+-------------+
|summary|         Town|
+-------+-------------+
|  count|       997213|
|   mean|         null|
| stddev|         null|
|    min|***Unknown***|
|    max|    Woodstock|
+-------+-------------+

Address
not nulls are 1.0
+-------+--------------------+
|summ

In [70]:
data = data.dropna()

data = data.withColumnRenamed('Years until sold', 'Years of sales', )

In [71]:
# selecting features and agreagting data on a feature level

features = ['List Year', 'Town', 'Property Type', 'Residential Type', 'Assessed Value', 'Sale Amount', 'Sales Ratio', 'Years of sales', 'Month']

data = data.groupby('List Year', 'Month', 'Town', 'Property Type', 'Residential Type').agg({'Assessed Value': 'sum', 'Sale Amount' : 'sum', 'Sales Ratio' : 'avg', 'Years of sales' : 'avg'})

data.show(5)

+---------+-----+-------------+-------------+----------------+----------------+-------------------+-------------------+------------------+
|List Year|Month|         Town|Property Type|Residential Type|sum(Sale Amount)|avg(Years of sales)|sum(Assessed Value)|  avg(Sales Ratio)|
+---------+-----+-------------+-------------+----------------+----------------+-------------------+-------------------+------------------+
|     2020|   11|    Southbury|  Vacant Land|             Nan|        470000.0|                0.0|             305120|            0.6619|
|     2020|   10|    Westbrook|  Residential|   Single Family|      1.493865E7|                0.0|            8207570|0.5888689655172414|
|     2020|    4|      Ansonia|  Residential|   Single Family|       3587900.0|                1.0|            2082500|0.6544944444444444|
|     2020|    1|East Hartford|  Residential|   Single Family|       9998420.0|                1.0|            5646000|0.5784075434117647|
|     2020|    7|      Enfi

In [72]:
data = data.withColumnRenamed('sum(Sale Amount)', 'Sales Amount').withColumnRenamed('avg(Years of sales)', 'Years of Sales').withColumnRenamed('sum(Assessed Value)', 'Assesed Value').withColumnRenamed('avg(Sales Ratio)', 'Sales Ratio')

In [73]:
data.show()

+---------+-----+-------------+-------------+----------------+------------+--------------+-------------+-------------------+
|List Year|Month|         Town|Property Type|Residential Type|Sales Amount|Years of Sales|Assesed Value|        Sales Ratio|
+---------+-----+-------------+-------------+----------------+------------+--------------+-------------+-------------------+
|     2020|   11|    Southbury|  Vacant Land|             Nan|    470000.0|           0.0|       305120|             0.6619|
|     2020|   10|    Westbrook|  Residential|   Single Family|  1.493865E7|           0.0|      8207570| 0.5888689655172414|
|     2020|    4|      Ansonia|  Residential|   Single Family|   3587900.0|           1.0|      2082500| 0.6544944444444444|
|     2020|    1|East Hartford|  Residential|   Single Family|   9998420.0|           1.0|      5646000| 0.5784075434117647|
|     2020|    7|      Enfield|  Vacant Land|             Nan|    950000.0|           1.0|         9140|        0.005077778|


In [74]:
def type_combiner(residential_type, property_type):
    if residential_type == 'Nan':
        return property_type
    else:
        return residential_type

type_combiner_udf = udf(type_combiner, StringType())

data = data.withColumn("Type combined", type_combiner_udf(data["Residential Type"], data["Property Type"]))

In [75]:
data.show(10)

+---------+-----+-------------+-------------+----------------+------------+--------------+-------------+-------------------+-------------+
|List Year|Month|         Town|Property Type|Residential Type|Sales Amount|Years of Sales|Assesed Value|        Sales Ratio|Type combined|
+---------+-----+-------------+-------------+----------------+------------+--------------+-------------+-------------------+-------------+
|     2020|   11|    Southbury|  Vacant Land|             Nan|    470000.0|           0.0|       305120|             0.6619|  Vacant Land|
|     2020|   10|    Westbrook|  Residential|   Single Family|  1.493865E7|           0.0|      8207570| 0.5888689655172414|Single Family|
|     2020|    4|      Ansonia|  Residential|   Single Family|   3587900.0|           1.0|      2082500| 0.6544944444444444|Single Family|
|     2020|    1|East Hartford|  Residential|   Single Family|   9998420.0|           1.0|      5646000| 0.5784075434117647|Single Family|
|     2020|    7|      Enfi

In [76]:
len_1 = data.count()

data = data.filter(data['Type combined'] != 'Nan')

len_2 = data.count()

print(len_2/len_1)

0.6904499448842569


In [77]:
data.select('Type combined').distinct().show()

+--------------+
| Type combined|
+--------------+
|   Four Family|
|    Apartments|
|   Vacant Land|
|    Industrial|
|    Two Family|
| Single Family|
|         Condo|
|Public Utility|
|    Commercial|
|  Three Family|
+--------------+



In [78]:
data.select('Town').distinct().count()

170

In [79]:
#selecting features

data = data.select('List Year', 'Month', 'Town', 'Sales Amount', 'Years of Sales', 'Assesed Value', 'Sales Ratio', 'Type combined')


In [80]:
#splitting into train/test

train_df, test_df = data.randomSplit([.8, .2], seed=123)

print('shape train {}, {}'.format(train_df.count(), len(train_df.columns)))
print('shape test {}, {}'.format(test_df.count(), len(test_df.columns)))

shape train 44204, 8
shape test 10916, 8


In [81]:
# Creating indexer for categorical columns

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep') for column in ["Town", "Type combined"]]

#encoding
encoder = [OneHotEncoder(inputCols=[column+"_index" for column in ["Town", "Type combined"]],
                          outputCols=[column+"_encoded" for column in ["Town", "Type combined"]],
                          handleInvalid='keep')]

#assambling all features into 1 vector
features = ['List Year', 'Month', 'Town_index', 'Years of Sales', 'Assesed Value', 'Sales Ratio','Type combined_index']

assembler = VectorAssembler(inputCols=features,
                            outputCol="features")

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Sales Amount",
    maxBins=4000)

stages = indexers + encoder + [assembler, rf]

rf_pipeline = Pipeline(stages=stages)


In [82]:
rf_model = rf_pipeline.fit(train_df)

In [83]:
from pyspark.sql.functions import col

test_predictions = rf_model.transform(test_df)
test_predictions = test_predictions.withColumn("prediction", col("prediction").cast("double"))

In [84]:
test_predictions.show()

+---------+-----+-----------+------------+--------------+-------------+-------------------+-------------+----------+-------------------+-----------------+---------------------+--------------------+--------------------+
|List Year|Month|       Town|Sales Amount|Years of Sales|Assesed Value|        Sales Ratio|Type combined|Town_index|Type combined_index|     Town_encoded|Type combined_encoded|            features|          prediction|
+---------+-----+-----------+------------+--------------+-------------+-------------------+-------------+----------+-------------------+-----------------+---------------------+--------------------+--------------------+
|     2006|    6|    Pomfret|    948000.0|           1.0|       511410|      0.54420917775|Single Family|     150.0|                0.0|(170,[150],[1.0])|       (11,[0],[1.0])|[2006.0,6.0,150.0...|  1931833.3919630633|
|     2006|    8|  Fairfield|    819000.0|           1.0|       537110|          0.6556795|   Two Family|      39.0|        

In [85]:
# Initializing the evaluator

mae_evaluator = RegressionEvaluator(
    labelCol="Sales Amount", predictionCol="prediction", metricName="mae")

mae = mae_evaluator.evaluate(test_predictions.select("Sales Amount", "prediction"))

In [86]:
mae

1742658.7553859495

In [90]:
test_predictions.filter((col('List Year') == 2012) & (col('Month') == 6) & (col('Town') == 'Killingly')).select('Sales Ratio', 'prediction').show()

+------------------+------------------+
|       Sales Ratio|        prediction|
+------------------+------------------+
|1.1328764274499998|3466364.2840406382|
+------------------+------------------+



In [98]:
test_predictions = test_predictions.withColumnRenamed('List Year', 'List_Year') \
                .withColumnRenamed('Sales Amount', 'Sales_Amount') \
                .withColumnRenamed('Years of Sales', 'Years_of_Sales') \
                .withColumnRenamed('Assesed Value', 'Assesed_Value') \
                .withColumnRenamed('Sales Ratio', 'Sales_Ratio') \
                .withColumnRenamed('Type combined', 'Type_combined') \
                .withColumnRenamed('Type combined_index', 'Type_combined_index') \
                .withColumnRenamed('Type combined_encoded', 'Type_combined_encoded')

In [99]:
test_predictions.write.parquet('/content/drive/My Drive/Colab Notebooks/Real_Estate_Sales_prediction.parquet')

In [100]:
spark.stop()