In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

In [3]:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar
!wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("data_processing")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4")\
    .config('fs.s3a.access.key', 'ACCES_KEY') \
    .config('fs.s3a.secret.key', 'SECRET_KEY') \
    .config('fs.s3a.session.token', 'SESSION_TOKEN') \
    .config('fs.s3a.path.style.access', 'true') \
    .config('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('fs.s3a.endpoint', 's3.amazonaws.com') \
    .getOrCreate()
sc = spark.sparkContext

In [6]:
spark

In [7]:
sc

In [10]:
df = spark.read.csv('/content/gdrive/MyDrive/content/gdrive/Casos_positivos_de_COVID-19_en_Colombia-100K.csv', inferSchema=True, header=True)


In [11]:
df.columns


['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [12]:
len(df.columns)


23

In [13]:
df.count()


100000

In [14]:
print((df.count(), len(df.columns)))


(100000, 23)


In [15]:
df.printSchema()


root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [16]:
df = df.withColumnRenamed("fecha reporte web", "fecha_reporte_web") \
       .withColumnRenamed("ID de caso", "id_caso") \
       .withColumnRenamed("Fecha de notificación", "fecha_notificacion") \
       .withColumnRenamed("Código DIVIPOLA departamento", "codigo_divipola_departamento") \
       .withColumnRenamed("Nombre departamento", "nombre_departamento") \
       .withColumnRenamed("Código DIVIPOLA municipio", "codigo_divipola_municipio") \
       .withColumnRenamed("Nombre municipio", "nombre_municipio") \
       .withColumnRenamed("Edad", "edad") \
       .withColumnRenamed("Unidad de medida de edad", "unidad_medida_edad") \
       .withColumnRenamed("Sexo", "sexo") \
       .withColumnRenamed("Tipo de contagio", "tipo_contagio") \
       .withColumnRenamed("Ubicación del caso", "ubicacion_caso") \
       .withColumnRenamed("Estado", "estado") \
       .withColumnRenamed("Código ISO del país", "codigo_iso_pais") \
       .withColumnRenamed("Nombre del país", "nombre_pais") \
       .withColumnRenamed("Recuperado", "recuperado") \
       .withColumnRenamed("Fecha de inicio de síntomas", "fecha_inicio_sintomas") \
       .withColumnRenamed("Fecha de muerte", "fecha_muerte") \
       .withColumnRenamed("Fecha de diagnóstico", "fecha_diagnostico") \
       .withColumnRenamed("Fecha de recuperación", "fecha_recuperacion") \
       .withColumnRenamed("Tipo de recuperación", "tipo_recuperacion") \
       .withColumnRenamed("Pertenencia étnica", "pertenencia_etnica") \
       .withColumnRenamed("Nombre del grupo étnico", "nombre_grupo_etnico")

In [17]:
df.show(5)


+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-----------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-----------+----------+--------------

In [18]:
df.select('edad', 'sexo').show(5)


+----+----+
|edad|sexo|
+----+----+
|  19|   F|
|  34|   M|
|  50|   F|
|  55|   M|
|  25|   M|
+----+----+
only showing top 5 rows



In [19]:
df.describe().show()


+-------+-----------------+------------------+------------------+----------------------------+-------------------+-------------------------+----------------+------------------+-------------------+------+-------------+--------------+---------+------------------+-----------+----------+---------------------+----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|fecha_reporte_web|           id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|              edad| unidad_medida_edad|  sexo|tipo_contagio|ubicacion_caso|   estado|   codigo_iso_pais|nombre_pais|recuperado|fecha_inicio_sintomas|    fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-------+-----------------+------------------+------------------+----------------------------+-------------------+-------------------------+----------------+-------

In [20]:
from pyspark.sql.functions import when
df = df.withColumn("es_español", when(df["nombre_pais"] == "ESPAÑA", True).otherwise(False))
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+----------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|es_español|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+-------------

In [21]:
df = df.drop("es_español")
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-------------------

In [22]:
#filter the records
df.filter(df['nombre_departamento'] =='BOGOTA').show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-------------------

In [23]:
df.filter(df['nombre_departamento'] =='BOGOTA').select('tipo_contagio', 'estado').show(10)


+-------------+------+
|tipo_contagio|estado|
+-------------+------+
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|    Importado|  Leve|
|  Relacionado|  Leve|
|  Relacionado|  Leve|
+-------------+------+
only showing top 10 rows



In [25]:
df.filter((df['nombre_departamento'] =='BOGOTA') & (df['edad'] < 25)).show()


+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+-------------------

In [26]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def clasificar_edad(edad):
    if edad is None:
        return "Desconocido"
    elif edad > 18:
        return "Niño"
    elif 18 <= edad < 60:
        return "Adulto"
    else:
        return "Adulto Mayor"

clasificar_edad_udf = udf(clasificar_edad, StringType())

In [27]:
df = df.withColumn("categoria_edad", clasificar_edad_udf(df.edad))
df.show()

+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+---------------+--------------------+----------+---------------------+------------+-----------------+------------------+-----------------+------------------+-------------------+--------------+
|fecha_reporte_web|id_caso|fecha_notificacion|codigo_divipola_departamento|nombre_departamento|codigo_divipola_municipio|nombre_municipio|edad|unidad_medida_edad|sexo|tipo_contagio|ubicacion_caso|estado|codigo_iso_pais|         nombre_pais|recuperado|fecha_inicio_sintomas|fecha_muerte|fecha_diagnostico|fecha_recuperacion|tipo_recuperacion|pertenencia_etnica|nombre_grupo_etnico|categoria_edad|
+-----------------+-------+------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------+----+-------------+--------------+------+-----