# ST0263 - Tópicos Especiales de Telemática

- Nombre: Miguel Angel Martinez Florez

- Correo: mamartinef@eafit.edu.co

## Profesor

- Nombre: Edwin Nelson Montoya Munera

- Correo: emontoya@eafit.edu.co

# Proyecto 3 - Spark con Notebooks y PySpark.

En el Proyecto 3, se abordarán datos sobre casos positivos de COVID-19 en Colombia empleando Apache Spark y PySpark en Jupyter Notebooks. Se llevará a cabo un análisis exploratorio y se resolverán preguntas de negocio, utilizando dos métodos de procesamiento en Spark: DataFrames y SparkSQL. Los datos serán almacenados y procesados tanto en AWS S3 como en Google Drive.




**Almacenamiento de datos en AWS S3 y en google drive**

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
sc

In [7]:
df=spark.read.csv('drive/MyDrive/Casos_positivos_de_COVID-19_en_Colombia..csv',inferSchema=True,header=True)


**2) Análisis exploratorio de datos en dataframes, cargando los datos con programas en JupyterHub y Google Colab.**

2.1 columnas

In [18]:
#Columnas
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

2.2 tipos de datos

In [9]:
#Tipos de datos
df.printSchema()

root
 |-- fecha reporte web: timestamp (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: timestamp (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: timestamp (nullable = true)
 |-- Fecha de muerte: timestamp (nullable = true)
 |-- Fecha de diagnóstico: timestamp (nullable = true)
 |-- Fecha de recuperación: timestamp (nullable = tr

2.3 seleccionar algunas columnas

In [10]:
#Seleccionar algunas columnas
df.show(5)
df.select('Edad','Nombre municipio').show(5)

+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------+---------------------------+-------------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|  fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|Nombre del país|Recuperado|Fecha de inicio de síntomas|    Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+-----------

2.4 RENOMBRAR COLUMNAS (se recomienda hacerlo para facilitar el procesamiento posterior)

In [11]:
#Renombrar columna
df=df.withColumnRenamed('Nombre municipio','Municipios')

2.5 agregar columnas

In [12]:
#Agregar columna
df = df.withColumn('Tiempo diagnostico',(df["Fecha de diagnóstico"] - df["Fecha de inicio de síntomas"]))

2.6 borrar columnas

In [13]:
#Borrar columna
df = df.drop("Tiempo diagnostico")

2.7 filtrar datos

In [14]:
#Filtrar datos
df.filter(df['Estado']=='Fallecido').show()

+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------------+---------------------------+-------------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|  fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|Nombre del país|Paciente de alta|Fecha de inicio de síntomas|    Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------

2.8 Ejecutar una función UDF o lambda sobre alguna columna creando una nueva.

In [15]:
#Ejecutar alguna función lambda
#using lambda function
# UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
age_udf = udf(lambda age: "Joven" if age <= 18 else "Adulto", StringType())
#apply udf on dataframe
df.withColumn("Grupo Edad", age_udf(df.Edad)).show(10,False)

+-------------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+---------+-------------------+---------------+----------------+---------------------------+-------------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------+
|fecha reporte web  |ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado   |Código ISO del país|Nombre del país|Paciente de alta|Fecha de inicio de síntomas|Fecha de muerte    |Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Grupo Edad|
+-------------------+----------+---------------------+----------------------------+-------------------+---

**3) Contestacion de las preguntas de negocio sobre los datos del COVID**

3.1 Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor

In [8]:
from pyspark.sql.functions import col

# Agrupar por 'Nombre departamento' y contar los casos
departamentos_df = df.groupBy("Nombre departamento").count()

# Renombrar la columna para claridad
departamentos_df = departamentos_df.withColumnRenamed("count", "total_casos")

# Ordenar los departamentos por casos en orden descendente y seleccionar los 10 primeros
top_departamentos_df = departamentos_df.orderBy(col("total_casos").desc()).limit(10)

# Mostrar el resultado
top_departamentos_df.show()


+-------------------+-----------+
|Nombre departamento|total_casos|
+-------------------+-----------+
|             BOGOTA|    1888137|
|          ANTIOQUIA|     955271|
|              VALLE|     572724|
|       CUNDINAMARCA|     331331|
|          SANTANDER|     297370|
|       BARRANQUILLA|     277989|
|          CARTAGENA|     163526|
|          ATLANTICO|     141072|
|             BOYACA|     131133|
|             TOLIMA|     127764|
+-------------------+-----------+



In [9]:
# Crear una vista temporal para usar SQL
df.createOrReplaceTempView("covid")

# Consulta SQL para obtener los 10 departamentos con más casos
top_departamentos_sql = spark.sql("""
    SELECT `Nombre departamento`, COUNT(*) as total_casos
    FROM covid
    GROUP BY `Nombre departamento`
    ORDER BY total_casos DESC
    LIMIT 10
""")

# Mostrar el resultado
top_departamentos_sql.show()

+-------------------+-----------+
|Nombre departamento|total_casos|
+-------------------+-----------+
|             BOGOTA|    1888137|
|          ANTIOQUIA|     955271|
|              VALLE|     572724|
|       CUNDINAMARCA|     331331|
|          SANTANDER|     297370|
|       BARRANQUILLA|     277989|
|          CARTAGENA|     163526|
|          ATLANTICO|     141072|
|             BOYACA|     131133|
|             TOLIMA|     127764|
+-------------------+-----------+



3.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor

In [10]:
# Agrupar por 'Nombre municipio' y contar los casos
ciudades_df = df.groupBy("Nombre municipio").count()

# Renombrar la columna para claridad
ciudades_df = ciudades_df.withColumnRenamed("count", "total_casos")

# Ordenar las ciudades por casos en orden descendente y seleccionar los 10 primeros
top_ciudades_df = ciudades_df.orderBy(col("total_casos").desc()).limit(10)

# Mostrar el resultado
top_ciudades_df.show()

+----------------+-----------+
|Nombre municipio|total_casos|
+----------------+-----------+
|          BOGOTA|    1888137|
|        MEDELLIN|     550790|
|            CALI|     406751|
|    BARRANQUILLA|     277989|
|       CARTAGENA|     163526|
|     BUCARAMANGA|     142842|
|          IBAGUE|      91598|
|     SANTA MARTA|      84863|
|       MANIZALES|      84478|
|          CUCUTA|      77359|
+----------------+-----------+



In [11]:
# Consulta SQL para obtener las 10 ciudades con más casos
top_ciudades_sql = spark.sql("""
    SELECT `Nombre municipio`, COUNT(*) as total_casos
    FROM covid
    GROUP BY `Nombre municipio`
    ORDER BY total_casos DESC
    LIMIT 10
""")

# Mostrar el resultado
top_ciudades_sql.show()

+----------------+-----------+
|Nombre municipio|total_casos|
+----------------+-----------+
|          BOGOTA|    1888137|
|        MEDELLIN|     550790|
|            CALI|     406751|
|    BARRANQUILLA|     277989|
|       CARTAGENA|     163526|
|     BUCARAMANGA|     142842|
|          IBAGUE|      91598|
|     SANTA MARTA|      84863|
|       MANIZALES|      84478|
|          CUCUTA|      77359|
+----------------+-----------+



3.3 Los 10 días con más casos de covid en Colombia ordenados de mayor a menor


In [13]:
# Agrupar por 'fecha reporte web' y contar los casos
dias_df = df.groupBy("fecha reporte web").count()

# Renombrar la columna para claridad
dias_df = dias_df.withColumnRenamed("count", "total_casos")

# Ordenar los días por casos en orden descendente y seleccionar los 10 primeros
top_dias_df = dias_df.orderBy(col("total_casos").desc()).limit(10)

# Mostrar el resultado
top_dias_df.show()


+-------------------+-----------+
|  fecha reporte web|total_casos|
+-------------------+-----------+
|2022-01-15 00:00:00|      35576|
|2022-01-14 00:00:00|      34924|
|2021-06-26 00:00:00|      33593|
|2021-06-24 00:00:00|      32997|
|2021-06-25 00:00:00|      32732|
|2021-06-27 00:00:00|      32377|
|2022-01-16 00:00:00|      32318|
|2022-01-08 00:00:00|      31170|
|2022-01-21 00:00:00|      31039|
|2022-01-09 00:00:00|      30630|
+-------------------+-----------+



In [21]:
# Consulta SQL para obtener los 10 días con más casos
top_dias_sql = spark.sql("""
    SELECT `fecha reporte web`, COUNT(*) as total_casos
    FROM covid
    GROUP BY `fecha reporte web`
    ORDER BY total_casos DESC
    LIMIT 10
""")

# Mostrar el resultado
top_dias_sql.show()


+-------------------+-----------+
|  fecha reporte web|total_casos|
+-------------------+-----------+
|2022-01-15 00:00:00|      35576|
|2022-01-14 00:00:00|      34924|
|2021-06-26 00:00:00|      33593|
|2021-06-24 00:00:00|      32997|
|2021-06-25 00:00:00|      32732|
|2021-06-27 00:00:00|      32377|
|2022-01-16 00:00:00|      32318|
|2022-01-08 00:00:00|      31170|
|2022-01-21 00:00:00|      31039|
|2022-01-09 00:00:00|      30630|
+-------------------+-----------+



3.4 Distribución de casos por edades de covid en Colombia

In [14]:
# Agrupar por 'Edad' y contar los casos
edades_df = df.groupBy("Edad").count()

# Renombrar la columna para claridad
edades_df = edades_df.withColumnRenamed("count", "total_casos")

# Mostrar la distribución de casos por edades
edades_df.show()

+----+-----------+
|Edad|total_casos|
+----+-----------+
|  31|     149885|
|  85|      13303|
|  65|      52856|
|  53|      88702|
|  78|      21996|
| 108|         10|
|  34|     138847|
| 101|        284|
|  81|      18615|
|  28|     151306|
|  76|      24920|
|  27|     149393|
|  26|     148680|
|  44|     105934|
| 103|        128|
|  12|      34208|
|  91|       5931|
|  22|     115027|
|  93|       3964|
|  47|      94180|
+----+-----------+
only showing top 20 rows



In [23]:
# Consulta SQL para obtener la distribución de casos por edades
distribucion_edades_sql = spark.sql("""
    SELECT Edad, COUNT(*) as total_casos
    FROM covid
    GROUP BY Edad
    ORDER BY Edad
""")

# Mostrar el resultado
distribucion_edades_sql.show()

+----+-----------+
|Edad|total_casos|
+----+-----------+
|   1|      33429|
|   2|      26207|
|   3|      24223|
|   4|      23905|
|   5|      24768|
|   6|      26380|
|   7|      26067|
|   8|      28717|
|   9|      30256|
|  10|      31768|
|  11|      34814|
|  12|      34208|
|  13|      36646|
|  14|      39774|
|  15|      43189|
|  16|      46546|
|  17|      52437|
|  18|      64310|
|  19|      80783|
|  20|      93686|
+----+-----------+
only showing top 20 rows



3.5 Pregunta de negocio: ¿Cuál es la ciudad con mayor promedio de casos diarios?

In [15]:
# Calcular el promedio de casos diarios por ciudad
promedio_diario_df = df.groupBy("Nombre municipio").avg("Edad")  # Usamos Edad como proxy para contar los casos

# Renombrar la columna para claridad
promedio_diario_df = promedio_diario_df.withColumnRenamed("avg(Edad)", "promedio_casos_diarios")

# Ordenar las ciudades por el promedio de casos diarios en orden descendente y seleccionar la primera
ciudad_max_promedio_df = promedio_diario_df.orderBy(col("promedio_casos_diarios").desc()).limit(1)

# Mostrar el resultado
ciudad_max_promedio_df.show()

+----------------+----------------------+
|Nombre municipio|promedio_casos_diarios|
+----------------+----------------------+
|          Gameza|                  94.0|
+----------------+----------------------+



In [25]:
# Consulta SQL para obtener la ciudad con mayor promedio de casos diarios
ciudad_max_promedio_sql = spark.sql("""
    SELECT `Nombre municipio`, AVG(Edad) as promedio_casos_diarios  -- Usamos Edad como proxy para contar los casos
    FROM covid
    GROUP BY `Nombre municipio`
    ORDER BY promedio_casos_diarios DESC
    LIMIT 1
""")

# Mostrar el resultado
ciudad_max_promedio_sql.show()


+----------------+----------------------+
|Nombre municipio|promedio_casos_diarios|
+----------------+----------------------+
|          Gameza|                  94.0|
+----------------+----------------------+



**4) Guarde los datos del numeral 3 en el bucket público de cada estudiante.**

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("data_processing") \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("fs.s3a.access.key", "ASIA5CVVWKQQINO7RPWO") \
    .config("fs.s3a.secret.key", "fAGIJSoMr2oCC4p2CSZPnETogGBZYPniMA6z10VQ") \
    .config("fs.s3a.session.token", "IQoJb3JpZ2luX2VjEMb//////////wEaCXVzLXdlc3QtMiJGMEQCIFO/A0m2C1BpwDBqmgQ7xTRliZgtIA/94ZJTXXczxbk/AiATUQMcI8KM5L43AsMj+w/oWkhzKHBl3muXMIgYv6wuAiqxAghPEAAaDDg5OTEwMjk1NDUyOCIMZQd5VHlvSeuK1L4+Ko4CSA+6yHhhJnitFvrR+yt/x4H7qBczkygGuD36hDSRmqzyAuZeu0SnFrOBrUBDZMmRX3YJK18PIX8KfVAnbYLpmsVsq6dedQ89qwfh4kcbdBjLw5jkK6Up4mYfNpye79Qawo3Fi9NbksHYx6NTPs8cJ5kYXeQB18OmA6/bXisXrPdEKu/93nqZEVAwDT/mR+UIOo8aMM3mtY4CcM8VwKJhd6RLg8OExxYKA1d/WSHIqkGYVGTsIzmT9vK/knC35fc15sc4UWFZ1XEsQaEXR5spy+dg+m5b7trN++CfmxvZE8k4UBHgfOKmzRlLXgQhPIslaa04frYgEVeOWSWciNjUM2LPJgxoaXCSvAD6eXIrML6T6bIGOp4BF9fcQEbQHKdS0LdZGOsHhxuJoGhxYzKe1FdhgwRt+ISApiNcPJUP8wcjxOaXxKeFSHDSGiSKNcGHzYCRmzT6qRROeK8GbryDrj06m2aooFEDHZAsNx+sHEUkxkHZLioPlrKtnZ6dBQMJz4NBH4YYH8kXOn/e4G+P+s1PaK1fpnkwpy7LfkJbCGmVZSVUsl1Qi5CarF7zudbMO1sgpzo=") \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.endpoint", "s3.amazonaws.com") \
    .getOrCreate()

sc = spark.sparkContext


In [5]:
# desde S3
df=spark.read.csv('s3a://proyecto-telematica/Casos_positivos_de_COVID-19_en_Colombia..csv',inferSchema=True,header=True)

4.1 Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.

In [12]:
top_departamentos_df.write.format('csv').mode('overwrite').option('header', 'true').save('s3a://proyecto-telematica/top_10_casos_por_departamento.csv')

4.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.

In [16]:
top_ciudades_df.write.format('csv').mode('overwrite').option('header', 'true').save('s3a://proyecto-telematica/top_10_casos_por_ciudad.csv')

4.3 Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.

In [17]:
top_dias_df.write.format('csv').mode('overwrite').option('header', 'true').save('s3a://proyecto-telematica/top_10_casos_por_dia.csv')

4.4 Distribución de casos por edades de covid en Colombia.

In [19]:
edades_df.write.format('csv').mode('overwrite').option('header', 'true').save('s3a://proyecto-telematica/distribucion_casos_por_edades.csv')

4.5 Realice la pregunta de negocio que quiera sobre los datos y respondala con la correspondiente programación en spark.

In [20]:
ciudad_max_promedio_df.write.format('csv').mode('overwrite').option('header', 'true').save('s3a://proyecto-telematica/ciudad_mayor_promedio_casos_diarios.csv')