# Install required packages

In [1]:
%pip install pymongo
%pip install pandas

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# Import required packages

In [10]:
import pandas as pd
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when, isnull, lower, regexp_replace, array_contains, explode

## RECONCILIATION

In [21]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("ParquetReadExample") \
    .getOrCreate()

##################################################################################################
#                                        LOAD DATAFRAMES
##################################################################################################
# LOOKUP TABLES - IDEALISTA
df_rent_district_lookup = spark.read.json("./P2_data/lookup_tables/rent_lookup_district.json")
df_rent_neighborhood_lookup = spark.read.json("./P2_data/lookup_tables/rent_lookup_neighborhood.json")

# LOOKUP TABLES - INCOME_OPENDATA
df_income_district_lookup = spark.read.json("./P2_data/lookup_tables/income_lookup_district.json")
df_income_neighborhood_lookup = spark.read.json("./P2_data/lookup_tables/income_lookup_neighborhood.json")

# IDEALISTA PARQUET FILES
parquetDF = spark.read.parquet("./P2_data/idealista/*/*")
#parquetDF.filter(parquetDF["municipality"] == 'Barcelona').show()

# INCOME_OPENDATA JSON
openData_df = spark.read.json("./P2_data/income_opendata/income_opendata_neighborhood.json")

# THIRD DATASET: SQUAREMETERPRICEEVOLUTION JSON
squaremeterpriceevolution_df = spark.read.json("./P2_data/SquareMeterPriceEvolution/squareMeterPriceEvolution.json")
##################################################################################################

##################################################################################################
#                                        RECONCILIATION
##################################################################################################
# IDEALISTA
idealista_reconciled_df = None
if "district" in parquetDF.columns and all(col_name in df_rent_district_lookup.columns for col_name in ["di", "di_n", "di_re"]):
        df1 = parquetDF.alias("df1")
        df2 = df_rent_district_lookup.alias("df2")

        # Join the DataFrames based on matching conditions
        joined_df = df1.join(df2, (col("df1.district") == col("df2.di")) | (col("df1.district") == col("df2.di_n")) | (col("df1.district") == col("df2.di_re")), "left_outer") \
                        .select("df1.*", col("df2._id").alias("district_id"), "df2.di_re")

        # Replace the values in the "district" column with "di_re" values from df2
        idealista_reconciled_df = joined_df.withColumn("district", when(col("di_re").isNotNull(), col("di_re")).otherwise(col("district"))) \
                   .drop("di_re")
        
if "neighborhood" in idealista_reconciled_df.columns and all(col_name in df_rent_neighborhood_lookup.columns for col_name in ["ne", "ne_n", "ne_re"]):
        df1 = idealista_reconciled_df.alias("df1")
        df2 = df_rent_neighborhood_lookup.alias("df2")

        # Join the DataFrames based on matching conditions
        joined_df = df1.join(df2, (col("df1.neighborhood") == col("df2.ne")) | (col("df1.neighborhood") == col("df2.ne_n")) | (col("df1.neighborhood") == col("df2.ne_re")), "left_outer") \
                        .select("df1.*", col("df2._id").alias("neighborhood_id"), "df2.ne_re")

        # Replace the values in the "district" column with "ne_re" values from df2
        idealista_reconciled_df = joined_df.withColumn("neighborhood", when(col("ne_re").isNotNull(), col("ne_re")).otherwise(col("neighborhood"))) \
                   .drop("ne_re")

#idealista_reconciled_df.show()

# INCOME_OPENDATA
incomeOpenData_reconciled_df = None
if "district_name" in openData_df.columns and all(col_name in df_income_district_lookup.columns for col_name in ["district", "district_name", "district_reconciled"]):
        df1 = openData_df.alias("df1")
        df2 = df_income_district_lookup.alias("df2")

        # Join the DataFrames based on matching conditions
        joined_df = df1.join(df2, (col("df1.district_name") == col("df2.district")) | (col("df1.district_name") == col("df2.district_name")) | (col("df1.district_name") == col("df2.district_reconciled")), "left_outer") \
                        .select("df1.*", col("df2._id").alias("districtID"), "df2.district_reconciled")

        # Replace the values in the "district" column with "district_reconciled" values from df2
        incomeOpenData_reconciled_df = joined_df.withColumn("district_name", when(col("district_reconciled").isNotNull(), col("district_reconciled")).otherwise(col("district_name"))).drop("district_reconciled")

if "neigh_name " in incomeOpenData_reconciled_df.columns and all(col_name in df_income_neighborhood_lookup.columns for col_name in ["neighborhood", "neighborhood_name", "neighborhood_reconciled"]):
        df1 = incomeOpenData_reconciled_df.alias("df1")
        df2 = df_income_neighborhood_lookup.alias("df2")

        # Join the DataFrames based on matching conditions
        joined_df = df1.join(df2, (col("df1.neigh_name ") == col("df2.neighborhood")) | 
                                        (col("df1.neigh_name ") == col("df2.neighborhood_name")) | 
                                        (col("df1.neigh_name ") == col("df2.neighborhood_reconciled")) | 
                                        (col("df1.neigh_name ") == regexp_replace(col("df2.neighborhood_reconciled"), "\"", "")), "left_outer")\
                        .select("df1.*", col("df2._id").alias("neighborhoodID"), "df2.neighborhood_reconciled")
    
        # Replace the values in the "district" column with "neighborhood_reconciled" values from df2
        incomeOpenData_reconciled_df = joined_df.withColumn("neigh_name ", when(col("neighborhood_reconciled").isNotNull(), col("neighborhood_reconciled")).otherwise(col("neigh_name "))).drop("neighborhood_reconciled")

# REPORT: drop inutil cols, Remove "\"" to not keep nulls! ERROR?
incomeOpenData_reconciled_df = incomeOpenData_reconciled_df.drop(incomeOpenData_reconciled_df.columns[0]).drop(incomeOpenData_reconciled_df.columns[1])
incomeOpenData_reconciled_df = incomeOpenData_reconciled_df.select(incomeOpenData_reconciled_df.districtID.alias("district_id"),
                                                                        incomeOpenData_reconciled_df.district_name.alias("district"),
                                                                        incomeOpenData_reconciled_df.neighborhoodID.alias("neighborhood_id"),
                                                                        incomeOpenData_reconciled_df['neigh_name '].alias("neighborhood"),
                                                                        incomeOpenData_reconciled_df.info)


#incomeOpenData_reconciled_df.show()

# SQUAREMETERPRICEEVOLUTION                                                         
squareMeterPriceEvolution_reconciled_df = None

if "neighborhood" in squaremeterpriceevolution_df.columns and all(col_name in df_income_neighborhood_lookup.columns for col_name in ["neighborhood", "neighborhood_name", "neighborhood_reconciled"]):
        df1 = squaremeterpriceevolution_df.alias("df1")
        df2 = df_income_neighborhood_lookup.alias("df2")

        # Join the DataFrames based on matching conditions
        joined_df = df1.join(df2, (lower(col("df1.neighborhood")) == lower(col("df2.neighborhood"))) | 
                                        (lower(col("df1.neighborhood")) == lower(col("df2.neighborhood_name"))) | 
                                        (lower(col("df1.neighborhood")) == lower(col("df2.neighborhood_reconciled"))) | 
                                        (lower(col("df1.neighborhood")) == lower(regexp_replace(col("df2.neighborhood_reconciled"), "\"", ""))), "left_outer")\
                        .select("df1.*", col("df2._id").alias("neighborhoodID"), "df2.neighborhood_reconciled")
    
        # Replace the values in the "district" column with "di_re" values from df2
        squareMeterPriceEvolution_reconciled_df = joined_df.withColumn("neighborhood", when(col("neighborhood_reconciled").isNotNull(), col("neighborhood_reconciled")).otherwise(col("neighborhood"))).drop("neighborhood_reconciled")


squareMeterPriceEvolution_reconciled_df = squareMeterPriceEvolution_reconciled_df.join(df_income_district_lookup, array_contains(df_income_district_lookup.neighborhood_id, squareMeterPriceEvolution_reconciled_df.neighborhoodID), "left")
squareMeterPriceEvolution_reconciled_df = squareMeterPriceEvolution_reconciled_df.select(squareMeterPriceEvolution_reconciled_df._id.alias("district_id"),
                                                                        squareMeterPriceEvolution_reconciled_df.district,
                                                                        squareMeterPriceEvolution_reconciled_df.neighborhoodID.alias("neighborhood_id"),
                                                                        squareMeterPriceEvolution_reconciled_df.neighborhood,
                                                                        squareMeterPriceEvolution_reconciled_df.info)

#squareMeterPriceEvolution_reconciled_df.show()
##################################################################################################

##################################################################################################
#                                        DEDUPLICATION
##################################################################################################

# Do not remove duplicated by propertyCode because in other analysis may be interesting see the evolution of price for the same property related to certain factors like seasonality
idealista_deduplicated_df = idealista_reconciled_df.dropDuplicates()

##################################################################################################

##################################################################################################
#                                   FORMATTING & FILTERING & EXPANDING
##################################################################################################

idealista_filtered_df = idealista_deduplicated_df.filter(col("price").isNotNull())
idealista_filtered_df = idealista_filtered_df.filter(idealista_filtered_df["price"] > 0)
idealista_filtered_df = idealista_filtered_df.withColumn("address", regexp_replace(col("address"), "Calle de ", "C/ "))
idealista_filtered_df = idealista_filtered_df.withColumn("address", regexp_replace(col("address"), "Calle dels ", "C/ "))
idealista_filtered_df = idealista_filtered_df.withColumn("address", regexp_replace(col("address"), "Calle ", "C/ "))
idealista_filtered_df = idealista_filtered_df.withColumn("address", regexp_replace(col("address"), "barrio", "Barrio"))

incomeOpenData_expanded_df = incomeOpenData_reconciled_df.select("district_id", "district", "neighborhood_id", "neighborhood", explode("info").alias("info"))
incomeOpenData_expanded_df = incomeOpenData_expanded_df.select("district_id", "district", "neighborhood_id", "neighborhood", "info.year", "info.pop", "info.RFD")
#incomeOpenData_expanded_df.show()

squareMeterPriceEvolution_expanded_df = squareMeterPriceEvolution_reconciled_df.select("district_id", "district", "neighborhood_id", "neighborhood", explode("info").alias("info"))
squareMeterPriceEvolution_expanded_df = squareMeterPriceEvolution_expanded_df.select("district_id", "district", "neighborhood_id", "neighborhood", "info.year", "info.preu_m2")
#squareMeterPriceEvolution_expanded_df.show()

##################################################################################################

##################################################################################################
#                                        SAVE AS PARQUET FILES
##################################################################################################

idealista_filtered_df.write.format("parquet").mode("overwrite").option("compression", "snappy").partitionBy("neighborhood_id").save("./distributedFolder/idealista")
incomeOpenData_expanded_df.write.format("parquet").mode("overwrite").option("compression", "snappy").partitionBy("neighborhood_id").save("./distributedFolder/incomeOpenData")
squareMeterPriceEvolution_expanded_df.write.format("parquet").mode("overwrite").option("compression", "snappy").partitionBy("neighborhood_id").save("./distributedFolder/squareMeterPriceEvolution")

# Stop the SparkSession
spark.stop()


                                                                                

In [None]:
"""df2008 = pd.read_csv("./P2_data/SquareMeterPriceEvolution/DOSZEROZEROVUIT_SquareMeterPriceEvolution.csv")
df2008['BARRIS'] = df2008['BARRIS'].str.replace(r'^\d+\.\s+', '')
df2009 = pd.read_csv("./P2_data/SquareMeterPriceEvolution/DOSZEROZERONOU_SquareMeterPriceEvolution.csv")
df2009['BARRIS'] = df2009['BARRIS'].str.replace(r'^\d+\.\s+', '')
df2010 = pd.read_csv("./P2_data/SquareMeterPriceEvolution/DOSZERODEU_SquareMeterPriceEvolution.csv")
df2010['BARRIS'] = df2010['BARRIS'].str.replace(r'^\d+\.\s+', '')
df2011 = pd.read_csv("./P2_data/SquareMeterPriceEvolution/DOSZEROONZE_SquareMeterPriceEvolution.csv")
df2011['BARRIS'] = df2011['BARRIS'].str.replace(r'^\d+\.\s+', '')

newDict = []

df_json = df2008.to_json(orient='records')
data = json.loads(df_json)
for neig in data:
    newDict.append({'neighborhood': neig['BARRIS'], 'info': [{'year': 2008, 'preu_m2': neig['PREU_M2']}]})

df_json = df2009.to_json(orient='records')
data = json.loads(df_json)
for neig in data:
    for entry in newDict:
        if neig['BARRIS'] == entry['neighborhood']:
            entry['info'].append({'year': 2009, 'preu_m2': neig['PREU_M2']})

df_json = df2010.to_json(orient='records')
data = json.loads(df_json)
for neig in data:
    for entry in newDict:
        if neig['BARRIS'] == entry['neighborhood']:
            entry['info'].append({'year': 2010, 'preu_m2': neig['PREU_M2']})

df_json = df2011.to_json(orient='records')
data = json.loads(df_json)
for neig in data:
    for entry in newDict:
        if neig['BARRIS'] == entry['neighborhood']:
            entry['info'].append({'year': 2011, 'preu_m2': neig['PREU_M2']})

with open("./P2_data/SquareMeterPriceEvolution/squareMeterPriceEvolution.json", 'w') as json_file:
    for dictionary in newDict:
        json_file.write(str(dictionary).replace("'",'"') + '\n')"""