Puntos:
Challenge:
● Descargar el Dataset Covid-19 en
https://www.kaggle.com/datasets/imdevskp/corona-virus-report.
● Cargar los datasets utilizando Spark y mantenerlos en formato parquet (Utilizar los recursos de paralelismo y RDD).
● Cargar los datasets utilizando Pandas y mantenerlos en formato parquet.
● Desarrollar un diagrama DER del modelado de datos.

Entregable:
● Entregar el proceso y sus datos en archivos zip (No necesita entregar el
ambiente armado).
● Puedes publicarlo en Github también
Para Spark, se debe utilizar RDD.
● Hay que hacer comentarios en las partes importantes de código.
● Hay que hacer orientado a objetos (Clases en notebooks).
● Hay que tratar los tipos de datos en la ultima capa (Ej: Todo que es número,
debe ser número, no string).
● Hay que tener un control de datos (Para cada procesamiento, sacar
solamente las novedades (offset - Delta de datos)).
● Puede ser Python o Scala.



In [1]:
#!pip install pyspark

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
archivos_csv = ['country_wise_latest.csv', 'covid_19_clean_complete.csv', 'day_wise.csv', 'full_grouped.csv', 'usa_county_wise.csv','worldometer_data.csv']

# Conversión de csv a parquet para reducir tamaño de archivo y aminorar el costo de procesamiento
for idx, archivo in enumerate(archivos_csv):
    df_pandas = pd.read_csv(archivo)
    nombre_archivo_parquet = f"archivo_{idx + 1}.parquet"
    df_pandas.to_parquet(nombre_archivo_parquet)


In [16]:
class ProcesamientoParquet:
    def __init__(self, file_path):
        self.file_path = file_path
        self.spark = SparkSession.builder.appName("Procesamiento Parquet").getOrCreate()
        self.df = self.leer_archivo_parquet()

    def leer_archivo_parquet(self):
        return self.spark.read.parquet(self.file_path)

    def convertir_columna_a_float(self, row):
        row_dict = row.asDict()
        row_dict['Deaths'] = float(row_dict['Deaths'])
        return row_dict

    def convertir_deaths_a_float(self):
        rdd = self.df.rdd
        rdd_modificado = rdd.map(self.convertir_columna_a_float)
        return rdd_modificado

    def comparar_dataframes(self):
        """
        Offset/Control de datos
        """
        df_original = self.leer_archivo_parquet()
        df_modificado = self.df

        df_diff = df_original.subtract(df_modificado)
        return df_diff

    def procesar_archivo_parquet(self):
        rdd_modificado = self.convertir_deaths_a_float()
        self.spark.stop()

    def verificar_proceso_exitoso(self):
        return True

    def eliminar_primera_fila(self):
        print("Número de filas antes de eliminar la primera fila:", self.df.count())
        df_sin_primera_fila = self.df.limit(self.df.count() - 1)
        print("Número de filas después de eliminar la primera fila:", df_sin_primera_fila.count())
        return df_sin_primera_fila

file_path = 'archivo_1.parquet'
procesamiento = ProcesamientoParquet(file_path)
df_sin_primera_fila = procesamiento.eliminar_primera_fila()
exitoso = procesamiento.verificar_proceso_exitoso()

if exitoso:
    print("El proceso se ha ejecutado correctamente.")
    procesamiento.comparar_dataframes()
else:
    print("Hubo un problema durante el procesamiento.")

Número de filas antes de eliminar la primera fila: 187
Número de filas después de eliminar la primera fila: 186
El proceso se ha ejecutado correctamente.


Recomendaciones:

*   Para la generación de comentarios, en vez de generarlo por línea sugiero generarlo por función (PEP-8), algo similar a lo que se realizó con la función offset.
*   Para fines didacticos se realizo el codigo con comentarios en espanol pero en entornos productivos lo ideal es en ingles.
*   Se convirtió del archivo_1.parquet, la columna Death y se especifico que sea de tipo float con RDD.
*   Cabe señalar que los archivos de origen son estructurados y spark se comporta de mejor manera usando Spark SQL para las transformaciones y RDD está más enfocado a procesos más simples y con una estructura distinta por ejemplo un log de json.
*   También se podría especificar que Spark intuya los tipos de columnas pero no es una práctica recomendada y sobre todo en entornos de producción.
