# Data Processing using Pyspark

In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
sc

In [7]:
# Load csv Dataset 
#df=spark.read.csv('s3://<bucket/dir>/sample_data.csv',inferSchema=True,header=True)
df=spark.read.csv('/content/gdrive/MyDrive/Telematica/st0263-2266/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [8]:
#columns of dataframe
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [9]:
#check number of columns
len(df.columns)

23

In [10]:
#number of records in dataframe
df.count()

100000

In [11]:
#shape of dataset
print((df.count(),len(df.columns)))

(100000, 23)


In [12]:
#printSchema
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [13]:
#fisrt few rows of dataframe
df.show()

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+------+-------------------+--------------------+----------+---------------------------+---------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----

In [14]:
#select only 2 columns
df.select('fecha reporte web','Nombre municipio', 'Estado').show(5)

+-----------------+----------------+------+
|fecha reporte web|Nombre municipio|Estado|
+-----------------+----------------+------+
| 6/3/2020 0:00:00|          BOGOTA|  Leve|
| 9/3/2020 0:00:00|            BUGA|  Leve|
| 9/3/2020 0:00:00|        MEDELLIN|  Leve|
|11/3/2020 0:00:00|        MEDELLIN|  Leve|
|11/3/2020 0:00:00|        MEDELLIN|  Leve|
+-----------------+----------------+------+
only showing top 5 rows



In [15]:
#change column name
df.select('ID de caso', 'Estado').withColumnRenamed('ID de caso', 'ID').show(5)

+---+------+
| ID|Estado|
+---+------+
|  1|  Leve|
|  2|  Leve|
|  3|  Leve|
|  4|  Leve|
|  5|  Leve|
+---+------+
only showing top 5 rows



In [16]:
#add column
df.select('ID de caso', 'Estado').withColumn('ID mas 10', (10+df['ID de caso'])).show(5)

+----------+------+---------+
|ID de caso|Estado|ID mas 10|
+----------+------+---------+
|         1|  Leve|       11|
|         2|  Leve|       12|
|         3|  Leve|       13|
|         4|  Leve|       14|
|         5|  Leve|       15|
+----------+------+---------+
only showing top 5 rows



In [17]:
#borrar columna
df.select('ID de caso','fecha reporte web','Nombre municipio', 'Estado').drop('fecha reporte web').show(5)

+----------+----------------+------+
|ID de caso|Nombre municipio|Estado|
+----------+----------------+------+
|         1|          BOGOTA|  Leve|
|         2|            BUGA|  Leve|
|         3|        MEDELLIN|  Leve|
|         4|        MEDELLIN|  Leve|
|         5|        MEDELLIN|  Leve|
+----------+----------------+------+
only showing top 5 rows



In [18]:
#filter the records 
df.filter(df['Estado']=='Fallecido').show()

+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+----------------+----+------------------------+----+----------------+------------------+---------+-------------------+--------------------+----------+---------------------------+-----------------+--------------------+---------------------+--------------------+------------------+-----------------------+
|fecha reporte web|ID de caso|Fecha de notificación|Código DIVIPOLA departamento|Nombre departamento|Código DIVIPOLA municipio|Nombre municipio|Edad|Unidad de medida de edad|Sexo|Tipo de contagio|Ubicación del caso|   Estado|Código ISO del país|     Nombre del país|Recuperado|Fecha de inicio de síntomas|  Fecha de muerte|Fecha de diagnóstico|Fecha de recuperación|Tipo de recuperación|Pertenencia étnica|Nombre del grupo étnico|
+-----------------+----------+---------------------+----------------------------+-------------------+-------------------------+-----------

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, IntegerType

estado_udf = udf(lambda estado: 'Estable' if estado == 'Leve' else 'Muerto', StringType())

#apply udf on dataframe
df.select('ID de caso', 'Estado').withColumn('Condicion', estado_udf(df.Estado)).show(5)

+----------+------+---------+
|ID de caso|Estado|Condicion|
+----------+------+---------+
|         1|  Leve|  Estable|
|         2|  Leve|  Estable|
|         3|  Leve|  Estable|
|         4|  Leve|  Estable|
|         5|  Leve|  Estable|
+----------+------+---------+
only showing top 5 rows



In [20]:
#Punto 3 Dataframes

In [21]:
#Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.

df.groupBy('Nombre departamento').count().orderBy('count',ascending=False).show(10,False)

+-------------------+-----+
|Nombre departamento|count|
+-------------------+-----+
|BOGOTA             |30016|
|BARRANQUILLA       |13065|
|ATLANTICO          |10994|
|VALLE              |10404|
|CARTAGENA          |8333 |
|ANTIOQUIA          |4554 |
|NARIÑO             |3520 |
|CUNDINAMARCA       |2827 |
|AMAZONAS           |2317 |
|CHOCO              |1636 |
+-------------------+-----+
only showing top 10 rows



In [22]:
#Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.

df.groupBy('Nombre municipio').count().orderBy('count',ascending=False).show(10,False)

+----------------+-----+
|Nombre municipio|count|
+----------------+-----+
|BOGOTA          |30016|
|BARRANQUILLA    |13065|
|CARTAGENA       |8333 |
|CALI            |7747 |
|SOLEDAD         |6233 |
|LETICIA         |2194 |
|MEDELLIN        |2137 |
|TUMACO          |1501 |
|BUENAVENTURA    |1453 |
|QUIBDO          |1367 |
+----------------+-----+
only showing top 10 rows



In [23]:
#Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.

df.groupBy('Fecha de notificación').count().orderBy('count',ascending=False).show(10,False)

+---------------------+-----+
|Fecha de notificación|count|
+---------------------+-----+
|18/6/2020 0:00:00    |3477 |
|19/6/2020 0:00:00    |3328 |
|17/6/2020 0:00:00    |3318 |
|16/6/2020 0:00:00    |3232 |
|23/6/2020 0:00:00    |3230 |
|11/6/2020 0:00:00    |2747 |
|20/6/2020 0:00:00    |2684 |
|12/6/2020 0:00:00    |2679 |
|10/6/2020 0:00:00    |2650 |
|24/6/2020 0:00:00    |2599 |
+---------------------+-----+
only showing top 10 rows



In [24]:
#Distribución de casos por edades de covid en Colombia.

df.groupBy('Edad').count().orderBy('Edad',ascending=True).show()

+----+-----+
|Edad|count|
+----+-----+
|   1|  485|
|   2|  440|
|   3|  449|
|   4|  373|
|   5|  425|
|   6|  431|
|   7|  442|
|   8|  461|
|   9|  467|
|  10|  530|
|  11|  566|
|  12|  562|
|  13|  531|
|  14|  580|
|  15|  560|
|  16|  600|
|  17|  685|
|  18| 1160|
|  19| 1567|
|  20| 1674|
+----+-----+
only showing top 20 rows



In [25]:
#Distribucion de casos por genero. 

df.groupBy('Sexo').count().orderBy('Sexo',ascending=True).show()

+----+-----+
|Sexo|count|
+----+-----+
|   F|45902|
|   M|54098|
+----+-----+



In [26]:
#Punto 3 SparkSQL

In [27]:
df.createOrReplaceTempView("covid19")

In [28]:
#Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.

spark.sql("""SELECT `Nombre departamento`, COUNT(*) FROM covid19 GROUP BY `Nombre departamento` ORDER BY 2 DESC""").show(10)

+-------------------+--------+
|Nombre departamento|count(1)|
+-------------------+--------+
|             BOGOTA|   30016|
|       BARRANQUILLA|   13065|
|          ATLANTICO|   10994|
|              VALLE|   10404|
|          CARTAGENA|    8333|
|          ANTIOQUIA|    4554|
|             NARIÑO|    3520|
|       CUNDINAMARCA|    2827|
|           AMAZONAS|    2317|
|              CHOCO|    1636|
+-------------------+--------+
only showing top 10 rows



In [29]:
#Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.

spark.sql("""SELECT `Nombre municipio`, COUNT(*) FROM covid19 GROUP BY `Nombre municipio` ORDER BY 2 DESC""").show(10)

+----------------+--------+
|Nombre municipio|count(1)|
+----------------+--------+
|          BOGOTA|   30016|
|    BARRANQUILLA|   13065|
|       CARTAGENA|    8333|
|            CALI|    7747|
|         SOLEDAD|    6233|
|         LETICIA|    2194|
|        MEDELLIN|    2137|
|          TUMACO|    1501|
|    BUENAVENTURA|    1453|
|          QUIBDO|    1367|
+----------------+--------+
only showing top 10 rows



In [30]:
#Los 10 días con más casos de covid en Colombia ordenados de mayor a menor.

spark.sql("""SELECT `Fecha de notificación`, COUNT(*) FROM covid19 GROUP BY `Fecha de notificación` ORDER BY 2 DESC""").show(10)

+---------------------+--------+
|Fecha de notificación|count(1)|
+---------------------+--------+
|    18/6/2020 0:00:00|    3477|
|    19/6/2020 0:00:00|    3328|
|    17/6/2020 0:00:00|    3318|
|    16/6/2020 0:00:00|    3232|
|    23/6/2020 0:00:00|    3230|
|    11/6/2020 0:00:00|    2747|
|    20/6/2020 0:00:00|    2684|
|    12/6/2020 0:00:00|    2679|
|    10/6/2020 0:00:00|    2650|
|    24/6/2020 0:00:00|    2599|
+---------------------+--------+
only showing top 10 rows



In [31]:
#Distribución de casos por edades de covid en Colombia.

spark.sql("""SELECT `Edad`, COUNT(*) FROM covid19 GROUP BY `Edad` ORDER BY 1 ASC""").show()

+----+--------+
|Edad|count(1)|
+----+--------+
|   1|     485|
|   2|     440|
|   3|     449|
|   4|     373|
|   5|     425|
|   6|     431|
|   7|     442|
|   8|     461|
|   9|     467|
|  10|     530|
|  11|     566|
|  12|     562|
|  13|     531|
|  14|     580|
|  15|     560|
|  16|     600|
|  17|     685|
|  18|    1160|
|  19|    1567|
|  20|    1674|
+----+--------+
only showing top 20 rows



In [32]:
#Distribucion de casos por genero. 

spark.sql("""SELECT `Sexo`, COUNT(*) FROM covid19 GROUP BY `Sexo` ORDER BY 1 ASC""").show()

+----+--------+
|Sexo|count(1)|
+----+--------+
|   F|   45902|
|   M|   54098|
+----+--------+



In [33]:
#Guardar

In [34]:
#target directory 
write_uri='/content/gdrive/MyDrive/Telematica/googlecolab_csv'

In [35]:
#save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

In [36]:
# parquet

In [37]:
#target location
parquet_uri='/content/gdrive/MyDrive/Telematica/googlecolab_parquet'

In [38]:
#save the data into parquet format 
df.write.format('parquet').save(parquet_uri)