# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

In [2]:
%%configure
{
    "--datalake-formats":"delta",
    "--conf":"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
The following configurations have been updated: {'--datalake-formats': 'delta', '--conf': 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore'}


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
bucket_name="s3://bucket-for-requests/data/api_database/bronze/"

Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 079d17ef-5cc9-4528-a27b-de0ed36e263e
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--datalake-formats delta
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
Waiting for session 079d17ef-5cc9-4528-a27b-de0ed36e263e to get into ready status...
Session 079d17ef-5cc9-4528-a27b-de0ed36e263e has been created.



In [2]:
df = spark.read.parquet(bucket_name) # En este caso, usamos la ruta al bucket pues spark ha generado 3 particiones de datos.
df.printSchema()

root
 |-- address: string (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- subTypology: string (nullable = true)
 |-- typology: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- district: string (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- externalReference: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- has360: boolean (nullable = true)
 |-- has3DTour: boolean (nullable = true)
 |-- hasLift: boolean (nullable = true)
 |-- hasPlan: boolean (nullable = true)
 |-- hasStaging: boolean (nullable = true)
 |-- hasVideo: boolean (nullable = true)
 |-- groupDescription: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- municipality: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- newDevelopment: boolean (nullable = true)
 |-- newDevelopmentFinished: boolean

In [3]:
df.show(1, vertical=True)

-RECORD 0---------------------------------------------
 address                       | vereda de Palacio, 4 
 bathrooms                     | 2                    
 country                       | es                   
 description                   | Venta. Piso. El E... 
 subTypology                   | null                 
 typology                      | flat                 
 distance                      | 13765                
 district                      | Encinar de los Reyes 
 exterior                      | true                 
 externalReference             | V2807I               
 floor                         | 1                    
 has360                        | false                
 has3DTour                     | true                 
 hasLift                       | true                 
 hasPlan                       | true                 
 hasStaging                    | true                 
 hasVideo                      | true                 
 groupDesc

# Pasamos a limpiar el dataset

In [4]:
rows=df.count()
print(f"El dataframe contiene {rows} filas") # Realizamos el conteo para chequear que se ha cargado correctamente y para tener la referencia para futuras operaciones

El dataframe contiene 7450 filas


### Primeramente vamos a droppear todas las columnas que contengan información irrelevante para el análisis.

In [5]:
columns_to_drop=["description", "distance", "has360", "has3dtour", "hasplan", "hasstaging", "hasvideo", "groupdescription" ,"latitude", "longitude", "numphotos","operation", "showaddress", "subtitle","thumbnail", "topnewdevelopment","topplus", "url", "title", "currencySuffix", "externalReference"]
df_dropped=df.drop(*columns_to_drop)
df_dropped.cache()
df_dropped.printSchema()

root
 |-- address: string (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- country: string (nullable = true)
 |-- subTypology: string (nullable = true)
 |-- typology: string (nullable = true)
 |-- district: string (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- floor: string (nullable = true)
 |-- hasLift: boolean (nullable = true)
 |-- municipality: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- newDevelopment: boolean (nullable = true)
 |-- newDevelopmentFinished: boolean (nullable = true)
 |-- hasParkingSpace: boolean (nullable = true)
 |-- isParkingSpaceIncludedInPrice: boolean (nullable = true)
 |-- parkingSpacePrice: double (nullable = true)
 |-- price: double (nullable = true)
 |-- priceByArea: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- formerPrice: double (nullable = true)
 |-- priceDropPercentage: long (nullable = true)
 |-- priceDropValue: long (nullable = true)
 |-- propertyCode: string (nullabl

## Refactorizamos el nombre de algunas columnas para mejorar la explicabilidad de las mismas y para que columnas como "size" no interfieran en el uso de métodos de spark.

In [6]:
# Ahora vamos a renombrar algunas columnas para que los nombres transmitan más información.
df_dropped=df_dropped.withColumnRenamed("size", "size_m2")
df_dropped=df_dropped.withColumnRenamed("rooms", "rooms_count")
df_dropped=df_dropped.withColumnRenamed("priceByArea", "price_per_m2")
df_dropped=df_dropped.withColumnRenamed("bathrooms", "bathrooms_count")




In [7]:
df_dropped.printSchema()

root
 |-- address: string (nullable = true)
 |-- bathrooms_count: long (nullable = true)
 |-- country: string (nullable = true)
 |-- subTypology: string (nullable = true)
 |-- typology: string (nullable = true)
 |-- district: string (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- floor: string (nullable = true)
 |-- hasLift: boolean (nullable = true)
 |-- municipality: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- newDevelopment: boolean (nullable = true)
 |-- newDevelopmentFinished: boolean (nullable = true)
 |-- hasParkingSpace: boolean (nullable = true)
 |-- isParkingSpaceIncludedInPrice: boolean (nullable = true)
 |-- parkingSpacePrice: double (nullable = true)
 |-- price: double (nullable = true)
 |-- price_per_m2: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- formerPrice: double (nullable = true)
 |-- priceDropPercentage: long (nullable = true)
 |-- priceDropValue: long (nullable = true)
 |-- propertyCode: string (

# Tratamiento de nulos/valores perdidos

In [18]:
cols=df_dropped.columns
high_nulls_columns=[]
for col in cols:
    nulls=df_dropped.filter(df_dropped[col].isNull()).count()
    percentage=nulls/rows*100
    print(f"La columna {col} tiene: {nulls} valores nulos. Lo cual representa el {percentage:.2f}% del dataset")
    if percentage > 30:
        high_nulls_columns.append(col)
    else:
        continue

print(f"\nLas siguientes columnas contienen más de un 30% de valores nulos: {high_nulls_columns}")

La columna address tiene: 0 valores nulos. Lo cual representa el 0.00% del dataset
La columna bathrooms_count tiene: 0 valores nulos. Lo cual representa el 0.00% del dataset
La columna country tiene: 0 valores nulos. Lo cual representa el 0.00% del dataset
La columna subTypology tiene: 5273 valores nulos. Lo cual representa el 70.78% del dataset
La columna typology tiene: 0 valores nulos. Lo cual representa el 0.00% del dataset
La columna district tiene: 87 valores nulos. Lo cual representa el 1.17% del dataset
La columna exterior tiene: 1390 valores nulos. Lo cual representa el 18.66% del dataset
La columna floor tiene: 1604 valores nulos. Lo cual representa el 21.53% del dataset
La columna hasLift tiene: 1310 valores nulos. Lo cual representa el 17.58% del dataset
La columna municipality tiene: 0 valores nulos. Lo cual representa el 0.00% del dataset
La columna neighborhood tiene: 1408 valores nulos. Lo cual representa el 18.90% del dataset
La columna newDevelopment tiene: 0 valores 

De las anteriores columnas, las que podrían aportarle información explicativa o ayudar a las predicciones son hasParkingSpace y parkingSpacePrice. Sin embargo, debido a que tienen gran cantidad de valores perdidos, es preferible excluirlos del estudio.

### Simplemente descartamos del estudio todas aquellas columnas con más de un 30% de valores nulos.

In [19]:
df_without_nulls=df_dropped.drop(*high_nulls_columns)
df_without_nulls.printSchema()

root
 |-- address: string (nullable = true)
 |-- bathrooms_count: long (nullable = true)
 |-- country: string (nullable = true)
 |-- typology: string (nullable = true)
 |-- district: string (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- floor: string (nullable = true)
 |-- hasLift: boolean (nullable = true)
 |-- municipality: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- newDevelopment: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- price_per_m2: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- propertyCode: string (nullable = true)
 |-- propertyType: string (nullable = true)
 |-- province: string (nullable = true)
 |-- rooms_count: long (nullable = true)
 |-- size_m2: double (nullable = true)
 |-- status: string (nullable = true)


In [20]:
df_without_nulls.show(3)

+--------------------+---------------+-------+--------+--------------------+--------+-----+-------+------------+------------+--------------+---------+------------+---------+------------+------------+--------+-----------+-------+------+
|             address|bathrooms_count|country|typology|            district|exterior|floor|hasLift|municipality|neighborhood|newDevelopment|    price|price_per_m2|   amount|propertyCode|propertyType|province|rooms_count|size_m2|status|
+--------------------+---------------+-------+--------+--------------------+--------+-----+-------+------------+------------+--------------+---------+------------+---------+------------+------------+--------+-----------+-------+------+
|vereda de Palacio, 4|              2|     es|    flat|Encinar de los Reyes|    true|    1|   true| La Moraleja|        null|         false| 625000.0|      6443.0| 625000.0|   105083301|        flat|  Madrid|          2|   97.0|  good|
|calle de Darío Ap...|              5|     es|  chalet| 

Observando el resultado enfoco mi atención en la variable amount. 

Price y amount dan básicamente la misma información, el precio del inmueble. 

In [21]:
df_without_nulls=df_without_nulls.drop("amount") # La descartamos del dataframe.
df_without_nulls.printSchema()

root
 |-- address: string (nullable = true)
 |-- bathrooms_count: long (nullable = true)
 |-- country: string (nullable = true)
 |-- typology: string (nullable = true)
 |-- district: string (nullable = true)
 |-- exterior: boolean (nullable = true)
 |-- floor: string (nullable = true)
 |-- hasLift: boolean (nullable = true)
 |-- municipality: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- newDevelopment: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- price_per_m2: double (nullable = true)
 |-- propertyCode: string (nullable = true)
 |-- propertyType: string (nullable = true)
 |-- province: string (nullable = true)
 |-- rooms_count: long (nullable = true)
 |-- size_m2: double (nullable = true)
 |-- status: string (nullable = true)


En este punto quiero chequear valores perdidos de la variable floor pero solamente de las viviendas tipo piso.

Decidimos si imputar en caso de que la incidencia no sea muy grande (si es que existen nulos).

In [22]:
filtered_df=df_without_nulls.filter((F.col("typology") == "flat") & (F.col("floor").isNull())).select("typology", "floor")
filtered_df.show() # Existen nulos en la variable floor para las viviendas de tipo piso.

+--------+-----+
|typology|floor|
+--------+-----+
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
|    flat| null|
+--------+-----+
only showing top 20 rows


In [23]:
df_flats=df_without_nulls.where(F.col("propertyType")=="flat").cache()




In [24]:
floor_nulls=df_flats.filter(df_flats["floor"].isNull()).count()
print(f"Existen {floor_nulls} pisos que no sabemos la planta en la que se sitúan, lo que se corresponde al {floor_nulls/rows*100:.2f}% del dataset")

Existen 230 pisos que no sabemos la planta en la que se sitúan, lo que se corresponde al 3.09% del dataset


In [25]:
df_flats.describe().show() # Mostramos los estadísticos de las variables, enfocándonos en floor sobre todo.

+-------+--------+------------------+-------+--------+----------+------------------+--------------------+--------------------+-----------------+-----------------+--------------------+------------+--------+------------------+-----------------+------+
|summary| address|   bathrooms_count|country|typology|  district|             floor|        municipality|        neighborhood|            price|     price_per_m2|        propertyCode|propertyType|province|       rooms_count|          size_m2|status|
+-------+--------+------------------+-------+--------+----------+------------------+--------------------+--------------------+-----------------+-----------------+--------------------+------------+--------+------------------+-----------------+------+
|  count|    5233|              5233|   5233|    5233|      5174|              5003|                5233|                4568|             5233|             5233|                5233|        5233|    5233|              5233|             5233|  5196|


In [26]:
df_clean=df_without_nulls.withColumn(
    "floor",
    F.when((F.col("propertyType") == "flat") & F.col("floor").isNull(), 3) # Rellenamos los valores nulos con la media anteriormente vista.
    .otherwise(F.col("floor")) 
    )




In [27]:
df_clean.filter((F.col("floor").isNull()) & (F.col("propertyType") == "flat")).count() # Comprobamos que no hay más nulos

0


# Pasamos ahora al estudio y tratamiento de duplicados.

In [28]:
df_no_duplicates=df_clean.dropDuplicates(["propertyCode"]) # Descartamos por propertyCode pues es el valor identificativo de cada vivienda puesta a la venta.




In [29]:
no_duplicates_rows=df_no_duplicates.count()
percentage=no_duplicates_rows/rows*100
print(f"El dataframe sin duplicados contiene {no_duplicates_rows} filas")
print(f"El dataframe contiene un {100-percentage:.2f}% de datos duplicados") # Calculamos el porcentaje de filas sin duplicados.

El dataframe contiene 6483 filas
El dataframe contiene un 12.98% de datos duplicados


### Siendo un 12.98% de datos duplicados, vamos a quitarlos del dataset y continuar.

# Pasamos a guardar el archivo en formato delta y generamos una tabla en la base de datos mediante crawler.

In [21]:
silver_path="s3://bucket-for-requests/data/api_database/silver_folder"
df_no_duplicates.write\
        .format("delta")\
        .mode("overwrite")\
        .save(silver_path)




In [None]:
job.commit()