# Covid19 PySpark

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
!wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("data_processing")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4")\
    .config('fs.s3a.access.key', 'ACCESS_KEY') \
    .config('fs.s3a.secret.key', 'SECRET_KEY') \
    .config('fs.s3a.session.token', 'SESSION_TOKEN') \
    .config('fs.s3a.path.style.access', 'true') \
    .config('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('fs.s3a.endpoint', 's3.amazonaws.com') \
    .getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [None]:
df = spark.read.csv('/content/gdrive/MyDrive/labs/bigdata//covid19/CasosPositivosCovid19.csv', inferSchema=True, header=True)

In [None]:
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [None]:
len(df.columns)

23

In [None]:
df.count()

100000

In [None]:
print((df.count(), len(df.columns)))

(100000, 23)


In [None]:
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [None]:
df = df.withColumnRenamed("fecha reporte web", "fecha_reporte_web") \
       .withColumnRenamed("ID de caso", "id_caso") \
       .withColumnRenamed("Fecha de notificación", "fecha_notificacion") \
       .withColumnRenamed("Código DIVIPOLA departamento", "codigo_divipola_departamento") \
       .withColumnRenamed("Nombre departamento", "nombre_departamento") \
       .withColumnRenamed("Código DIVIPOLA municipio", "codigo_divipola_municipio") \
       .withColumnRenamed("Nombre municipio", "nombre_municipio") \
       .withColumnRenamed("Edad", "edad") \
       .withColumnRenamed("Unidad de medida de edad", "unidad_medida_edad") \
       .withColumnRenamed("Sexo", "sexo") \
       .withColumnRenamed("Tipo de contagio", "tipo_contagio") \
       .withColumnRenamed("Ubicación del caso", "ubicacion_caso") \
       .withColumnRenamed("Estado", "estado") \
       .withColumnRenamed("Código ISO del país", "codigo_iso_pais") \
       .withColumnRenamed("Nombre del país", "nombre_pais") \
       .withColumnRenamed("Recuperado", "recuperado") \
       .withColumnRenamed("Fecha de inicio de síntomas", "fecha_inicio_sintomas") \
       .withColumnRenamed("Fecha de muerte", "fecha_muerte") \
       .withColumnRenamed("Fecha de diagnóstico", "fecha_diagnostico") \
       .withColumnRenamed("Fecha de recuperación", "fecha_recuperacion") \
       .withColumnRenamed("Tipo de recuperación", "tipo_recuperacion") \
       .withColumnRenamed("Pertenencia étnica", "pertenencia_etnica") \
       .withColumnRenamed("Nombre del grupo étnico", "nombre_grupo_etnico")

In [None]:
df.show(5)

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-----------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-----------+----------+--------------

In [None]:
df.select('edad', 'sexo').show(5)

+----+----+
|edad|sexo|
+----+----+
|  19|   F|
|  34|   M|
|  50|   F|
|  55|   M|
|  25|   M|
+----+----+
only showing top 5 rows



In [None]:
df.describe().show()

+-------+-----------------+------------------+------------------+----------------------------+-------------------+-------------------------+----------------+------------------+-------------------+------+-------------+--------------+---------+------------------+-----------+----------+---------------------+----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|fecha_reporte_web|           id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|              edad| unidad_medida_edad|  sexo|tipo_contagio|ubicacion_caso|   estado|   codigo_iso_pais|nombre_pais|recuperado|fecha_inicio_sintomas|    fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-------+-----------------+------------------+------------------+----------------------------+-------------------+-------------------------+----------------+-------

In [None]:
from pyspark.sql.functions import when

In [None]:
df = df.withColumn("es_español", when(df["nombre_pais"] == "ESPAÑA", True).otherwise(False))
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+----------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|es_español|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+-------------

In [None]:
# Delete Column
df = df.drop("es_español")
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+--------------+-------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|categoria_edad|mayor_de_edad|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------

In [None]:
df.filter(df['nombre_departamento'] =='ANTIOQUIA').show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+--------------+-------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|categoria_edad|mayor_de_edad|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------

In [None]:
#filter the records
df.filter(df['nombre_departamento'] =='ANTIOQUIA').select('tipo_contagio', 'estado', 'tipo_recuperacion').show(10)

+-------------+------+-----------------+
|tipo_contagio|estado|tipo_recuperacion|
+-------------+------+-----------------+
|    Importado|  Leve|              PCR|
|  Relacionado|  Leve|              PCR|
|  Relacionado|  Leve|              PCR|
|  Relacionado|  Leve|              PCR|
|  Relacionado|  Leve|              PCR|
|  Relacionado|  Leve|              PCR|
|    Importado|  Leve|              PCR|
|    Importado|  Leve|              PCR|
|    Importado|  Leve|              PCR|
|    Importado|  Leve|              PCR|
+-------------+------+-----------------+
only showing top 10 rows



In [None]:
df.filter((df['nombre_departamento'] =='ANTIOQUIA') & (df['edad'] > 40)).show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+--------------+-------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|categoria_edad|mayor_de_edad|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------

In [None]:
# Value counts
df.groupBy('codigo_iso_pais').count().orderBy('count', ascending=False).show(5, False)

+---------------+-----+
|codigo_iso_pais|count|
+---------------+-----+
|NULL           |99088|
|724            |258  |
|840            |212  |
|76             |59   |
|218            |59   |
+---------------+-----+
only showing top 5 rows



In [None]:
df.groupBy('codigo_iso_pais').sum().show(5, False)

+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|codigo_iso_pais|sum(id_caso)|sum(codigo_divipola_departamento)|sum(codigo_divipola_municipio)|sum(edad)|sum(unidad_medida_edad)|sum(codigo_iso_pais)|sum(pertenencia_etnica)|
+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|858            |2425        |76                               |76113                         |24       |1                      |858                 |6                      |
|530            |6262        |82                               |82045                         |326      |8                      |4240                |48                     |
|756            |962         |76                               |76001                         |68       |1                   

In [None]:
# Value counts
df.groupBy('codigo_iso_pais').max().show(5, False)

+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|codigo_iso_pais|max(id_caso)|max(codigo_divipola_departamento)|max(codigo_divipola_municipio)|max(edad)|max(unidad_medida_edad)|max(codigo_iso_pais)|max(pertenencia_etnica)|
+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|858            |2425        |76                               |76113                         |24       |1                      |858                 |6                      |
|530            |1588        |11                               |11001                         |74       |1                      |530                 |6                      |
|756            |962         |76                               |76001                         |68       |1                   

In [None]:
# Value counts
df.groupBy('codigo_iso_pais').min().show(5, False)

+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|codigo_iso_pais|min(id_caso)|min(codigo_divipola_departamento)|min(codigo_divipola_municipio)|min(edad)|min(unidad_medida_edad)|min(codigo_iso_pais)|min(pertenencia_etnica)|
+---------------+------------+---------------------------------+------------------------------+---------+-----------------------+--------------------+-----------------------+
|858            |2425        |76                               |76113                         |24       |1                      |858                 |6                      |
|530            |262         |5                                |5038                          |28       |1                      |530                 |6                      |
|756            |962         |76                               |76001                         |68       |1                   

In [None]:
#Aggregation
df.groupBy('codigo_iso_pais').agg({'edad': 'sum'}).show(5, False)

+---------------+---------+
|codigo_iso_pais|sum(edad)|
+---------------+---------+
|858            |24       |
|530            |326      |
|756            |68       |
|300            |206      |
|784            |45       |
+---------------+---------+
only showing top 5 rows



In [None]:
# Lambda
mayor_de_edad_lambda = udf(lambda edad: "Sí" if edad >= 18 else "No", StringType())

In [None]:
df = df.withColumn("mayor_de_edad", mayor_de_edad_lambda(df.edad))

In [None]:
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+--------------+-------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|categoria_edad|mayor_de_edad|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------

In [None]:
# saving file (csv)

In [None]:
#current working directory
!pwd

/content


In [None]:
!mkdir -p /usr/local/spark/jars/

!wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
!wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar
!cp hadoop-aws-3.3.4.jar /usr/local/spark/jars/
!cp aws-java-sdk-bundle-1.11.1026.jar /usr/local/spark/jars/

--2024-11-07 23:44:53--  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 962685 (940K) [application/java-archive]
Saving to: ‘hadoop-aws-3.3.4.jar.1’


2024-11-07 23:44:54 (15.3 MB/s) - ‘hadoop-aws-3.3.4.jar.1’ saved [962685/962685]

--2024-11-07 23:44:54--  https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 226379782 (216M) [application/java-archive]
Saving to: ‘aws-java-sdk-bundle-1.11.1026.jar.1’


2024-11-07 23:

In [None]:
write_uri='s3a://labs/bigdata/colab/csv'

In [None]:
df.coalesce(1).write.format("csv").option("header", "true").save(write_uri)