# Parte 2: Análisis de datos con Spark

## Instalación y configuración de Spark
#### ********************************************************************************************

In [1]:
# Instalación de Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar e instalar Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Instalación de findspark
!pip install -q findspark

# Configuración de las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

# Inicialización de findspark
import findspark
findspark.init()

## Importacion de librerias
#### ********************************************************************************************

In [2]:
from pyspark.sql.functions import *
from google.colab import drive
drive.mount('/content/drive')

# Crear una sesión de Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Mounted at /content/drive


In [3]:
# Rutas Drive
input_dir = '/content/drive/MyDrive/Big Data/Originales'
output_dir = '/content/drive/MyDrive/Big Data/Spark'

#### ********************************************************************************************
### Dataset: Bus Stops
#### ********************************************************************************************

In [65]:
# Definir la ruta de origen
src_bus_stops = input_dir + '/bus_stops.csv'

# Cargar las tablas de datos
bus_stops = spark.read.csv(src_bus_stops, header=True, inferSchema=True)

In [66]:
# Mostrar esquema de los datos
bus_stops.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Transport: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Bus.Stop: string (nullable = true)
 |-- District.Name: string (nullable = true)
 |-- Neighborhood.Name: string (nullable = true)



In [None]:
# Mostrar las primeras filas
bus_stops.show(5)

+----+------------+---------+---------+----------+-------------------+--------------------+
|Code|   Transport|Longitude| Latitude|  Bus.Stop|      District.Name|   Neighborhood.Name|
+----+------------+---------+---------+----------+-------------------+--------------------+
|K014|Day bus stop| 2.171619|41.413744|BUS -192--|     Horta-Guinardó|         el Guinardó|
|K014|Day bus stop| 2.134902|41.420222|BUS -124--|             Gràcia|Vallcarca i els P...|
|K014|Day bus stop| 2.162913|41.423187|BUS -117--|     Horta-Guinardó|la Font d'en Fargues|
|K014|Day bus stop| 2.163667|41.422899|BUS -117--|     Horta-Guinardó|la Font d'en Fargues|
|K014|Day bus stop| 2.120212|41.397209|BUS -130--|Sarrià-Sant Gervasi|              Sarrià|
+----+------------+---------+---------+----------+-------------------+--------------------+
only showing top 5 rows



In [None]:
# Cantidad de columnas
num_columns = len(bus_stops.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7
Nombres de las columnas: ['Code', 'Transport', 'Longitude', 'Latitude', 'Bus.Stop', 'District.Name', 'Neighborhood.Name']


In [None]:
# Nombre de cada columna
column_names = bus_stops.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Code', 'Transport', 'Longitude', 'Latitude', 'Bus.Stop', 'District.Name', 'Neighborhood.Name']


In [None]:
# Descripción de los datos
bus_stops.describe().show()

+-------+----+----------------+--------------------+-------------------+-------------+-------------------+-----------------+
|summary|Code|       Transport|           Longitude|           Latitude|     Bus.Stop|      District.Name|Neighborhood.Name|
+-------+----+----------------+--------------------+-------------------+-------------+-------------------+-----------------+
|  count|3162|            3162|                3162|               3162|         3162|               3146|             3146|
|   mean|null|            null|   2.159823969639465|  41.40500162333972|         null|               null|             null|
| stddev|null|            null|0.027898271557498862|0.02715006452368344|         null|               null|             null|
|    min|K014|Airport bus stop|            2.055835|          41.321643|AEROBUS -A1--|       Ciutat Vella|    Baró de Viver|
|    max|K017|  Night bus stop|            2.221753|          41.467592| NITBUS -N9--|Sarrià-Sant Gervasi|  les Tres Torres|


In [None]:
# Cantidad de registros (número de filas)
num_records = bus_stops.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 3162


In [67]:
# Renombrar columnas
bus_stops = bus_stops.withColumnRenamed("Bus.Stop", "Bus_Stop") \
                     .withColumnRenamed("District.Name", "District_Name") \
                     .withColumnRenamed("Neighborhood.Name", "Neighborhood_Name")

# Verificar valores nulos
bus_stops.select([count(when(col(c).isNull(), c)).alias(c) for c in bus_stops.columns]).show()

+----+---------+---------+--------+--------+-------------+-----------------+
|Code|Transport|Longitude|Latitude|Bus_Stop|District_Name|Neighborhood_Name|
+----+---------+---------+--------+--------+-------------+-----------------+
|   0|        0|        0|       0|       0|           16|               16|
+----+---------+---------+--------+--------+-------------+-----------------+



In [68]:
#Si Neighborhood_Name o District_Name son nulos rellenar con texto vacío
bus_stops = bus_stops.na.fill({"District_Name": "", "Neighborhood_Name": ""})

In [69]:
# Verificar duplicados
print("Registros duplicados en bus_stops:")
print(bus_stops.count() - bus_stops.dropDuplicates().count())

Registros duplicados en bus_stops:
0


In [70]:
# Verificar claves primarias únicas
bus_stops.groupBy("Longitude", "Latitude", "Bus_Stop").count().filter(col("count") > 1).show()

+---------+--------+--------+-----+
|Longitude|Latitude|Bus_Stop|count|
+---------+--------+--------+-----+
+---------+--------+--------+-----+



In [71]:
# Guardar los datos limpios en un nuevo archivo
bus_stops.write.csv(output_dir + "/cleaned_bus_stops.csv", header=True)

#### ********************************************************************************************
### Dataset: Population
#### ********************************************************************************************

In [72]:
# Definir la ruta de origen
src_population = input_dir + '/population.csv'

# Cargar las tablas de datos
population = spark.read.csv(src_population, header=True, inferSchema=True)

In [73]:
# Mostrar esquema de los datos
population.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- District.Code: integer (nullable = true)
 |-- District.Name: string (nullable = true)
 |-- Neighborhood.Code: integer (nullable = true)
 |-- Neighborhood.Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Number: integer (nullable = true)



In [10]:
# Mostrar las primeras filas
population.show(5)

+----+-------------+-------------+-----------------+--------------------+------+---+------+
|Year|District.Code|District.Name|Neighborhood.Code|   Neighborhood.Name|Gender|Age|Number|
+----+-------------+-------------+-----------------+--------------------+------+---+------+
|2017|            1| Ciutat Vella|                1|            el Raval|  Male|0-4|   224|
|2017|            1| Ciutat Vella|                2|      el Barri Gòtic|  Male|0-4|    50|
|2017|            1| Ciutat Vella|                3|      la Barceloneta|  Male|0-4|    43|
|2017|            1| Ciutat Vella|                4|Sant Pere, Santa ...|  Male|0-4|    95|
|2017|            2|     Eixample|                5|       el Fort Pienc|  Male|0-4|   124|
+----+-------------+-------------+-----------------+--------------------+------+---+------+
only showing top 5 rows



In [11]:
# Cantidad de columnas
num_columns = len(population.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 8


In [12]:
# Nombre de cada columna
column_names = population.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Year', 'District.Code', 'District.Name', 'Neighborhood.Code', 'Neighborhood.Name', 'Gender', 'Age', 'Number']


In [13]:
# Descripción de los datos
population.describe().show()

+-------+------------------+------------------+-------------------+------------------+-----------------+------+-----+-----------------+
|summary|              Year|     District.Code|      District.Name| Neighborhood.Code|Neighborhood.Name|Gender|  Age|           Number|
+-------+------------------+------------------+-------------------+------------------+-----------------+------+-----+-----------------+
|  count|             70080|             70080|              70080|             70080|            70080| 70080|70080|            70080|
|   mean|            2015.0|6.2465753424657535|               null|              37.0|             null|  null| null|114.8447203196347|
| stddev|1.4142236524678082|2.7883557847683456|               null|21.071457844839458|             null|  null| null|96.54038907224755|
|    min|              2013|                 1|       Ciutat Vella|                 1|    Baró de Viver|Female|  0-4|                0|
|    max|              2017|                10|S

In [14]:
# Cantidad de registros (número de filas)
num_records = population.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 70080


In [74]:
# Renombrar columnas
population = population.withColumnRenamed("District.Code", "District_Code") \
                       .withColumnRenamed("District.Name", "District_Name") \
                       .withColumnRenamed("Neighborhood.Code", "Neighborhood_Code") \
                       .withColumnRenamed("Neighborhood.Name", "Neighborhood_Name")

# Verificar valores nulos
population.select([count(when(col(c).isNull(), c)).alias(c) for c in population.columns]).show()

+----+-------------+-------------+-----------------+-----------------+------+---+------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Gender|Age|Number|
+----+-------------+-------------+-----------------+-----------------+------+---+------+
|   0|            0|            0|                0|                0|     0|  0|     0|
+----+-------------+-------------+-----------------+-----------------+------+---+------+



In [75]:
# Verificar duplicados
print("Registros duplicados en population:")
print(population.count() - population.dropDuplicates().count())

Registros duplicados en population:
4277


In [76]:
# Eliminar duplicados
population = population.dropDuplicates()

In [77]:
# Verificar claves primarias únicas
population.groupBy("Year", "District_Code", "Neighborhood_Code", "Gender", "Age").count().filter(col("count") > 1).show()

+----+-------------+-----------------+------+-----+-----+
|Year|District_Code|Neighborhood_Code|Gender|  Age|count|
+----+-------------+-----------------+------+-----+-----+
|2015|            5|               26|  Male|35-39|    5|
|2014|            8|               49|Female|40-44|    5|
|2016|            8|               44|Female|85-89|    5|
|2015|            8|               47|Female|  0-4|    4|
|2014|            9|               61|  Male|20-24|    4|
|2014|            2|                7|Female|55-59|    5|
|2016|            5|               27|Female|50-54|    5|
|2015|            2|               10|  Male|50-54|    5|
|2017|           10|               72|Female|70-74|    5|
|2015|            9|               57|  Male|75-79|    4|
|2014|            8|               50|  Male|65-69|    5|
|2014|            4|               20|Female|35-39|    5|
|2013|            3|               13|Female|70-74|    5|
|2015|            8|               56|Female|20-24|    3|
|2016|        

In [78]:
# Determinar todas las columnas excepto 'Number'
group_columns = [col for col in population.columns if col != 'Number']

# Agrupo y sumo la cantidad
population = population.groupBy(group_columns).agg(sum("Number").alias("Number"))

In [79]:
# Guardar los datos limpios en un nuevo archivo
population.write.csv(output_dir + "/cleaned_population.csv", header=True)

#### ********************************************************************************************
### Dataset: Immigrants by nationality
#### ********************************************************************************************

In [80]:
# Definir la ruta de origen
src_immigrants_nationality = input_dir + '/immigrants_by_nationality.csv'

# Cargar las tablas de datos
immigrants_by_nationality = spark.read.csv(src_immigrants_nationality, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
immigrants_by_nationality.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- District Code: integer (nullable = true)
 |-- District Name: string (nullable = true)
 |-- Neighborhood Code: integer (nullable = true)
 |-- Neighborhood Name: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Number: integer (nullable = true)



In [None]:
# Vista previa de las primeras filas
immigrants_by_nationality.show(5)

+----+-------------+-------------+-----------------+--------------------+-----------+------+
|Year|District Code|District Name|Neighborhood Code|   Neighborhood Name|Nationality|Number|
+----+-------------+-------------+-----------------+--------------------+-----------+------+
|2017|            1| Ciutat Vella|                1|            el Raval|      Spain|  1109|
|2017|            1| Ciutat Vella|                2|      el Barri Gòtic|      Spain|   482|
|2017|            1| Ciutat Vella|                3|      la Barceloneta|      Spain|   414|
|2017|            1| Ciutat Vella|                4|Sant Pere, Santa ...|      Spain|   537|
|2017|            2|     Eixample|                5|       el Fort Pienc|      Spain|   663|
+----+-------------+-------------+-----------------+--------------------+-----------+------+
only showing top 5 rows



In [None]:
# Cantidad de columnas
num_columns = len(immigrants_by_nationality.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7


In [None]:
# Nombre de cada columna
column_names = immigrants_by_nationality.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Year', 'District Code', 'District Name', 'Neighborhood Code', 'Neighborhood Name', 'Nationality', 'Number']


In [None]:
# Descripción de los datos de cada columna
immigrants_by_nationality.describe().show()

+-------+------------------+------------------+-------------------+------------------+-----------------+-----------+-----------------+
|summary|              Year|     District Code|      District Name| Neighborhood Code|Neighborhood Name|Nationality|           Number|
+-------+------------------+------------------+-------------------+------------------+-----------------+-----------+-----------------+
|  count|             35224|             35224|              35224|             35224|            35224|      35224|            35224|
|   mean|2016.0105042016808|               7.5|               null| 37.83783783783784|             null|       null|7.707273449920509|
| stddev|0.8108460719474905|11.061716951762222|               null|22.119174116683812|             null|       null|50.42188329015287|
|    min|              2015|                 1|       Ciutat Vella|                 1|    Baró de Viver|Afghanistan|                0|
|    max|              2017|                99|Sarrià-S

In [None]:
# Cantidad de registros (número de filas)
num_records = immigrants_by_nationality.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 35224


In [81]:
# Renombrar columnas
immigrants_by_nationality = immigrants_by_nationality.withColumnRenamed("District Code", "District_Code") \
                       .withColumnRenamed("District Name", "District_Name") \
                       .withColumnRenamed("Neighborhood Code", "Neighborhood_Code") \
                       .withColumnRenamed("Neighborhood Name", "Neighborhood_Name")

# Verificar valores nulos
immigrants_by_nationality.select([count(when(col(c).isNull(), c)).alias(c) for c in immigrants_by_nationality.columns]).show()

+----+-------------+-------------+-----------------+-----------------+-----------+------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Nationality|Number|
+----+-------------+-------------+-----------------+-----------------+-----------+------+
|   0|            0|            0|                0|                0|          0|     0|
+----+-------------+-------------+-----------------+-----------------+-----------+------+



In [82]:
# Verificar registros duplicados
num_duplicates = immigrants_by_nationality.count() - immigrants_by_nationality.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [83]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Year", "District_Code", "Neighborhood_Code", "Nationality"]
immigrants_by_nationality.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+-------------+-----------------+-----------+-----+
|Year|District_Code|Neighborhood_Code|Nationality|count|
+----+-------------+-----------------+-----------+-----+
+----+-------------+-----------------+-----------+-----+



In [84]:
# Guardar los datos limpios en un nuevo archivo
immigrants_by_nationality.write.csv(output_dir + "/cleaned_immigrants_by_nationality.csv", header=True)

#### ********************************************************************************************
### Dataset: Births
#### ********************************************************************************************

In [85]:
# Definir la ruta de origen
src_births = input_dir + '/births.csv'

# Cargar las tablas de datos
births = spark.read.csv(src_births, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
births.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- District Code: integer (nullable = true)
 |-- District Name: string (nullable = true)
 |-- Neighborhood Code: integer (nullable = true)
 |-- Neighborhood Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Number: integer (nullable = true)



In [None]:
# Vista previa de las primeras filas
births.show(5)

+----+-------------+-------------+-----------------+--------------------+------+------+
|Year|District Code|District Name|Neighborhood Code|   Neighborhood Name|Gender|Number|
+----+-------------+-------------+-----------------+--------------------+------+------+
|2017|            1| Ciutat Vella|                1|            el Raval|  Boys|   283|
|2017|            1| Ciutat Vella|                2|      el Barri Gòtic|  Boys|    56|
|2017|            1| Ciutat Vella|                3|      la Barceloneta|  Boys|    51|
|2017|            1| Ciutat Vella|                4|Sant Pere, Santa ...|  Boys|    90|
|2017|            2|     Eixample|                5|       el Fort Pienc|  Boys|   117|
+----+-------------+-------------+-----------------+--------------------+------+------+
only showing top 5 rows



In [None]:
# Cantidad de columnas
num_columns = len(births.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7


In [None]:
# Nombre de cada columna
column_names = births.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Year', 'District Code', 'District Name', 'Neighborhood Code', 'Neighborhood Name', 'Gender', 'Number']


In [None]:
# Descripción de los datos de cada columna
births.describe().show()

+-------+------------------+-----------------+-------------------+------------------+-----------------+------+------------------+
|summary|              Year|    District Code|      District Name| Neighborhood Code|Neighborhood Name|Gender|            Number|
+-------+------------------+-----------------+-------------------+------------------+-----------------+------+------------------+
|  count|               734|              734|                734|               734|              734|   734|               734|
|   mean|2014.9972752043598|6.752043596730245|               null| 37.33787465940055|             null|  null| 91.75476839237058|
| stddev| 1.416138973152209|7.378009426880971|               null|21.518487469619597|             null|  null|61.928487762933905|
|    min|              2013|                1|       Ciutat Vella|                 1|    Baró de Viver|  Boys|                 0|
|    max|              2017|               99|Sarrià-Sant Gervasi|                99|  les

In [None]:
# Cantidad de registros (número de filas)
num_records = births.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 734


In [86]:
# Renombrar columnas
births = births.withColumnRenamed("District Code", "District_Code") \
                       .withColumnRenamed("District Name", "District_Name") \
                       .withColumnRenamed("Neighborhood Code", "Neighborhood_Code") \
                       .withColumnRenamed("Neighborhood Name", "Neighborhood_Name")

# Verificar valores nulos
births.select([count(when(col(c).isNull(), c)).alias(c) for c in births.columns]).show()

+----+-------------+-------------+-----------------+-----------------+------+------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Gender|Number|
+----+-------------+-------------+-----------------+-----------------+------+------+
|   0|            0|            0|                0|                0|     0|     0|
+----+-------------+-------------+-----------------+-----------------+------+------+



In [87]:
# Verificar registros duplicados
num_duplicates = births.count() - births.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [88]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Year", "District_Code", "Neighborhood_Code", "Gender"]
births.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+-------------+-----------------+------+-----+
|Year|District_Code|Neighborhood_Code|Gender|count|
+----+-------------+-----------------+------+-----+
+----+-------------+-----------------+------+-----+



In [89]:
# Guardar los datos limpios en un nuevo archivo
births.write.csv(output_dir + "/cleaned_births.csv", header=True)

#### ********************************************************************************************
### Dataset: Accidents
#### ********************************************************************************************


In [90]:
# Definir la ruta de origen
src_accidents = input_dir + '/accidents_2017.csv'

# Cargar las tablas de datos
accidents = spark.read.csv(src_accidents, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
accidents.printSchema()

root
 |-- Id: string (nullable = true)
 |-- District Name: string (nullable = true)
 |-- Neighborhood Name: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Weekday: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Part of the day: string (nullable = true)
 |-- Mild injuries: integer (nullable = true)
 |-- Serious injuries: integer (nullable = true)
 |-- Victims: integer (nullable = true)
 |-- Vehicles involved: integer (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)



In [None]:
# Vista previa de las primeras filas
accidents.show(5)

+---------------+-------------+--------------------+--------------------+--------+---------+---+----+---------------+-------------+----------------+-------+-----------------+----------+-----------+
|             Id|District Name|   Neighborhood Name|              Street| Weekday|    Month|Day|Hour|Part of the day|Mild injuries|Serious injuries|Victims|Vehicles involved| Longitude|   Latitude|
+---------------+-------------+--------------------+--------------------+--------+---------+---+----+---------------+-------------+----------------+-------+-----------------+----------+-----------+
|2017S008429    |      Unknown|             Unknown|Número 27        ...|  Friday|  October| 13|   8|        Morning|            2|               0|      2|                2|2.12562442|41.34004482|
|2017S007316    |      Unknown|             Unknown|Número 3 Zona Fra...|  Friday|September|  1|  13|        Morning|            2|               0|      2|                2|2.12045245|41.33942606|
|2017S0102

In [None]:
# Cantidad de columnas
num_columns = len(accidents.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 15


In [None]:
# Nombre de cada columna
column_names = accidents.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Id', 'District Name', 'Neighborhood Name', 'Street', 'Weekday', 'Month', 'Day', 'Hour', 'Part of the day', 'Mild injuries', 'Serious injuries', 'Victims', 'Vehicles involved', 'Longitude', 'Latitude']


In [None]:
# Descripción de los datos de cada columna
accidents.describe().show()

+-------+---------------+-------------+-----------------+--------------------+---------+---------+------------------+------------------+---------------+------------------+--------------------+------------------+------------------+-------------------+--------------------+
|summary|             Id|District Name|Neighborhood Name|              Street|  Weekday|    Month|               Day|              Hour|Part of the day|     Mild injuries|    Serious injuries|           Victims| Vehicles involved|          Longitude|            Latitude|
+-------+---------------+-------------+-----------------+--------------------+---------+---------+------------------+------------------+---------------+------------------+--------------------+------------------+------------------+-------------------+--------------------+
|  count|          10339|        10339|            10339|               10339|    10339|    10339|             10339|             10339|          10339|             10339|             

In [None]:
# Cantidad de registros (número de filas)
num_records = accidents.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 10339


In [91]:
# Renombrar columnas
accidents = accidents.withColumnRenamed("Part of the day", "Part_of_the_day") \
                       .withColumnRenamed("District Name", "District_Name") \
                       .withColumnRenamed("Mild injuries", "Mild_injuries") \
                       .withColumnRenamed("Neighborhood Name", "Neighborhood_Name") \
                       .withColumnRenamed("Serious injuries", "Serious_injuries") \
                       .withColumnRenamed("Vehicles involved", "Vehicles_involved")

# Verificar valores nulos
accidents.select([count(when(col(c).isNull(), c)).alias(c) for c in accidents.columns]).show()

+---+-------------+-----------------+------+-------+-----+---+----+---------------+-------------+----------------+-------+-----------------+---------+--------+
| Id|District_Name|Neighborhood_Name|Street|Weekday|Month|Day|Hour|Part_of_the_day|Mild_injuries|Serious_injuries|Victims|Vehicles_involved|Longitude|Latitude|
+---+-------------+-----------------+------+-------+-----+---+----+---------------+-------------+----------------+-------+-----------------+---------+--------+
|  0|            0|                0|     0|      0|    0|  0|   0|              0|            0|               0|      0|                0|        0|       0|
+---+-------------+-----------------+------+-------+-----+---+----+---------------+-------------+----------------+-------+-----------------+---------+--------+



In [92]:
# Verificar registros duplicados
num_duplicates = accidents.count() - accidents.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 4


In [93]:
# Eliminar registros duplicados
accidents = accidents.dropDuplicates()

In [94]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Id"]
accidents.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+---+-----+
| Id|count|
+---+-----+
+---+-----+



In [95]:
# Guardar los datos limpios en un nuevo archivo
accidents.write.csv(output_dir + "/cleaned_accidents.csv", header=True)

#### ********************************************************************************************
### Dataset: Life expectancy
#### ********************************************************************************************


In [96]:
# Definir la ruta de origen
src_life_expectancy = input_dir + '/life_expectancy.csv'

# Cargar las tablas de datos
life_expectancy = spark.read.csv(src_life_expectancy, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
life_expectancy.printSchema()

In [None]:
# Vista previa de las primeras filas
life_expectancy.show(5)

+--------------------+---------+---------+---------+---------+---------+------+
|        Neighborhood|2006-2010|2007-2011|2008-2012|2009-2013|2010-2014|Gender|
+--------------------+---------+---------+---------+---------+---------+------+
|            el Raval|     87.5|     84.9|     84.7|     84.9|     85.3|Female|
|      el Barri Gòtic|     88.0|     84.3|     84.4|     87.5|     84.4|Female|
|      la Barceloneta|     88.2|     85.3|     84.4|     86.3|     84.7|Female|
|Sant Pere, Santa ...|     88.9|     85.9|     86.1|     85.5|     85.3|Female|
|       el Fort Pienc|     89.7|     87.1|     87.2|     87.7|     86.7|Female|
+--------------------+---------+---------+---------+---------+---------+------+
only showing top 5 rows



In [None]:
# Cantidad de columnas
num_columns = len(life_expectancy.columns)
print(f"Cantidad de columnas: {num_columns}")

In [None]:
# Nombre de cada columna
column_names = life_expectancy.columns
print(f"Nombres de las columnas: {column_names}")

In [None]:
# Descripción de los datos de cada columna
life_expectancy.describe().show()

In [None]:
# Cantidad de registros (número de filas)
num_records = life_expectancy.count()
print(f"Cantidad de registros: {num_records}")

In [97]:
# Verificar valores nulos
life_expectancy.select([count(when(col(c).isNull(), c)).alias(c) for c in life_expectancy.columns]).show()

+------------+---------+---------+---------+---------+---------+------+
|Neighborhood|2006-2010|2007-2011|2008-2012|2009-2013|2010-2014|Gender|
+------------+---------+---------+---------+---------+---------+------+
|           0|        8|        6|        8|        6|        4|     0|
+------------+---------+---------+---------+---------+---------+------+



In [98]:
# Sustituir valores nulos en todas las columnas excepto "Gender" y "Neighborhood" con 0.0
columns_to_fill = [c for c in life_expectancy.columns if c not in ["Gender", "Neighborhood"]]
life_expectancy = life_expectancy.fillna(0.0, subset=columns_to_fill)

In [99]:
# Verificar registros duplicados
num_duplicates = life_expectancy.count() - life_expectancy.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [100]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Neighborhood", "Gender"]
life_expectancy.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+------------+------+-----+
|Neighborhood|Gender|count|
+------------+------+-----+
+------------+------+-----+



In [101]:
# Guardar los datos limpios en un nuevo archivo
life_expectancy.write.csv(output_dir + "/cleaned_life_expectancy.csv", header=True)

#### ********************************************************************************************
### Dataset: Unemployment
#### ********************************************************************************************


In [102]:
# Definir la ruta de origen
src_unemployment = input_dir + '/unemployment.csv'

# Cargar las tablas de datos
unemployment = spark.read.csv(src_unemployment, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
unemployment.printSchema()

In [None]:
# Vista previa de las primeras filas
unemployment.show(5)

In [None]:
# Cantidad de columnas
num_columns = len(unemployment.columns)
print(f"Cantidad de columnas: {num_columns}")

In [None]:
# Nombre de cada columna
column_names = unemployment.columns
print(f"Nombres de las columnas: {column_names}")

In [None]:
# Descripción de los datos de cada columna
unemployment.describe().show()

In [None]:
# Cantidad de registros (número de filas)
num_records = unemployment.count()
print(f"Cantidad de registros: {num_records}")

In [103]:
# Renombrar columnas
unemployment = unemployment.withColumnRenamed("District Code", "District_Code") \
                       .withColumnRenamed("District Name", "District_Name") \
                       .withColumnRenamed("Neighborhood Code", "Neighborhood_Code") \
                       .withColumnRenamed("Neighborhood Name", "Neighborhood_Name")

# Verificar valores nulos
unemployment.select([count(when(col(c).isNull(), c)).alias(c) for c in unemployment.columns]).show()

+----+-----+-------------+-------------+-----------------+-----------------+------+-----------------+------+
|Year|Month|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Gender|Demand_occupation|Number|
+----+-----+-------------+-------------+-----------------+-----------------+------+-----------------+------+
|   0|    0|            0|            0|                0|                0|     0|                0|     0|
+----+-----+-------------+-------------+-----------------+-----------------+------+-----------------+------+



In [104]:
# Verificar registros duplicados
num_duplicates = unemployment.count() - unemployment.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [105]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Year", "Month", "District_Code", "Neighborhood_Code", "Gender", "Demand_occupation"]
unemployment.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+-----+-------------+-----------------+------+-----------------+-----+
|Year|Month|District_Code|Neighborhood_Code|Gender|Demand_occupation|count|
+----+-----+-------------+-----------------+------+-----------------+-----+
+----+-----+-------------+-----------------+------+-----------------+-----+



In [106]:
# Guardar los datos limpios en un nuevo archivo
unemployment.write.csv(output_dir + "/cleaned_unemployment.csv", header=True)

#### ********************************************************************************************
### Dataset: Air quality
#### ********************************************************************************************

In [107]:
# Definir la ruta de origen
src_air_quality = input_dir + '/air_quality_Nov2017.csv'

# Cargar las tablas de datos
air_quality = spark.read.csv(src_air_quality, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
air_quality.printSchema()

In [None]:
# Vista previa de las primeras filas
air_quality.show(5)

In [None]:
# Cantidad de columnas
num_columns = len(air_quality.columns)
print(f"Cantidad de columnas: {num_columns}")

In [None]:
# Nombre de cada columna
column_names = air_quality.columns
print(f"Nombres de las columnas: {column_names}")

In [None]:
# Descripción de los datos de cada columna
air_quality.describe().show()

In [None]:
# Cantidad de registros (número de filas)
num_records = air_quality.count()
print(f"Cantidad de registros: {num_records}")

In [108]:
# Renombrar columnas
air_quality = air_quality.withColumnRenamed("Air Quality", "Air_Quality") \
                         .withColumnRenamed("O3 Hour", "O3_Hour") \
                         .withColumnRenamed("O3 Quality", "O3_Quality") \
                         .withColumnRenamed("O3 Value", "O3_Value") \
                         .withColumnRenamed("NO2 Hour", "NO2_Hour") \
                         .withColumnRenamed("NO2 Quality", "NO2_Quality") \
                         .withColumnRenamed("NO2 Value", "NO2_Value") \
                         .withColumnRenamed("PM10 Hour", "PM10_Hour") \
                         .withColumnRenamed("PM10 Quality", "PM10_Quality") \
                         .withColumnRenamed("PM10 Value", "PM10_Value") \
                         .withColumnRenamed("Date Time", "Date_Time")

# Verificar valores nulos
air_quality.select([count(when(col(c).isNull(), c)).alias(c) for c in air_quality.columns]).show()

+-------+-----------+---------+--------+-------+----------+--------+--------+-----------+---------+---------+------------+----------+---------+---------+
|Station|Air_Quality|Longitude|Latitude|O3_Hour|O3_Quality|O3_Value|NO2_Hour|NO2_Quality|NO2_Value|PM10_Hour|PM10_Quality|PM10_Value|Generated|Date_Time|
+-------+-----------+---------+--------+-------+----------+--------+--------+-----------+---------+---------+------------+----------+---------+---------+
|      0|          0|        0|       0|      0|         0|       0|       0|          0|        0|        0|           0|         0|        0|        0|
+-------+-----------+---------+--------+-------+----------+--------+--------+-----------+---------+---------+------------+----------+---------+---------+



In [109]:
# Verificar registros duplicados
num_duplicates = air_quality.count() - air_quality.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [110]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Station", "Date_Time"]
air_quality.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+-------+---------+-----+
|Station|Date_Time|count|
+-------+---------+-----+
+-------+---------+-----+



In [111]:
# Guardar los datos limpios en un nuevo archivo
air_quality.write.csv(output_dir + "/cleaned_air_quality.csv", header=True)

#### ********************************************************************************************
### Dataset: Most frequent names
#### ********************************************************************************************


In [112]:
# Definir la ruta de origen
src_most_frequent_names = input_dir + '/most_frequent_names.csv'

# Cargar las tablas de datos
most_frequent_names = spark.read.csv(src_most_frequent_names, header=True, inferSchema=True)

In [None]:
# Mostrar el esquema (tipo de datos de cada columna)
most_frequent_names.printSchema()

In [None]:
# Vista previa de las primeras filas
most_frequent_names.show(5)

In [None]:
# Cantidad de columnas
num_columns = len(most_frequent_names.columns)
print(f"Cantidad de columnas: {num_columns}")

In [None]:
# Nombre de cada columna
column_names = most_frequent_names.columns
print(f"Nombres de las columnas: {column_names}")

In [None]:
# Descripción de los datos de cada columna
most_frequent_names.describe().show()

In [None]:
# Cantidad de registros (número de filas)
num_records = most_frequent_names.count()
print(f"Cantidad de registros: {num_records}")

In [113]:
# Verificar valores nulos
most_frequent_names.select([count(when(col(c).isNull(), c)).alias(c) for c in most_frequent_names.columns]).show()

+-----+----+------+------+---------+
|Order|Name|Gender|Decade|Frequency|
+-----+----+------+------+---------+
|    0|   0|     0|     0|        0|
+-----+----+------+------+---------+



In [114]:
# Verificar registros duplicados
num_duplicates = most_frequent_names.count() - most_frequent_names.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [115]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Order", "Decade", "Name", "Gender"]
most_frequent_names.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+-----+------+----+------+-----+
|Order|Decade|Name|Gender|count|
+-----+------+----+------+-----+
+-----+------+----+------+-----+



In [116]:
# Guardar los datos limpios en un nuevo archivo
most_frequent_names.write.csv(output_dir + "/cleaned_most_frequent_names.csv", header=True)

#### ********************************************************************************************
### Dataset: Transports
#### ********************************************************************************************


In [117]:
# Definir la ruta de origen
src_transports = input_dir + '/transports.csv'

# Cargar las tablas de datos
transports = spark.read.csv(src_transports, header=True, inferSchema=True)

In [5]:
# Mostrar el esquema (tipo de datos de cada columna)
transports.printSchema()

root
 |-- Code: string (nullable = true)
 |-- Transport: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Station: string (nullable = true)
 |-- District.Name: string (nullable = true)
 |-- Neighborhood.Name: string (nullable = true)



In [6]:
# Vista previa de las primeras filas
transports.show(5)

+----+-----------+---------+---------+--------------------+-------------------+--------------------+
|Code|  Transport|Longitude| Latitude|             Station|      District.Name|   Neighborhood.Name|
+----+-----------+---------+---------+--------------------+-------------------+--------------------+
|K001|Underground|  2.11937|41.399203|FGC (L6) - REINA ...|Sarrià-Sant Gervasi|              Sarrià|
|K001|Underground| 2.135427|41.397791|FGC (L6) - LA BON...|Sarrià-Sant Gervasi|Sant Gervasi - Ga...|
|K001|Underground| 2.185391|41.451492|METRO (L11) - CAS...|         Nou Barris|    la Trinitat Nova|
|K001|Underground| 2.174473|41.460889|METRO (L11) - CIU...|         Nou Barris|    Ciutat Meridiana|
|K001|Underground| 2.168588|  41.3872|METRO (L1) - CATA...|           Eixample|la Dreta de l'Eix...|
+----+-----------+---------+---------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [7]:
# Cantidad de columnas
num_columns = len(transports.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7


In [8]:
# Nombre de cada columna
column_names = transports.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Code', 'Transport', 'Longitude', 'Latitude', 'Station', 'District.Name', 'Neighborhood.Name']


In [9]:
# Descripción de los datos de cada columna
transports.describe().show()

+-------+----+-------------+-------------------+--------------------+--------------------+-------------------+-----------------+
|summary|Code|    Transport|          Longitude|            Latitude|             Station|      District.Name|Neighborhood.Name|
+-------+----+-------------+-------------------+--------------------+--------------------+-------------------+-----------------+
|  count| 651|          651|                651|                 651|                 651|                487|              487|
|   mean|null|         null|  2.156516794162825|   41.39935084024583|                null|               null|             null|
| stddev|null|         null|0.04006143105067931|0.028519542256909725|                null|               null|             null|
|    min|K001|Airport train|            2.00528|           41.304419| Estació Marítima A-|       Ciutat Vella|        Canyelles|
|    max|K011|  Underground|           2.249093|           41.496321|Tren AEROPORT- AE...|Sarrià-

In [10]:
# Cantidad de registros (número de filas)
num_records = transports.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 651


In [118]:
# Renombrar columnas
transports = transports.withColumnRenamed("District.Name", "District_Name") \
                         .withColumnRenamed("Neighborhood.Name", "Neighborhood_Name")

# Verificar valores nulos
transports.select([count(when(col(c).isNull(), c)).alias(c) for c in transports.columns]).show()

+----+---------+---------+--------+-------+-------------+-----------------+
|Code|Transport|Longitude|Latitude|Station|District_Name|Neighborhood_Name|
+----+---------+---------+--------+-------+-------------+-----------------+
|   0|        0|        0|       0|      0|          164|              164|
+----+---------+---------+--------+-------+-------------+-----------------+



In [119]:
#Si Neighborhood_Name o District_Name son nulos rellenar con texto vacío
transports = transports.na.fill({"District_Name": "", "Neighborhood_Name": ""})

In [120]:
# Verificar registros duplicados
num_duplicates = transports.count() - transports.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [121]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Code", "Longitude", "Latitude"]
transports.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+---------+--------+-----+
|Code|Longitude|Latitude|count|
+----+---------+--------+-----+
+----+---------+--------+-----+



In [122]:
# Guardar los datos limpios en un nuevo archivo
transports.write.csv(output_dir + "/cleaned_transports.csv", header=True)

#### ********************************************************************************************
### Dataset: Immigrants and emigrants by age
#### ********************************************************************************************


In [123]:
# Definir la ruta de origen
src_immigrants_emigrants_by_age = input_dir + '/immigrants_emigrants_by_age.csv'

# Cargar las tablas de datos
immigrants_emigrants_by_age = spark.read.csv(src_immigrants_emigrants_by_age, header=True, inferSchema=True)

In [29]:
# Mostrar el esquema (tipo de datos de cada columna)
immigrants_emigrants_by_age.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- District Code: integer (nullable = true)
 |-- District Name: string (nullable = true)
 |-- Neighborhood Code: integer (nullable = true)
 |-- Neighborhood Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Immigrants: integer (nullable = true)
 |-- Emigrants: integer (nullable = true)



In [30]:
# Vista previa de las primeras filas
immigrants_emigrants_by_age.show(5)

+----+-------------+-------------+-----------------+--------------------+---+----------+---------+
|Year|District Code|District Name|Neighborhood Code|   Neighborhood Name|Age|Immigrants|Emigrants|
+----+-------------+-------------+-----------------+--------------------+---+----------+---------+
|2017|            1| Ciutat Vella|                1|            el Raval|0-4|       154|      108|
|2017|            1| Ciutat Vella|                2|      el Barri Gòtic|0-4|        58|       33|
|2017|            1| Ciutat Vella|                3|      la Barceloneta|0-4|        38|       37|
|2017|            1| Ciutat Vella|                4|Sant Pere, Santa ...|0-4|        56|       55|
|2017|            2|     Eixample|                5|       el Fort Pienc|0-4|        79|       60|
+----+-------------+-------------+-----------------+--------------------+---+----------+---------+
only showing top 5 rows



In [31]:
# Cantidad de columnas
num_columns = len(immigrants_emigrants_by_age.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 8


In [32]:
# Nombre de cada columna
column_names = immigrants_emigrants_by_age.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Year', 'District Code', 'District Name', 'Neighborhood Code', 'Neighborhood Name', 'Age', 'Immigrants', 'Emigrants']


In [33]:
# Descripción de los datos de cada columna
immigrants_emigrants_by_age.describe().show()

+-------+------------------+------------------+-------------------+-----------------+-----------------+-----+------------------+------------------+
|summary|              Year|     District Code|      District Name|Neighborhood Code|Neighborhood Name|  Age|        Immigrants|         Emigrants|
+-------+------------------+------------------+-------------------+-----------------+-----------------+-----+------------------+------------------+
|  count|              4662|              4662|               4662|             4662|             4662| 4662|              4662|              4662|
|   mean|            2016.0|               7.5|               null|37.83783783783784|             null| null|58.232732732732735|33.684041184041185|
| stddev|0.8165841643654697|11.062746475423248|               null|22.12123276754366|             null| null|108.97053575400817|    45.92536502973|
|    min|              2015|                 1|       Ciutat Vella|                1|    Baró de Viver|  0-4|   

In [34]:
# Cantidad de registros (número de filas)
num_records = immigrants_emigrants_by_age.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 4662


In [124]:
# Renombrar columnas
immigrants_emigrants_by_age = immigrants_emigrants_by_age.withColumnRenamed("District Name", "District_Name") \
                         .withColumnRenamed("Neighborhood Name", "Neighborhood_Name") \
                         .withColumnRenamed("District Code", "District_Code") \
                         .withColumnRenamed("Neighborhood Code", "Neighborhood_Code")

# Verificar valores nulos
immigrants_emigrants_by_age.select([count(when(col(c).isNull(), c)).alias(c) for c in immigrants_emigrants_by_age.columns]).show()

+----+-------------+-------------+-----------------+-----------------+---+----------+---------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Age|Immigrants|Emigrants|
+----+-------------+-------------+-----------------+-----------------+---+----------+---------+
|   0|            0|            0|                0|                0|  0|         0|        0|
+----+-------------+-------------+-----------------+-----------------+---+----------+---------+



In [125]:
# Verificar registros duplicados
num_duplicates = immigrants_emigrants_by_age.count() - immigrants_emigrants_by_age.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [126]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Year", "District_Code", "Neighborhood_Code", "Age"]
immigrants_emigrants_by_age.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+-------------+-----------------+---+-----+
|Year|District_Code|Neighborhood_Code|Age|count|
+----+-------------+-----------------+---+-----+
+----+-------------+-----------------+---+-----+



In [127]:
# Guardar los datos limpios en un nuevo archivo
immigrants_emigrants_by_age.write.csv(output_dir + "/cleaned_immigrants_emigrants_by_age.csv", header=True)

#### ********************************************************************************************
### Dataset: Deaths
#### ********************************************************************************************


In [128]:
# Definir la ruta de origen
src_deaths = input_dir + '/deaths.csv'

# Cargar las tablas de datos
deaths = spark.read.csv(src_deaths, header=True, inferSchema=True)

In [41]:
# Mostrar el esquema (tipo de datos de cada columna)
deaths.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- District.Code: integer (nullable = true)
 |-- District.Name: string (nullable = true)
 |-- Neighborhood.Code: integer (nullable = true)
 |-- Neighborhood.Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Number: integer (nullable = true)



In [42]:
# Vista previa de las primeras filas
deaths.show(5)

+----+-------------+-------------+-----------------+--------------------+---+------+
|Year|District.Code|District.Name|Neighborhood.Code|   Neighborhood.Name|Age|Number|
+----+-------------+-------------+-----------------+--------------------+---+------+
|2017|            1| Ciutat Vella|                1|            el Raval|0-4|     1|
|2017|            1| Ciutat Vella|                2|      el Barri Gòtic|0-4|     1|
|2017|            1| Ciutat Vella|                3|      la Barceloneta|0-4|     0|
|2017|            1| Ciutat Vella|                4|Sant Pere, Santa ...|0-4|     0|
|2017|            2|     Eixample|                5|       el Fort Pienc|0-4|     0|
+----+-------------+-------------+-----------------+--------------------+---+------+
only showing top 5 rows



In [43]:
# Cantidad de columnas
num_columns = len(deaths.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7


In [44]:
# Nombre de cada columna
column_names = deaths.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Year', 'District.Code', 'District.Name', 'Neighborhood.Code', 'Neighborhood.Name', 'Age', 'Number']


In [45]:
# Descripción de los datos de cada columna
deaths.describe().show()

+-------+------------------+------------------+-------------------+-----------------+-----------------+-----+------------------+
|summary|              Year|     District.Code|      District.Name|Neighborhood.Code|Neighborhood.Name|  Age|            Number|
+-------+------------------+------------------+-------------------+-----------------+-----------------+-----+------------------+
|  count|              4599|              4599|               4599|             4599|             4599| 4599|              4599|
|   mean|            2016.0|6.2465753424657535|               null|             37.0|             null| null|10.051098064796696|
| stddev|0.8165853643324936| 2.788639085941505|               null|21.07359873688776|             null| null|18.559140929441728|
|    min|              2015|                 1|       Ciutat Vella|                1|    Baró de Viver|  0-4|                 0|
|    max|              2017|                10|Sarrià-Sant Gervasi|               73|  les Tres T

In [46]:
# Cantidad de registros (número de filas)
num_records = deaths.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 4599


In [129]:
# Renombrar columnas
deaths = deaths.withColumnRenamed("District.Code", "District_Code") \
                       .withColumnRenamed("District.Name", "District_Name") \
                       .withColumnRenamed("Neighborhood.Code", "Neighborhood_Code") \
                       .withColumnRenamed("Neighborhood.Name", "Neighborhood_Name")

# Verificar valores nulos
deaths.select([count(when(col(c).isNull(), c)).alias(c) for c in deaths.columns]).show()

+----+-------------+-------------+-----------------+-----------------+---+------+
|Year|District_Code|District_Name|Neighborhood_Code|Neighborhood_Name|Age|Number|
+----+-------------+-------------+-----------------+-----------------+---+------+
|   0|            0|            0|                0|                0|  0|     0|
+----+-------------+-------------+-----------------+-----------------+---+------+



In [130]:
# Verificar registros duplicados
num_duplicates = deaths.count() - deaths.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [131]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Year", "District_Code", "Neighborhood_Code", "Age"]
deaths.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+----+-------------+-----------------+---+-----+
|Year|District_Code|Neighborhood_Code|Age|count|
+----+-------------+-----------------+---+-----+
+----+-------------+-----------------+---+-----+



In [132]:
# Guardar los datos limpios en un nuevo archivo
deaths.write.csv(output_dir + "/cleaned_deaths.csv", header=True)

#### ********************************************************************************************
### Dataset: Gross income
#### ********************************************************************************************


In [133]:
# Definir la ruta de origen
src_gross_income = input_dir + '/2021_atles_renda_bruta_llar.csv'

# Cargar las tablas de datos
gross_income = spark.read.csv(src_gross_income, header=True, inferSchema=True)

In [54]:
# Mostrar el esquema (tipo de datos de cada columna)
gross_income.printSchema()

root
 |-- Any: integer (nullable = true)
 |-- Codi_Districte: integer (nullable = true)
 |-- Nom_Districte: string (nullable = true)
 |-- Codi_Barri: integer (nullable = true)
 |-- Nom_Barri: string (nullable = true)
 |-- Seccio_Censal: integer (nullable = true)
 |-- Import_Renda_Bruta_€: integer (nullable = true)



In [55]:
# Vista previa de las primeras filas
gross_income.show(5)

+----+--------------+-------------+----------+---------+-------------+--------------------+
| Any|Codi_Districte|Nom_Districte|Codi_Barri|Nom_Barri|Seccio_Censal|Import_Renda_Bruta_€|
+----+--------------+-------------+----------+---------+-------------+--------------------+
|2021|             1| Ciutat Vella|         1| el Raval|            1|               34839|
|2021|             1| Ciutat Vella|         1| el Raval|            2|               27777|
|2021|             1| Ciutat Vella|         1| el Raval|            3|               31082|
|2021|             1| Ciutat Vella|         1| el Raval|            4|               34635|
|2021|             1| Ciutat Vella|         1| el Raval|            5|               27950|
+----+--------------+-------------+----------+---------+-------------+--------------------+
only showing top 5 rows



In [56]:
# Cantidad de columnas
num_columns = len(gross_income.columns)
print(f"Cantidad de columnas: {num_columns}")

Cantidad de columnas: 7


In [57]:
# Nombre de cada columna
column_names = gross_income.columns
print(f"Nombres de las columnas: {column_names}")

Nombres de las columnas: ['Any', 'Codi_Districte', 'Nom_Districte', 'Codi_Barri', 'Nom_Barri', 'Seccio_Censal', 'Import_Renda_Bruta_€']


In [58]:
# Descripción de los datos de cada columna
gross_income.describe().show()

+-------+------+------------------+-------------------+-----------------+---------------+-----------------+--------------------+
|summary|   Any|    Codi_Districte|      Nom_Districte|       Codi_Barri|      Nom_Barri|    Seccio_Censal|Import_Renda_Bruta_€|
+-------+------+------------------+-------------------+-----------------+---------------+-----------------+--------------------+
|  count|  1068|              1068|               1068|             1068|           1068|             1068|                1068|
|   mean|2021.0|  5.72378277153558|               null|33.24625468164794|           null|60.64513108614232|   53840.52808988764|
| stddev|   0.0|2.9385324132584225|               null|21.89298833329705|           null|41.56954070940254|   20741.88373398768|
|    min|  2021|                 1|       Ciutat Vella|                1|  Baró de Viver|                1|               25218|
|    max|  2021|                10|Sarrià-Sant Gervasi|               73|les Tres Torres|        

In [59]:
# Cantidad de registros (número de filas)
num_records = gross_income.count()
print(f"Cantidad de registros: {num_records}")

Cantidad de registros: 1068


In [134]:
# Verificar valores nulos
gross_income.select([count(when(col(c).isNull(), c)).alias(c) for c in gross_income.columns]).show()

+---+--------------+-------------+----------+---------+-------------+--------------------+
|Any|Codi_Districte|Nom_Districte|Codi_Barri|Nom_Barri|Seccio_Censal|Import_Renda_Bruta_€|
+---+--------------+-------------+----------+---------+-------------+--------------------+
|  0|             0|            0|         0|        0|            0|                   0|
+---+--------------+-------------+----------+---------+-------------+--------------------+



In [135]:
# Verificar registros duplicados
num_duplicates = gross_income.count() - gross_income.dropDuplicates().count()
print(f"Número de registros duplicados: {num_duplicates}")

Número de registros duplicados: 0


In [136]:
# Verificar si las claves primarias son únicas
primary_key_columns = ["Any", "Codi_Districte", "Codi_Barri", "Seccio_Censal"]
gross_income.groupBy(primary_key_columns).count().filter(col("count") > 1).show()

+---+--------------+----------+-------------+-----+
|Any|Codi_Districte|Codi_Barri|Seccio_Censal|count|
+---+--------------+----------+-------------+-----+
+---+--------------+----------+-------------+-----+



In [137]:
# Guardar los datos limpios en un nuevo archivo
gross_income.write.csv(output_dir + "/cleaned_gross_income.csv", header=True)