In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Creamos una sesión usando SparkSession
spark = (SparkSession
.builder
.appName("ejercicioPadron")
.getOrCreate())

In [0]:
# • 6.1) Comenzamos realizando la misma práctica que hicimos en Hive en Spark, importando el csv. Sería recomendable intentarlo con opciones que quiten las "" de los campos, que ignoren los espacios innecesarios en los campos, que sustituyan los valores vacíos por 0 y que infiera el esquema.

padronLimpio = spark.read.format("csv")\
 .option("header", "true")\
 .option("inferSchema", "true")\
 .option("delimiter", ";")\
 .option("emptyValue", "0")\
 .option("mode", "PERMISSION")\
 .csv("dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/Rango_Edades_Seccion_202106_utf.csv",
  ignoreLeadingWhiteSpace=True,
  ignoreTrailingWhiteSpace=True,
  encoding="UTF-8",
  quote='"',
  escape='"')
padronLimpio.show()

In [0]:
# • 6.2) De manera alternativa también se puede importar el csv con menos tratamiento en la importación y hacer todas las modificaciones para alcanzar el mismo estado de limpieza de los datos con funciones de Spark.

padron = spark.read\
           .option("header", "true")\
           .option("inferSchema", "true")\
           .option("delimiter", ";")\
           .csv("dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/Rango_Edades_Seccion_202106_utf.csv")
padron.show()

In [0]:
# • 6.3) Enumera todos los barrios diferentes.

barriosDif = padronLimpio.select("COD_BARRIO","DESC_BARRIO").distinct()
barriosDif.show()

In [0]:
# • 6.4) Crea una vista temporal de nombre "padron" y a través de ella cuenta el número de barrios diferentes que hay.

padronLimpio.createOrReplaceTempView("padronTmpView")
spark.read.table("padronTmpView").select(F.countDistinct("COD_BARRIO","DESC_BARRIO"))

In [0]:
# • 6.5) Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud".

padronLimpio.withColumn("longitud", F.length("DESC_BARRIO")).show()

In [0]:
cleanPadron = padronLimpio.select("COD_DISTRITO",
                   F.regexp_replace(F.col("DESC_DISTRITO"), " ", "").alias("DESC_DISTRITO"),
                   "COD_DIST_BARRIO",
                   F.regexp_replace(F.col("DESC_BARRIO"), " ", "").alias("DESC_BARRIO"),
                   "COD_BARRIO",
                   "COD_DIST_SECCION",
                   "COD_SECCION",
                   "COD_EDAD_INT",
                   "EspanolesHombres",
                   "EspanolesMujeres",
                   "ExtranjerosHombres",
                   "ExtranjerosMujeres")

cleanPadron.withColumn("longitud", F.length("DESC_BARRIO")).show()

In [0]:
# • 6.6) Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla. 

cleanPadron = cleanPadron.withColumn("ColumnaDe5", F.lit(5))
cleanPadron.show()

In [0]:
# • 6.7) Borra esta columna.
cleanPadron = cleanPadron.drop(F.col("ColumnaDe5"))
cleanPadron.show()

In [0]:
# • 6.8) Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO.

cleanPadron.write.partitionBy("DESC_DISTRITO", "DESC_BARRIO")
cleanPadron.rdd.getNumPartitions()

In [0]:
# • 6.9) Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados.

cleanPadron.cache().count()

In [0]:
# • 6.10) Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres".

cleanPadron.groupBy("DESC_DISTRITO","DESC_BARRIO")\
           .agg(F.sum("espanoleshombres"),F.sum("espanolesmujeres"),F.sum("extranjeroshombres"),F.sum("extranjerosmujeres"))\
           .orderBy(F.sum("extranjerosmujeres"),F.sum("extranjeroshombres"), ascending = False)\
           .show()

In [0]:
# • 6.11) Elimina el registro en caché.

cleanPadron.unpersist()

In [0]:
# • 6.12) Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común.

espHom = cleanPadron.groupBy("DESC_BARRIO","DESC_DISTRITO")\
           .agg(F.sum("espanoleshombres"))

In [0]:
espHom.show()

In [0]:
espHomDF = cleanPadron.join(espHom, ["DESC_BARRIO","DESC_DISTRITO"], how = 'left')
espHomDF.show()

In [0]:
# • 6.13) Repite la función anterior utilizando funciones de ventana. (over(Window.partitionBy.....)).

from pyspark.sql.window import Window
windowEspH = Window.partitionBy("DESC_BARRIO","DESC_DISTRITO")

In [0]:
espHomDF1 = cleanPadron.withColumn("SumEspanoleshombres",F.sum("espanoleshombres").over(windowEspH)).show()

In [0]:
# • 6.14) Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que contenga los valores totales ()la suma de valores) de espanolesmujeres para cada distrito y en cada rango de edad (COD_EDAD_INT). Los distritos incluidos deben ser únicamente CENTRO, BARAJAS y RETIRO y deben figurar como columnas . El aspecto debe ser similar a este:

distritos = ["BARAJAS","CENTRO","RETIRO"]
pivotDF = cleanPadron.groupBy("COD_EDAD_INT").pivot("DESC_DISTRITO", distritos).sum("espanolesmujeres").orderBy("COD_EDAD_INT")
pivotDF.show()

In [0]:
# • 6.15) Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje de la suma de "espanolesmujeres" en los tres distritos para cada rango de edad representa cada uno de los tres distritos. Debe estar redondeada a 2 decimales. Puedes imponerte la condición extra de no apoyarte en ninguna columna auxiliar creada para el caso.

pivotDF.withColumn("%BARAJAS", F.round((F.col("BARAJAS") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2))\
       .withColumn("%CENTRO", F.round((F.col("CENTRO") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2))\
       .withColumn("%RETIRO", F.round((F.col("RETIRO") / (F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))),2))\
       .show()

In [0]:
# • 6.16) Guarda el archivo csv original particionado por distrito y por barrio (en ese orden) en un directorio local. Consulta el directorio para ver la estructura de los ficheros y comprueba que es la esperada.

padron.write.option("header", True) \
      .partitionBy("DESC_DISTRITO", "DESC_BARRIO") \
      .csv("dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado")

In [0]:
%fs ls dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado

path,name,size
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=ARGANZUELA /,DESC_DISTRITO=ARGANZUELA /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=BARAJAS /,DESC_DISTRITO=BARAJAS /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=CARABANCHEL /,DESC_DISTRITO=CARABANCHEL /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=CENTRO /,DESC_DISTRITO=CENTRO /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=CHAMARTIN /,DESC_DISTRITO=CHAMARTIN /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=CHAMBERI /,DESC_DISTRITO=CHAMBERI /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=CIUDAD LINEAL /,DESC_DISTRITO=CIUDAD LINEAL /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=FUENCARRAL-EL PARDO /,DESC_DISTRITO=FUENCARRAL-EL PARDO /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=HORTALEZA /,DESC_DISTRITO=HORTALEZA /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado/DESC_DISTRITO=LATINA /,DESC_DISTRITO=LATINA /,0


In [0]:
# • 6.17) Haz el mismo guardado pero en formato parquet. Compara el peso del archivo con el resultado anterior

padron.write.option("header", True) \
      .partitionBy("DESC_DISTRITO", "DESC_BARRIO") \
      .format("parquet") \
      .save("dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2")

In [0]:
%fs ls dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2

path,name,size
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=ARGANZUELA /,DESC_DISTRITO=ARGANZUELA /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=BARAJAS /,DESC_DISTRITO=BARAJAS /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=CARABANCHEL /,DESC_DISTRITO=CARABANCHEL /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=CENTRO /,DESC_DISTRITO=CENTRO /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=CHAMARTIN /,DESC_DISTRITO=CHAMARTIN /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=CHAMBERI /,DESC_DISTRITO=CHAMBERI /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=CIUDAD LINEAL /,DESC_DISTRITO=CIUDAD LINEAL /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=FUENCARRAL-EL PARDO /,DESC_DISTRITO=FUENCARRAL-EL PARDO /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=HORTALEZA /,DESC_DISTRITO=HORTALEZA /,0
dbfs:/FileStore/shared_uploads/ricardo.afonso@bosonit.com/padronParticionado2/DESC_DISTRITO=LATINA /,DESC_DISTRITO=LATINA /,0
