In [None]:
# Instalar Java y Spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
# Trabajar en Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

In [9]:
# Conectamos nuestro Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [10]:
# Cargar datos desde Drive
df=spark.read.csv('/content/gdrive/MyDrive/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [11]:
# Columnas
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [12]:
# Tipo de dato
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [14]:
# Seleccionar 3 columnas
df.select('ID de caso','Edad', 'Sexo').show(5)

+----------+----+----+
|ID de caso|Edad|Sexo|
+----------+----+----+
|         1|  19|   F|
|         2|  34|   M|
|         3|  50|   F|
|         4|  55|   M|
|         5|  25|   M|
+----------+----+----+
only showing top 5 rows



In [15]:
# Cambiar nombre de columna
df.select('ID de caso', 'Nombre del país').withColumnRenamed('Nombre del país', 'País').show(5)


+----------+------+
|ID de caso|  País|
+----------+------+
|         1|ITALIA|
|         2|ESPAÑA|
|         3|ESPAÑA|
|         4|  null|
|         5|  null|
+----------+------+
only showing top 5 rows



In [16]:
# Agregar columna (Aumentar 20 años la edad)
df.select('ID de caso', 'Edad').withColumn('Edad mas 20', (df['Edad']+20)).show(5)


+----------+----+-----------+
|ID de caso|Edad|Edad mas 20|
+----------+----+-----------+
|         1|  19|         39|
|         2|  34|         54|
|         3|  50|         70|
|         4|  55|         75|
|         5|  25|         45|
+----------+----+-----------+
only showing top 5 rows



In [17]:
# Borrar una columna
df.select('ID de caso','Edad','Sexo', 'Nombre del grupo étnico').drop('Nombre del grupo étnico').show(5)

+----------+----+----+
|ID de caso|Edad|Sexo|
+----------+----+----+
|         1|  19|   F|
|         2|  34|   M|
|         3|  50|   F|
|         4|  55|   M|
|         5|  25|   M|
+----------+----+----+
only showing top 5 rows



In [19]:
# Filtrar datos
df.filter(df['Edad']=='19').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [24]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Funcion lambda
riesgo_udf = udf(lambda riesgo: "Moderado" if riesgo <= 60 else "Alto", StringType())
# Usar funcion
df.withColumn("Riesgo de muerte", riesgo_udf(df.Edad)).show(10)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Riesgo de muerte|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------

In [33]:
# Punto 3 Dataframes

# Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.
df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+
only showing top 10 rows



In [26]:
# Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.
df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+
only showing top 10 rows



In [30]:
# Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.
df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

+---------------------+-----+
|Fecha de notificación|count|
+---------------------+-----+
|18/6/2020 0:00:00    |3477 |
|19/6/2020 0:00:00    |3328 |
|17/6/2020 0:00:00    |3318 |
|16/6/2020 0:00:00    |3232 |
|23/6/2020 0:00:00    |3230 |
|11/6/2020 0:00:00    |2747 |
|20/6/2020 0:00:00    |2684 |
|12/6/2020 0:00:00    |2679 |
|10/6/2020 0:00:00    |2650 |
|24/6/2020 0:00:00    |2599 |
+---------------------+-----+
only showing top 10 rows



In [34]:
# Distribución de casos por edades de covid en Colombia.
df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [37]:
# Distribucion de casos por estado. 
df.groupBy('Estado').count().orderBy('count',ascending=False).show()

+---------+-----+
|   Estado|count|
+---------+-----+
|     Leve|94367|
|Fallecido| 4663|
|      N/A|  970|
+---------+-----+



In [38]:
# Punto 3 SparkSQL
df.createOrReplaceTempView("covid19")

In [45]:
# Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.
spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM covid19 GROUP BY `Nombre departamento` ORDER BY COUNT(*) DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



In [42]:
# Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.
spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM covid19 GROUP BY `Nombre municipio` ORDER BY COUNT(*) DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



In [43]:
# Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.
spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM covid19 GROUP BY `Fecha de notificación` ORDER BY COUNT(*) DESC""").show(10)

+---------------------+--------+
|Fecha de notificación|count(1)|
+---------------------+--------+
|    18/6/2020 0:00:00|    3477|
|    19/6/2020 0:00:00|    3328|
|    17/6/2020 0:00:00|    3318|
|    16/6/2020 0:00:00|    3232|
|    23/6/2020 0:00:00|    3230|
|    11/6/2020 0:00:00|    2747|
|    20/6/2020 0:00:00|    2684|
|    12/6/2020 0:00:00|    2679|
|    10/6/2020 0:00:00|    2650|
|    24/6/2020 0:00:00|    2599|
+---------------------+--------+
only showing top 10 rows



In [46]:
# Distribución de casos por edades de covid en Colombia.
spark.sql("""SELECT `Edad`, COUNT(*) FROM covid19 GROUP BY `Edad` ORDER BY Edad ASC""").show()

+----+--------+
|Edad|count(1)|
+----+--------+
|   1|     485|
|   2|     440|
|   3|     449|
|   4|     373|
|   5|     425|
|   6|     431|
|   7|     442|
|   8|     461|
|   9|     467|
|  10|     530|
|  11|     566|
|  12|     562|
|  13|     531|
|  14|     580|
|  15|     560|
|  16|     600|
|  17|     685|
|  18|    1160|
|  19|    1567|
|  20|    1674|
+----+--------+
only showing top 20 rows



In [51]:
# Distribucion de casos por estado. 
spark.sql("""SELECT `Estado`, COUNT(*) FROM covid19 GROUP BY `Estado` ORDER BY COUNT(*) DESC""").show()

+---------+--------+
|   Estado|count(1)|
+---------+--------+
|     Leve|   94367|
|Fallecido|    4663|
|      N/A|     970|
+---------+--------+



In [52]:
#Guardar
# Directorio 
write_uri='/content/gdrive/MyDrive/datasets/googlecolab_csv'
# Guardar como csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

In [53]:
# parquet
# Directorio
parquet_uri='/content/gdrive/MyDrive/datasets/googlecolab_parquet'
# Guardar como parquet
df.write.format('parquet').save(parquet_uri)