# Proceso para convertir datos de CSV a formato Parquet usando Pyspark

## Instalamos la librería pyspark

In [11]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Carga de los archivos

Subir los archivos a la ruta "/content/kaggle":


*   /content/kaggle/country_wise_latest.csv
*   /content/kaggle/covid_19_clean_complete.csv
*   /content/kaggle/day_wise.csv
*   /content/kaggle/full_grouped.csv
*   /content/kaggle/usa_county_wise.csv
*   /content/kaggle/worldometer_data.csv






## Importamos la librerías

In [12]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import csv

## Definimos la Clase CSVtoParquet

La clase "CSVtoParquet" realizará la lectura de los archivos ".csv" y los escribirá en formato parquet, considerando el tipo de cada campo

In [43]:
class CSVtoParquet:
    def __init__(self, sc, spark):
        self.sc = sc
        self.spark = spark

    # Se define la función convert_country_csv para leer el archivo "country_wise_latest.csv" y escribirlo en formato parquet
    def convert_country_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda x: x.split(","))
          schema = StructType([
              StructField('country_region', StringType(), True),
              StructField('confirmed', IntegerType(), True),
              StructField('deaths', IntegerType(), True),
              StructField('recovered', IntegerType(), True),
              StructField('active', IntegerType(), True),
              StructField('new_cases', IntegerType(), True),
              StructField('new_deaths', IntegerType(), True),
              StructField('new_recovered', IntegerType(), True),
              StructField('deaths_100_cases', DoubleType(), True),
              StructField('recovered_100_cases', DoubleType(), True),
              StructField('deaths_100_recovered', DoubleType(), True),
              StructField('confirmed_last_week', IntegerType(), True),
              StructField('week_change', IntegerType(), True),
              StructField('week_perc_increase', DoubleType(), True),
              StructField('who_region', StringType(), True),
          ])
          data = data.map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), int(x[4]), int(x[5]), int(x[6]), int(x[7]), float(x[8]), float(x[9]), float(x[10]), int(x[11]), int(x[12]), float(x[13]), x[14]))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()

          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)
        
    # Se define la función convert_covid_csv para leer el archivo "covid_19_clean_complete.csv" y escribirlo en formato parquet
    def convert_covid_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda x: x.split(","))
          schema = StructType([
              StructField('province_state', StringType(), True),
              StructField('country_region', StringType(), True),
              StructField('lat', DoubleType(), True),
              StructField('long', DoubleType(), True),
              StructField('date', StringType(), True),
              StructField('confirmed', IntegerType(), True),
              StructField('deaths', IntegerType(), True),
              StructField('recovered', IntegerType(), True),
              StructField('active', IntegerType(), True),
              StructField('who_region', StringType(), True),
          ])
          data = data.map(lambda x: (x[0], x[1], float(x[2]), float(x[3]), x[4], int(x[5]), int(x[6]), int(x[7]), int(x[8]), x[9]))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()
          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)
      
    # Se define la función convert_day_wise_csv para leer el archivo "day_wise.csv" y escribirlo en formato parquet
    def convert_day_wise_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda x: x.split(","))
          schema = StructType([
              StructField('date', StringType(), True),
              StructField('confirmed', IntegerType(), True),
              StructField('deaths', IntegerType(), True),
              StructField('recovered', IntegerType(), True),
              StructField('active', IntegerType(), True),
              StructField('new_cases', IntegerType(), True),
              StructField('new_deaths', IntegerType(), True),
              StructField('new_recovered', IntegerType(), True),
              StructField('deaths_100_cases', DoubleType(), True),
              StructField('recovered_100_cases', DoubleType(), True),
              StructField('deaths_100_recovered', DoubleType(), True),
              StructField('nro_countries', IntegerType(), True),
          ])
          data = data.map(lambda x: (x[0], int(x[1]), int(x[2]), int(x[3]), int(x[4]), int(x[5]), int(x[6]), int(x[7]), float(x[8]), float(x[9]), float(x[10]), int(x[11])))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()
          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)

    # Se define la función convert_full_grouped_csv para leer el archivo "full_grouped.csv" y escribirlo en formato parquet
    def convert_full_grouped_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda x: x.split(","))
          schema = StructType([
              StructField('date', StringType(), True),
              StructField('country_region', StringType(), True),
              StructField('confirmed', IntegerType(), True),
              StructField('deaths', IntegerType(), True),
              StructField('recovered', IntegerType(), True),
              StructField('active', IntegerType(), True),
              StructField('new_cases', IntegerType(), True),
              StructField('new_deaths', IntegerType(), True),
              StructField('new_recovered', IntegerType(), True),
              StructField('who_region', StringType(), True),
          ])
          data = data.map(lambda x: (x[0], x[1], int(x[2]), int(x[3]), int(x[4]), int(x[5]), int(x[6]), int(x[7]), int(x[8]), x[9]))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()
          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)

    # Se define la función convert_usa_county_csv para leer el archivo "usa_county_wise.csv" y escribirlo en formato parquet
    def convert_usa_county_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda linea: next(csv.reader([linea])))
          schema = StructType([
              StructField('uid', IntegerType(), True),
              StructField('iso2', StringType(), True),
              StructField('iso3', StringType(), True),
              StructField('code3', IntegerType(), True),
              StructField('fips', DoubleType(), True),
              StructField('admin2', StringType(), True),
              StructField('province_state', StringType(), True),
              StructField('country_region', StringType(), True),
              StructField('lat', DoubleType(), True),
              StructField('long', DoubleType(), True),
              StructField('combined_key', StringType(), True),
              StructField('date', StringType(), True),
              StructField('confirmed', IntegerType(), True),
              StructField('deaths', IntegerType(), True),
          ])
          data = data.map(lambda x: (int(x[0]), x[1], x[2], int(x[3]), float(x[4]) if(x[4] != '') else 0.0 , x[5], x[6], x[7], float(x[8]), float(x[9]), x[10], x[11], int(x[12]), int(x[13])))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()
          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)

    # Se define la función convert_worldometer_csv para leer el archivo "worldometer_data.csv" y escribirlo en formato parquet
    def convert_worldometer_csv(self, file_path, output_path):
        try:
          # Leer CSV haciendo uso de RDD
          csv_rdd = self.sc.textFile(file_path)

          # Parsea RDD dentro de una lista de tuplas con tipos de datos específicos
          header = csv_rdd.first()
          data = csv_rdd.filter(lambda x: x != header).map(lambda linea: next(csv.reader([linea])))
          schema = StructType([
              StructField('country_region', StringType(), True),
              StructField('continent', StringType(), True),
              StructField('population', IntegerType(), True),
              StructField('totalcases', IntegerType(), True),
              StructField('newcases', IntegerType(), True),
              StructField('totaldeaths', IntegerType(), True),
              StructField('newdeaths', IntegerType(), True),
              StructField('totalrecovered', IntegerType(), True),
              StructField('newrecovered', IntegerType(), True),
              StructField('activecases', IntegerType(), True),
              StructField('serious_critical', IntegerType(), True),
              StructField('tot_cases_1m_pop', IntegerType(), True),
              StructField('deaths_1m_pop', DoubleType(), True),
              StructField('totaltests', IntegerType(), True),
              StructField('tests_1m_pop', IntegerType(), True),
              StructField('who_region', StringType(), True),
          ])
          data = data.map(lambda x: (x[0], x[1], cast_string_to_int(x[2]), cast_string_to_int(x[3]), cast_string_to_int(x[4]), cast_string_to_int(x[5]), cast_string_to_int(x[6]), cast_string_to_int(x[7]), cast_string_to_int(x[8]), cast_string_to_int(x[9]), cast_string_to_int(x[10]), cast_string_to_int(x[11]), cast_string_to_float(x[12]), cast_string_to_int(x[13]), cast_string_to_int(x[14]), x[15]))

          # Convertir RDD a DataFrame con un esquema específico
          df = self.spark.createDataFrame(data, schema)

          df.show(5)
          df.printSchema()
          # Escribir DataFrame a formato Parquet
          df.write.format("parquet").mode("overwrite").save(output_path)
          print("\n", "Se generó en formato parquet en:",output_path)
        except Exception as error:
          print("Ocurrió un error:", error)


In [35]:
# Se define la función cast_string_to_int para realizar el casteo de datos String a entero
def cast_string_to_int(string):
  if(string != ''):
    return int(string)
  else:
    return 0

In [36]:
# Se define la función cast_string_to_int para realizar el casteo de datos String a decimales
def cast_string_to_float(string):
  if(string != ''):
    return float(string)
  else:
    return 0.0

In [37]:
# Se especifica la configuración
conf = SparkConf().setAppName("CSV to Parquet")

In [38]:
# Se instancia sparkContext con la configuración especificada
sc = SparkContext(conf=conf)

In [39]:
# Se crea la sessión de spark
spark = SparkSession(sc)

# Intancia y ejecución de la clase CSVtoParquet

In [44]:
# Se instancia la clase
csv_to_parquet = CSVtoParquet(sc,spark)

In [45]:
# Se ejecuta el método que convierte el archivo "country_wise_latest.csv" en formato parquet
csv_to_parquet.convert_country_csv("/content/kaggle/country_wise_latest.csv", "/content/parquet/parquet_country_wise_latest")

+--------------+---------+------+---------+------+---------+----------+-------------+----------------+-------------------+--------------------+-------------------+-----------+------------------+--------------------+
|country_region|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|deaths_100_cases|recovered_100_cases|deaths_100_recovered|confirmed_last_week|week_change|week_perc_increase|          who_region|
+--------------+---------+------+---------+------+---------+----------+-------------+----------------+-------------------+--------------------+-------------------+-----------+------------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|             3.5|              69.49|                5.04|              35526|        737|              2.07|Eastern Mediterra...|
|       Albania|     4880|   144|     2745|  1991|      117|         6|           63|            2.95|              56.25|              

In [46]:
# Se ejecuta el método que convierte el archivo "covid_19_clean_complete.csv" en formato parquet
csv_to_parquet.convert_covid_csv("/content/kaggle/covid_19_clean_complete.csv", "/content/parquet/parquet_covid_19_clean_complete")

+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|province_state|country_region|     lat|     long|      date|confirmed|deaths|recovered|active|          who_region|
+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|              |   Afghanistan|33.93911|67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|              |       Albania| 41.1533|  20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|              |       Algeria| 28.0339|   1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|              |       Andorra| 42.5063|   1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|              |        Angola|-11.2027|  17.8739|2020-01-22|        0|     0|        0|     0|              Africa|
+--------------+--------------+--------+---------+----------+---

In [47]:
# Se ejecuta el método que convierte el archivo "day_wise.csv" en formato parquet
csv_to_parquet.convert_day_wise_csv("/content/kaggle/day_wise.csv", "/content/parquet/parquet_day_wise")

+----------+---------+------+---------+------+---------+----------+-------------+----------------+-------------------+--------------------+-------------+
|      date|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|deaths_100_cases|recovered_100_cases|deaths_100_recovered|nro_countries|
+----------+---------+------+---------+------+---------+----------+-------------+----------------+-------------------+--------------------+-------------+
|2020-01-22|      555|    17|       28|   510|        0|         0|            0|            3.06|               5.05|               60.71|            6|
|2020-01-23|      654|    18|       30|   606|       99|         1|            2|            2.75|               4.59|                60.0|            8|
|2020-01-24|      941|    26|       36|   879|      287|         8|            6|            2.76|               3.83|               72.22|            9|
|2020-01-25|     1434|    42|       39|  1353|      493|        16|         

In [48]:
# Se ejecuta el método que convierte el archivo "full_grouped.csv" en formato parquet
csv_to_parquet.convert_full_grouped_csv("/content/kaggle/full_grouped.csv", "/content/parquet/parquet_full_grouped")

+----------+--------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|      date|country_region|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recovered|          who_region|
+----------+--------------+---------+------+---------+------+---------+----------+-------------+--------------------+
|2020-01-22|   Afghanistan|        0|     0|        0|     0|        0|         0|            0|Eastern Mediterra...|
|2020-01-22|       Albania|        0|     0|        0|     0|        0|         0|            0|              Europe|
|2020-01-22|       Algeria|        0|     0|        0|     0|        0|         0|            0|              Africa|
|2020-01-22|       Andorra|        0|     0|        0|     0|        0|         0|            0|              Europe|
|2020-01-22|        Angola|        0|     0|        0|     0|        0|         0|            0|              Africa|
+----------+--------------+---------+------+---------+--

In [49]:
# Se ejecuta el método que convierte el archivo "usa_county_wise.csv" en formato parquet
csv_to_parquet.convert_usa_county_csv("/content/kaggle/usa_county_wise.csv", "/content/parquet/parquet_usa_county_wise")

+--------+----+----+-----+-------+--------+--------------------+--------------+-------------------+------------------+--------------------+-------+---------+------+
|     uid|iso2|iso3|code3|   fips|  admin2|      province_state|country_region|                lat|              long|        combined_key|   date|confirmed|deaths|
+--------+----+----+-----+-------+--------+--------------------+--------------+-------------------+------------------+--------------------+-------+---------+------+
|      16|  AS| ASM|   16|   60.0|        |      American Samoa|            US|-14.270999999999999|          -170.132|  American Samoa, US|1/22/20|        0|     0|
|     316|  GU| GUM|  316|   66.0|        |                Guam|            US|            13.4443|          144.7937|            Guam, US|1/22/20|        0|     0|
|     580|  MP| MNP|  580|   69.0|        |Northern Mariana ...|            US|            15.0979|          145.6739|Northern Mariana ...|1/22/20|        0|     0|
|63072001|

In [50]:
# Se ejecuta el método que convierte el archivo "worldometer_data.csv" en formato parquet
csv_to_parquet.convert_worldometer_csv("/content/kaggle/worldometer_data.csv", "/content/parquet/parquet_worldometer")

+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|country_region|    continent|population|totalcases|newcases|totaldeaths|newdeaths|totalrecovered|newrecovered|activecases|serious_critical|tot_cases_1m_pop|deaths_1m_pop|totaltests|tests_1m_pop|    who_region|
+--------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+----------------+----------------+-------------+----------+------------+--------------+
|           USA|North America| 331198130|   5032179|       0|     162804|        0|       2576668|           0|    2292707|           18296|           15194|        492.0|  63139605|      190640|      Americas|
|        Brazil|South America| 212710692|   2917562|       0|      98644|        0|       2047660|           0|     771258|            8318|           13716

In [52]:
# finaliza la sessión de spark
spark.stop()