In [1]:
######################################################################################################################
# VERSION  	DESARROLLADOR             FECHA        DESCRIPCION
# -------------------------------------------------------------
#  1        Walter Albites Azarte     02/02/2023   Challenge Coronavirus report Notebook utilizando clases
#                                                  utilitarias y de negocio en pyspark.
#                                                  Objetivo Casos a nivel Mundial de Coronavirus reportados por Mes
#
# Dataset : https://www.kaggle.com/datasets/imdevskp/corona-virus-report
######################################################################################################################

In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.1.2-bin-hadoop2.7'

In [3]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf= pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
from utilitario import Utils
from logica_negocio import BusinessLogic

In [6]:
utils = Utils()

In [7]:
businesslogic=BusinessLogic()

In [8]:
path_raw = 'C:/Users/walbi/Documents/challenge-covid/clases/raw/'

In [9]:
path_udv = 'C:/Users/walbi/Documents/challenge-covid/clases/udv/'

In [10]:
#Leer csv con la sesion de spark
#df_covid_19=utils.read_csv(spark,path_name)
#df_covid_19.show(5,False)

In [11]:
############################################ worldometer

In [12]:
titles=['Country/Region',
 'Continent',
 'Population',
 'TotalCases',
 'NewCases',
 'TotalDeaths',
 'NewDeaths',
 'TotalRecovered',
 'NewRecovered',
 'ActiveCases',
 'Serious,Critical',
 'Tot Cases/1M pop',
 'Deaths/1M pop',
 'TotalTests',
 'Tests/1M pop',
 'WHO Region']

In [13]:
#Hay que hacer orientado a objetos (Clases en notebooks).
#Leer csv rdd de la clase Utils
df_covid_19_rdd=utils.read_csv_rdd(sc,path_raw+'worldometer_data.csv',titles)
df_covid_19_rdd.show(5,False)

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|Country/Region|Continent    |Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious,Critical|Tot Cases/1M pop|Deaths/1M pop|TotalTests|Tests/1M pop|WHO Region    |
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|USA           |North America|331198130 |5032179   |        |162804     |         |2576668       |            |2292707    |18296           |15194           |492          |63139605  |190640      |Americas      |
|Brazil        |South America|212710692 |2917562   |        |98644      |         |2047660       |            |771258     |8318            |13716           

In [14]:
df_covid_19_rdd.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: string (nullable = true)
 |-- TotalCases: string (nullable = true)
 |-- NewCases: string (nullable = true)
 |-- TotalDeaths: string (nullable = true)
 |-- NewDeaths: string (nullable = true)
 |-- TotalRecovered: string (nullable = true)
 |-- NewRecovered: string (nullable = true)
 |-- ActiveCases: string (nullable = true)
 |-- Serious,Critical: string (nullable = true)
 |-- Tot Cases/1M pop: string (nullable = true)
 |-- Deaths/1M pop: string (nullable = true)
 |-- TotalTests: string (nullable = true)
 |-- Tests/1M pop: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [15]:
#Hay que tratar los tipos de datos en la ultima capa (Ej: Todo que es número, 
#debe ser número, no string).
df_covid_19_udv=businesslogic.transformacion_tipos_datos_covid(df_covid_19_rdd)
df_covid_19_udv.show(5,False)

+-------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+--------------+------------+----------+-----------+--------------+
|CountryRegion|Continent    |Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|Serious/Critical|TotCases/1Mpop|Deaths/1Mpop|TotalTests|Tests/1Mpop|WHORegion     |
+-------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+--------------+------------+----------+-----------+--------------+
|USA          |North America|331198130 |5032179   |null    |162804     |null     |2576668       |null        |2292707    |18296           |15194         |492         |63139605  |190640     |Americas      |
|Brazil       |South America|212710692 |2917562   |null    |98644      |null     |2047660       |null        |771258     |8318            |13716         |464         |13206188 

In [16]:
df_covid_19_udv.printSchema()

root
 |-- CountryRegion: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- TotalCases: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- TotalDeaths: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- TotalRecovered: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- ActiveCases: integer (nullable = true)
 |-- Serious/Critical: integer (nullable = true)
 |-- TotCases/1Mpop: integer (nullable = true)
 |-- Deaths/1Mpop: integer (nullable = true)
 |-- TotalTests: integer (nullable = true)
 |-- Tests/1Mpop: integer (nullable = true)
 |-- WHORegion: string (nullable = true)



In [17]:
df_covid_19_udv.count()

209

In [18]:
#Escribir en parquet
utils.write_df_parquet(df_covid_19_udv,path_udv+'covid_19_udv')

In [19]:
#verificar parquet
df_covid_19_udv_read=utils.read_parquet(spark,path_udv+'covid_19_udv')
df_covid_19_udv_read.count()

209

In [20]:
############################################ full_grouped

In [21]:
titles_grouped=['Date',
 'Country/Region',
 'Confirmed',
 'Deaths',
 'Recovered',
 'Active',
 'New cases',
 'New deaths',
 'New recovered',
 'WHO Region']

In [22]:
#Leer csv rdd de la clase Utils
df_covid_19_grouped_rdd=utils.read_csv_rdd(sc,path_raw+'full_grouped.csv',titles_grouped)
df_covid_19_grouped_rdd.show(5,False)

+----------+--------------+---------+------+---------+------+---------+----------+-------------+---------------------+
|Date      |Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|WHO Region           |
+----------+--------------+---------+------+---------+------+---------+----------+-------------+---------------------+
|2020-01-22|Afghanistan   |0        |0     |0        |0     |0        |0         |0            |Eastern Mediterranean|
|2020-01-22|Albania       |0        |0     |0        |0     |0        |0         |0            |Europe               |
|2020-01-22|Algeria       |0        |0     |0        |0     |0        |0         |0            |Africa               |
|2020-01-22|Andorra       |0        |0     |0        |0     |0        |0         |0            |Europe               |
|2020-01-22|Angola        |0        |0     |0        |0     |0        |0         |0            |Africa               |
+----------+--------------+---------+------+----

In [23]:
df_covid_19_grouped_rdd.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [24]:
#Hay que tratar los tipos de datos en la ultima capa (Ej: Todo que es número, 
#debe ser número, no string).
df_covid_19_grouped_udv=businesslogic.transformacion_tipos_datos_covid_grouped(df_covid_19_grouped_rdd)
df_covid_19_grouped_udv.show(5,False)

+----------+-------------+---------+------+---------+------+--------+---------+------------+---------------------+
|Date      |CountryRegion|Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|WHORegion            |
+----------+-------------+---------+------+---------+------+--------+---------+------------+---------------------+
|2020-01-22|Afghanistan  |0        |0     |0        |0     |0       |0        |0           |Eastern Mediterranean|
|2020-01-22|Albania      |0        |0     |0        |0     |0       |0        |0           |Europe               |
|2020-01-22|Algeria      |0        |0     |0        |0     |0       |0        |0           |Africa               |
|2020-01-22|Andorra      |0        |0     |0        |0     |0       |0        |0           |Europe               |
|2020-01-22|Angola       |0        |0     |0        |0     |0       |0        |0           |Africa               |
+----------+-------------+---------+------+---------+------+--------+---------+-

In [25]:
df_covid_19_grouped_udv.printSchema()

root
 |-- Date: string (nullable = true)
 |-- CountryRegion: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- NewCases: integer (nullable = true)
 |-- NewDeaths: integer (nullable = true)
 |-- NewRecovered: integer (nullable = true)
 |-- WHORegion: string (nullable = true)



In [26]:
df_covid_19_grouped_udv.count()

35156

In [27]:
#Escribir en parquet
utils.write_df_parquet(df_covid_19_grouped_udv,path_udv+'covid_19_grouped_udv')

In [28]:
#verificar parquet
df_covid_19_grouped_udv_read=utils.read_parquet(spark,path_udv+'covid_19_grouped_udv')
df_covid_19_grouped_udv_read.count()

35156

In [29]:
#Tablas para join
df_covid_19_udv_select=df_covid_19_udv_read.select('CountryRegion',
 'Continent',
 'Population',
 'TotalCases',
 'TotalDeaths',
 'TotalRecovered')
df_covid_19_udv_select.show(5,False)

+-------------+-------------+----------+----------+-----------+--------------+
|CountryRegion|Continent    |Population|TotalCases|TotalDeaths|TotalRecovered|
+-------------+-------------+----------+----------+-----------+--------------+
|USA          |North America|331198130 |5032179   |162804     |2576668       |
|Brazil       |South America|212710692 |2917562   |98644      |2047660       |
|India        |Asia         |1381344997|2025409   |41638      |1377384       |
|Russia       |Europe       |145940924 |871894    |14606      |676357        |
|South Africa |Africa       |59381566  |538184    |9604       |387316        |
+-------------+-------------+----------+----------+-----------+--------------+
only showing top 5 rows



In [30]:
df_covid_19_grouped_udv_read.show(5,False)

+----------+-------------+---------+------+---------+------+--------+---------+------------+---------------------+
|Date      |CountryRegion|Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|WHORegion            |
+----------+-------------+---------+------+---------+------+--------+---------+------------+---------------------+
|2020-01-22|Afghanistan  |0        |0     |0        |0     |0       |0        |0           |Eastern Mediterranean|
|2020-01-22|Albania      |0        |0     |0        |0     |0       |0        |0           |Europe               |
|2020-01-22|Algeria      |0        |0     |0        |0     |0       |0        |0           |Africa               |
|2020-01-22|Andorra      |0        |0     |0        |0     |0       |0        |0           |Europe               |
|2020-01-22|Angola       |0        |0     |0        |0     |0       |0        |0           |Africa               |
+----------+-------------+---------+------+---------+------+--------+---------+-

In [31]:
#Join
df_result=businesslogic.join_df(df_covid_19_udv_select,df_covid_19_grouped_udv_read,'inner')
df_result.show(5,False)

+-------------+---------+----------+----------+-----------+--------------+----------+---------+------+---------+------+--------+---------+------------+---------------------+
|CountryRegion|Continent|Population|TotalCases|TotalDeaths|TotalRecovered|Date      |Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|WHORegion            |
+-------------+---------+----------+----------+-----------+--------------+----------+---------+------+---------+------+--------+---------+------------+---------------------+
|Afghanistan  |Asia     |39009447  |36896     |1298       |25840         |2020-01-22|0        |0     |0        |0     |0       |0        |0           |Eastern Mediterranean|
|Albania      |Europe   |2877470   |6016      |188        |3155          |2020-01-22|0        |0     |0        |0     |0       |0        |0           |Europe               |
|Algeria      |Africa   |43926079  |33626     |1273       |23238         |2020-01-22|0        |0     |0        |0     |0       |0 

In [32]:
df_result_ordenado=df_result.select(
'Continent',
'CountryRegion',
 'Date',
date_format(to_date("Date", 'yyyy-MM-dd'), 'yyyy').alias('Anio'),
date_format(to_date("Date", 'yyyy-MM-dd'), 'MM').alias('Mes'),
 'Population',
 'TotalCases',
 'TotalDeaths',
 'TotalRecovered',
 'Confirmed',
 'Deaths',
 'Recovered',
 'Active',
 'NewCases',
 'NewDeaths',
 'NewRecovered'
).orderBy('CountryRegion','Date')

In [33]:
# Calcular Agrupamiento de casos por mes
df_result_calcular_casos=businesslogic.calcular_casos_por_mes(df_result_ordenado)
df_result_calcular_casos.show(5)

+-----------------+--------------------+----------+----+---+---------+-------+---------+--------+--------+---------+------------+
|        Continent|       CountryRegion|Population|Anio|Mes|Confirmed| Deaths|Recovered|  Active|NewCases|NewDeaths|NewRecovered|
+-----------------+--------------------+----------+----+---+---------+-------+---------+--------+--------+---------+------------+
|             Asia|         Afghanistan|  39009447|2020| 01|        0|      0|        0|       0|       0|        0|           0|
|             Asia|         Afghanistan|  39009447|2020| 02|        6|      0|        0|       6|       1|        0|           0|
|             Asia|         Afghanistan|  39009447|2020| 03|     1219|     29|       26|    1164|     173|        4|           5|
|             Asia|         Afghanistan|  39009447|2020| 04|    27237|    860|     2927|   23450|    1997|       60|         255|
|             Asia|         Afghanistan|  39009447|2020| 05|   225655|   4994|    24129|  