##Padrón de Madrid

####1. Create a DataFrame with the given .csv

In [0]:
df = (spark.read.format("csv").option("inferSchema", "true").option("header", "true").option("sep", ";").load("dbfs:/FileStore/shared_uploads/sergio.salgado@n.world/Rango_Edades_Seccion_202209__1_.csv"))

####2. Clean the data

In [0]:
from pyspark.sql.functions import *

clean_df = df.na.fill(value=0).withColumn("DESC_DISTRITO", rtrim(col("DESC_DISTRITO"))).withColumn("DESC_BARRIO", rtrim(col("DESC_BARRIO")))
clean_df.display()



COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres
1,CENTRO,101,PALACIO,1,1001,1,0,1,1,0,0
1,CENTRO,101,PALACIO,1,1001,1,1,3,3,1,0
1,CENTRO,101,PALACIO,1,1001,1,2,5,1,1,1
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,1,2
1,CENTRO,101,PALACIO,1,1001,1,4,3,1,1,0
1,CENTRO,101,PALACIO,1,1001,1,5,0,1,0,2
1,CENTRO,101,PALACIO,1,1001,1,6,2,2,1,1
1,CENTRO,101,PALACIO,1,1001,1,7,1,1,0,0
1,CENTRO,101,PALACIO,1,1001,1,8,2,0,0,0
1,CENTRO,101,PALACIO,1,1001,1,9,1,2,0,1


####3. Count the number of different neighborhood

In [0]:
from pyspark.sql.functions import countDistinct
number_diff_neighborhoods = (clean_df
                             .select(countDistinct("DESC_BARRIO").alias("number_diff_neighborhoods"))
                             .display()
                            )

number_diff_neighborhoods
132


####4. Create a temporal view and count the number of different neighborhood

In [0]:
#First we create the view
df.createOrReplaceTempView("number_diff_neighborhoods_view")

spark.sql("SELECT COUNT(DISTINCT DESC_BARRIO) AS number_diff_neighborhoods FROM number_diff_neighborhoods_view").display()

number_diff_neighborhoods
132


####5. Create a new column that shows the length of the fields in the DESC_DISTRITO column and call it "longitud"

In [0]:
from pyspark.sql.functions import length, col

clean_df_longitud = (clean_df
                     .withColumn("longitud", length("DESC_DISTRITO"))
                     .display()
                    )

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,longitud
1,CENTRO,101,PALACIO,1,1001,1,0,1,1,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,1,3,3,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,2,5,1,1,1,6
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,1,2,6
1,CENTRO,101,PALACIO,1,1001,1,4,3,1,1,0,6
1,CENTRO,101,PALACIO,1,1001,1,5,0,1,0,2,6
1,CENTRO,101,PALACIO,1,1001,1,6,2,2,1,1,6
1,CENTRO,101,PALACIO,1,1001,1,7,1,1,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,8,2,0,0,0,6
1,CENTRO,101,PALACIO,1,1001,1,9,1,2,0,1,6


####6. Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla

In [0]:
from pyspark.sql.functions import lit

clean_df_five = (clean_df
                 .withColumn("five", lit("5"))
                 .display()
                )

COD_DISTRITO,DESC_DISTRITO,COD_DIST_BARRIO,DESC_BARRIO,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres,five
1,CENTRO,101,PALACIO,1,1001,1,0,1,1,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,1,3,3,1,0,5
1,CENTRO,101,PALACIO,1,1001,1,2,5,1,1,1,5
1,CENTRO,101,PALACIO,1,1001,1,3,3,1,1,2,5
1,CENTRO,101,PALACIO,1,1001,1,4,3,1,1,0,5
1,CENTRO,101,PALACIO,1,1001,1,5,0,1,0,2,5
1,CENTRO,101,PALACIO,1,1001,1,6,2,2,1,1,5
1,CENTRO,101,PALACIO,1,1001,1,7,1,1,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,8,2,0,0,0,5
1,CENTRO,101,PALACIO,1,1001,1,9,1,2,0,1,5


####7. Borra esta columna

In [0]:
clean_df_no_five = (clean_df
            .drop("five")
            .printSchema()
           )

root
 |-- COD_DISTRITO: integer (nullable = true)
 |-- DESC_DISTRITO: string (nullable = true)
 |-- COD_DIST_BARRIO: integer (nullable = true)
 |-- DESC_BARRIO: string (nullable = true)
 |-- COD_BARRIO: integer (nullable = true)
 |-- COD_DIST_SECCION: integer (nullable = true)
 |-- COD_SECCION: integer (nullable = true)
 |-- COD_EDAD_INT: integer (nullable = true)
 |-- EspanolesHombres: integer (nullable = true)
 |-- EspanolesMujeres: integer (nullable = true)
 |-- ExtranjerosHombres: integer (nullable = true)
 |-- ExtranjerosMujeres: integer (nullable = true)



####8. Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO

In [0]:
clean_df_partitioned = (clean_df
                        .repartition(col("DESC_DISTRITO"), col("DESC_BARRIO"))
                      )

####9. Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados.

In [0]:
clean_df_partitioned.cache()

Out[11]: DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int]

####10. Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres".

In [0]:
from pyspark.sql.functions import col, count, sum

clean_df_partitioned_consulta = (clean_df
                                 .select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres", "espanolesmujeres", "extranjeroshombres", "extranjerosmujeres")
                                 .groupBy("DESC_DISTRITO", "DESC_BARRIO")
                                 .agg(sum(col("espanoleshombres")).alias("espanoleshombres"), sum(col("espanolesmujeres")).alias("espanolesmujeres"), sum(col("extranjeroshombres")).alias("extranjeroshombres"), sum(col("extranjerosmujeres")).alias("extranjerosmujeres"))
                                )
clean_df_partitioned_consulta.display()

DESC_DISTRITO,DESC_BARRIO,espanoleshombres,espanolesmujeres,extranjeroshombres,extranjerosmujeres
HORTALEZA,VALDEFUENTES,30129,30655,3426,4305
MORATALAZ,MARROQUINA,11278,13136,883,1106
CENTRO,JUSTICIA,6905,6538,2405,2328
CENTRO,UNIVERSIDAD,12163,12278,4199,4284
VICALVARO,CASCO H.VICALVARO,13241,14437,3881,4066
ARGANZUELA,LEGAZPI,8786,9161,755,814
CENTRO,EMBAJADORES,16266,16138,8040,6014
VILLAVERDE,LOS ANGELES,12596,14446,3488,3841
CARABANCHEL,ABRANTES,11120,12965,3666,4049
CHAMBERI,ARAPILES,9391,11602,1445,1749


####11. Elimina el registro en caché

In [0]:
spark.catalog.clearCache()
# for an especific view --->
#     spark.catalog.uncacheTable(clean_df_partitioned)
 

####12. Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común.

In [0]:
from pyspark.sql.functions import col, count

nuevo_df = (clean_df
            .select("DESC_DISTRITO", "DESC_BARRIO", "espanoleshombres")
            .groupBy("DESC_DISTRITO", "DESC_BARRIO")
            .agg(count(col("espanoleshombres")).alias("espanoleshombres"))
            .orderBy(asc(col("DESC_DISTRITO")))
           )
nuevo_df_2 = (nuevo_df
              .join(clean_df, [(nuevo_df.DESC_BARRIO == clean_df.DESC_BARRIO) & (nuevo_df.DESC_DISTRITO == clean_df.DESC_DISTRITO)], "leftouter")
             )

nuevo_df.display()
nuevo_df_2.display()

DESC_DISTRITO,DESC_BARRIO,espanoleshombres
ARGANZUELA,LEGAZPI,1129
ARGANZUELA,CHOPERA,1576
ARGANZUELA,PALOS DE MOGUER,1791
ARGANZUELA,DELICIAS,1942
ARGANZUELA,IMPERIAL,1687
ARGANZUELA,ACACIAS,2642
ARGANZUELA,ATOCHA,95
BARAJAS,AEROPUERTO,99
BARAJAS,CASCO H.BARAJAS,486
BARAJAS,ALAMEDA DE OSUNA,1361


DESC_DISTRITO,DESC_BARRIO,espanoleshombres,COD_DISTRITO,DESC_DISTRITO.1,COD_DIST_BARRIO,DESC_BARRIO.1,COD_BARRIO,COD_DIST_SECCION,COD_SECCION,COD_EDAD_INT,EspanolesHombres,EspanolesMujeres,ExtranjerosHombres,ExtranjerosMujeres
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,0,4,5,1,1
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,1,4,5,0,1
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,2,4,11,1,0
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,3,5,5,0,1
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,4,7,1,0,2
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,5,6,4,1,0
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,6,12,5,0,0
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,7,4,8,0,1
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,8,7,4,1,0
ARGANZUELA,ACACIAS,2642,2,ARGANZUELA,202,ACACIAS,2,2015,15,9,5,8,0,0
