In [82]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark

In [83]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [84]:
import findspark
from pyspark.sql.functions import desc, asc
from pyspark.sql.functions import lit
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("data_processing")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.1034,org.apache.hadoop:hadoop-aws:3.3.4")\
    .getOrCreate()
    
sc = spark.sparkContext

In [85]:

df=spark.read.csv("/content/drive/MyDrive/datasets/Casos_positivos_de_COVID-19_en_Colombia-100K.csv",inferSchema=True,header=True)


2. Realizar el siguiente analisis:

columnas

tipos de datos

seleccionar algunas columnas

RENOMBRAR COLUMNAS (esto se recomienda hacerlo para facilitar el 
procesamiento posterior)

agregar columnas

borrar columnas

filtrar datos

ejecutar alguna función UDF o lambda sobre alguna columna creando una nueva.

In [86]:
# columnas
print(df.columns)

['fecha reporte web', 'ID de caso', 'Fecha de notificación', 'Código DIVIPOLA departamento', 'Nombre departamento', 'Código DIVIPOLA municipio', 'Nombre municipio', 'Edad', 'Unidad de medida de edad', 'Sexo', 'Tipo de contagio', 'Ubicación del caso', 'Estado', 'Código ISO del país', 'Nombre del país', 'Recuperado', 'Fecha de inicio de síntomas', 'Fecha de muerte', 'Fecha de diagnóstico', 'Fecha de recuperación', 'Tipo de recuperación', 'Pertenencia étnica', 'Nombre del grupo étnico']


In [87]:
# tipos de datos
print([f.dataType for f in df.schema.fields])

[StringType(), IntegerType(), StringType(), IntegerType(), StringType(), IntegerType(), StringType(), IntegerType(), IntegerType(), StringType(), StringType(), StringType(), StringType(), IntegerType(), StringType(), StringType(), StringType(), StringType(), StringType(), StringType(), StringType(), IntegerType(), StringType()]


In [88]:
# seleccionar columnas
print(df.select("Nombre municipio").show())

+----------------+
|Nombre municipio|
+----------------+
|          BOGOTA|
|            BUGA|
|        MEDELLIN|
|        MEDELLIN|
|        MEDELLIN|
|          ITAGUI|
|       CARTAGENA|
|          BOGOTA|
|          BOGOTA|
|          BOGOTA|
|          BOGOTA|
|           NEIVA|
|           NEIVA|
|         PALMIRA|
|   VILLAVICENCIO|
|          BOGOTA|
|          BOGOTA|
|          BOGOTA|
|          BOGOTA|
|        MEDELLIN|
+----------------+
only showing top 20 rows

None


In [89]:
# renombrar columnas
print(df.withColumnRenamed("Sexo", "Género").columns)

['fecha reporte web', 'ID de caso', 'Fecha de notificación', 'Código DIVIPOLA departamento', 'Nombre departamento', 'Código DIVIPOLA municipio', 'Nombre municipio', 'Edad', 'Unidad de medida de edad', 'Género', 'Tipo de contagio', 'Ubicación del caso', 'Estado', 'Código ISO del país', 'Nombre del país', 'Recuperado', 'Fecha de inicio de síntomas', 'Fecha de muerte', 'Fecha de diagnóstico', 'Fecha de recuperación', 'Tipo de recuperación', 'Pertenencia étnica', 'Nombre del grupo étnico']


In [90]:
# agregar columnas
print(df.withColumn("Datos Analizados", lit(True)).show())

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Datos Analizados|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------

In [91]:
# borrar columnas
print(df.drop("Código DIVIPOLA departamento").columns)

['fecha reporte web', 'ID de caso', 'Fecha de notificación', 'Nombre departamento', 'Código DIVIPOLA municipio', 'Nombre municipio', 'Edad', 'Unidad de medida de edad', 'Sexo', 'Tipo de contagio', 'Ubicación del caso', 'Estado', 'Código ISO del país', 'Nombre del país', 'Recuperado', 'Fecha de inicio de síntomas', 'Fecha de muerte', 'Fecha de diagnóstico', 'Fecha de recuperación', 'Tipo de recuperación', 'Pertenencia étnica', 'Nombre del grupo étnico']


In [92]:
# filtrar datos
print(df.filter(df["Nombre departamento"] == 'ANTIOQUIA').show())

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [93]:
# ejecutar alguna funcion UDF o Lambda
import pyspark.sql.functions as F
df.withColumn("Pertenencia étnica + 1", F.udf(lambda x: x+1)("Pertenencia étnica")).show()

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|Pertenencia étnica + 1|
+-----------------+----------+---------------------+----------------------------+-------------------+-

3. Contestar las siguientes preguntas con dataframes y sparkSQL

3.1 Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.

3.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.

3.3 Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.

3.4 Distribución de casos por edades de covid en Colombia.

3.5 Realice la pregunda de negocio que quiera sobre los datos y respondala con la correspondiente programación en spark.

In [94]:
# Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.


df.groupBy("Nombre departamento").count().orderBy(desc("count")).show(10)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|             BOGOTA|30016|
|       BARRANQUILLA|13065|
|          ATLANTICO|10994|
|              VALLE|10404|
|          CARTAGENA| 8333|
|          ANTIOQUIA| 4554|
|             NARIÑO| 3520|
|       CUNDINAMARCA| 2827|
|           AMAZONAS| 2317|
|              CHOCO| 1636|
+-------------------+-----+
only showing top 10 rows



In [95]:
# Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.


df.groupBy("Nombre municipio").count().orderBy(desc("count")).show(10)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|          BOGOTA|30016|
|    BARRANQUILLA|13065|
|       CARTAGENA| 8333|
|            CALI| 7747|
|         SOLEDAD| 6233|
|         LETICIA| 2194|
|        MEDELLIN| 2137|
|          TUMACO| 1501|
|    BUENAVENTURA| 1453|
|          QUIBDO| 1367|
+----------------+-----+
only showing top 10 rows



In [96]:
# Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.


df.groupBy("Fecha de diagnóstico").count().orderBy(desc("count")).show(10)

+--------------------+-----+
|Fecha de diagnóstico|count|
+--------------------+-----+
|   26/6/2020 0:00:00| 4390|
|   27/6/2020 0:00:00| 4019|
|   28/6/2020 0:00:00| 3580|
|   25/6/2020 0:00:00| 3381|
|   19/6/2020 0:00:00| 3053|
|   18/6/2020 0:00:00| 3040|
|   23/6/2020 0:00:00| 3031|
|   22/6/2020 0:00:00| 2938|
|   21/6/2020 0:00:00| 2781|
|   24/6/2020 0:00:00| 2564|
+--------------------+-----+
only showing top 10 rows



In [97]:
# Distribución de casos por edades de covid en Colombia.
df.groupBy().pivot("Edad").count().show()

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17|  18|  19|  20|  21|  22|  23|  24|  25|  26|  27|  28|  29|  30|  31|  32|  33|  34|  35|  36|  37|  38|  39|  40|  41|  42|  43|  44|  45|  46|  47|  48|  49|  50|  51|  52|  53|  54|  55|  56|  57|  58|  59|  60| 61| 62| 63| 64| 65| 66| 67| 68| 69| 70| 71| 72| 73| 74| 75| 76| 77| 78| 79| 80| 81| 82| 83| 84| 85| 86| 87| 88| 89| 90| 91| 92| 93| 94| 95| 96| 97| 98| 99|100|101|102|103|104|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----

In [98]:
# Realice la pregunda de negocio que quiera sobre los datos y respondala con la correspondiente programación en spark.
# ciudades con menos casos de covid
df.groupBy("Nombre municipio").count().orderBy(asc("count")).show(10)

+------------------+-----+
|  Nombre municipio|count|
+------------------+-----+
|           CABRERA|    1|
|            TOTORO|    1|
|           YAGUARA|    1|
|            DARIEN|    1|
|             NEIRA|    1|
|      ANSERMANUEVO|    1|
|SAN MIGUEL DE SEMA|    1|
|          PAILITAS|    1|
|           BELTRAN|    1|
|           ANSERMA|    1|
+------------------+-----+
only showing top 10 rows

