In this file the data is reshaped, aggregated, and merged. String variables are hot hot encoded.

In [173]:
import pandas as pd
import glob
import re

from pyspark.sql import SparkSession
from pyspark import  SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import mean

# Set up spark session

In [174]:
appName = "app"
master = "local[*]" # Spark will use all cores (*) available
if not 'spark' in globals(): # This 'trick' makes sure the SparkContext sc is initialized exactly once
  conf = SparkConf().setAppName(appName).setMaster(master)
  spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

In [175]:
#Display the information kept in the spark variable
spark

# Merge Price and Income

In [176]:
# Load prices parquet
prices = spark.read.parquet("../formatted_zone/prices.parquet")
prices.show(5)


+---+-----------+-------------------+--------------------+------+---------+-----------+--------------+----+
|_id|district_id|           district|        neighborhood|amount|per_meter|used_amount|used_per_meter|year|
+---+-----------+-------------------+--------------------+------+---------+-----------+--------------+----+
| 37|          7|     Horta-Guinardó|        la Teixonera| 106.6|   1812.9|      106.6|        1889.4|2016|
| 70|         10|         Sant Martí|Provençals del Po...| 246.6|   3092.1|      283.4|        3558.6|2016|
| 28|          6|             Gràcia|             el Coll| 169.7|   2283.5|      151.4|        2135.3|2016|
| 65|         10|         Sant Martí|el Parc i la Llac...| 250.5|   3129.7|      229.4|        3089.1|2016|
| 25|          5|Sarrià-Sant Gervasi|Sant Gervasi - Ga...| 582.0|   4416.5|      568.0|        4336.0|2016|
+---+-----------+-------------------+--------------------+------+---------+-----------+--------------+----+
only showing top 5 rows



In [177]:
# Load the income parquet
income = spark.read.parquet("../formatted_zone/income.parquet")
income.show(5)

+-----------+------------+----------+--------------------+--------+---------+----+
|district_id|    district|codi_barri|        neighborhood|població|index_rfd|year|
+-----------+------------+----------+--------------------+--------+---------+----+
|          1|Ciutat Vella|         1|            el Raval|   49225|     60.3|2013|
|          1|Ciutat Vella|         2|      el Barri Gòtic|   16327|    103.6|2013|
|          1|Ciutat Vella|         3|      la Barceloneta|   15571|     82.1|2013|
|          1|Ciutat Vella|         4|Sant Pere, Santa ...|   22821|     91.2|2013|
|          2|    Eixample|         5|       el Fort Pienc|   31754|     99.0|2013|
+-----------+------------+----------+--------------------+--------+---------+----+
only showing top 5 rows



In [178]:
# check ranges of year columns
income.select('year').distinct().show()

+----+
|year|
+----+
|2013|
|2010|
|2015|
|2011|
|2014|
|2009|
|2007|
|2012|
|2008|
|2017|
|2016|
+----+



In [179]:
prices.select('year').distinct().show()

+----+
|year|
+----+
|2016|
|2017|
|2015|
|2013|
|2014|
+----+



Year columns dont align. So as not to loose data, we create columns for each year in the income data

In [180]:
# Reshape wide, group 
wide_income = income.groupBy("district_id", "district") \
    .pivot("year") \
    .agg(
        F.mean("index_rfd").alias("index_rfd"),  
        F.mean("població").alias("poblacio")   
    )
wide_income.show(5)

+-----------+--------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|district_id|      district|    2007_index_rfd|     2007_poblacio|    2008_index_rfd|     2008_poblacio|    2009_index_rfd|     2009_poblacio|    2010_index_rfd|     2010_poblacio|    2011_index_rfd|     2011_poblacio|    2012_index_rfd|     2012_poblacio|    2013_index_rfd|     2013_poblacio|    2014_index_rfd|     2014_poblacio|    2015_index_rfd|     2015_poblacio|    2016_index_rfd|     2016_poblacio|   2017_index_rfd|     2017_poblacio|
+-----------+--------------+------------------+------------------+------------------+------------------+----

In [181]:
# Reshape wide
wide_prices = prices.groupBy("district_id", "district") \
    .pivot("year") \
    .agg(
        F.mean("per_meter").alias("per_meter"),
        F.mean("used_per_meter").alias("used_per_meter"),
        F.mean("amount").alias("amount"),
        F.mean("used_amount").alias("used_amount")
    )
wide_prices.show(5)

+-----------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|district_id|           district|    2013_per_meter|2013_used_per_meter|       2013_amount|  2013_used_amount|    2014_per_meter|2014_used_per_meter|       2014_amount|  2014_used_amount|    2015_per_meter|2015_used_per_meter|       2015_amount|  2015_used_amount|    2016_per_meter|2016_used_per_meter|       2016_amount|  2016_used_amount|    2017_per_meter|2017_used_per_meter|       2017_amount|  2017_used_amount|
+-----------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+-------------

In [182]:
# Merge on distric
prices_income_wide = wide_prices.join(wide_income, wide_prices.district == wide_income.district)
prices_income_wide.show(5)

# drop duplicated columns
selected_columns = [wide_prices.district] + \
                   [wide_prices.district_id] + \
                   [col for col in wide_prices.columns if col != 'district' and col != 'district_id']  + \
                   [col for col in wide_income.columns if col != 'district' and col != 'district_id']

prices_income_wide = prices_income_wide.select(selected_columns)

+-----------+--------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------+--------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|district_id|      district|    2013_per_meter|2013_used_per_meter|       2013_amount|  2013_used_amount|    2014_per_meter|2014_used_per_mete

In [183]:
prices_income_wide.printSchema()

root
 |-- district: string (nullable = true)
 |-- district_id: long (nullable = true)
 |-- 2013_per_meter: double (nullable = true)
 |-- 2013_used_per_meter: double (nullable = true)
 |-- 2013_amount: double (nullable = true)
 |-- 2013_used_amount: double (nullable = true)
 |-- 2014_per_meter: double (nullable = true)
 |-- 2014_used_per_meter: double (nullable = true)
 |-- 2014_amount: double (nullable = true)
 |-- 2014_used_amount: double (nullable = true)
 |-- 2015_per_meter: double (nullable = true)
 |-- 2015_used_per_meter: double (nullable = true)
 |-- 2015_amount: double (nullable = true)
 |-- 2015_used_amount: double (nullable = true)
 |-- 2016_per_meter: double (nullable = true)
 |-- 2016_used_per_meter: double (nullable = true)
 |-- 2016_amount: double (nullable = true)
 |-- 2016_used_amount: double (nullable = true)
 |-- 2017_per_meter: double (nullable = true)
 |-- 2017_used_per_meter: double (nullable = true)
 |-- 2017_amount: double (nullable = true)
 |-- 2017_used_amount:

# Merge with idealista

In [184]:
idealista = spark.read.parquet("../formatted_zone/idealista.parquet")
idealista.show(5)


+--------------------+---------+-------+--------+-----------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------------+--------------+---------+---------+--------+-----------+------------+------------+---------+-----+-----------+-----+-----------------+----------+---------------+-----------------------------+-----------------+--------+----+
|             address|bathrooms|country|distance|   district|exterior|floor|has360|has3dtour|haslift|hasplan|hasstaging|hasvideo|  latitude|longitude|municipality|        neighborhood|newdevelopment|numphotos|operation|   price|pricebyarea|propertycode|propertytype| province|rooms|showaddress| size|topnewdevelopment|      date|hasparkingspace|isparkingspaceincludedinprice|parkingspaceprice|typology|year|
+--------------------+---------+-------+--------+-----------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------------+

In [185]:
# Merge 
prices_income_idealista = idealista.join(prices_income_wide, prices_income_wide.district == idealista.district)
# Keep only one district column
selected_columns = [prices_income_wide.district] + \
                   [col for col in idealista.columns if col != 'district'] 

prices_income_idealista = prices_income_idealista.select(selected_columns)

prices_income_idealista.show(5)


+----------+--------------------+---------+-------+--------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------------+--------------+---------+---------+--------+-----------+------------+------------+---------+-----+-----------+-----+-----------------+----------+---------------+-----------------------------+-----------------+--------+----+
|  district|             address|bathrooms|country|distance|exterior|floor|has360|has3dtour|haslift|hasplan|hasstaging|hasvideo|  latitude|longitude|municipality|        neighborhood|newdevelopment|numphotos|operation|   price|pricebyarea|propertycode|propertytype| province|rooms|showaddress| size|topnewdevelopment|      date|hasparkingspace|isparkingspaceincludedinprice|parkingspaceprice|typology|year|
+----------+--------------------+---------+-------+--------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------------+---

In [186]:
# count the rows to check all merged
idealista.count()

12267

In [187]:
# One hot encode columns
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

def one_hot_encode(df, input_col):
    # Create a StringIndexer to convert string values to numeric indices
    indexer = StringIndexer(inputCol=input_col, outputCol=input_col + "_index")
    # Create a OneHotEncoder to convert numeric indices to one-hot encoded vectors
    encoder = OneHotEncoder(inputCols=[input_col + "_index"], outputCols=[input_col + "_enc"])
    
    # Set up the Pipeline with both stages: indexing and encoding
    pipeline = Pipeline(stages=[indexer, encoder])
    
    # Fit the pipeline to the data and then transform the data
    model = pipeline.fit(df)
    encoded = model.transform(df)
    
    # Optionally drop the original input column if no longer needed
    encoded = encoded.drop(input_col)
    encoded = encoded.drop(input_col + "_index")
    
    return encoded

In [188]:
prices_income_idealista = one_hot_encode(prices_income_idealista, "district")
prices_income_idealista = one_hot_encode(prices_income_idealista, "neighborhood")
prices_income_idealista = one_hot_encode(prices_income_idealista, "propertytype")
prices_income_idealista = one_hot_encode(prices_income_idealista, "typology")
prices_income_idealista.show(5)

+--------------------+---------+-------+--------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------+---------+---------+--------+-----------+------------+---------+-----+-----------+-----+-----------------+----------+---------------+-----------------------------+-----------------+----+-------------+----------------+----------------+-------------+
|             address|bathrooms|country|distance|exterior|floor|has360|has3dtour|haslift|hasplan|hasstaging|hasvideo|  latitude|longitude|municipality|newdevelopment|numphotos|operation|   price|pricebyarea|propertycode| province|rooms|showaddress| size|topnewdevelopment|      date|hasparkingspace|isparkingspaceincludedinprice|parkingspaceprice|year| district_enc|neighborhood_enc|propertytype_enc| typology_enc|
+--------------------+---------+-------+--------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+------------+--------------+----

In [189]:
# Generate index column 
prices_income_idealista = prices_income_idealista.withColumn("index", F.monotonically_increasing_id())

# Drop columsn string columns that wont be used in analysis
prices_income_idealista = prices_income_idealista.drop('address', 'country', 'district', 'municipality', 'province', 'operation', 'date')

In [190]:
prices_income_idealista.printSchema()

root
 |-- bathrooms: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- floor: integer (nullable = true)
 |-- has360: boolean (nullable = true)
 |-- has3dtour: boolean (nullable = true)
 |-- haslift: boolean (nullable = true)
 |-- hasplan: boolean (nullable = true)
 |-- hasstaging: boolean (nullable = true)
 |-- hasvideo: boolean (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- newdevelopment: boolean (nullable = true)
 |-- numphotos: long (nullable = true)
 |-- price: double (nullable = true)
 |-- pricebyarea: double (nullable = true)
 |-- propertycode: string (nullable = true)
 |-- rooms: integer (nullable = true)
 |-- showaddress: boolean (nullable = true)
 |-- size: double (nullable = true)
 |-- topnewdevelopment: boolean (nullable = true)
 |-- date: date (nullable = true)
 |-- hasparkingspace: boolean (nullable = true)
 |-- isparkingspaceincludedinprice: boolean (nul

In [191]:
# Save to exploitation zone
prices_income_idealista.write.mode('overwrite').parquet("../exploitation_zone/idealista.parquet")

# Check inegrity

In [195]:
from pyspark.sql.functions import col, count, isnan, when, avg, min, max, stddev, corr

## Check for missing values in columns

In [199]:
for column in ['price', 'size', 'rooms', 'bathrooms', 'latitude', 'longitude']:
    missing_count = prices_income_idealista.filter((col(column).isNull()) | (col(column) == '')).count()
    print(f"Missing count for {column}: {missing_count}")


Missing count for price: 0
Missing count for size: 0
Missing count for rooms: 0
Missing count for bathrooms: 0
Missing count for latitude: 0
Missing count for longitude: 0


## Check for duplicates

In [201]:
prices_income_idealista.groupBy(prices_income_idealista.columns).count().filter("count > 1").show()


+---------+--------+--------+-----+------+---------+-------+-------+----------+--------+--------+---------+--------------+---------+-----+-----------+------------+-----+-----------+----+-----------------+----+---------------+-----------------------------+-----------------+----+------------+----------------+----------------+------------+-----+-----+
|bathrooms|distance|exterior|floor|has360|has3dtour|haslift|hasplan|hasstaging|hasvideo|latitude|longitude|newdevelopment|numphotos|price|pricebyarea|propertycode|rooms|showaddress|size|topnewdevelopment|date|hasparkingspace|isparkingspaceincludedinprice|parkingspaceprice|year|district_enc|neighborhood_enc|propertytype_enc|typology_enc|index|count|
+---------+--------+--------+-----+------+---------+-------+-------+----------+--------+--------+---------+--------------+---------+-----+-----------+------------+-----+-----------+----+-----------------+----+---------------+-----------------------------+-----------------+----+------------+-------

# Output summary statistics

In [203]:
prices_income_idealista.describe().show()


+-------+------------------+------------------+-----------------+-------------------+--------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|         bathrooms|          distance|            floor|           latitude|           longitude|         numphotos|            price|       pricebyarea|       propertycode|             rooms|              size| parkingspaceprice|               year|             index|
+-------+------------------+------------------+-----------------+-------------------+--------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+
|  count|              6621|              6621|             6621|               6621|                6621|              6621|             6621|              6621| 

## Correlation between price and size

In [204]:
print(prices_income_idealista.stat.corr('price', 'size'))


0.8643497235900238


## Distribution of price by room count

In [205]:
prices_income_idealista.groupBy('rooms').agg(avg('price').alias('avg_price'), min('price').alias('min_price'), max('price').alias('max_price')).orderBy('avg_price').show()


+-----+------------------+---------+---------+
|rooms|         avg_price|min_price|max_price|
+-----+------------------+---------+---------+
|    0|265666.12903225806|  34000.0|1600000.0|
|    1|298887.50741839764|  39000.0|3750000.0|
|    2| 391471.4365957447|  89000.0|2750000.0|
|    3| 513620.8333333333|  48000.0|2800000.0|
|    4| 629133.9960291198|  39000.0|    1.2E7|
|    5|1284932.2766570605| 169000.0|8000000.0|
|   15|         1350000.0|1350000.0|1350000.0|
|    6|1948355.1401869159| 419000.0|6800000.0|
|    8|         2505000.0| 880000.0|4950000.0|
|   13|         3730000.0|3730000.0|3730000.0|
|   12|         3750000.0|3750000.0|3750000.0|
|    7|3788253.9682539683| 820000.0|    1.0E7|
|    9|         3806000.0|3600000.0|3900000.0|
|   10|         3918750.0|1950000.0|4600000.0|
|   11| 5109090.909090909|3800000.0|5600000.0|
+-----+------------------+---------+---------+



## Check for trends by geograph and time

In [206]:
# Geographic Analysis
prices_income_idealista.groupBy('latitude', 'longitude').agg(avg('price').alias('avg_price')).show()

# Temporal Trends
prices_income_idealista.groupBy('date').agg(avg('price').alias('avg_price')).orderBy('date').show()


+----------+---------+---------+
|  latitude|longitude|avg_price|
+----------+---------+---------+
|41.3784791|2.1234518| 410000.0|
|41.3766203|2.1235422| 275000.0|
| 41.390968| 2.125664| 695000.0|
|41.3764138|2.1240547| 199000.0|
|41.3652336|2.1383753| 244000.0|
|41.3587512|2.1440876| 186000.0|
|41.3728028|2.1280125| 210000.0|
|41.3612185|2.1390853| 185000.0|
|41.3731361|2.1442186| 360000.0|
|41.3603742|2.1414026| 315000.0|
|41.3738539|2.1398227| 239000.0|
|41.3603689|2.1390959| 230000.0|
|41.3819925|2.1402918| 379000.0|
|41.3749363|2.1616583| 250000.0|
|41.3735834|2.1622435| 380000.0|
|41.3783279|2.1263543| 335000.0|
|41.3737705|2.1423725| 290000.0|
|41.3731006| 2.156417| 261500.0|
|41.3730297|2.1390113| 264000.0|
|41.3724028|2.1294125| 185000.0|
+----------+---------+---------+
only showing top 20 rows

+----------+------------------+
|      date|         avg_price|
+----------+------------------+
|2020-01-02| 308176.4705882353|
|2020-01-08|          676000.0|
|2020-01-10|         1