In [1]:
import findspark
findspark.init()

In [2]:
import pyspark #only run after findspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv('price_paid_records.csv',header=True,sep= ",",inferSchema=True)

print(df.printSchema)
#Property type D = Detached, S = Semi-Detached, T = Terraced, F = Flats/Maisonettes, O = Other
#Duration relates to the tenure: F = Freehold, L= Leasehold
#PPD Category Type Indicates the type of Price Paid transaction, only differentiated since 2013
#Record Status - monthly file only indicates Additions, Changes and Deletions to the records.
# Old/New: Y = a newly built property, N = an established residential building

print('Count:', df.count())

<bound method DataFrame.printSchema of DataFrame[Transaction unique identifier: string, Price: int, Date of Transfer: string, Property Type: string, Old/New: string, Duration: string, Town/City: string, District: string, County: string, PPDCategory Type: string, Record Status - monthly file only: string]>
Count: 22489348


In [4]:
df.head()

Row(Transaction unique identifier='{81B82214-7FBC-4129-9F6B-4956B4A663AD}', Price=25000, Date of Transfer='1995-08-18 00:00', Property Type='T', Old/New='N', Duration='F', Town/City='OLDHAM', District='OLDHAM', County='GREATER MANCHESTER', PPDCategory Type='A', Record Status - monthly file only='A')

In [5]:
from pyspark.sql.functions import *

# Shows null values in each column
#df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   #).show()

#drops all null values, not needed for this dataset
#df = df.na.drop()

In [6]:
df.groupBy('PPDCategory Type').count().show()
#PPD category type B only makes up ~1% of data, and is not very informative, so it will be dropped

df = df.drop(col('PPDCategory Type'))

+----------------+--------+
|PPDCategory Type|   count|
+----------------+--------+
|               B|  348175|
|               A|22141173|
+----------------+--------+



In [7]:
df.groupBy('Record Status - monthly file only').count().show()
#Only additions are found in this dataset

df = df.drop(col('Record Status - monthly file only'))

+---------------------------------+--------+
|Record Status - monthly file only|   count|
+---------------------------------+--------+
|                                A|22489348|
+---------------------------------+--------+



In [8]:
# Will now drop duplicates, can check new count to see if any were found, then unique identifier can be removed
df = df.dropDuplicates()

print('Count:', df.count())

df = df.drop(col('Transaction unique identifier'))
df.printSchema

Count: 22489348


<bound method DataFrame.printSchema of DataFrame[Price: int, Date of Transfer: string, Property Type: string, Old/New: string, Duration: string, Town/City: string, District: string, County: string]>

In [9]:
#converts the Date of Transfer column to timestamp format
df = df.withColumn('TSDate of Transfer',to_timestamp('Date of Transfer'))

print(df.head())
print(df.printSchema)

Row(Price=99000, Date of Transfer='1995-10-16 00:00', Property Type='D', Old/New='N', Duration='F', Town/City='MARLBOROUGH', District='KENNET', County='WILTSHIRE', TSDate of Transfer=datetime.datetime(1995, 10, 16, 0, 0))
<bound method DataFrame.printSchema of DataFrame[Price: int, Date of Transfer: string, Property Type: string, Old/New: string, Duration: string, Town/City: string, District: string, County: string, TSDate of Transfer: timestamp]>


In [10]:
df = df.drop(col('Date of Transfer'))
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Price: int, Property Type: string, Old/New: string, Duration: string, Town/City: string, District: string, County: string, TSDate of Transfer: timestamp]>

In [11]:
# Truncating the dates to the month
df = df.withColumn('TSDate of Transfer', date_trunc('month' ,col('TSDate of Transfer')))
print(df.head())

Row(Price=99000, Property Type='D', Old/New='N', Duration='F', Town/City='MARLBOROUGH', District='KENNET', County='WILTSHIRE', TSDate of Transfer=datetime.datetime(1995, 10, 1, 0, 0))


In [12]:
# Converting Old/New from N,Y to 0,1
df = df.withColumn('Old/New', when(df['Old/New']=='N', 0).otherwise(1))

In [13]:
df.groupBy('Old/New').count().show()

+-------+--------+
|Old/New|   count|
+-------+--------+
|      1| 2296672|
|      0|20192676|
+-------+--------+



In [14]:
# Converting Duration from F,L to 0,1
df = df.withColumn('Duration', when(df['Duration']=='F', 0).otherwise(1))

df.groupBy('Duration').count().show()

+--------+--------+
|Duration|   count|
+--------+--------+
|       1| 5312938|
|       0|17176410|
+--------+--------+



In [15]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# Returns pandas correlation matrix, corr_columns allows control of what variables to correlate

def correlation_matrix(df, corr_columns, method='pearson'):
    vector_col = 'features'
    assembler = VectorAssembler(inputCols=corr_columns, outputCol=vector_col)
    df_vector = assembler.transform(df).select(vector_col)
    matrix = Correlation.corr(df_vector, vector_col, method)

    result = matrix.collect()[0]["pearson({})".format(vector_col)].values
    return pd.DataFrame(result.reshape(-1, len(corr_columns)), columns=corr_columns, index=corr_columns)

In [None]:
# Correlation matrix for just price, old vs new, and duration. would like to include date and location info as well
corr_columns = ['Price','Old/New','Duration']

correlation_matrix(df, corr_columns)