# Data Processing using Pyspark

In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
sc

In [7]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [8]:
#columns of dataframe
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [9]:
#printSchema
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [12]:
df.select('ID de caso', 'fecha reporte web', 'Fecha de notificación', 'Sexo').show(4)

+----------+-----------------+---------------------+----+
|ID de caso|fecha reporte web|Fecha de notificación|Sexo|
+----------+-----------------+---------------------+----+
|         1| 6/3/2020 0:00:00|     2/3/2020 0:00:00|   F|
|         2| 9/3/2020 0:00:00|     6/3/2020 0:00:00|   M|
|         3| 9/3/2020 0:00:00|     7/3/2020 0:00:00|   F|
|         4|11/3/2020 0:00:00|     9/3/2020 0:00:00|   M|
+----------+-----------------+---------------------+----+
only showing top 4 rows



In [18]:
df.select('Nombre Departamento', 'Edad').withColumnRenamed('Nombre Departamento', 'Dep').show(5)

+---------+----+
|      Dep|Edad|
+---------+----+
|   BOGOTA|  19|
|    VALLE|  34|
|ANTIOQUIA|  50|
|ANTIOQUIA|  55|
|ANTIOQUIA|  25|
+---------+----+
only showing top 5 rows



In [24]:
df.select('ID de caso', 'Fecha de recuperación', 'Edad').withColumn("Edad luego 10 años",(df["Edad"]+10)).show(10,False)

+----------+---------------------+----+------------------+
|ID de caso|Fecha de recuperación|Edad|Edad luego 10 años|
+----------+---------------------+----+------------------+
|1         |13/3/2020 0:00:00    |19  |29                |
|2         |19/3/2020 0:00:00    |34  |44                |
|3         |15/3/2020 0:00:00    |50  |60                |
|4         |26/3/2020 0:00:00    |55  |65                |
|5         |23/3/2020 0:00:00    |25  |35                |
|6         |26/3/2020 0:00:00    |27  |37                |
|7         |17/3/2020 0:00:00    |85  |95                |
|8         |21/3/2020 0:00:00    |22  |32                |
|9         |23/3/2020 0:00:00    |28  |38                |
|10        |21/3/2020 0:00:00    |36  |46                |
+----------+---------------------+----+------------------+
only showing top 10 rows



In [25]:
df.select('ID de caso', 'Fecha de recuperación').drop('ID de caso', 'fecha de recuperacion').show(5)

+---------------------+
|Fecha de recuperación|
+---------------------+
|    13/3/2020 0:00:00|
|    19/3/2020 0:00:00|
|    15/3/2020 0:00:00|
|    26/3/2020 0:00:00|
|    23/3/2020 0:00:00|
+---------------------+
only showing top 5 rows



In [26]:
df.filter(df['Edad']=='19').show(5)

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [27]:
from pyspark.sql.functions import udf

In [28]:
from pyspark.sql.types import StringType

In [29]:
def menores_de_edad(Edad):
  if Edad >= '18':
    return "ERES MAYOR DE EDAD"
  else:
    return "No eres mayor de edad" 

In [30]:
eda_udf=udf(menores_de_edad,StringType())
df.select('Edad', 'Sexo').withColumn('Edades totales', eda_udf(df['Sexo'])).show(5)

+----+----+------------------+
|Edad|Sexo|    Edades totales|
+----+----+------------------+
|  19|   F|ERES MAYOR DE EDAD|
|  34|   M|ERES MAYOR DE EDAD|
|  50|   F|ERES MAYOR DE EDAD|
|  55|   M|ERES MAYOR DE EDAD|
|  25|   M|ERES MAYOR DE EDAD|
+----+----+------------------+
only showing top 5 rows



In [33]:
df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+
only showing top 10 rows



In [34]:
df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+
only showing top 10 rows



In [35]:
df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

+---------------------+-----+
|Fecha de notificación|count|
+---------------------+-----+
|18/6/2020 0:00:00    |3477 |
|19/6/2020 0:00:00    |3328 |
|17/6/2020 0:00:00    |3318 |
|16/6/2020 0:00:00    |3232 |
|23/6/2020 0:00:00    |3230 |
|11/6/2020 0:00:00    |2747 |
|20/6/2020 0:00:00    |2684 |
|12/6/2020 0:00:00    |2679 |
|10/6/2020 0:00:00    |2650 |
|24/6/2020 0:00:00    |2599 |
+---------------------+-----+
only showing top 10 rows



In [36]:
df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [37]:
df.groupBy('Estado').count().orderBy('Estado',ascending=True).show()

+---------+-----+
|   Estado|count|
+---------+-----+
|Fallecido| 4663|
|     Leve|94367|
|      N/A|  970|
+---------+-----+



In [46]:
# In Python
from pyspark.sql import SparkSession
# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())
# Path to data set
csv_file = "/content/gdrive/MyDrive/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv"
# Read and create a temporary view
# Infer schema (note that for larger files you 
# may want to specify the schema)
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("CASOS")

In [53]:
spark.sql("""SELECT Edad, COUNT(*) FROM CASOS GROUP BY Edad""").show(10)

+----+--------+
|Edad|count(1)|
+----+--------+
|  31|    2569|
|  85|     224|
|  65|     736|
|  53|    1307|
|  78|     346|
|  34|    2310|
| 101|       1|
|  81|     275|
|  28|    2540|
|  76|     361|
+----+--------+
only showing top 10 rows



In [52]:
spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM CASOS GROUP BY `Nombre departamento` ORDER BY 2  DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



In [54]:
spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM CASOS GROUP BY `Nombre municipio` ORDER BY 2  DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



In [55]:
spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM CASOS GROUP BY `Fecha de notificación` ORDER BY 2  DESC""").show(10)

+---------------------+--------+
|Fecha de notificación|count(1)|
+---------------------+--------+
|    18/6/2020 0:00:00|    3477|
|    19/6/2020 0:00:00|    3328|
|    17/6/2020 0:00:00|    3318|
|    16/6/2020 0:00:00|    3232|
|    23/6/2020 0:00:00|    3230|
|    11/6/2020 0:00:00|    2747|
|    20/6/2020 0:00:00|    2684|
|    12/6/2020 0:00:00|    2679|
|    10/6/2020 0:00:00|    2650|
|    24/6/2020 0:00:00|    2599|
+---------------------+--------+
only showing top 10 rows



In [56]:
spark.sql("""SELECT `Estado`, COUNT(*) FROM CASOS GROUP BY `Estado` ORDER BY 2  DESC""").show(10)

+---------+--------+
|   Estado|count(1)|
+---------+--------+
|     Leve|   94367|
|Fallecido|    4663|
|      N/A|     970|
+---------+--------+

