In [0]:
from pyspark.sql import SparkSession

In [0]:
spark =(SparkSession
        .builder
        .appName("Padron")
        .getOrCreate())

In [0]:
## 6.1) Comenzamos realizando la misma práctica que hicimos en Hive en Spark, importando el csv. Sería recomendable intentarlo con opciones que quiten las "" de los campos, que ignoren los espacios innecesarios en los campos, que sustituyan los valores vacíos por 0 y que infiera el esquema.##

In [0]:
df = spark.read\
           .option("header", "true")\
           .option("inferSchema", "true")\
           .option("delimiter", ";")\
           .option("mode", "PERMISSION")\
           .option("emptyValue", 0)\
           .csv("dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/Rango_Edades_Seccion_202106-3.csv",
               encoding="UTF-8",
               ignoreLeadingWhiteSpace=True,
               ignoreTrailingWhiteSpace=True,
               quote='"',
               escape='"')

In [0]:
##REVISAR##
df = spark.read.format("csv")\
          .option("inferSchema", True)\
          .option("header", True)\
          .option("emptyValue", "0")\
          .option("quote", "\"")\
          .option("escape", "\"")\
          .option("ignoreLeadingWhiteSpace", True)\
          .option("ignoreTrailingWhiteSpace", True)\
          .option("sep", ";")\
          .option("encoding", "ISO-8859-1")\
          .load("dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/Rango_Edades_Seccion_202106-3.csv")

df.show()

In [0]:
##  6.2) De manera alternativa también se puede importar el csv con menos tratamiento en la importación y hacer todas las modificaciones para alcanzar el mismo estado de limpieza de los datos con funciones de Spark.

In [0]:
df1 = spark.read\
           .option("header", "true")\
           .option("inferSchema", "true")\
           .option("delimiter", ";")\
           .csv("dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/Rango_Edades_Seccion_202106-3.csv")
df1.show()

In [0]:
df2=df1.na.fill(0)
df2.show()

In [0]:
from pyspark.sql.functions import regexp_replace, col
cleanDF=df2.select("COD_DISTRITO",
                   regexp_replace(col("DESC_DISTRITO"), " ", "").alias("DESC_DISTRITO"),
                   "COD_DIST_BARRIO",
                   regexp_replace(col("DESC_BARRIO"), " ", "").alias("DESC_BARRIO"),
                   "COD_BARRIO",
                   "COD_DIST_SECCION",
                   "COD_SECCION",
                   "COD_EDAD_INT",
                   "EspanolesHombres",
                   "EspanolesMujeres",
                   "ExtranjerosHombres",
                   "ExtranjerosMujeres")
cleanDF.show()

In [0]:
##  6.3) Enumera todos los barrios diferentes.
cleanDF.select("COD_BARRIO","DESC_BARRIO").distinct().show()
cleanDF.select("COD_BARRIO","DESC_BARRIO").distinct().count()

In [0]:
## 6.4) Crea una vista temporal de nombre "padron" y a través de ella cuenta el número de barrios diferentes que hay.

In [0]:
cleanDF.createOrReplaceTempView("padron")

In [0]:
spark.read.table("padron").select("DESC_BARRIO").distinct().count()

In [0]:
##6.5) Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud".

In [0]:
import pyspark.sql.functions as F
cleanDF.select("DESC_DISTRITO").withColumn("longitud", F.length("DESC_DISTRITO")).distinct().show()
#hago un distinct() para mostrar que se hace bien con todos los distritos

In [0]:
#prueba a ver si se han eliminado los espacios en blanco en el primer DF
df.select("DESC_DISTRITO").withColumn("longitud", F.length("DESC_DISTRITO")).distinct().show()

In [0]:
## 6.6) Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla. 

In [0]:
from pyspark.sql.functions import lit
withFive=cleanDF.withColumn("numberFive", lit(5))
withFive.show()

In [0]:
## 6.7) Borra esta columna.

In [0]:
withFive=withFive.select("*").drop("numberFive")
withFive.select("*")

In [0]:
## 6.8) Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO.

In [0]:
partitionDF = cleanDF.repartition("DESC_DISTRITO", "DESC_BARRIO")

In [0]:
## 6.9) Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados.

In [0]:
partitionDF.cache()

In [0]:
## 6.10) Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres".

In [0]:
partitionDF.groupBy("DESC_DISTRITO", "DESC_BARRIO")\
            .agg(F.sum("EspanolesHombres").alias("Sum_EsH"), F.sum("EspanolesMujeres").alias("Sum_EsM"), 
               F.sum("ExtranjerosHombres").alias("Sum_ExH"), F.sum("ExtranjerosMujeres").alias("Sum_ExM"))\
           .orderBy((F.col("Sum_ExM")), (F.col("Sum_ExH")), ascending=False )\
           .show()

In [0]:
## 6.11)Elimina el registro en caché.

In [0]:
partitionDF.unpersist()

In [0]:
##6.12) Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común.

In [0]:
hombresDF = cleanDF.groupBy("DESC_BARRIO", "DESC_DISTRITO").agg(F.sum("EspanolesHombres").alias("num_EsH"))                 
hombresDF.show()

In [0]:
df_join = cleanDF.join(hombresDF, ((cleanDF["DESC_BARRIO"]==hombresDF["DESC_BARRIO"]) & (cleanDF["DESC_DISTRITO"]==hombresDF["DESC_DISTRITO"])), how="left_outer").select(cleanDF["*"], hombresDF["num_EsH"])
df_join.select("*").show()

In [0]:
## 6.13) Repite la función anterior utilizando funciones de ventana. (over(Window.partitionBy.....)).

In [0]:
from pyspark.sql.window import Window

In [0]:
particion = Window.partitionBy("DESC_BARRIO", "DESC_DISTRITO")
cleanDF.withColumn("Num_EsH", F.sum("EspanolesHombres").over(particion)).select("*").show()

In [0]:
## 6.14) Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que contenga los valores totales ()la suma de valores) de espanolesmujeres para cada distrito y en cada rango de edad (COD_EDAD_INT). Los distritos incluidos deben ser únicamente CENTRO, BARAJAS y RETIRO y deben figurar como columnas 

In [0]:
distritos= cleanDF.where((cleanDF.DESC_DISTRITO=='CENTRO') | (cleanDF.DESC_DISTRITO=='BARAJAS') | (cleanDF.DESC_DISTRITO=='RETIRO'))
distritos.select("DESC_DISTRITO").distinct().show()

In [0]:
distritosmujeres = distritos.groupBy("COD_EDAD_INT")\
                   .pivot("DESC_DISTRITO")\
                   .agg(F.sum("EspanolesMujeres"))\
                   .sort("COD_EDAD_INT")
distritosmujeres.show()

In [0]:
## 6.15) Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje de la suma de "espanolesmujeres" en los tres distritos para cada rango de edad representa cada uno de los tres distritos. Debe estar redondeada a 2 decimales. Puedes imponerte la condición extra de no apoyarte en ninguna columna auxiliar creada para el caso.


In [0]:
distritosmujeres.withColumn("total", F.col("BARAJAS")+ F.col("CENTRO")+F.col("RETIRO"))\
               .withColumn("%BAJARAS", F.round((F.col("BARAJAS")/F.col("total")),2))\
               .withColumn("%CENTRO", F.round((F.col("CENTRO")/F.col("total")),2))\
               .withColumn("%RETIRO", F.round((F.col("RETIRO")/F.col("total")),2))\
               .show()

In [0]:
## 6.16) Guarda el archivo csv original particionado por distrito y por barrio (en ese orden) en un directorio local. Consulta el directorio para ver la estructura de los ficheros y comprueba que es la esperada.

In [0]:
cleanDF.write.partitionBy("COD_DISTRITO", "COD_BARRIO").format("csv").save("dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/csv_path")

In [0]:
## 6.17) Haz el mismo guardado pero en formato parquet. Compara el peso del archivo con el resultado anterior.

In [0]:
cleanDF.write.partitionBy("COD_DISTRITO", "COD_BARRIO").format("parquet").save("dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path")

In [0]:
%fs ls dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path

path,name,size
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=1/,COD_DISTRITO=1/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=10/,COD_DISTRITO=10/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=11/,COD_DISTRITO=11/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=12/,COD_DISTRITO=12/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=13/,COD_DISTRITO=13/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=14/,COD_DISTRITO=14/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=15/,COD_DISTRITO=15/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=16/,COD_DISTRITO=16/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=17/,COD_DISTRITO=17/,0
dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/parquet_path/COD_DISTRITO=18/,COD_DISTRITO=18/,0


In [0]:
#7.1) Por último, prueba a hacer los ejercicios sugeridos en la parte de Hive con el csv "Datos Padrón" (incluyendo la importación con Regex) utilizando desde Spark EXCLUSIVAMENTE sentencias spark.sql, es decir, importar los archivos desde local directamente como tablas de Hive y haciendo todas las consultas sobre estas tablas sin transformarlas en ningún momento en DataFrames ni DataSets.

In [0]:
spark.sql("""DROP DATABASE IF EXISTS datos_padron CASCADE; """)

In [0]:
spark.sql("""CREATE DATABASE datos_padron;""")

In [0]:
spark.sql("""use datos_padron""")

In [0]:
spark.sql("""DROP TABLE IF EXISTS padron_txt""")

In [0]:
dataFrame = "dbfs:/FileStore/shared_uploads/nerea.gomez@bosonit.com/Rango_Edades_Seccion_202106-3.csv"
spark.read.format("csv").option("header","true")\
  .option("inferSchema", "true").option("sep", ";").option("emptyValue", 0).load(dataFrame)\
  .createOrReplaceTempView("padron_txt")

In [0]:
spark.sql("""SELECT * FROM padron_txt""").show()

In [0]:
spark.sql("""CREATE TABLE IF NOT EXISTS padron_txt2 AS 
SELECT COD_DISTRITO,
TRIM(DESC_DISTRITO) as DESC_DISTRITO,
COD_DIST_BARRIO,
TRIM(DESC_BARRIO) as DESC_BARRIO,
COD_BARRIO,
COD_DIST_SECCION,
COD_SECCION,
COD_EDAD_INT,
EspanolesHombres,
EspanolesMujeres,
ExtranjerosHombres,
ExtranjerosMujeres
FROM padron_txt;""")

In [0]:
spark.sql("""SELECT * FROM padron_txt2 limit 10;""").show()

In [0]:
spark.sql("""SELECT COD_DISTRITO, COD_BARRIO, sum(EspanolesHombres) as N_EspH, sum(EspanolesMujeres) as N_EspM, sum(ExtranjerosHombres) as N_ExH, sum(ExtranjerosMujeres) as N_ExM
FROM padron_txt2
WHERE COD_DISTRITO IN (1, 5, 10, 19, 6, 21)
GROUP BY COD_DISTRITO, COD_BARRIO; """).show()

In [0]:
spark.sql("""CREATE TABLE padron_particionado
(DESC_DISTRITO String,
COD_DIST_BARRIO Int,
DESC_BARRIO String,
COD_DIST_SECCION Int,
COD_SECCION Int,
COD_EDAD_INT Int,
EspanolesHombres Int,
EspanolesMujeres Int,
ExtranjerosHombres Int,
ExtranjerosMujeres Int)
PARTITIONED BY(COD_DISTRITO INT , COD_BARRIO INT)
stored as parquet; """)

In [0]:
spark.sql("""set hive.exec.dynamic.partition.mode=nonstrict """)

In [0]:
spark.sql("""INSERT OVERWRITE TABLE padron_particionado PARTITION(COD_DISTRITO, COD_BARRIO)
SELECT DESC_DISTRITO,
COD_DIST_BARRIO,
DESC_BARRIO,
COD_DIST_SECCION,
COD_SECCION,
COD_EDAD_INT,
EspanolesHombres,
EspanolesMujeres,
ExtranjerosHombres,
ExtranjerosMujeres,
COD_DISTRITO,
COD_BARRIO
FROM padron_txt2; """)

In [0]:
spark.sql("""SELECT COD_DISTRITO, COD_BARRIO, sum(EspanolesHombres) as N_EspH, sum(EspanolesMujeres) as N_EspM, sum(ExtranjerosHombres) as N_ExH, sum(ExtranjerosMujeres) as N_ExM
FROM padron_particionado
WHERE COD_DISTRITO IN (1, 5, 10, 19, 6, 21)
GROUP BY COD_DISTRITO, COD_BARRIO; """).show()

In [0]:
spark.sql("""SELECT COD_DISTRITO, COD_BARRIO, 
max(EspanolesHombres) as N_EspH, max(EspanolesMujeres) as N_EspM, max(ExtranjerosHombres) as N_ExH, max(ExtranjerosMujeres) as N_ExM,
min(EspanolesHombres) as N_EspH, min(EspanolesMujeres) as N_EspM, min(ExtranjerosHombres) as N_ExH, min(ExtranjerosMujeres) as N_ExM,
avg(EspanolesHombres) as N_EspH, avg(EspanolesMujeres) as N_EspM, avg(ExtranjerosHombres) as N_ExH, avg(ExtranjerosMujeres) as N_ExM,
count(EspanolesHombres) as N_EspH, count(EspanolesMujeres) as N_EspM, count(ExtranjerosHombres) as N_ExH, count(ExtranjerosMujeres) as N_ExM
FROM padron_txt2
WHERE COD_DISTRITO IN (1, 5, 10, 19, 6, 21)
GROUP BY COD_DISTRITO, COD_BARRIO; """).show()

In [0]:
spark.sql("""SELECT COD_DISTRITO, COD_BARRIO, 
max(EspanolesHombres) as N_EspH, max(EspanolesMujeres) as N_EspM, max(ExtranjerosHombres) as N_ExH, max(ExtranjerosMujeres) as N_ExM,
min(EspanolesHombres) as N_EspH, min(EspanolesMujeres) as N_EspM, min(ExtranjerosHombres) as N_ExH, min(ExtranjerosMujeres) as N_ExM,
avg(EspanolesHombres) as N_EspH, avg(EspanolesMujeres) as N_EspM, avg(ExtranjerosHombres) as N_ExH, avg(ExtranjerosMujeres) as N_ExM,
count(EspanolesHombres) as N_EspH, count(EspanolesMujeres) as N_EspM, count(ExtranjerosHombres) as N_ExH, count(ExtranjerosMujeres) as N_ExM
FROM padron_particionado
WHERE COD_DISTRITO IN (1, 5, 10, 19, 6, 21)
GROUP BY COD_DISTRITO, COD_BARRIO; """).show()