## Setting system variables location

In [1]:
import os

# Set HADOOP_HOME to the parent folder of the 'bin' with winutils.exe
os.environ["HADOOP_HOME"] = r"C:\winutils-master\hadoop-3.0.0"
os.environ["PATH"] += r";C:\winutils-master\hadoop-3.0.0\bin"

# (Optional: Set JAVA_HOME if not already set)
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.15.6-hotspot"

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when, array_contains
from pyspark.sql import functions as F

In [3]:
# set a Spark session
spark = SparkSession.builder \
    .appName("DataFormatting") \
    .master("local[*]") \
    .getOrCreate()


In [4]:
# set the paths for the input and output data
landing_zone = ("landing_zone")
formatted_zone = ("formatted_zone")

In [5]:
# load all files from each folder of the landing zone
df_idealista = spark.read.option("multiline", True).json(f"{landing_zone}/idealista")
df_income = spark.read.option("header", True).csv(f"{landing_zone}/Income")
df_lookup = spark.read.option("header", True).csv(f"{landing_zone}/lookup_tables")

## Idealista data

In [6]:
df_idealista.show()

+--------------------+---------+-------+--------------------+--------+--------------+--------+-----------------+-----+------+---------+-------+-------+----------+--------+----------+---------+--------------------+--------------------+--------------+----------------------+---------+---------+------------------+---------+-----------+------------+------------+---------+-----+-----------+-----+------+--------------------+--------------------+-----------------+--------------------+
|             address|bathrooms|country|        detailedType|distance|      district|exterior|externalReference|floor|has360|has3DTour|hasLift|hasPlan|hasStaging|hasVideo|  latitude|longitude|        municipality|        neighborhood|newDevelopment|newDevelopmentFinished|numPhotos|operation|      parkingSpace|    price|priceByArea|propertyCode|propertyType| province|rooms|showAddress| size|status|      suggestedTexts|           thumbnail|topNewDevelopment|                 url|
+--------------------+---------+----

We will only predict price for the Municipality of Barcelona, filtering the data

In [7]:
df_idealista = df_idealista.filter(F.col("municipality") == "Barcelona")

We will check next how many rows we have in our data frame

In [8]:
n_rows = df_idealista.count()
print(n_rows)

7856


For our specific task, we want to include only unique properties, so we will also check for duplicates next

In [9]:
df_idealista.groupBy("propertyCode") \
    .agg(count("*").alias("occurrences")) \
    .filter("occurrences > 1") \
    .show(truncate=False)


+------------+-----------+
|propertyCode|occurrences|
+------------+-----------+
|90204735    |5          |
|92524329    |6          |
|91622667    |4          |
|91358097    |3          |
|87258588    |4          |
|91903585    |12         |
|91901786    |9          |
|92771991    |5          |
|91560330    |4          |
|92108611    |10         |
|92411589    |2          |
|92557089    |2          |
|87265378    |3          |
|92962742    |6          |
|40084649    |2          |
|92474319    |3          |
|91566532    |2          |
|88667699    |4          |
|88863152    |4          |
|91671895    |3          |
+------------+-----------+
only showing top 20 rows


In [10]:
from IPython.display import Markdown, display
n_duplicates = (
    df_idealista
    .groupBy("propertyCode") 
    .agg(count("*").alias("occurrences"))
    .filter("occurrences > 1")
    .count()
)

display(Markdown(f"We identified **{n_duplicates}** duplicated properties in our data."))

We identified **1389** duplicated properties in our data.

The Idealista JSON files were originally split by year, month, and day, but since distinguishing between dates is not relevant for our analysis—and property prices do not show significant variation within our timeframe—we will remove these duplicate entries. Additionally, we cannot leverage temporal information, as the Idealista dataset spans 2020–2021, whereas the income dataset covers 2007–2017.

Therefore, we will ignore the time dimension entirely and proceed with an analysis that does not consider temporal aspects.

Additionally, this approach resolves the issue of repeated files—for example, cases like 2020_12_28_idealista(1).json and 2020_12_28_idealista.json, which are likely duplicates. By retaining only unique propertyCode values, we ensure that such repetitions do not introduce bias into our analysis.

In [11]:
# remove duplicates
df_idealista = df_idealista.dropDuplicates(["propertyCode"])

In [12]:
n_rows = df_idealista.count()
display(Markdown(f"We will perform our analysis using information for **{n_rows}** properties."))

We will perform our analysis using information for **4062** properties.

In [13]:
# Count total rows
total_rows = df_idealista.count()

# Count unique propertyCodes
unique_property_codes = df_idealista.select("propertyCode").distinct().count()

print(f"Total rows: {total_rows}")
print(f"Unique property codes: {unique_property_codes}")

# Check if propertyCode is unique
if total_rows == unique_property_codes:
    print("propertyCode is unique for all rows.")
else:
    print("There are duplicate propertyCode values.")


Total rows: 4062
Unique property codes: 4062
propertyCode is unique for all rows.


## Income Data

In [14]:
df_income.show()

+----+--------------+--------------+----------+--------------------+--------+-------------------------+
| Any|Codi_Districte| Nom_Districte|Codi_Barri|           Nom_Barri|Població|Índex RFD Barcelona = 100|
+----+--------------+--------------+----------+--------------------+--------+-------------------------+
|2007|             1|  Ciutat Vella|         1|            el Raval|   46595|                     64.7|
|2007|             1|  Ciutat Vella|         2|      el Barri Gòtic|   27946|                     86.5|
|2007|             1|  Ciutat Vella|         3|      la Barceloneta|   15921|                     66.7|
|2007|             1|  Ciutat Vella|         4|Sant Pere, Santa ...|   22572|                     80.2|
|2007|             2|      Eixample|         5|       el Fort Pienc|   31521|                    107.9|
|2007|             2|      Eixample|         6|  la Sagrada Família|   52185|                    101.8|
|2007|             2|      Eixample|         7|la Dreta de l'Eix

In the 'income' dataframe we won't have exact duplicates since the population and the RDF index change over the years, so we used a different mechanism to remove the time dimension:
- we grouped by the neighborhood,
- we computed average population and average RDF index

In [15]:
# we tried to normally groupby 'Nom_Barri' and then average the 'Població' and 'Índex RFD Barcelona = 100' columns, 
# but we found that some values were not numeric, so here we handle this. 
df_income_cleaned = df_income.withColumn(
    "Poblacio_num", when(col("Població").rlike("^\d+$"), col("Població").cast("double"))).withColumn(
    "Index_RFD_num", when(col("Índex RFD Barcelona = 100").rlike("^\d+(\.\d+)?$"), col("Índex RFD Barcelona = 100").cast("double")))

# normal groupby and average
df_income_barri = df_income_cleaned.groupBy("Nom_Barri").agg(
    avg("Poblacio_num").alias("Poblacio_average"),
    avg("Index_RFD_num").alias("Index_RFD_average"))


In [16]:
df_income_barri.show()

+--------------------+------------------+------------------+
|           Nom_Barri|  Poblacio_average| Index_RFD_average|
+--------------------+------------------+------------------+
|         el Poblenou|32450.454545454544| 92.63636363636364|
|   la Vila de Gràcia| 51166.63636363636|104.81818181818181|
|el Besòs i el Mar...|23435.454545454544| 56.07272727272727|
|        la Guineueta|15231.727272727272|63.818181818181806|
|        la Teixonera|11400.727272727272| 71.89999999999999|
|la Dreta de l'Eix...| 43410.36363636364|155.25454545454548|
|      el Barri Gòtic| 18795.81818181818| 98.14545454545454|
|         el Guinardó|35770.818181818184| 85.58181818181818|
|            Vallbona|1338.1818181818182| 48.96363636363637|
|           Canyelles| 7169.181818181818| 64.53636363636365|
|Provençals del Po...|19809.090909090908| 87.87272727272727|
| la Verneda i la Pau|29134.545454545456| 62.77272727272727|
|Vilapicina i la T...| 25575.81818181818| 72.43636363636364|
|l'Antiga Esquerra...| 4

## Lookup Data

 To merge the datasets, we will use the `neighborhood` column as the key. For the Idealista properties, we have neighborhood information available for all properties located within the municipality of Barcelona.

In [17]:
# Checking missing values for neighborhood
missing_neighborhood = df_idealista.filter(col("neighborhood").isNull()).count()
display(Markdown(f"We identified **{missing_neighborhood}** missing values for neighborhood in the idealista df."))

We identified **0** missing values for neighborhood in the idealista df.

First, we will create a comprehensive mapping of all possible neighborhood names by aggregating the different naming conventions found in the neighborhood, neighborhood_n_reconciled, and neighborhood_n columns of df_lookup, grouped by neighborhood_id. We will then merge df_idealista with this enhanced lookup table, ensuring that each property is matched to its correct neighborhood regardless of naming variations. Next, we bring in the neighborhood-level income index by merging with df_income_barri, matching any of the possible neighborhood names to df_income_barri.Nom_Barri. This process ensures that each property receives the most granular income index available.

In [18]:
# Step 1: Stack all three columns into one column with the corresponding id
grouped = (
    df_lookup
    .select("neighborhood_id", "neighborhood")
    .unionByName(df_lookup.select("neighborhood_id", "neighborhood_n_reconciled").withColumnRenamed("neighborhood_n_reconciled", "neighborhood"))
    .unionByName(df_lookup.select("neighborhood_id", "neighborhood_n").withColumnRenamed("neighborhood_n", "neighborhood"))
    .filter(F.col("neighborhood").isNotNull())
    .dropDuplicates()
)

# Step 2: Group by id, collect all unique names in a list
lookup_collapsed = (
    grouped
    .groupBy("neighborhood_id")
    .agg(F.collect_set("neighborhood").alias("all_names"))
)

In [19]:
lookup_collapsed.show()

+---------------+--------------------+
|neighborhood_id|           all_names|
+---------------+--------------------+
|       Q3320806|[vilapicina i la ...|
|       Q3321805|[el putxet i el f...|
|       Q3291762|[L'Antiga Esquerr...|
|       Q3813818|[el Congrés i els...|
|       Q3320705|[La Teixonera, la...|
|       Q3045547|[el guinardo, El ...|
|       Q1932090|[el coll, el Coll...|
|       Q3773169|[sant marti de pr...|
|       Q3294602|[el camp de l arp...|
|       Q1425291|[Baró de Viver, b...|
|       Q3296693|[Les Corts, les C...|
|       Q1026658|[la nova esquerra...|
|        Q542473|[La Verneda i la ...|
|       Q3751072|[vallbona, Vallbona]|
|       Q2562684|[pedralbes, Pedra...|
|        Q524311|[El Turó de la Pe...|
|       Q3773462|[la sagrera, la S...|
|       Q1627690|[les roquetes, Le...|
|       Q3320699|[La Salut, la Sal...|
|       Q3750558|      [porta, Porta]|
+---------------+--------------------+
only showing top 20 rows


In [20]:
# Merge df_idealista with lookup_collapsed using array_contains on 'all_names'
df_idealista_lookup = df_idealista.join(
    lookup_collapsed,
    array_contains(lookup_collapsed['all_names'], df_idealista['neighborhood']),
    how='left'
)

In [21]:
# Sanity check
display(Markdown(f"The row count should match between **{df_idealista_lookup.count()}** and **{df_idealista.count()}**."))

The row count should match between **4062** and **4062**.

In [22]:
# Merge df_idealista_lookup with df_income_barri using array_contains on 'all_names'
df_idealista_barri = df_idealista_lookup.join(
    df_income_barri.select('Nom_Barri', 'Index_RFD_average', 'Poblacio_average'),
    array_contains(df_idealista_lookup['all_names'], df_income_barri['Nom_Barri']),
    how='left'
).drop('Nom_Barri')


In [23]:
# Sanity check
display(Markdown(f"The row count should match between **{df_idealista_barri.count()}** and **{df_idealista_lookup.count()}**."))

The row count should match between **4062** and **4062**.

We check how many properties are missing an Index\_RFD\_average value. Any property without a match did not have its neighborhood name present in any of the entries in the lookup table.

In [24]:
# Count rows where there is a valid neighborhood_id but missing Index_RFD_average
missing_income = df_idealista_barri.filter(
    (F.col('neighborhood_id').isNotNull()) &
    (F.col('Index_RFD_average').isNull())
).count()

display(Markdown(
    f"Rows with non-null <b>neighborhood_id</b> but missing <b>Index_RFD_average</b>: <b>{missing_income}</b>.<br>"
    + ("<span style='color:green'>PASS</span>" if missing_income == 0 else "<span style='color:red'>CHECK DATA</span>")
))


Rows with non-null <b>neighborhood_id</b> but missing <b>Index_RFD_average</b>: <b>0</b>.<br><span style='color:green'>PASS</span>

In [25]:
df_idealista_barri.filter(F.col("Index_RFD_average").isNull()).count()

0

We remove columns that won't be useful for our analysis

In [26]:
final_cleaned = df_idealista_barri \
    .drop("address", "country", "detailedType", "externalReference", "municipality", "operation", "province", "suggestedTexts", "thumbnail",
          "url", "all_names") 

In [27]:
final_cleaned.show()

+---------+--------+-------------------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+--------------------+--------------+----------------------+---------+------------------+---------+-----------+------------+------------+-----+-----------+-----+------+-----------------+---------------+------------------+------------------+
|bathrooms|distance|           district|exterior|floor|has360|has3DTour|hasLift|hasPlan|hasStaging|hasVideo|  latitude|longitude|        neighborhood|newDevelopment|newDevelopmentFinished|numPhotos|      parkingSpace|    price|priceByArea|propertyCode|propertyType|rooms|showAddress| size|status|topNewDevelopment|neighborhood_id| Index_RFD_average|  Poblacio_average|
+---------+--------+-------------------+--------+-----+------+---------+-------+-------+----------+--------+----------+---------+--------------------+--------------+----------------------+---------+------------------+---------+-----------+------------+----------

In [28]:
# transform in parquet file
final_cleaned.write.mode("overwrite").parquet(f"{formatted_zone}/formatted_data")