In [16]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [17]:
# https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkConf
config = SparkConf()
config.set("spark.driver.memory", "2g")
config.set("spark.executor.memory", "1g")

<pyspark.conf.SparkConf at 0x111214310>

In [18]:
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession
spark = SparkSession.builder.config(conf=config).master("local").appName("Analyzing Real Estate Sales").getOrCreate()

In [4]:
spark

In [19]:
df = spark.read.format('csv').option("header", "true").load('../Real_Estate_Sales_2001-2017.csv')

In [7]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SerialNumber: string (nullable = true)
 |-- ListYear: string (nullable = true)
 |-- DateRecorded: string (nullable = true)
 |-- Town: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- AssessedValue: string (nullable = true)
 |-- SaleAmount: string (nullable = true)
 |-- SalesRatio: string (nullable = true)
 |-- PropertyType: string (nullable = true)
 |-- ResidentialType: string (nullable = true)
 |-- NonUseCode: string (nullable = true)
 |-- Remarks: string (nullable = true)



In [27]:
df.describe().show()

+-------+------------------+------------------+------------------+--------------------+---------+------------------+------------------+------------------+-----------------+--------------+---------------+-----------------+--------------------+
|summary|                ID|      SerialNumber|          ListYear|        DateRecorded|     Town|           Address|     AssessedValue|        SaleAmount|       SalesRatio|  PropertyType|ResidentialType|       NonUseCode|             Remarks|
+-------+------------------+------------------+------------------+--------------------+---------+------------------+------------------+------------------+-----------------+--------------+---------------+-----------------+--------------------+
|  count|            861597|            861597|            861597|              861589|   861597|            861594|            861292|            830804|           861596|        815904|         861596|           830867|              826698|
|   mean|          430799.0|

In [29]:
df.select("Town").describe().show()

+-------+---------+
|summary|     Town|
+-------+---------+
|  count|   861597|
|   mean|     null|
| stddev|     null|
|    min|  Andover|
|    max|Woodstock|
+-------+---------+



In [30]:
df.columns

['ID',
 'SerialNumber',
 'ListYear',
 'DateRecorded',
 'Town',
 'Address',
 'AssessedValue',
 'SaleAmount',
 'SalesRatio',
 'PropertyType',
 'ResidentialType',
 'NonUseCode',
 'Remarks']

In [31]:
df.explain()

== Physical Plan ==
*(1) FileScan csv [ID#257,SerialNumber#258,ListYear#259,DateRecorded#260,Town#261,Address#262,AssessedValue#263,SaleAmount#264,SalesRatio#265,PropertyType#266,ResidentialType#267,NonUseCode#268,Remarks#269] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/pmacharl/git-projects/personal/github.com/spark_experiments/Real_Es..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ID:string,SerialNumber:string,ListYear:string,DateRecorded:string,Town:string,Address:stri...


In [8]:
df.count()

861597

In [9]:
df.show()

+------+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|    ID|SerialNumber|ListYear|        DateRecorded|      Town|             Address|AssessedValue|SaleAmount|       SalesRatio|PropertyType|ResidentialType|      NonUseCode|             Remarks|
+------+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|815906|      170177|    2017|04/05/1999 12:00:...|New London|      293 PEQUOT AVE|       132440|    252500|            0.525|        null|     Two Family|            null|                null|
|     2|      900035|    2009|07/20/2010 12:00:...|   Andover|     1 DOGWOOD DRIVE|        55600|     99000|0.561616161616162| Vacant Land|             NA|              NA|                  NA|
|     3|       14011|    2014|

In [32]:
df.head(5) # OR df.take(5)

[Row(ID='815906', SerialNumber='170177', ListYear='2017', DateRecorded='04/05/1999 12:00:00 AM', Town='New London', Address='293 PEQUOT AVE', AssessedValue='132440', SaleAmount='252500', SalesRatio='0.525', PropertyType=None, ResidentialType='Two Family', NonUseCode=None, Remarks=None),
 Row(ID='2', SerialNumber='900035', ListYear='2009', DateRecorded='07/20/2010 12:00:00 AM', Town='Andover', Address='1 DOGWOOD DRIVE', AssessedValue='55600', SaleAmount='99000', SalesRatio='0.561616161616162', PropertyType='Vacant Land', ResidentialType='NA', NonUseCode='NA', Remarks='NA'),
 Row(ID='3', SerialNumber='14011', ListYear='2014', DateRecorded='01/14/2015 12:00:00 AM', Town='Andover', Address='1 JUROVATY LANE', AssessedValue='153100', SaleAmount='190000', SalesRatio='0.805789474', PropertyType='Residential', ResidentialType='Single Family', NonUseCode='NA', Remarks='NA'),
 Row(ID='4', SerialNumber='80009', ListYear='2008', DateRecorded='01/21/2009 12:00:00 AM', Town='Andover', Address='1 ROSE

In [22]:
df.select("Town").show()

+----------+
|      Town|
+----------+
|New London|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Shelton|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
|   Andover|
+----------+
only showing top 20 rows



In [10]:
df.limit(10).show()

+------+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|    ID|SerialNumber|ListYear|        DateRecorded|      Town|             Address|AssessedValue|SaleAmount|       SalesRatio|PropertyType|ResidentialType|      NonUseCode|             Remarks|
+------+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|815906|      170177|    2017|04/05/1999 12:00:...|New London|      293 PEQUOT AVE|       132440|    252500|            0.525|        null|     Two Family|            null|                null|
|     2|      900035|    2009|07/20/2010 12:00:...|   Andover|     1 DOGWOOD DRIVE|        55600|     99000|0.561616161616162| Vacant Land|             NA|              NA|                  NA|
|     3|       14011|    2014|

In [11]:
df.dropna()

DataFrame[ID: string, SerialNumber: string, ListYear: string, DateRecorded: string, Town: string, Address: string, AssessedValue: string, SaleAmount: string, SalesRatio: string, PropertyType: string, ResidentialType: string, NonUseCode: string, Remarks: string]

In [12]:
df1 = df.drop("ID")
df1.show()

+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|SerialNumber|ListYear|        DateRecorded|      Town|             Address|AssessedValue|SaleAmount|       SalesRatio|PropertyType|ResidentialType|      NonUseCode|             Remarks|
+------------+--------+--------------------+----------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|      170177|    2017|04/05/1999 12:00:...|New London|      293 PEQUOT AVE|       132440|    252500|            0.525|        null|     Two Family|            null|                null|
|      900035|    2009|07/20/2010 12:00:...|   Andover|     1 DOGWOOD DRIVE|        55600|     99000|0.561616161616162| Vacant Land|             NA|              NA|                  NA|
|       14011|    2014|01/14/2015 12:00:...|   Andover|     1 JUR

In [13]:
distinct_property_type = df1.select("PropertyType").distinct()
distinct_property_type.show()

+--------------+
|  PropertyType|
+--------------+
|    Apartments|
|   Vacant Land|
|            NA|
|   Residential|
|          null|
|    Industrial|
|         Condo|
|Public Utility|
|10 Mill Forest|
|    Commercial|
+--------------+



In [14]:
distinct_property_type.count()

10

In [23]:
df_andover_town = df.filter(df['Town']=='Andover')

In [25]:
df_andover_town.show(30)

+---+------------+--------+--------------------+-------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
| ID|SerialNumber|ListYear|        DateRecorded|   Town|             Address|AssessedValue|SaleAmount|       SalesRatio|PropertyType|ResidentialType|      NonUseCode|             Remarks|
+---+------------+--------+--------------------+-------+--------------------+-------------+----------+-----------------+------------+---------------+----------------+--------------------+
|  2|      900035|    2009|07/20/2010 12:00:...|Andover|     1 DOGWOOD DRIVE|        55600|     99000|0.561616161616162| Vacant Land|             NA|              NA|                  NA|
|  3|       14011|    2014|01/14/2015 12:00:...|Andover|     1 JUROVATY LANE|       153100|    190000|      0.805789474| Residential|  Single Family|              NA|                  NA|
|  4|       80009|    2008|01/21/2009 12:00:...|Andover|    

In [35]:
# The file is saved with a name part_00_xx.snappy.parquet
df.write.save("blahfolder",format="parquet")

In [15]:
# spark.catalog.clearCache()
spark.stop()