## Trusted zone

### rent_price

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr, lit, min, max

In [2]:
spark = SparkSession.builder\
    .config("spark.jars", "duckdb.jar") \
    .getOrCreate()

24/04/24 19:54:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# rent_price
DF = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:formatted_zone/freshdata.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", "SELECT * FROM rent_price") \
  .load()

# verificar duplicats
print(f"Hi ha {DF.count() - DF.distinct().count()} valors duplicats al DataFrame 'formatted_zone/rent_price.db' de {DF.count()} valors totals")

# verificar NA
print(f'Hi ha {DF.count() - DF.na.drop().count()} NAs')

# escalar obteninint mínim i màxims via query en funció de la unitat
min_max = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:formatted_zone/freshdata.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", 'SELECT "Average _rent", min(Price) AS min_price , max(Price) AS max_price FROM rent_price GROUP BY "Average _rent"') \
  .load().collect()

DF = DF.withColumn('Price_Escalat', F.when(DF['Average _rent'] == min_max[0][0], (DF['Price'] - min_max[0].min_price)/(min_max[0].max_price - min_max[0].min_price)).otherwise((DF['Price'] - min_max[1].min_price)/(min_max[1].max_price - min_max[1].min_price)))

print("S'ha escalat la columna 'Price' en funció de la columna 'Average _rent'")

# outliers
IQR = DF.approxQuantile('Price', [0.25, 0.75], 0.001)
IQR_values = IQR[1] - IQR[0]
        
DF = DF.withColumn('IQR', lit(IQR_values))
                
not_outliers = DF.filter(col('Price') >= IQR[0] - 1.5 * IQR_values)\
            .filter(col('Price') <= IQR[1] + 1.5 * IQR_values)

outliers = DF.subtract(not_outliers)
print(f"Hi ha {outliers.count()} outliers detectats per cuartils")

DF = DF.drop('IQR')

# substituïr apostrof i espacios por _ en los valores de la columna Neighborhood
DF = DF.withColumn('Neighbourhood', F.regexp_replace('Neighbourhood', ' ', '_'))
DF = DF.withColumn('Neighbourhood', F.regexp_replace('Neighbourhood', "'", '_'))
DF = DF.withColumn('Neighbourhood', F.regexp_replace('Neighbourhood', ",", '_'))
DF = DF.withColumn('District', F.regexp_replace('District', ',', '_'))
DF = DF.withColumn('District', F.regexp_replace('District', ' ', '_'))
DF = DF.withColumn('District', F.regexp_replace('District', "'", '_'))

Hi ha 0 valors duplicats al DataFrame 'formatted_zone/rent_price.db' de 4622 valors totals
Hi ha 0 NAs
S'ha escalat la columna 'Price' en funció de la columna 'Average _rent'




CodeCache: size=131072Kb used=23084Kb max_used=23084Kb free=107987Kb
 bounds [0x00000001089e0000, 0x000000010a090000, 0x00000001109e0000]
 total_blobs=9317 nmethods=8380 adapters=848
 compilation: disabled (not enough contiguous free space left)
Hi ha 5 outliers detectats per cuartils


In [4]:
DF.write \
        .format("jdbc") \
        .option("url", f"jdbc:duckdb:trusted_zone/freshdata_trusted.db") \
        .option("dbtable", "rent_price") \
        .option("driver", "org.duckdb.DuckDBDriver") \
        .mode("overwrite") \
        .save()

                                                                                

### renda

In [5]:
# rent_price
DF = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:formatted_zone/freshdata.db") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", "SELECT * FROM renda") \
  .load()

# verificar duplicats
print(f"Hi ha {DF.count() - DF.distinct().count()} valors duplicats al DataFrame 'formatted_zone/renda.db' de {DF.count()} valors totals")

# verificar NA
print(f'Hi ha {DF.count() - DF.na.drop().count()} NAs')

# escalar obteninint mínim i màxims via query en funció de la unitat
min_value = DF.select(min("Import_Euros"), max("Import_Euros")).collect()[0][0]
max_value = DF.select(min("Import_Euros"), max("Import_Euros")).collect()[0][1]
DF = DF.withColumn('Import_Euros_Escalat', (DF['Import_Euros'] - min_value)/(max_value - min_value))

print("S'ha escalat la columna 'Import_Euros' en funció de la columna 'Average _rent'")

# outliers
IQR = DF.approxQuantile('Import_Euros', [0.25, 0.75], 0.001)
IQR_values = IQR[1] - IQR[0]
        
DF = DF.withColumn('IQR', lit(IQR_values))
                
not_outliers = DF.filter(col('Import_Euros') >= IQR[0] - 1.5 * IQR_values)\
            .filter(col('Import_Euros') <= IQR[1] + 1.5 * IQR_values)

outliers = DF.subtract(not_outliers)
print(f"Hi ha {outliers.count()} outliers detectats per cuartils")

DF = DF.drop('IQR')

# substituïr apostrof i espacios por _ en los valores de la columna Neighborhood
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', ' ', '_'))
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', "'", '_'))
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', ",", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', ",", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', " ", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', "'", '_'))

Hi ha 0 valors duplicats al DataFrame 'formatted_zone/renda.db' de 7476 valors totals
Hi ha 7476 NAs
S'ha escalat la columna 'Import_Euros' en funció de la columna 'Average _rent'
Hi ha 4471 outliers detectats per cuartils


In [6]:
DF.write \
        .format("jdbc") \
        .option("url", f"jdbc:duckdb:trusted_zone/freshdata_trusted.db") \
        .option("dbtable", "renda") \
        .option("driver", "org.duckdb.DuckDBDriver") \
        .mode("overwrite") \
        .save()

                                                                                

### compravenda_sup

In [7]:

DF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:duckdb:formatted_zone/freshdata.db") \
    .option("driver", "org.duckdb.DuckDBDriver") \
    .option("query", "SELECT * FROM compravenda_sup") \
    .load()


In [8]:

# verificar duplicats
print(f"Hi ha {DF.count() - DF.distinct().count()} valors duplicats al DataFrame 'formatted_zone/compravenda_sup.db' de {DF.count()} valors totals")

# substituir "--" por NA
DF = DF.withColumn('Nombre', F.when(col('Nombre') == '--', None).otherwise(col('Nombre')))
# verificar NA
print(f'Hi ha {DF.count() - DF.na.drop().count()} NAs')
# imprimir toda la informació que té un NA
valorsnull = DF.filter(DF.Nombre.isNull())

# passar a numeric
DF = DF.withColumn('Nombre', DF['Nombre'].cast('float'))

# Imputar NA de Nombre amb la mitjana de "Nombre" per "Nom_barri"
mean_nombre_nom_barri = DF.groupBy('Nom_barri').agg(F.mean('Nombre').alias('mean_nombre_nom_barri'))

DF = DF.join(mean_nombre_nom_barri, on='Nom_barri', how='left')
DF = DF.withColumn('Nombre', F.when(DF['Nombre'].isNull(), DF['mean_nombre_nom_barri']).otherwise(DF['Nombre']))
# Verificar NAs després de la imputació per barri
print(f'Hi ha {DF.count() - DF.na.drop().count()} NAs en barri')

# Imputar NA de Nombre amb la mitjana de "Nombre" per "Nom_districte"
mean_nombre_nom_districte = DF.groupBy('Nom_Districte').agg(F.mean('Nombre').alias('mean_nombre_nom_districte'))

DF = DF.join(mean_nombre_nom_districte, on='Nom_Districte', how='left')
DF = DF.withColumn('Nombre', F.when(DF['Nombre'].isNull(), DF['mean_nombre_nom_districte']).otherwise(DF['Nombre']))

# Verificar NAs després de la imputació per districte
print(f'Hi ha {DF.count() - DF.na.drop().count()} NAs en districte')

# Imprimir totes les files amb NA a la columna "Nombre"
DF.filter(DF.Nombre.isNull()).show()


min_value = DF.groupBy('Superfície_mitjana_(m2_construïts)').agg(F.min('Nombre').alias('min_value'))
max_value = DF.groupBy('Superfície_mitjana_(m2_construïts)').agg(F.max('Nombre').alias('max_value'))

DF = DF.join(min_value, on='Superfície_mitjana_(m2_construïts)', how='left')
DF = DF.join(max_value, on='Superfície_mitjana_(m2_construïts)', how='left')

DF = DF.withColumn('Nombre_escalat', F.when(DF['Superfície_mitjana_(m2_construïts)'] == 0, 0).otherwise(DF['Nombre']-DF['min_value'])/(DF['max_value']-DF['min_value']))

# outliers
IQR = DF.approxQuantile('Nombre', [0.25, 0.75], 0.001)
IQR_values = IQR[1] - IQR[0]
        
DF = DF.withColumn('IQR', lit(IQR_values))
                
not_outliers = DF.filter(col('Nombre') >= IQR[0] - 1.5 * IQR_values)\
            .filter(col('Nombre') <= IQR[1] + 1.5 * IQR_values)

outliers = DF.subtract(not_outliers)
print(f"Hi ha {outliers.count()} outliers detectats per quartils")
  
# eliminar columnes auxiliars
DF = DF.drop('min_value', 'max_value', 'mean_nombre_nom_barri', 'mean_nombre_nom_districte', 'IQR')

# substituïr apostrof i espacios por _ en los valores de la columna Neighborhood
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', ' ', '_'))
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', "'", '_'))
DF = DF.withColumn('Nom_Barri', F.regexp_replace('Nom_Barri', ",", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', ",", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', " ", '_'))
DF = DF.withColumn('Nom_Districte', F.regexp_replace('Nom_Districte', "'", '_'))

Hi ha 0 valors duplicats al DataFrame 'formatted_zone/compravenda_sup.db' de 876 valors totals
Hi ha 290 NAs
Hi ha 12 NAs en barri
Hi ha 12 NAs en districte
+-------------+---------+---+---------+--------------+----------+----------------------------------+------+---------------------+-------------------------+
|Nom_Districte|Nom_Barri|Any|Trimestre|Codi_Districte|Codi_Barri|Superfície_mitjana_(m2_construïts)|Nombre|mean_nombre_nom_barri|mean_nombre_nom_districte|
+-------------+---------+---+---------+--------------+----------+----------------------------------+------+---------------------+-------------------------+
+-------------+---------+---+---------+--------------+----------+----------------------------------+------+---------------------+-------------------------+



24/04/24 19:55:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Hi ha 109 outliers detectats per quartils


In [9]:
DF.write \
        .format("jdbc") \
        .option("url", f"jdbc:duckdb:trusted_zone/freshdata_trusted.db") \
        .option("dbtable", "compravenda_sup") \
        .option("driver", "org.duckdb.DuckDBDriver") \
        .mode("overwrite") \
        .save()

In [10]:
DF.show()

+----------------------------------+--------------+--------------------+----+---------+--------------+----------+------------------+-------------------+
|Superfície_mitjana_(m2_construïts)| Nom_Districte|           Nom_Barri| Any|Trimestre|Codi_Districte|Codi_Barri|            Nombre|     Nombre_escalat|
+----------------------------------+--------------+--------------------+----+---------+--------------+----------+------------------+-------------------+
|                             Total|     Les_Corts|           les_Corts|2023|        1|             4|        19|  80.9000015258789|0.16117216998719475|
|                             Total|     Les_Corts|la_Maternitat_i_S...|2023|        1|             4|        20|              91.0|0.20227920475625832|
|                             Total|     Les_Corts|           Pedralbes|2023|        1|             4|        21| 145.3000030517578|0.42328043749190036|
|                             Total|Sants-Montjuïc|   la_Marina_de_Port|2023|     