## Titulo: N5 Now - Challenge Técnico 

<br>

#### Autor: Nicolas San Martin

<br> 

#### Descripción: el desarrollo actual proporciona dos formas de abordar el siguiente punto del challenge: 'Cargar los datasets utilizando Spark y mantenerlos en formato parquet'. La primer parte se hará mediante el uso de Dataframe's, y la segunda, un ejemplo de como tratar uno de los archivos con RDD's.

<br>

#### Consideraciones iniciales previo al comienzo del desarrollo:
<br>

- Para cada uno de los esquemas definidos en los Dataframe's, no se hace validación de los datos, como por ejemplo, que una columna no tenga valores nulos (a través del párametro nullable), así como tampoco la adición de metadata.

- Si bien en la descripción del challenge se especifica que los datos deben ser tratados en la última capa, de todas formas se proporciona una manera por la cual los datasets pueden ser leidos especificando el esquema previamente.

- No se realiza manejo de excepciones.

- No se realiza limpieza de los nombres de las columnas.

In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.4.1-bin-hadoop3'

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, LongType, IntegerType, FloatType, DoubleType, TimestampType, BooleanType, StringType, DateType, StructType
from pyspark.sql.functions import col, to_date

In [3]:
app_name = "challenge_tecnico_n5"
partitions_number = 3 

In [4]:
spark = SparkSession.builder \
    .appName(app_name) \
    .config("spark.sql.shuffle.partitions", partitions_number) \
    .getOrCreate()

## Alternativa 1: Desarrollo con Dataframe's

#### Definición de rutas

###### Se define la ruta del 'datalake' donde se almacenan los datos de covid, y los nombres de los archivos en caso de que en otro paso del desarrollo sea necesaria la reutilización.

In [5]:
path_datalake = "input/datalake/kaggle/covid"

file_name_country_wise_latest = "country_wise_latest"
file_name_covid_19_clean_complete_df = "covid_19_clean_complete"
file_name_day_wise_df = "day_wise"
file_name_full_grouped_df = "full_grouped"
file_name_usa_county_wise_df = "usa_county_wise"
file_name_worldometer_data_df = "worldometer_data"

#### Lectura de archivos csv

###### Siguiendo los lineamientos definidos en la descripción del challenge, en esta etapa no se tratan los tipos de datos de cada uno de los archivos, sino que se realiza el casteo de los datos posteriormente.- Por eso, al parametro inferSchema no le paso el esquema y lo dejo en false.

In [6]:
country_wise_latest_df = spark.read.csv(f"{path_datalake}/{file_name_country_wise_latest}.csv", \
                                        header = True, \
                                        inferSchema = False) 

covid_19_clean_complete_df = spark.read.csv(f"{path_datalake}/{file_name_covid_19_clean_complete_df}.csv", \
                                            header = True, \
                                            inferSchema = True)

day_wise_df = spark.read.csv(f"{path_datalake}/{file_name_day_wise_df}.csv", \
                             header = True, \
                             inferSchema = True)

full_grouped_df = spark.read.csv(f"{path_datalake}/{file_name_full_grouped_df}.csv", \
                                 header = True, \
                                 inferSchema = True)

usa_county_wise_df = spark.read.csv(f"{path_datalake}/{file_name_usa_county_wise_df}.csv", \
                                    header = True, \
                                    inferSchema = True)

worldometer_data_df = spark.read.csv(f"{path_datalake}/{file_name_worldometer_data_df}.csv", \
                                     header = True, \
                                     inferSchema = True)

#### Repartición en particiones

In [7]:
country_wise_latest_df = country_wise_latest_df.repartition(partitions_number)
covid_19_clean_complete_df = covid_19_clean_complete_df.repartition(partitions_number)
day_wise_df = day_wise_df.repartition(partitions_number)
full_grouped_df = full_grouped_df.repartition(partitions_number)
usa_county_wise_df = usa_county_wise_df.repartition(partitions_number)
worldometer_data_df = worldometer_data_df.repartition(partitions_number)

#### Validación de lectura 

In [8]:
country_wise_latest_df

DataFrame[Country/Region: string, Confirmed: string, Deaths: string, Recovered: string, Active: string, New cases: string, New deaths: string, New recovered: string, Deaths / 100 Cases: string, Recovered / 100 Cases: string, Deaths / 100 Recovered: string, Confirmed last week: string, 1 week change: string, 1 week % increase: string, WHO Region: string]

In [9]:
covid_19_clean_complete_df

DataFrame[Province/State: string, Country/Region: string, Lat: double, Long: double, Date: date, Confirmed: int, Deaths: int, Recovered: int, Active: int, WHO Region: string]

In [10]:
day_wise_df

DataFrame[Date: date, Confirmed: int, Deaths: int, Recovered: int, Active: int, New cases: int, New deaths: int, New recovered: int, Deaths / 100 Cases: double, Recovered / 100 Cases: double, Deaths / 100 Recovered: double, No. of countries: int]

In [11]:
full_grouped_df

DataFrame[Date: date, Country/Region: string, Confirmed: int, Deaths: int, Recovered: int, Active: int, New cases: int, New deaths: int, New recovered: int, WHO Region: string]

In [12]:
usa_county_wise_df

DataFrame[UID: int, iso2: string, iso3: string, code3: int, FIPS: double, Admin2: string, Province_State: string, Country_Region: string, Lat: double, Long_: double, Combined_Key: string, Date: string, Confirmed: int, Deaths: int]

In [13]:
worldometer_data_df

DataFrame[Country/Region: string, Continent: string, Population: int, TotalCases: int, NewCases: int, TotalDeaths: int, NewDeaths: int, TotalRecovered: int, NewRecovered: int, ActiveCases: int, Serious,Critical: int, Tot Cases/1M pop: int, Deaths/1M pop: double, TotalTests: int, Tests/1M pop: int, WHO Region: string]

#### Definición de esquemas de datos

###### Si bien no se usarán los esquemas, se definen a modo de ejemplo.

In [14]:
optional_schema_country_wise_latest = StructType([
    StructField("Country/Region", StringType(), True),
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True),
    StructField("Recovered", IntegerType(), True),
    StructField("Active", IntegerType(), True),
    StructField("New cases", IntegerType(), True),
    StructField("New deaths", IntegerType(), True),
    StructField("New recovered", IntegerType(), True),
    StructField("Deaths / 100 Cases", FloatType(), True),
    StructField("Recovered / 100 Cases", FloatType(), True),
    StructField("Deaths / 100 Recovered", FloatType(), True),
    StructField("Confirmed last week", IntegerType(), True),
    StructField("1 week change", IntegerType(), True),
    StructField("1 week % increase", FloatType(), True),
    StructField("WHO Region", StringType(), True)
])

In [15]:
optional_schema_covid_19_clean_complete = StructType([
    StructField("Province/State", StringType(), True),
    StructField("Country/Region", StringType(), True),
    StructField("Lat", DoubleType(), True),
    StructField("Long", DoubleType(), True),
    StructField("Date", DateType(), True),
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True),
    StructField("Recovered", IntegerType(), True),
    StructField("Active", IntegerType(), True),
    StructField("WHO Region", StringType(), True)
])

In [16]:
optional_schema_day_wise = StructType([
    StructField("Date", DateType(), True), # Esta fecha tiene otro formato d/M/yy
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True),
    StructField("Recovered", IntegerType(), True),
    StructField("Active", IntegerType(), True),
    StructField("New cases", IntegerType(), True),
    StructField("New deaths", FloatType(), True), # La cantidad de significativos es 2, por eso no coloco Double.
    StructField("New recovered", IntegerType(), True),
    StructField("Deaths / 100 Cases", FloatType(), True),
    StructField("Recovered / 100 Cases", FloatType(), True),
    StructField("Deaths / 100 Recovered", FloatType(), True),
    StructField("No. of countries", IntegerType(), True)
])

In [17]:
optional_schema_full_grouped = StructType([
    StructField("Date", DateType(), True),
    StructField("Country/Region", StringType(), True),
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True),
    StructField("Recovered", IntegerType(), True),
    StructField("Active", IntegerType(), True),
    StructField("New cases", IntegerType(), True),
    StructField("New deaths", IntegerType(), True),
    StructField("New recovered", IntegerType(), True),
    StructField("WHO Region", StringType(), True)
])

In [18]:
''' En este caso, para los atributos latitud y longitud, 
    decidi considerar utilizar Double en lugar de Float dado la cantidad de digitos significativos que se pueden observar en los registros.
'''
optional_schema_usa_county_wise = StructType([
    StructField("UID", IntegerType(), True),
    StructField("iso2", StringType(), True),
    StructField("iso3", StringType(), True),
    StructField("code3", IntegerType(), True),
    StructField("FIPS", IntegerType(), True),
    StructField("Admin2", StringType(), True),
    StructField("Province_State", StringType(), True),
    StructField("Country_Region", StringType(), True),
    StructField("Lat", DoubleType(), True),
    StructField("Long_", DoubleType(), True),
    StructField("Combined_Key", StringType(), True),
    StructField("Date", DateType(), True), # Dado que el formato de fecha es M/d/yy, es probable que necesite tratarse más adelante
    StructField("Confirmed", IntegerType(), True),
    StructField("Deaths", IntegerType(), True)
])

In [19]:
optional_schema_worldometer_data = StructType([
    StructField("Country/Region", StringType(), True), 
    StructField("Continent", StringType(), True),
    StructField("Population", IntegerType(), True), 
    StructField("TotalCases", IntegerType(), True), 
    StructField("NewCases", IntegerType(), True), 
    StructField("TotalDeaths", IntegerType(), True), 
    StructField("NewDeaths", IntegerType(), True), 
    StructField("TotalRecovered", IntegerType(), True), 
    StructField("NewRecovered", IntegerType(), True),  
    StructField("ActiveCases", IntegerType(), True), 
    StructField("Serious,Critical", IntegerType(), True),  
    StructField("Tot Cases/1M pop", IntegerType(), True), 
    StructField("Deaths/1M pop", IntegerType(), True),  
    StructField("TotalTests", IntegerType(), True), 
    StructField("Tests/1M pop", IntegerType(), True),  
    StructField("WHO Region", StringType(), True),  
])

#### Transformación de datos

In [20]:
df_country_wise_latest_t = country_wise_latest_df.select(
    col("Country/Region"),
    col("Confirmed").cast(IntegerType()).alias("Confirmed"),
    col("Deaths").cast(IntegerType()).alias("Deaths"),
    col("Recovered").cast(IntegerType()).alias("Recovered"),
    col("Active").cast(IntegerType()).alias("Active"),
    col("New cases").cast(IntegerType()).alias("New cases"),
    col("New deaths").cast(IntegerType()).alias("New deaths"),
    col("New recovered").cast(IntegerType()).alias("New recovered"),
    col("Deaths / 100 Cases").cast(FloatType()).alias("Deaths / 100 Cases"),
    col("Recovered / 100 Cases").cast(FloatType()).alias("Recovered / 100 Cases"),
    col("Deaths / 100 Recovered").cast(FloatType()).alias("Deaths / 100 Recovered"),
    col("Confirmed last week").cast(IntegerType()).alias("Confirmed last week"),
    col("1 week change").cast(IntegerType()).alias("1 week change"),
    col("1 week % increase").cast(FloatType()).alias("1 week % increase"),
    col("WHO Region")
)

In [21]:
df_covid_19_clean_complete_t = covid_19_clean_complete_df.select(
    col("Province/State"),
    col("Country/Region"),
    col("Lat").cast(DoubleType()).alias("Lat"),
    col("Long").cast(DoubleType()).alias("Long"),
    col("Date").cast(DateType()).alias("Date"),
    col("Confirmed").cast(IntegerType()).alias("Confirmed"),
    col("Deaths").cast(IntegerType()).alias("Deaths"),
    col("Recovered").cast(IntegerType()).alias("Recovered"),
    col("Active").cast(IntegerType()).alias("Active"),
    col("WHO Region")
)

In [22]:
df_day_wise_t = day_wise_df.select(
    to_date(col("Date"), "d/M/yy").alias("Date"),  
    col("Confirmed").cast(IntegerType()).alias("Confirmed"),
    col("Deaths").cast(IntegerType()).alias("Deaths"),
    col("Recovered").cast(IntegerType()).alias("Recovered"),
    col("Active").cast(IntegerType()).alias("Active"),
    col("New cases").cast(IntegerType()).alias("New cases"),
    col("New deaths").cast(FloatType()).alias("New deaths"),
    col("New recovered").cast(IntegerType()).alias("New recovered"),
    col("Deaths / 100 Cases").cast(FloatType()).alias("Deaths / 100 Cases"),
    col("Recovered / 100 Cases").cast(FloatType()).alias("Recovered / 100 Cases"),
    col("Deaths / 100 Recovered").cast(FloatType()).alias("Deaths / 100 Recovered"),
    col("`No. of countries`").cast(IntegerType()).alias("No. of countries")
)


In [23]:
df_full_grouped_t = full_grouped_df.select(
    to_date(col("Date"), "d/M/yy").alias("Date"),  
    col("Country/Region"),
    col("Confirmed").cast(IntegerType()).alias("Confirmed"),
    col("Deaths").cast(IntegerType()).alias("Deaths"),
    col("Recovered").cast(IntegerType()).alias("Recovered"),
    col("Active").cast(IntegerType()).alias("Active"),
    col("New cases").cast(IntegerType()).alias("New cases"),
    col("New deaths").cast(IntegerType()).alias("New deaths"),
    col("New recovered").cast(IntegerType()).alias("New recovered"),
    col("WHO Region")
)

In [24]:
df_usa_county_wise_t = usa_county_wise_df.select(
    col("UID").cast(IntegerType()).alias("UID"),
    col("iso2"),
    col("iso3"),
    col("code3").cast(IntegerType()).alias("code3"),
    col("FIPS").cast(IntegerType()).alias("FIPS"),
    col("Admin2"),
    col("Province_State"),
    col("Country_Region"),
    col("Lat").cast(DoubleType()).alias("Lat"),
    col("Long_").cast(DoubleType()).alias("Long_"),
    col("Combined_Key"),
    to_date(col("Date"), "M/d/yy").alias("Date"), 
    col("Confirmed").cast(IntegerType()).alias("Confirmed"),
    col("Deaths").cast(IntegerType()).alias("Deaths")
)

In [25]:
df_worldometer_data_t = worldometer_data_df.select(
    col("Country/Region"),
    col("Continent"),
    col("Population").cast(IntegerType()).alias("Population"),
    col("TotalCases").cast(IntegerType()).alias("TotalCases"),
    col("NewCases").cast(IntegerType()).alias("NewCases"),
    col("TotalDeaths").cast(IntegerType()).alias("TotalDeaths"),
    col("NewDeaths").cast(IntegerType()).alias("NewDeaths"),
    col("TotalRecovered").cast(IntegerType()).alias("TotalRecovered"),
    col("NewRecovered").cast(IntegerType()).alias("NewRecovered"),
    col("ActiveCases").cast(IntegerType()).alias("ActiveCases"),
    col("Serious,Critical").cast(IntegerType()).alias("Serious_Critical"),  
    col("Tot Cases/1M pop").cast(IntegerType()).alias("Tot_Cases_1M_pop"),  
    col("Deaths/1M pop").cast(IntegerType()).alias("Deaths_1M_pop"),  
    col("TotalTests").cast(IntegerType()).alias("TotalTests"),
    col("Tests/1M pop").cast(IntegerType()).alias("Tests_1M_pop"), 
    col("WHO Region")
)

#### Persistencia de archivos en formato parquet

In [26]:
path_output = "output/kaggle/pyspark"

In [27]:
df_usa_county_wise_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_usa_county_wise_df}/")

In [28]:
df_day_wise_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_day_wise_df}/")

In [29]:
df_country_wise_latest_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_country_wise_latest}/")

In [30]:
df_full_grouped_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_full_grouped_df}/")

In [31]:
df_covid_19_clean_complete_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_covid_19_clean_complete_df}/")

In [32]:
df_worldometer_data_t.write.mode("overwrite").parquet(f"{path_output}/{file_name_worldometer_data_df}/")

## Alternativa 2: Ejemplo de desarrollo con RDD's

No se realiza de forma completa, es a modo de ejemplo.

In [33]:
rdd_country_wise_latest = spark.sparkContext.textFile(f"{path_datalake}/{file_name_country_wise_latest}.csv")

rdd_covid_19_clean_complete = spark.sparkContext.textFile(f"{path_datalake}/{file_name_covid_19_clean_complete_df}.csv")

rdd_day_wise = spark.sparkContext.textFile(f"{path_datalake}/{file_name_day_wise_df}.csv")

rdd_full_grouped = spark.sparkContext.textFile(f"{path_datalake}/{file_name_full_grouped_df}.csv")

rdd_usa_county_wise = spark.sparkContext.textFile(f"{path_datalake}/{file_name_usa_county_wise_df}.csv")

rdd_worldometer_data = spark.sparkContext.textFile(f"{path_datalake}/{file_name_worldometer_data_df}.csv")

In [34]:
# Realizo un split de las filas del archivo separadas por , para poder tratarlas
rdd_country_wise_latest_transformed = rdd_country_wise_latest.map(lambda line: line.split(','))

In [35]:
# Guardo el header para que quede exceluido de la transformación de datos
header_rdd_country_wise_latest_transformed = rdd_country_wise_latest_transformed.first()  

In [36]:
# Excluyo el header
rdd_country_wise_latest_filtered = rdd_country_wise_latest_transformed.filter(lambda line: line != header_rdd_country_wise_latest_transformed)  

In [37]:
def cast_values_country_wise_latest(row):
    return (
        row[0],  # Columna Country/Region 
        int(row[1]), # Columna  Confirmed 
        int(row[2]), # Columna  Deaths 
        int(row[3]), # Columna  Recovered 
        int(row[4]), # Columna  Active 
        int(row[5]), # Columna  New cases 
        int(row[6]), # Columna  New deaths 
        int(row[7]), # Columna  New recovered 
        float(row[8]), # Columna Deaths / 100 Cases 
        float(row[9]), # Columna Recovered / 100 Cases 
        float(row[10]), # Columna Deaths / 100 Recovered 
        int(row[11]), # Columna Confirmed last week 
        int(row[12]), # Columna 1 week change 
        float(row[13]), # Columna 1 week % increase 
        row[14]  # Columna  WHO Region 
    )

In [38]:
#Casteo de datos
rdd_country_wise_latest_casted = rdd_country_wise_latest_filtered.map(cast_values_country_wise_latest)

#### Persistencia de los datos

In [39]:
df_country_wise_latest = rdd_country_wise_latest_casted.toDF(header_rdd_country_wise_latest_transformed)

In [40]:
df_country_wise_latest.write.mode("overwrite").parquet(f"{path_output}/rdd/{file_name_country_wise_latest}/")