# Universidad EAFIT

## ST0263 - Tópicos Especiales en Telemática, 2024-1



*   **Estudiante**: Tomás Bernal Zuluaga - tbernalz@eafit.edu.co
*   **Profesor**: Edwin Montoya - emontoya@eafit.edu.co


---

## Proyecto 3 - Spark con Notebooks y PySpark
En este proyecto se explorarán y analizarán los datos de casos positivos de COVID-19 en Colombia utilizando PySpark y Jupyter Notebooks. El objetivo es realizar un análisis exploratorio de los datos, responder preguntas de negocio específicas y almacenar los datos procesados tanto en AWS S3 como en Google Drive. El proyecto se desarrollará en dos plataformas: AWS-EMR-JupyterHub y Google Colab.


## 1. Almacenar datos en AWS S3 y en google drive

### Configuración en Google Colab de Spark y PySpark






In [1]:
# Montar Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Instalar Java y Spark en Google Colab
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
# Configuración del entorno de Java y Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

### Crear una Sesión de Spark con Configuraciones Adicionales

In [5]:
# Configurar credenciales de AWS
os.environ['AWS_ACCESS_KEY_ID'] = 'tu info aqui'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'tu info aqui'
os.environ['AWS_SESSION_TOKEN'] = 'tu info aqui'

# Inicialización de findspark con las configuraciones establecidas anteriormente
import findspark
findspark.init()
from pyspark.sql import SparkSession

# Detener cualquier sesión de Spark existente
if 'spark' in globals():
    spark.stop()

# Crear una sesión de Spark con configuraciones adicionales para AWS S3
spark = SparkSession.builder \
    .appName("spark_data_processing")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4")\
    .config('fs.s3a.access.key', os.environ["AWS_ACCESS_KEY_ID"]) \
    .config('fs.s3a.secret.key', os.environ["AWS_SECRET_ACCESS_KEY"]) \
    .config('fs.s3a.session.token', os.environ["AWS_SESSION_TOKEN"]) \
    .config('fs.s3a.path.style.access', 'true') \
    .config('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('fs.s3a.endpoint', 's3.amazonaws.com') \
    .getOrCreate()

sc = spark.sparkContext

### Verificar la conexión con Spark

In [6]:
spark

In [7]:
sc

### Cargar dataset desde Google Drive hacia Spark (se debe tener el dataset almacenado en drive)

In [None]:
# Guardar datos procesados en Google Drive
df = spark.read.csv('/content/drive/MyDrive/dataset_covid_colombia/covid_colombia_data.csv', header=True, inferSchema=True)

### Cargar dataset desde AWS S3 hacia Spark (se debe tener el dataset almacenado en un bucket de AWS S3)

In [8]:
# Guardar datos procesados en un bucket de S3
df = spark.read.csv('s3a://tbernalz-p3/dataset_covid_colombia/covid_colombia_data.csv', header=True, inferSchema=True)

## 2. Análisis Exploratorio de Datos
2.1 Mostrar las columnas

In [9]:
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

2.2 Mostrar los tipos de datos

In [10]:
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: timestamp (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: timestamp (nullable = true)
 |-- Fecha de muerte: timestamp (nullable = true)
 |-- Fecha de diagnóstico: timestamp (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |

2.3 seleccionar algunas columnas

In [11]:
# Seleccionar columnas específicas
df_selected = df.select('ID de caso', 'Nombre municipio', 'Edad', 'Estado', 'Fecha de inicio de síntomas')
df_selected.show(5)



+----------+----------------+----+---------+---------------------------+
|ID de caso|Nombre municipio|Edad|   Estado|Fecha de inicio de síntomas|
+----------+----------------+----+---------+---------------------------+
|   1556979|            CALI|  67|     Leve|        2020-12-21 00:00:00|
|   1556980|            CALI|  66|     Leve|        2020-12-07 00:00:00|
|   1556981|            CALI|  68|     Leve|        2020-12-18 00:00:00|
|   1556982|            CALI|  74|Fallecido|        2020-12-17 00:00:00|
|   1556983|            CALI|  65|     Leve|        2020-12-21 00:00:00|
+----------+----------------+----+---------+---------------------------+
only showing top 5 rows



2.4 Renombrar Columnas (esto se recomienda hacerlo para facilitar el procesamiento posterior)

In [12]:
df_selected = df_selected.withColumnRenamed('ID de caso', 'id') \
                         .withColumnRenamed('Nombre municipio', 'city')\
                         .withColumnRenamed('Edad', 'age')\
                         .withColumnRenamed('Estado', 'state')\
                         .withColumnRenamed('Fecha de inicio de síntomas', 'symptom_onset_date')
df_selected.show(5)


+-------+----+---+---------+-------------------+
|     id|city|age|    state| symptom_onset_date|
+-------+----+---+---------+-------------------+
|1556979|CALI| 67|     Leve|2020-12-21 00:00:00|
|1556980|CALI| 66|     Leve|2020-12-07 00:00:00|
|1556981|CALI| 68|     Leve|2020-12-18 00:00:00|
|1556982|CALI| 74|Fallecido|2020-12-17 00:00:00|
|1556983|CALI| 65|     Leve|2020-12-21 00:00:00|
+-------+----+---+---------+-------------------+
only showing top 5 rows



2.5 Agregar columnas

In [13]:
from pyspark.sql.functions import when

# Agregar una nueva columna
df = df.withColumn('edad_grupo',
                                     when(df['Edad'] < 18, 'Niño')
                                     .when((df['Edad'] >= 18) & (df['Edad'] < 45), 'Joven')
                                     .when((df['Edad'] >= 45) & (df['Edad'] < 65), 'Adulto')
                                     .otherwise('Adulto Mayor'))
df.select('ID de caso', 'Edad','edad_grupo').show(5)

+----------+----+------------+
|ID de caso|Edad|  edad_grupo|
+----------+----+------------+
|   1556979|  67|Adulto Mayor|
|   1556980|  66|Adulto Mayor|
|   1556981|  68|Adulto Mayor|
|   1556982|  74|Adulto Mayor|
|   1556983|  65|Adulto Mayor|
+----------+----+------------+
only showing top 5 rows



2.6 Borrar columnas

In [14]:
# Borrar una columna
df = df.drop('edad_grupo')

2.7 Filtrar datos

In [15]:
# Filtrar datos para casos activos
df.filter(df['Nombre municipio'] == 'MEDELLIN').show(5)
df.filter(df['Nombre municipio'] == 'ENVIGADO').show(5)

+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+---------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|  fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+---

2.8 Ejecutar una función UDF sobre alguna columna (Edad) creando una nueva (Edad_grupo)

In [16]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Definir una UDF para categorizar edades
def categorizar_edad(edad):
    if edad < 18:
        return 'Niño'
    elif 18 <= edad < 45:
        return 'Joven'
    elif 45 <= edad < 65:
        return 'Adulto'
    else:
        return 'Adulto Mayor'

categorizar_edad_udf = udf(categorizar_edad, StringType())
df_spark = df.withColumn('Edad_grupo', categorizar_edad_udf(df['Edad']))

df_spark.select('Edad_grupo').show(5)


+------------+
|  Edad_grupo|
+------------+
|Adulto Mayor|
|Adulto Mayor|
|Adulto Mayor|
|Adulto Mayor|
|Adulto Mayor|
+------------+
only showing top 5 rows



## 3. Preguntas de Negocio

3.1 Departamentos con Más Casos de Covid en Colombia ordenados de mayor a menor

In [17]:
# Spark DataFrame API
departamentos_mas_casos = df.groupBy('Nombre departamento').count().orderBy('count', ascending=False).limit(10)
departamentos_mas_casos.show()

# Spark SQL
df.createOrReplaceTempView('covid_data')
departamentos_mas_casos_sql = spark.sql("""
SELECT `Nombre departamento`, COUNT(*) as total
FROM covid_data
GROUP BY `Nombre departamento`
ORDER BY total DESC
LIMIT 10
""")
departamentos_mas_casos_sql.show()

+-------------------+------+
|Nombre departamento| count|
+-------------------+------+
|             BOGOTA|658308|
|          ANTIOQUIA|352813|
|              VALLE|171008|
|       CUNDINAMARCA|117111|
|       BARRANQUILLA|112080|
|          SANTANDER| 95906|
|          ATLANTICO| 66023|
|          CARTAGENA| 60560|
|             CALDAS| 50222|
|              CESAR| 48292|
+-------------------+------+

+-------------------+------+
|Nombre departamento| total|
+-------------------+------+
|             BOGOTA|658308|
|          ANTIOQUIA|352813|
|              VALLE|171008|
|       CUNDINAMARCA|117111|
|       BARRANQUILLA|112080|
|          SANTANDER| 95906|
|          ATLANTICO| 66023|
|          CARTAGENA| 60560|
|             CALDAS| 50222|
|              CESAR| 48292|
+-------------------+------+



3.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor

In [18]:
# Spark DataFrame API
ciudades_mas_casos = df.groupBy('Nombre municipio').count().orderBy('count', ascending=False).limit(10)
ciudades_mas_casos.show()

# Spark SQL
ciudades_mas_casos_sql = spark.sql("""
SELECT `Nombre municipio`, COUNT(*) as total
FROM covid_data
GROUP BY `Nombre municipio`
ORDER BY total DESC
LIMIT 10
""")
ciudades_mas_casos_sql.show()

+----------------+------+
|Nombre municipio| count|
+----------------+------+
|          BOGOTA|658308|
|        MEDELLIN|195405|
|            CALI|115651|
|    BARRANQUILLA|112080|
|       CARTAGENA| 60560|
|     BUCARAMANGA| 43798|
|       MANIZALES| 35812|
|      VALLEDUPAR| 34245|
|     SANTA MARTA| 33666|
|          IBAGUE| 32885|
+----------------+------+

+----------------+------+
|Nombre municipio| total|
+----------------+------+
|          BOGOTA|658308|
|        MEDELLIN|195405|
|            CALI|115651|
|    BARRANQUILLA|112080|
|       CARTAGENA| 60560|
|     BUCARAMANGA| 43798|
|       MANIZALES| 35812|
|      VALLEDUPAR| 34245|
|     SANTA MARTA| 33666|
|          IBAGUE| 32885|
+----------------+------+



3.3 Los 10 días con más casos de covid en Colombia ordenados de mayor a menor

In [19]:
# Spark DataFrame API
dias_mas_casos = df.groupBy('Fecha de diagnóstico').count().orderBy('count', ascending=False).limit(10)
dias_mas_casos.show()

# Spark SQL
dias_mas_casos_sql = spark.sql("""
SELECT `Fecha de diagnóstico`, COUNT(*) as total
FROM covid_data
GROUP BY `Fecha de diagnóstico`
ORDER BY total DESC
LIMIT 10
""")
dias_mas_casos_sql.show()

+--------------------+-----+
|Fecha de diagnóstico|count|
+--------------------+-----+
| 2021-06-02 00:00:00|27439|
| 2021-06-03 00:00:00|26826|
| 2021-06-01 00:00:00|24619|
| 2021-05-26 00:00:00|24577|
| 2021-06-04 00:00:00|24152|
| 2021-05-24 00:00:00|24025|
| 2021-05-25 00:00:00|23755|
| 2021-05-27 00:00:00|23584|
| 2021-05-28 00:00:00|23567|
| 2021-05-31 00:00:00|23558|
+--------------------+-----+

+--------------------+-----+
|Fecha de diagnóstico|total|
+--------------------+-----+
| 2021-06-02 00:00:00|27439|
| 2021-06-03 00:00:00|26826|
| 2021-06-01 00:00:00|24619|
| 2021-05-26 00:00:00|24577|
| 2021-06-04 00:00:00|24152|
| 2021-05-24 00:00:00|24025|
| 2021-05-25 00:00:00|23755|
| 2021-05-27 00:00:00|23584|
| 2021-05-28 00:00:00|23567|
| 2021-05-31 00:00:00|23558|
+--------------------+-----+



3.4 Distribución de casos por edades de covid en Colombia

In [20]:
# Spark DataFrame API
casos_por_edad = df.groupBy('Edad').count().orderBy('Edad')
casos_por_edad.show()

# Spark SQL
casos_por_edad_sql = spark.sql("""
SELECT Edad, COUNT(*) as total
FROM covid_data
GROUP BY Edad
ORDER BY Edad
""")
casos_por_edad_sql.show()

+----+-----+
|Edad|count|
+----+-----+
|NULL|    4|
|   1| 8021|
|   2| 7282|
|   3| 7263|
|   4| 7658|
|   5| 8173|
|   6| 8582|
|   7| 8501|
|   8| 9636|
|   9|10028|
|  10|10659|
|  11|11804|
|  12|12237|
|  13|13093|
|  14|14315|
|  15|15340|
|  16|16517|
|  17|18749|
|  18|22887|
|  19|29456|
+----+-----+
only showing top 20 rows

+----+-----+
|Edad|total|
+----+-----+
|NULL|    4|
|   1| 8021|
|   2| 7282|
|   3| 7263|
|   4| 7658|
|   5| 8173|
|   6| 8582|
|   7| 8501|
|   8| 9636|
|   9|10028|
|  10|10659|
|  11|11804|
|  12|12237|
|  13|13093|
|  14|14315|
|  15|15340|
|  16|16517|
|  17|18749|
|  18|22887|
|  19|29456|
+----+-----+
only showing top 20 rows



3.5 Pregunda de negocio: ¿Cuál es el promedio de edad de los casos recuperados?


In [21]:
# Filtrar los casos recuperados
casos_recuperados = df.filter(df['Tipo de recuperación'] != 'NULL')

# Calcular el promedio de edad de los casos activos
promedio_edad_recuperados = casos_recuperados.agg({'Edad': 'avg'})



promedio_edad_recuperados.show()

+------------------+
|         avg(Edad)|
+------------------+
|38.981673922328746|
+------------------+



### 4. Almacenamiento de Resultados en un Bucke público de AWS S3


In [25]:
# 3.1 Departamentos con Más Casos de Covid en Colombia ordenados de mayor a menor
departamentos_mas_casos.write.csv('s3a://tbernalz-p3/dataset_covid_colombia/departamentos_mas_casos.csv', header=True)

# 3.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor
ciudades_mas_casos.write.csv('s3a://tbernalz-p3/dataset_covid_colombia/ciudades_mas_casos.csv', header=True)

# 3.3 Los 10 días con más casos de covid en Colombia ordenados de mayor a menor
dias_mas_casos.write.csv('s3a://tbernalz-p3/dataset_covid_colombia/dias_mas_casos.csv', header=True)

# 3.4 Distribución de casos por edades de covid en Colombia
casos_por_edad.write.csv('s3a://tbernalz-p3/dataset_covid_colombia/casos_por_edad.csv', header=True)

# 3.5 Pregunda de negocio: ¿Cuál es el promedio de edad de los casos recuperados?
promedio_edad_recuperados.write.csv('s3a://tbernalz-p3/dataset_covid_colombia/promedio_edad_recuperados.csv', header=True)

## Conclusión

En este proyecto se realizó un análisis detallado de los datos de COVID-19 en Colombia utilizando PySpark. Se almacenaron los datos en AWS S3 y Google Drive, se llevó a cabo un análisis exploratorio de datos y se respondieron preguntas de negocio utilizando tanto la API de DataFrame de Spark como SparkSQL. Finalmente, los resultados se almacenaron en AWS S3 para su posterior análisis y consulta.



## Referencias

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html

https://www.datos.gov.co/api/views/gt2j-8ykr/rows.csv?accessType=DOWNLOAD