In [0]:
#Importacion csv. Sería recomendable intentarlo con opciones que quiten las "" de los campos, que ignoren los espacios innecesarios en los campos, que sustituyan los valores vacíos por 0 y que infiera el esquema
datos_padron_DF = spark.read\
                       .option("header", "true")\
                       .option("inferSchema", "true")\
                       .option("quote", "\"")\
                       .option("mode", "DROPMALFORMED")\
                       .option("delimiter", ";")\
                       .csv("/FileStore/tables/Rango_Edades_Seccion_202204.csv")
                               
                    

In [0]:
#lo vemos
datos_padron_DF.show()

+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|COD_DISTRITO|       DESC_DISTRITO|COD_DIST_BARRIO|         DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|
+------------+--------------------+---------------+--------------------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           0|               3|               1|                 1|              null|
|           1|CENTRO              |            101|PALACIO             |         1|            1001|          1|           1|               5|               2|              null|              null|
|         

In [0]:
#Reemplazamos las col por las mismas pero con trim para eliminar los espacios en blanco
datos_padron_DF = datos_padron_DF.withColumn("DESC_DISTRITO", F.trim(F.col("DESC_DISTRITO")))\
                                 .withColumn("DESC_BARRIO", F.trim(F.col("DESC_BARRIO")))

In [0]:
#Enumera todos los barrios diferentes
import pyspark.sql.functions as F
datos_padron_DF.select("DESC_BARRIO").distinct().withColumn("indice", F.monotonically_increasing_id()).show(200)



+--------------------+------+
|         DESC_BARRIO|indice|
+--------------------+------+
|       LOS JERONIMOS|     0|
|          NI�O JESUS|     1|
|            VALVERDE|     2|
|              CORTES|     3|
|   PALOMERAS SURESTE|     4|
|           TRAFALGAR|     5|
|              HELLIN|     6|
|        VISTA ALEGRE|     7|
|           FONTARRON|     8|
|              VENTAS|     9|
|    FUENTE DEL BERRO|    10|
|              LUCERO|    11|
|            PACIFICO|    12|
|      CUATRO CAMINOS|    13|
|   CASCO H.VICALVARO|    14|
|            EL PILAR|    15|
|           ARGUELLES|    16|
|       VALDEACEDERAS|    17|
|            CANILLAS|    18|
|        VALLEHERMOSO|    19|
|         ALMENDRALES|    20|
|          BUENAVISTA|    21|
|               IBIZA|    22|
|         EMBAJADORES|    23|
|          CANILLEJAS|    24|
|          CASTELLANA|    25|
|             LEGAZPI|    26|
|      HISPANOAMERICA|    27|
|ENSANCHE DE VALLECAS|    28|
|             PIOVERA|    29|
|         

In [0]:
#Crea una vista temporal de nombre "padron" y a través de ella cuenta el número de barrios diferentes que hay
padron = datos_padron_DF.select("DESC_BARRIO")
padron.createOrReplaceTempView("padron")

In [0]:
#verificamos que se creó como tabla temporal
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |   padron|       true|
+--------+---------+-----------+



In [0]:
#ahora realizamos la consulta de contar el número de barrios diferentes que hay
padron.distinct().count()

Out[30]: 132

In [0]:
#Crea una nueva columna que muestre la longitud de los campos de la columna DESC_DISTRITO y que se llame "longitud"
import pyspark.sql.functions as F
from pyspark.sql.functions import *
datos_padron_DF = datos_padron_DF.withColumn("longitud", F.length(trim(F.col("DESC_DISTRITO"))))

In [0]:
datos_padron_DF.select("DESC_DISTRITO", "longitud").distinct().show()

+-------------------+--------+
|      DESC_DISTRITO|longitud|
+-------------------+--------+
|         ARGANZUELA|      10|
|FUENCARRAL-EL PARDO|      19|
|              USERA|       5|
|          SALAMANCA|       9|
| PUENTE DE VALLECAS|      18|
|  VILLA DE VALLECAS|      17|
|           CHAMBERI|       8|
|          VICALVARO|       9|
|             RETIRO|       6|
|             CENTRO|       6|
|SAN BLAS-CANILLEJAS|      19|
|          CHAMARTIN|       9|
|             LATINA|       6|
|          MORATALAZ|       9|
|             TETUAN|       6|
|      CIUDAD LINEAL|      13|
|          HORTALEZA|       9|
|         VILLAVERDE|      10|
|        CARABANCHEL|      11|
|    MONCLOA-ARAVACA|      15|
+-------------------+--------+
only showing top 20 rows



In [0]:
#Crea una nueva columna que muestre el valor 5 para cada uno de los registros de la tabla
from pyspark.sql.functions import lit 
datos_padron_DF = datos_padron_DF.select("*", lit("5").alias("Col_5"))



In [0]:
datos_padron_DF.select("DESC_DISTRITO", "longitud", "Col_5").distinct().show()

+-------------------+--------+-----+
|      DESC_DISTRITO|longitud|Col_5|
+-------------------+--------+-----+
|    MONCLOA-ARAVACA|      15|    5|
|FUENCARRAL-EL PARDO|      19|    5|
|             TETUAN|       6|    5|
|         VILLAVERDE|      10|    5|
|             CENTRO|       6|    5|
|        CARABANCHEL|      11|    5|
|             LATINA|       6|    5|
|          HORTALEZA|       9|    5|
|           CHAMBERI|       8|    5|
|          MORATALAZ|       9|    5|
|          SALAMANCA|       9|    5|
|              USERA|       5|    5|
| PUENTE DE VALLECAS|      18|    5|
|  VILLA DE VALLECAS|      17|    5|
|             RETIRO|       6|    5|
|          CHAMARTIN|       9|    5|
|          VICALVARO|       9|    5|
|         ARGANZUELA|      10|    5|
|      CIUDAD LINEAL|      13|    5|
|SAN BLAS-CANILLEJAS|      19|    5|
+-------------------+--------+-----+
only showing top 20 rows



In [0]:
#Borra esta columna
datos_padron_DF = datos_padron_DF.drop("Col_5")


In [0]:
#Particiona el DataFrame por las variables DESC_DISTRITO y DESC_BARRIO
datos_padron_DF.repartition("DESC_DISTRITO", "DESC_BARRIO")
datos_padron_DF.show()

+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|COD_DISTRITO|DESC_DISTRITO|COD_DIST_BARRIO|DESC_BARRIO|COD_BARRIO|COD_DIST_SECCION|COD_SECCION|COD_EDAD_INT|EspanolesHombres|EspanolesMujeres|ExtranjerosHombres|ExtranjerosMujeres|longitud|
+------------+-------------+---------------+-----------+----------+----------------+-----------+------------+----------------+----------------+------------------+------------------+--------+
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           0|               3|               1|                 1|              null|       6|
|           1|       CENTRO|            101|    PALACIO|         1|            1001|          1|           1|               5|               2|              null|              null|       6|
|           1|       CENTRO|            101| 

In [0]:
#Almacénalo en caché. Consulta en el puerto 4040 (UI de Spark) de tu usuario local el estado de los rdds almacenados
datos_padron_DF.cache()

Out[37]: DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int, longitud: int]

In [0]:
#Lanza una consulta contra el DF resultante en la que muestre el número total de "espanoleshombres", "espanolesmujeres", extranjeroshombres" y "extranjerosmujeres" 
#para cada barrio de cada distrito. Las columnas distrito y barrio deben ser las primeras en aparecer en el show. Los resultados deben estar ordenados en orden de más a menos 
#según la columna "extranjerosmujeres" y desempatarán por la columna "extranjeroshombres"
import pyspark.sql.functions as F
datos_padron_DF.groupBy("DESC_DISTRITO", "DESC_BARRIO")\
               .agg(F.count("EspanolesHombres").alias("Cantidad Hombres Españoles"),\
                    F.count("EspanolesMujeres").alias("Cantidad Mujeres Españolas"),\
                    F.count("ExtranjerosHombres").alias("Cantidad Extranjeros Hombres"),\
                    F.count("ExtranjerosMujeres").alias("Cantidad Extranjeras Mujeres"))\
               .orderBy("Cantidad Extranjeras Mujeres", "Cantidad Extranjeros Hombres", ascending=False)\
               .show()

+-------------------+--------------------+--------------------------+--------------------------+----------------------------+----------------------------+
|      DESC_DISTRITO|         DESC_BARRIO|Cantidad Hombres Españoles|Cantidad Mujeres Españolas|Cantidad Extranjeros Hombres|Cantidad Extranjeras Mujeres|
+-------------------+--------------------+--------------------------+--------------------------+----------------------------+----------------------------+
|             LATINA|              ALUCHE|                      5366|                      5575|                        2604|                        2888|
|      CIUDAD LINEAL|        PUEBLO NUEVO|                      4229|                      4407|                        2318|                        2516|
|      CIUDAD LINEAL|              VENTAS|                      3778|                      3922|                        1912|                        2122|
|FUENCARRAL-EL PARDO|            VALVERDE|                      3531| 

In [0]:
#Elimina el registro en caché
datos_padron_DF.unpersist()

Out[39]: DataFrame[COD_DISTRITO: int, DESC_DISTRITO: string, COD_DIST_BARRIO: int, DESC_BARRIO: string, COD_BARRIO: int, COD_DIST_SECCION: int, COD_SECCION: int, COD_EDAD_INT: int, EspanolesHombres: int, EspanolesMujeres: int, ExtranjerosHombres: int, ExtranjerosMujeres: int, longitud: int]

In [0]:
#Crea un nuevo DataFrame a partir del original que muestre únicamente una columna con DESC_BARRIO, otra con DESC_DISTRITO y otra con el número total de "espanoleshombres" 
#residentes en cada distrito de cada barrio. Únelo (con un join) con el DataFrame original a través de las columnas en común

datos_padron_2 = datos_padron_DF.groupBy("DESC_BARRIO", "DESC_DISTRITO")\
                                .agg(F.count("EspanolesHombres").alias("Cantidad EspanolesHombres"))

joined_DF = datos_padron_DF.join(datos_padron_2, on = ["DESC_BARRIO", "DESC_DISTRITO"])

In [0]:
#Repite la función anterior utilizando funciones de ventana. (over(Window.partitionBy.....))
from pyspark.sql import Window
w = Window().partitionBy("DESC_BARRIO", "DESC_DISTRITO")
datos_padron_3 = datos_padron_DF.withColumn("Cantidad EspanolesHombres", F.count("EspanolesHombres").over(w))
datos_padron_3.select("DESC_BARRIO", "DESC_DISTRITO", "Cantidad EspanolesHombres").distinct().show()

+--------------------+-------------------+-------------------------+
|         DESC_BARRIO|      DESC_DISTRITO|Cantidad EspanolesHombres|
+--------------------+-------------------+-------------------------+
|            ABRANTES|        CARABANCHEL|                     1905|
|             ACACIAS|         ARGANZUELA|                     2439|
|             ADELFAS|             RETIRO|                     1116|
|          AEROPUERTO|            BARAJAS|                       94|
|    ALAMEDA DE OSUNA|            BARAJAS|                     1272|
|             ALMAGRO|           CHAMBERI|                     1631|
|            ALMENARA|             TETUAN|                     1465|
|         ALMENDRALES|              USERA|                     1290|
|              ALUCHE|             LATINA|                     5366|
|             AMPOSTA|SAN BLAS-CANILLEJAS|                      782|
|    APOSTOL SANTIAGO|          HORTALEZA|                     1004|
|            ARAPILES|           C

In [0]:
#Mediante una función Pivot muestra una tabla (que va a ser una tabla de contingencia) que contenga los valores totales (la suma de valores) 
#de espanolesmujeres para cada distrito y en cada rango de edad (COD_EDAD_INT). Los distritos incluidos deben ser únicamente 
#CENTRO, BARAJAS y RETIRO y deben figurar como columnas 
distrito_filtrado = datos_padron_DF.filter((trim(F.col("DESC_DISTRITO")) == "CENTRO")| (trim(F.col("DESC_DISTRITO")) == "BARAJAS")|(trim(F.col("DESC_DISTRITO")) == "RETIRO"))
pivot_df=distrito_filtrado.groupBy("COD_EDAD_INT")\
                          .pivot("DESC_DISTRITO").agg(
                           F.sum("EspanolesMujeres"))\
                          .orderBy("COD_EDAD_INT")

In [0]:
pivot_df.show()

+------------+-------+------+------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|
+------------+-------+------+------+
|           0|    146|   262|   317|
|           1|    160|   240|   318|
|           2|    171|   200|   377|
|           3|    175|   216|   365|
|           4|    201|   239|   412|
|           5|    217|   224|   420|
|           6|    242|   231|   405|
|           7|    229|   237|   466|
|           8|    234|   228|   417|
|           9|    236|   235|   421|
|          10|    260|   252|   419|
|          11|    241|   231|   423|
|          12|    259|   244|   417|
|          13|    267|   274|   443|
|          14|    250|   249|   407|
|          15|    269|   251|   429|
|          16|    269|   267|   422|
|          17|    265|   261|   417|
|          18|    223|   285|   455|
|          19|    254|   304|   411|
+------------+-------+------+------+
only showing top 20 rows



In [0]:
#Utilizando este nuevo DF, crea 3 columnas nuevas que hagan referencia a qué porcentaje de la suma de "espanolesmujeres" en los tres distritos para cada rango de edad representa cada uno de los tres distritos. Debe estar redondeada a 2 decimales. Puedes imponerte la condición extra de no apoyarte en ninguna columna auxiliar creada para el caso
from pyspark.sql.functions import round
porcentaje_df = pivot_df.withColumn("Totales", F.col("BARAJAS") + F.col("CENTRO") + F.col("RETIRO"))\
                        .withColumn("% BARAJAS", round(F.col("BARAJAS")/F.col("Totales"),2))\
                        .withColumn("% CENTRO", round(F.col("CENTRO")/F.col("Totales"),2))\
                        .withColumn("% RETIRO", round(F.col("RETIRO")/F.col("Totales"),2))\
                        .drop("Totales")

In [0]:
porcentaje_df.show()

+------------+-------+------+------+---------+--------+--------+
|COD_EDAD_INT|BARAJAS|CENTRO|RETIRO|% BARAJAS|% CENTRO|% RETIRO|
+------------+-------+------+------+---------+--------+--------+
|           0|    146|   262|   317|      0.2|    0.36|    0.44|
|           1|    160|   240|   318|     0.22|    0.33|    0.44|
|           2|    171|   200|   377|     0.23|    0.27|     0.5|
|           3|    175|   216|   365|     0.23|    0.29|    0.48|
|           4|    201|   239|   412|     0.24|    0.28|    0.48|
|           5|    217|   224|   420|     0.25|    0.26|    0.49|
|           6|    242|   231|   405|     0.28|    0.26|    0.46|
|           7|    229|   237|   466|     0.25|    0.25|     0.5|
|           8|    234|   228|   417|     0.27|    0.26|    0.47|
|           9|    236|   235|   421|     0.26|    0.26|    0.47|
|          10|    260|   252|   419|     0.28|    0.27|    0.45|
|          11|    241|   231|   423|     0.27|    0.26|    0.47|
|          12|    259|   

In [0]:
#Guarda el archivo csv original particionado por distrito y por barrio (en ese orden) en un directorio local. 
#Consulta el directorio para ver la estructura de los ficheros y comprueba que es la esperada
datos_padron_DF.write.partitionBy("DESC_DISTRITO, DESC_BARRIO").csv

Out[54]: <bound method DataFrameWriter.csv of <pyspark.sql.readwriter.DataFrameWriter object at 0x7f00c79caa90>>

In [0]:
#Haz el mismo guardado pero en formato parquet. Compara el peso del archivo con el resultado anterior
datos_padron_DF.write.partitionBy("DESC_DISTRITO, DESC_BARRIO").parquet

Out[55]: <bound method DataFrameWriter.parquet of <pyspark.sql.readwriter.DataFrameWriter object at 0x7f00c69723d0>>