## House Prices Fraud Detection

The purpose of this notebook is to demostrate an example scenario for House Prices Fraud Analysis. Various Tax authorities around the world are facing fraud when it comes to house prices. Sometimes the price paid is under reported, something that in most countries is illegal and leads to reduced tax reciepts.

The fraud detection problem can be translated into an anomaly detection problem. If price is very low considering the location and of course the type and the location of the property. Property size, number or bedrooms etc, all might play a singificant role. 

For this notebook we consider the following dataset <a href="https://www.kaggle.com/hm-land-registry/uk-housing-prices-paid">dataset</a> found in kaggle.com. The dataset is simply the price paid. In previous demo I have used the AWS Glue Databrew to split dataset into training and test dataset. First, let's load the file into a Spark Dataframe.



In [1]:
dataset_location = "s3://samatas/house_prices_fraud/edited/rename_16Dec2020_1608136675571/rename_16Dec2020_1608136675571_part00000.csv"
dataset = spark.read.format("csv").option("header", "true").load(dataset_location)
dataset = dataset.cache()
dataset.printSchema()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1608586772828_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- price_paid: string (nullable = true)
 |-- date_of_transfer: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- new: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- town: string (nullable = true)
 |-- district: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category_type: string (nullable = true)
 |-- record_status: string (nullable = true)

Now let's just count the two different datasets, just to validate that the split was succesful. 

In [3]:
import locale
locale.setlocale(locale.LC_ALL, 'en_US')
print("Size of the dataset is " + locale.format("%d",dataset.count(),grouping=True))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Size of the dataset is 22,489,348

In [4]:
dataset.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+
|                  id|price_paid|date_of_transfer|property_type|new|duration|      town|          district|            county|category_type|record_status|
+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+
|{81B82214-7FBC-41...|     25000|1995-08-18 00:00|            T|  N|       F|    OLDHAM|            OLDHAM|GREATER MANCHESTER|            A|            A|
|{8046EC72-1466-42...|     42500|1995-08-09 00:00|            S|  N|       F|     GRAYS|          THURROCK|          THURROCK|            A|            A|
|{278D581A-5BF3-4F...|     45000|1995-06-30 00:00|            T|  N|       F|HIGHBRIDGE|         SEDGEMOOR|          SOMERSET|            A|            A|
|{1D861C06-A416-48...|     43150|1995-11-24 00:00|            T|  N|  


For Linear Regression we need numerical values. For that reason we can use the Spark ML built in library for String Indexing. For this example let's start with the property type. First let's explore what are the different property types in our dataset. For this purpose I will use the train dataset. 

In [5]:
dataset.groupby("property_type").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-------+
|property_type|  count|
+-------------+-------+
|            D|5170327|
|            S|6216218|
|            T|6918811|
|            F|4083424|
|            O| 100568|
+-------------+-------+

It seems that we only have 5 different types of property. Let's now index them and see the results. 

In [6]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="property_type", outputCol="property_type_index")
prepared_dataset = indexer.fit(dataset).transform(dataset)
prepared_dataset.show(5)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+-------------------+
|                  id|price_paid|date_of_transfer|property_type|new|duration|      town|          district|            county|category_type|record_status|property_type_index|
+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+-------------------+
|{81B82214-7FBC-41...|     25000|1995-08-18 00:00|            T|  N|       F|    OLDHAM|            OLDHAM|GREATER MANCHESTER|            A|            A|                0.0|
|{8046EC72-1466-42...|     42500|1995-08-09 00:00|            S|  N|       F|     GRAYS|          THURROCK|          THURROCK|            A|            A|                1.0|
|{278D581A-5BF3-4F...|     45000|1995-06-30 00:00|            T|  N|       F|HIGHBRIDGE|         SEDGEMOOR|          SOMERSET

In [7]:
prepared_dataset.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- price_paid: string (nullable = true)
 |-- date_of_transfer: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- new: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- town: string (nullable = true)
 |-- district: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category_type: string (nullable = true)
 |-- record_status: string (nullable = true)
 |-- property_type_index: double (nullable = false)

We can notice, the `property_type_index` is of type `double`, which is exactly what we wanted. At this stage though, we notice that the `price_paid` column  is of type `string`. Let's try convert it to a numerical data type. 

In [8]:
from pyspark.sql.types import IntegerType
prepared_dataset = prepared_dataset.withColumn("price_paid", prepared_dataset["price_paid"].cast(IntegerType()))
prepared_dataset.show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+-------------------+
|                  id|price_paid|date_of_transfer|property_type|new|duration|      town|          district|            county|category_type|record_status|property_type_index|
+--------------------+----------+----------------+-------------+---+--------+----------+------------------+------------------+-------------+-------------+-------------------+
|{81B82214-7FBC-41...|     25000|1995-08-18 00:00|            T|  N|       F|    OLDHAM|            OLDHAM|GREATER MANCHESTER|            A|            A|                0.0|
|{8046EC72-1466-42...|     42500|1995-08-09 00:00|            S|  N|       F|     GRAYS|          THURROCK|          THURROCK|            A|            A|                1.0|
|{278D581A-5BF3-4F...|     45000|1995-06-30 00:00|            T|  N|       F|HIGHBRIDGE|         SEDGEMOOR|          SOMERSET

Let's repeat the process for the `town` feature 

In [9]:
indexer = StringIndexer(inputCol="town", outputCol="town_index")
prepared_dataset = indexer.fit(prepared_dataset).transform(prepared_dataset)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, let's take the year the property was sold and add it as another column that needs to be indexed. First, we need to extract the year from the dataset. 

In [10]:
prepared_dataset = prepared_dataset.withColumn('year_of_transfer', prepared_dataset['date_of_transfer'].substr(1, 4))
prepared_dataset = prepared_dataset.withColumn("year_of_transfer", prepared_dataset["year_of_transfer"].cast(IntegerType()))
prepared_dataset.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: string (nullable = true)
 |-- price_paid: integer (nullable = true)
 |-- date_of_transfer: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- new: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- town: string (nullable = true)
 |-- district: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category_type: string (nullable = true)
 |-- record_status: string (nullable = true)
 |-- property_type_index: double (nullable = false)
 |-- town_index: double (nullable = false)
 |-- year_of_transfer: integer (nullable = true)

Next, we want to split the dataset into two parts, training and test. In order to do that we can use the built-in function of Apache Spark random split that allows us to randomly split our dataset giving specific weights. For this example, we will allocate 80% for model training and 20% for testing. 

In [11]:
splits = prepared_dataset.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
print("Size of train dataset is " + locale.format("%d",train.count(),grouping=True))
print("Size of test dataset is " + locale.format("%d",test.count(),grouping=True))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Size of train dataset is 17,993,102
Size of test dataset is 4,496,246

In [13]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(
    inputCols=[ "property_type_index", "town_index","year_of_transfer"],
    outputCol="features")
training = assembler.transform(train)
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8).setLabelCol("price_paid")
lrModel = lr.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Coefficients: [32020.562122876876,-2.245252766067222,11567.104989562413]
Intercept: -23056521.72478374
numIterations: 6
objectiveHistory: [0.4999999999999999, 0.49055061404863065, 0.481015457287159, 0.47863065259146675, 0.47803444273831025, 0.4778357060209848]
+-------------------+
|          residuals|
+-------------------+
|  -16175.4566793181|
|-11307.487819101661|
|  52960.72514574602|
| -35914.41576190293|
|  75126.70848384872|
|  846.9086215086281|
|  156398.2706067264|
|-20900.944245308638|
| -4814.560096248984|
| 13205.647178642452|
| 19680.594550233334|
| -35433.66074741259|
| 1279.1122651137412|
|  21167.47788162157|
|-24333.231814343482|
|   52858.5769601576|
|  56126.70848384872|
|  235147.2706067264|
| -99659.06138571724|
|-22688.236829079688|
+-------------------+
only showing top 20 rows

RMSE: 384335.690430
r2: 0.044329

In [15]:
testing = assembler.transform(test)
lr_predictions = lrModel.transform(testing)

lr_predictions.select("prediction","price_paid","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price_paid",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----------+------------------+
|        prediction|price_paid|          features|
+------------------+----------+------------------+
| 19596.77057794109|     33000|[0.0,114.0,1995.0]|
|19567.582291983068|     16000|[0.0,127.0,1995.0]|
|115914.41576190293|     55000|  [3.0,0.0,1995.0]|
|19408.169345591217|     39250|[0.0,198.0,1995.0]|
|18965.854550678283|     54000|[0.0,395.0,1995.0]|
+------------------+----------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.0470255

Let's write our predictions back to s3 

In [16]:
lr_predictions.write.parquet("s3://samatas/house_prices_fraud/predictions/",mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…