# Covid-19
Utilizar Spark para cargar los dataset de del reposirotio: 
    https://www.kaggle.com/datasets/imdevskp/corona-virus-report 

## Cargar información utilizando RDD

In [1]:
# Importar las bibliotecas de PySpark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType,DateType
from pyspark.sql.functions import split

In [2]:
# Crear una instancia de SparkContext y SparkSession
sc = SparkContext("local", "CSV to Parquet")
spark = SparkSession(sc)

### Función para convertir un archivo CSV en un DataFrame

In [3]:
# Función para convertir un archivo CSV en un DataFrame
def csv_to_df(csv_path):
    try:
        # Leer el archivo CSV en un RDD de PySpark
        csv_rdd = spark.sparkContext.textFile(csv_path)
        #csv_rdd = spark.sparkContext.textFile(csv_path, use_unicode=False, quote='"')

        # Obtener la primera fila
        header = csv_rdd.first()
        
        # Contar los elementos de la primera fila
        num_columns = len(header.split(","))

        # Dividir cada línea del RDD en una lista de valores
        csv_rdd = csv_rdd.map(lambda x: x.split(","))
        
        # Se excluye el primer registro
        csv_rdd = csv_rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])

        # Convertir el RDD en un DataFrame de PySpark, 
        df = spark.createDataFrame(csv_rdd)

        # Imprimir el número de columnas del archivo CSV
        print(f"El archivo '{csv_path}' tiene {num_columns} columnas")

    except Exception as e:
        # Imprimir un mensaje de error si ocurre una excepción durante la lectura del archivo
        print(f"Error al leer el archivo '{csv_path}': {e}")
        df = None

    return df

### Leer y escrbir en parquet el archivo country_wise_latest

In [4]:
# Convertir el archivo country_wise_latest en Dataframe, el archivo debe tener 15 columnas
df_cwl = csv_to_df("covid19/country_wise_latest.csv")


El archivo 'covid19/country_wise_latest.csv' tiene 15 columnas


In [5]:
# Castear los campos
df_cwl = df_cwl.withColumn("CountryRegion", df_cwl["_1"].cast(StringType())).drop("_1")
df_cwl = df_cwl.withColumn("Confirmed", df_cwl["_2"].cast(IntegerType())).drop("_2")
df_cwl = df_cwl.withColumn("Deaths", df_cwl["_3"].cast(IntegerType())).drop("_3")
df_cwl = df_cwl.withColumn("Recovered", df_cwl["_4"].cast(IntegerType())).drop("_4")
df_cwl = df_cwl.withColumn("Active", df_cwl["_5"].cast(IntegerType())).drop("_5")
df_cwl = df_cwl.withColumn("NewCases", df_cwl["_6"].cast(IntegerType())).drop("_6")
df_cwl = df_cwl.withColumn("NewDeaths", df_cwl["_7"].cast(IntegerType())).drop("_7")
df_cwl = df_cwl.withColumn("NewRecovered", df_cwl["_8"].cast(IntegerType())).drop("_8")
df_cwl = df_cwl.withColumn("Deaths100Cases", df_cwl["_9"].cast(DoubleType())).drop("_9")
df_cwl = df_cwl.withColumn("Recovered100Cases", df_cwl["_10"].cast(DoubleType())).drop("_10")
df_cwl = df_cwl.withColumn("Deaths100Recovered", df_cwl["_11"].cast(DoubleType())).drop("_11")
df_cwl = df_cwl.withColumn("ConfirmedLastWeek", df_cwl["_12"].cast(IntegerType())).drop("_12")
df_cwl = df_cwl.withColumn("1WeekChange", df_cwl["_13"].cast(IntegerType())).drop("_13")
df_cwl = df_cwl.withColumn("1WeekIncrease", df_cwl["_14"].cast(DoubleType())).drop("_14")
df_cwl = df_cwl.withColumn("WhoRegion", df_cwl["_15"].cast(StringType())).drop("_15")

In [6]:
# Validar el data Frame
df_cwl.show(5)

+-------------+---------+------+---------+------+--------+---------+------------+--------------+-----------------+------------------+-----------------+-----------+-------------+--------------------+
|CountryRegion|Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|Deaths100Cases|Recovered100Cases|Deaths100Recovered|ConfirmedLastWeek|1WeekChange|1WeekIncrease|           WhoRegion|
+-------------+---------+------+---------+------+--------+---------+------------+--------------+-----------------+------------------+-----------------+-----------+-------------+--------------------+
|  Afghanistan|    36263|  1269|    25198|  9796|     106|       10|          18|           3.5|            69.49|              5.04|            35526|        737|         2.07|Eastern Mediterra...|
|      Albania|     4880|   144|     2745|  1991|     117|        6|          63|          2.95|            56.25|              5.25|             4171|        709|         17.0|              Europe|
|    

In [7]:
# Escribir parquet
df_cwl.write.parquet("covid19/country_wise_latest.parquet")

### Leer y escrbir en parquet el archivo covid_19_clean_complete

In [8]:
# Convertir el archivo covid_19_clean_complete en Dataframe, el archivo debe tener 10 columnas
df_ccc = csv_to_df("covid19/covid_19_clean_complete.csv")

El archivo 'covid19/covid_19_clean_complete.csv' tiene 10 columnas


In [9]:
# Castear los campos
df_ccc = df_ccc.withColumn("ProvinceState", df_ccc["_1"].cast(StringType())).drop("_1")
df_ccc = df_ccc.withColumn("CountryRegion", df_ccc["_2"].cast(StringType())).drop("_2")
df_ccc = df_ccc.withColumn("Lat", df_ccc["_3"].cast(DoubleType())).drop("_3")
df_ccc = df_ccc.withColumn("Long", df_ccc["_4"].cast(DoubleType())).drop("_4")
df_ccc = df_ccc.withColumn("Date", df_ccc["_5"].cast(DateType())).drop("_5")
df_ccc = df_ccc.withColumn("Confirmed", df_ccc["_6"].cast(IntegerType())).drop("_6")
df_ccc = df_ccc.withColumn("Deaths", df_ccc["_7"].cast(IntegerType())).drop("_7")
df_ccc = df_ccc.withColumn("Recovered", df_ccc["_8"].cast(IntegerType())).drop("_8")
df_ccc = df_ccc.withColumn("Active", df_ccc["_9"].cast(IntegerType())).drop("_9")
df_ccc = df_ccc.withColumn("WhoRegion", df_ccc["_10"].cast(StringType())).drop("_10")

In [10]:
# Validar el data Frame
df_ccc.show(10)

+--------------------+-------------------+--------+---------+----------+---------+------+---------+------+--------------------+
|       ProvinceState|      CountryRegion|     Lat|     Long|      Date|Confirmed|Deaths|Recovered|Active|           WhoRegion|
+--------------------+-------------------+--------+---------+----------+---------+------+---------+------+--------------------+
|                    |        Afghanistan|33.93911|67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|                    |            Albania| 41.1533|  20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|                    |            Algeria| 28.0339|   1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|                    |            Andorra| 42.5063|   1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|                    |             Angola|-11.2027|  17.8739|2020-01-22|        0|     0|        0|     

In [11]:
# Escribir parquet
df_ccc.write.parquet("covid19/covid_19_clean_complete.parquet")

### Leer y escrbir en parquet el archivo day_wise

In [12]:
# Convertir el archivo day_wise en Dataframe, el archivo debe tener 12 columnas
df_dw = csv_to_df("covid19/day_wise.csv")

El archivo 'covid19/day_wise.csv' tiene 12 columnas


In [13]:
# Castear los campos
df_dw =  df_dw.withColumn("Date",  df_dw["_1"].cast(DateType())).drop("_1")
df_dw =  df_dw.withColumn("Confirmed",  df_dw["_2"].cast(IntegerType())).drop("_2")
df_dw =  df_dw.withColumn("Deaths",  df_dw["_3"].cast(IntegerType())).drop("_3")
df_dw =  df_dw.withColumn("Recovered",  df_dw["_4"].cast(IntegerType())).drop("_4")
df_dw =  df_dw.withColumn("Active",  df_dw["_5"].cast(IntegerType())).drop("_5")
df_dw =  df_dw.withColumn("NewCases",  df_dw["_6"].cast(IntegerType())).drop("_6")
df_dw =  df_dw.withColumn("NewDeaths",  df_dw["_7"].cast(IntegerType())).drop("_7")
df_dw =  df_dw.withColumn("NewRecovered",  df_dw["_8"].cast(IntegerType())).drop("_8")
df_dw =  df_dw.withColumn("Deaths100Cases",  df_dw["_9"].cast(DoubleType())).drop("_9")
df_dw =  df_dw.withColumn("Recovered100Cases",  df_dw["_10"].cast(DoubleType())).drop("_10")
df_dw =  df_dw.withColumn("Deaths100Recovered",  df_dw["_11"].cast(DoubleType())).drop("_11")
df_dw =  df_dw.withColumn("NoOfContries",  df_dw["_12"].cast(IntegerType())).drop("_12")

In [14]:
# Validar el data Frame
df_dw.show(5)

+----------+---------+------+---------+------+--------+---------+------------+--------------+-----------------+------------------+------------+
|      Date|Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|Deaths100Cases|Recovered100Cases|Deaths100Recovered|NoOfContries|
+----------+---------+------+---------+------+--------+---------+------------+--------------+-----------------+------------------+------------+
|2020-01-22|      555|    17|       28|   510|       0|        0|           0|          3.06|             5.05|             60.71|           6|
|2020-01-23|      654|    18|       30|   606|      99|        1|           2|          2.75|             4.59|              60.0|           8|
|2020-01-24|      941|    26|       36|   879|     287|        8|           6|          2.76|             3.83|             72.22|           9|
|2020-01-25|     1434|    42|       39|  1353|     493|       16|           3|          2.93|             2.72|            107.69|      

In [15]:
# Escribir parquet
df_dw.write.parquet("covid19/day_wise.parquet")

### Leer y escrbir en parquet el archivo full_grouped

In [16]:
# Convertir el archivo full_grouped en Dataframe, el archivo debe tener 10 columnas
df_fg = csv_to_df("covid19/full_grouped.csv")

El archivo 'covid19/full_grouped.csv' tiene 10 columnas


In [17]:
# Castear los campos
df_fg =   df_fg.withColumn("Date",   df_fg["_1"].cast(DateType())).drop("_1")
df_fg =   df_fg.withColumn("CountryRegion", df_fg["_2"].cast(StringType())).drop("_2")
df_fg =  df_fg.withColumn("Confirmed",  df_fg["_3"].cast(IntegerType())).drop("_3")
df_fg =  df_fg.withColumn("Deaths",  df_fg["_4"].cast(IntegerType())).drop("_4")
df_fg =  df_fg.withColumn("Recovered",  df_fg["_5"].cast(IntegerType())).drop("_5")
df_fg =  df_fg.withColumn("Active",  df_fg["_6"].cast(IntegerType())).drop("_6")
df_fg =  df_fg.withColumn("NewCases",  df_fg["_7"].cast(IntegerType())).drop("_7")
df_fg =  df_fg.withColumn("NewDeaths",  df_fg["_8"].cast(IntegerType())).drop("_8")
df_fg =   df_fg.withColumn("NewRecovered",   df_fg["_9"].cast(IntegerType())).drop("_9")
df_fg =   df_fg.withColumn("WhoRegion",   df_fg["_10"].cast(StringType())).drop("_10")

In [18]:
# Validar el data Frame
df_fg.show(5)

+----------+-------------+---------+------+---------+------+--------+---------+------------+--------------------+
|      Date|CountryRegion|Confirmed|Deaths|Recovered|Active|NewCases|NewDeaths|NewRecovered|           WhoRegion|
+----------+-------------+---------+------+---------+------+--------+---------+------------+--------------------+
|2020-01-22|  Afghanistan|        0|     0|        0|     0|       0|        0|           0|Eastern Mediterra...|
|2020-01-22|      Albania|        0|     0|        0|     0|       0|        0|           0|              Europe|
|2020-01-22|      Algeria|        0|     0|        0|     0|       0|        0|           0|              Africa|
|2020-01-22|      Andorra|        0|     0|        0|     0|       0|        0|           0|              Europe|
|2020-01-22|       Angola|        0|     0|        0|     0|       0|        0|           0|              Africa|
+----------+-------------+---------+------+---------+------+--------+---------+---------

In [19]:
# Escribir parquet
df_dw.write.parquet("covid19/full_grouped.parquet")

### Leer y escrbir en parquet el archivo worldometer_data

In [20]:
# Convertir el archivo worldometer_data en Dataframe 
# El archivo debe tener 16 columnas pero al cargar dice 17 porque dentro de los titulos de columna hay una coma.
df_wd = csv_to_df("covid19/worldometer_data.csv")

El archivo 'covid19/worldometer_data.csv' tiene 17 columnas


In [21]:
# Castear los campos
df_wd = df_wd.withColumn("CountryRegion", df_wd["_1"].cast(StringType())).drop("_1")
df_wd = df_wd.withColumn("Continent", df_wd["_2"].cast(StringType())).drop("_2")
df_wd = df_wd.withColumn("Population", df_wd["_3"].cast(IntegerType())).drop("_3")
df_wd = df_wd.withColumn("TotalCases", df_wd["_4"].cast(IntegerType())).drop("_4")
df_wd = df_wd.withColumn("NewCases", df_wd["_5"].cast(IntegerType())).drop("_5")
df_wd = df_wd.withColumn("TotalDeaths", df_wd["_6"].cast(IntegerType())).drop("_6")
df_wd = df_wd.withColumn("NewDeaths", df_wd["_7"].cast(IntegerType())).drop("_7")
df_wd = df_wd.withColumn("TotalRecovered", df_wd["_8"].cast(IntegerType())).drop("_8")
df_wd = df_wd.withColumn("NewRecovered", df_wd["_9"].cast(IntegerType())).drop("_9")
df_wd = df_wd.withColumn("ActiveCases", df_wd["_10"].cast(IntegerType())).drop("_10")
df_wd = df_wd.withColumn("SeriousCritical", df_wd["_11"].cast(DoubleType())).drop("_11")
df_wd = df_wd.withColumn("TotCases1MPop", df_wd["_12"].cast(IntegerType())).drop("_12")
df_wd = df_wd.withColumn("Deaths1MPop", df_wd["_13"].cast(IntegerType())).drop("_13")
df_wd = df_wd.withColumn("TotalTests", df_wd["_14"].cast(IntegerType())).drop("_14")
df_wd = df_wd.withColumn("Tests1MPop", df_wd["_15"].cast(IntegerType())).drop("_15")
df_wd = df_wd.withColumn("Region", df_wd["_16"].cast(StringType())).drop("_16")

In [22]:
# Validar el data Frame
df_wd.show(5)

+-------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+---------------+-------------+-----------+----------+----------+--------------+
|CountryRegion|    Continent|Population|TotalCases|NewCases|TotalDeaths|NewDeaths|TotalRecovered|NewRecovered|ActiveCases|SeriousCritical|TotCases1MPop|Deaths1MPop|TotalTests|Tests1MPop|        Region|
+-------------+-------------+----------+----------+--------+-----------+---------+--------------+------------+-----------+---------------+-------------+-----------+----------+----------+--------------+
|          USA|North America| 331198130|   5032179|    null|     162804|     null|       2576668|        null|    2292707|        18296.0|        15194|        492|  63139605|    190640|      Americas|
|       Brazil|South America| 212710692|   2917562|    null|      98644|     null|       2047660|        null|     771258|         8318.0|        13716|        464|  13206188|     62085|      

In [23]:
# Escribir parquet
df_dw.write.parquet("covid19/worldometer_data.parquet")

### Leer y escrbir en parquet el archivo usa_county_wise   
Con este archivo si tocó leerlo directo en un dataframe porque dentro del archivo vienen textos con comas.
Personalemnte prefiero usar DataFrame en lugar de la API de RDD siempre que sea posible, ya que los DataFrames ofrecen mejor rendimiento y la interfaz es mejor.


In [24]:
# Leer archivo usa_county_wise directamente a un DataFrame
df_ucw = spark.read.option("header", "true").option("sep", ",").option("quote", '"').csv("covid19/usa_county_wise.csv")

In [25]:
# Validar que se haya leido bien el archivo, es decir, que no se haya afectado la lectura por las comas dentro de los campos texto
df_ucw.show(5)

+--------+----+----+-----+-------+--------+--------------------+--------------+-------------------+------------------+--------------------+-------+---------+------+
|     UID|iso2|iso3|code3|   FIPS|  Admin2|      Province_State|Country_Region|                Lat|             Long_|        Combined_Key|   Date|Confirmed|Deaths|
+--------+----+----+-----+-------+--------+--------------------+--------------+-------------------+------------------+--------------------+-------+---------+------+
|      16|  AS| ASM|   16|   60.0|    null|      American Samoa|            US|-14.270999999999999|          -170.132|  American Samoa, US|1/22/20|        0|     0|
|     316|  GU| GUM|  316|   66.0|    null|                Guam|            US|            13.4443|          144.7937|            Guam, US|1/22/20|        0|     0|
|     580|  MP| MNP|  580|   69.0|    null|Northern Mariana ...|            US|            15.0979|          145.6739|Northern Mariana ...|1/22/20|        0|     0|
|63072001|

In [26]:
# Validar el nuemro de columnas, deben ser 14.
print("Número de columnas:", len(df_ucw.columns))

Número de columnas: 14


In [27]:
# Castear los campos
df_ucw = df_ucw.withColumn("UID", df_ucw["UID"].cast(IntegerType()))
df_ucw = df_ucw.withColumnRenamed("iso2", "ISO2")
df_ucw = df_ucw.withColumnRenamed("iso3", "ISO3") 
df_ucw = df_ucw.withColumnRenamed("code3", "Code3")## Hago esto para mantener el orden de las columnas
df_ucw = df_ucw.withColumn("Code3", df_ucw["Code3"].cast(IntegerType()))
df_ucw = df_ucw.withColumn("FIPS", df_ucw["FIPS"].cast(DoubleType()))
df_ucw = df_ucw.withColumnRenamed("Admin2","Admin2")
df_ucw = df_ucw.withColumnRenamed("Province_State","ProvinceState")
df_ucw = df_ucw.withColumnRenamed("Country_Region","CountryRegion")
df_ucw = df_ucw.withColumn("Lat", df_ucw["Lat"].cast(DoubleType()))
df_ucw = df_ucw.withColumnRenamed("Long_","Long")## Hago esto para mantener el orden de las columnas
df_ucw = df_ucw.withColumn("Long", df_ucw["Long"].cast(DoubleType()))
df_ucw = df_ucw.withColumnRenamed("Combined_Key","CombinedKey")
df_ucw = df_ucw.withColumn("Date", df_ucw["Date"].cast(DateType()))
df_ucw = df_ucw.withColumn("Confirmed", df_ucw["Confirmed"].cast(IntegerType()))
df_ucw = df_ucw.withColumn("Deaths", df_ucw["Deaths"].cast(IntegerType()))

In [28]:
# Validar el data Frame
df_ucw.show(5)

+--------+----+----+-----+-------+--------+--------------------+-------------+-------------------+------------------+--------------------+----+---------+------+
|     UID|ISO2|ISO3|Code3|   FIPS|  Admin2|       ProvinceState|CountryRegion|                Lat|              Long|         CombinedKey|Date|Confirmed|Deaths|
+--------+----+----+-----+-------+--------+--------------------+-------------+-------------------+------------------+--------------------+----+---------+------+
|      16|  AS| ASM|   16|   60.0|    null|      American Samoa|           US|-14.270999999999999|          -170.132|  American Samoa, US|null|        0|     0|
|     316|  GU| GUM|  316|   66.0|    null|                Guam|           US|            13.4443|          144.7937|            Guam, US|null|        0|     0|
|     580|  MP| MNP|  580|   69.0|    null|Northern Mariana ...|           US|            15.0979|          145.6739|Northern Mariana ...|null|        0|     0|
|63072001|  PR| PRI|  630|72001.0|

In [29]:
# Escribir parquet
df_ucw.write.parquet("covid19/usa_county_wise.parquet")

In [30]:
# Detener la sesión de Spark
spark.stop()

## Cargar Información utilizando Pandas

In [31]:
# Importar pandas
import pandas as pd

### Cargar el archivo country_wise_latest utilizando pandas y escribir archivo parquet 

In [32]:
# Cargar en un DtaFrame de pandas
df_cwlpd = pd.read_csv("covid19/country_wise_latest.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 15 columnas
print("Número de columnas:", df_cwlpd.shape[1])

Número de columnas: 15


In [33]:
# Corregir el nombre de las columnas
df_cwlpd.columns = ['CountryRegion', 'Confirmed', 'Deaths','Recovered','Active','NewCases','NewDeaths','NewRecovered','Deaths100Cases','Recovered100Cases','Deaths100Recovered','ConfirmedLastWeek','1WeekChange','1WeekIncrease','WhoRegion']

In [34]:
# Validar el Dataframe
print(df_cwlpd)

          CountryRegion  Confirmed  Deaths  Recovered  Active  NewCases  \
0           Afghanistan      36263    1269      25198    9796       106   
1               Albania       4880     144       2745    1991       117   
2               Algeria      27973    1163      18837    7973       616   
3               Andorra        907      52        803      52        10   
4                Angola        950      41        242     667        18   
..                  ...        ...     ...        ...     ...       ...   
182  West Bank and Gaza      10621      78       3752    6791       152   
183      Western Sahara         10       1          8       1         0   
184               Yemen       1691     483        833     375        10   
185              Zambia       4552     140       2815    1597        71   
186            Zimbabwe       2704      36        542    2126       192   

     NewDeaths  NewRecovered  Deaths100Cases  Recovered100Cases  \
0           10            18    

In [35]:
# Validar que los tipos de datos sean los correctos
print(df_cwlpd.dtypes)

CountryRegion          object
Confirmed               int64
Deaths                  int64
Recovered               int64
Active                  int64
NewCases                int64
NewDeaths               int64
NewRecovered            int64
Deaths100Cases        float64
Recovered100Cases     float64
Deaths100Recovered    float64
ConfirmedLastWeek       int64
1WeekChange             int64
1WeekIncrease         float64
WhoRegion              object
dtype: object


In [36]:
# Escribir el dataframe
df_cwlpd.to_parquet('covid19/pd/country_wise_latest.parquet')


### Cargar el archivo covid_19_clean_complete utilizando pandas y escribir archivo parquet 

In [37]:
# Cargar en un DtaFrame de pandas
df_cccpd = pd.read_csv("covid19/covid_19_clean_complete.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 10 columnas
print("Número de columnas:", df_cccpd.shape[1])

Número de columnas: 10


In [38]:
# Corregir el nombre de las columnas
df_cccpd.columns = ['ProvinceState', 'CountryRegion', 'Lat','Long','Date','Confirmed','Deaths','Recovered','Active','WhoRegion']

In [39]:
# Realizar casteo 
df_cccpd['Date'] = pd.to_datetime(df_cccpd['Date'])

In [40]:
# Validar que los tipos de datos sean los correctos
print(df_cccpd.dtypes)

ProvinceState            object
CountryRegion            object
Lat                     float64
Long                    float64
Date             datetime64[ns]
Confirmed                 int64
Deaths                    int64
Recovered                 int64
Active                    int64
WhoRegion                object
dtype: object


In [41]:
# Validar el Dataframe
print(df_cccpd)

      ProvinceState          CountryRegion        Lat       Long       Date  \
0               NaN            Afghanistan  33.939110  67.709953 2020-01-22   
1               NaN                Albania  41.153300  20.168300 2020-01-22   
2               NaN                Algeria  28.033900   1.659600 2020-01-22   
3               NaN                Andorra  42.506300   1.521800 2020-01-22   
4               NaN                 Angola -11.202700  17.873900 2020-01-22   
...             ...                    ...        ...        ...        ...   
49063           NaN  Sao Tome and Principe   0.186400   6.613100 2020-07-27   
49064           NaN                  Yemen  15.552727  48.516388 2020-07-27   
49065           NaN                Comoros -11.645500  43.333300 2020-07-27   
49066           NaN             Tajikistan  38.861000  71.276100 2020-07-27   
49067           NaN                Lesotho -29.610000  28.233600 2020-07-27   

       Confirmed  Deaths  Recovered  Active        

In [42]:
# Escribir el dataframe
df_cccpd.to_parquet('covid19/pd/covid_19_clean_complete.parquet')

### Cargar el archivo day_wise utilizando pandas y escribir archivo parquet 

In [43]:
# Cargar en un DtaFrame de pandas
df_dwpd = pd.read_csv("covid19/day_wise.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 12 columnas
print("Número de columnas:", df_dwpd.shape[1])

Número de columnas: 12


In [44]:
# Corregir el nombre de las columnas
df_dwpd.columns = ['Date', 'Confirmed', 'Deaths','Recovered','Active','NewCases','NewDeaths','NewRecovered','Deaths100Cases','Recovered100Cases','Deaths100Recovered','NoOfContries']

In [45]:
# Realizar casteo de las columnas
df_dwpd['Date'] = pd.to_datetime(df_dwpd['Date'])

In [46]:
# Validar que los tipos de datos sean los correctos
print(df_dwpd.dtypes)

Date                  datetime64[ns]
Confirmed                      int64
Deaths                         int64
Recovered                      int64
Active                         int64
NewCases                       int64
NewDeaths                      int64
NewRecovered                   int64
Deaths100Cases               float64
Recovered100Cases            float64
Deaths100Recovered           float64
NoOfContries                   int64
dtype: object


In [47]:
# Validar el Dataframe
print(df_dwpd)

          Date  Confirmed  Deaths  Recovered   Active  NewCases  NewDeaths  \
0   2020-01-22        555      17         28      510         0          0   
1   2020-01-23        654      18         30      606        99          1   
2   2020-01-24        941      26         36      879       287          8   
3   2020-01-25       1434      42         39     1353       493         16   
4   2020-01-26       2118      56         52     2010       684         14   
..         ...        ...     ...        ...      ...       ...        ...   
183 2020-07-23   15510481  633506    8710969  6166006    282756       9966   
184 2020-07-24   15791645  639650    8939705  6212290    281164       6144   
185 2020-07-25   16047190  644517    9158743  6243930    255545       4867   
186 2020-07-26   16251796  648621    9293464  6309711    204606       4104   
187 2020-07-27   16480485  654036    9468087  6358362    228693       5415   

     NewRecovered  Deaths100Cases  Recovered100Cases  Deaths100

In [48]:
# Escribir el dataframe
df_dwpd.to_parquet('covid19/pd/day_wise.parquet')

### Cargar el archivo full_grouped utilizando pandas y escribir archivo parquet 

In [49]:
# Cargar en un DtaFrame de pandas
df_fgpd = pd.read_csv("covid19/full_grouped.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 10 columnas
print("Número de columnas:", df_fgpd.shape[1])

Número de columnas: 10


In [50]:
# Corregir el nombre de las columnas
df_fgpd.columns = ['Date', 'CountryRegion', 'Confirmed','Deaths','Recovered','Active','NewCases','NewDeaths','NewRecovered','WhoRegion']

In [51]:
# Realizar casteo 
df_fgpd['Date'] = pd.to_datetime(df_fgpd['Date'])

In [52]:
# Validar que los tipos de datos sean los correctos
print(df_fgpd.dtypes)

Date             datetime64[ns]
CountryRegion            object
Confirmed                 int64
Deaths                    int64
Recovered                 int64
Active                    int64
NewCases                  int64
NewDeaths                 int64
NewRecovered              int64
WhoRegion                object
dtype: object


In [53]:
# Validar el Dataframe
print(df_fgpd)

            Date       CountryRegion  Confirmed  Deaths  Recovered  Active  \
0     2020-01-22         Afghanistan          0       0          0       0   
1     2020-01-22             Albania          0       0          0       0   
2     2020-01-22             Algeria          0       0          0       0   
3     2020-01-22             Andorra          0       0          0       0   
4     2020-01-22              Angola          0       0          0       0   
...          ...                 ...        ...     ...        ...     ...   
35151 2020-07-27  West Bank and Gaza      10621      78       3752    6791   
35152 2020-07-27      Western Sahara         10       1          8       1   
35153 2020-07-27               Yemen       1691     483        833     375   
35154 2020-07-27              Zambia       4552     140       2815    1597   
35155 2020-07-27            Zimbabwe       2704      36        542    2126   

       NewCases  NewDeaths  NewRecovered              WhoRegion

In [54]:
# Escribir el dataframe
df_fgpd.to_parquet('covid19/pd/full_grouped.parquet')

### Cargar el archivo usa_county_wise utilizando pandas y escribir archivo parquet 

In [55]:
# Cargar en un DtaFrame de pandas
df_ucwpd = pd.read_csv("covid19/usa_county_wise.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 14 columnas
print("Número de columnas:", df_ucwpd.shape[1])

Número de columnas: 14


In [56]:
# Corregir el nombre de las columnas
df_ucwpd.columns = ['UID', 'ISO2', 'ISO3','Code3','FIPS','Admin2','ProvinceState','CountryRegion','Lat','Long','CombinedKey','Date','Confirmed','Deaths']

In [57]:
# Realizar casteo 
df_ucwpd['Date'] = pd.to_datetime(df_ucwpd['Date'])

In [58]:
# Validar que los tipos de datos sean los correctos
print(df_ucwpd.dtypes)

UID                       int64
ISO2                     object
ISO3                     object
Code3                     int64
FIPS                    float64
Admin2                   object
ProvinceState            object
CountryRegion            object
Lat                     float64
Long                    float64
CombinedKey              object
Date             datetime64[ns]
Confirmed                 int64
Deaths                    int64
dtype: object


In [59]:
# Validar el Dataframe
print(df_ucwpd)

             UID ISO2 ISO3  Code3     FIPS          Admin2  \
0             16   AS  ASM     16     60.0             NaN   
1            316   GU  GUM    316     66.0             NaN   
2            580   MP  MNP    580     69.0             NaN   
3       63072001   PR  PRI    630  72001.0        Adjuntas   
4       63072003   PR  PRI    630  72003.0          Aguada   
...          ...  ...  ...    ...      ...             ...   
627915  84070016   US  USA    840      NaN    Central Utah   
627916  84070017   US  USA    840      NaN  Southeast Utah   
627917  84070018   US  USA    840      NaN  Southwest Utah   
627918  84070019   US  USA    840      NaN       TriCounty   
627919  84070020   US  USA    840      NaN    Weber-Morgan   

                   ProvinceState CountryRegion        Lat        Long  \
0                 American Samoa            US -14.271000 -170.132000   
1                           Guam            US  13.444300  144.793700   
2       Northern Mariana Islands    

In [60]:
# Escribir el dataframe
df_ucwpd.to_parquet('covid19/pd/usa_county_wise.parquet')

### Cargar el archivo worldometer_data utilizando pandas y escribir archivo parquet 

In [61]:
# Cargar en un DtaFrame de pandas
df_wdpd = pd.read_csv("covid19/worldometer_data.csv", sep=',', quotechar='"')

# Contar número de columnas, Deben ser 16 columnas
print("Número de columnas:", df_wdpd.shape[1])

Número de columnas: 16


In [62]:
# Corregir el nombre de las columnas
df_wdpd.columns = ['CountryRegion', 'Continent', 'Population','TotalCases','NewCases','TotalDeaths','NewDeaths','TotalRecovered','NewRecovered','ActiveCases','SeriousCritical','TotCases1MPop','Deaths1MPop','TotalTests','Tests1MPop','Region']

In [63]:
# Validar que los tipos de datos sean los correctos
print(df_wdpd.dtypes)

CountryRegion       object
Continent           object
Population         float64
TotalCases           int64
NewCases           float64
TotalDeaths        float64
NewDeaths          float64
TotalRecovered     float64
NewRecovered       float64
ActiveCases        float64
SeriousCritical    float64
TotCases1MPop      float64
Deaths1MPop        float64
TotalTests         float64
Tests1MPop         float64
Region              object
dtype: object


In [64]:
# Validar el Dataframe
print(df_wdpd)

             CountryRegion      Continent    Population  TotalCases  NewCases  \
0                      USA  North America  3.311981e+08     5032179       NaN   
1                   Brazil  South America  2.127107e+08     2917562       NaN   
2                    India           Asia  1.381345e+09     2025409       NaN   
3                   Russia         Europe  1.459409e+08      871894       NaN   
4             South Africa         Africa  5.938157e+07      538184       NaN   
..                     ...            ...           ...         ...       ...   
204             Montserrat  North America  4.992000e+03          13       NaN   
205  Caribbean Netherlands  North America  2.624700e+04          13       NaN   
206       Falkland Islands  South America  3.489000e+03          13       NaN   
207           Vatican City         Europe  8.010000e+02          12       NaN   
208         Western Sahara         Africa  5.986820e+05          10       NaN   

     TotalDeaths  NewDeaths

In [65]:
# Escribir el dataframe
df_wdpd.to_parquet('covid19/pd/worldometer_data.parquet')

### Crear Scripts para generar diagrama usando https://dbdiagram.io/

In [90]:
# Crear funcion que escriba create table en el formato de https://dbdiagram.io/ para hacer el DER de los dataframes
def generate_table(df,table_name):
    # Obtener los tipos de datos de cada columna
    column_types = {col: df[col].dtype for col in df.columns}
    
    # Construir la sentencia TABLE
    create_table_sql = f"TABLE {table_name} "
    create_table_sql += "{\n"
    for col, col_type in column_types.items():
        if col_type == 'object':
            col_type_sql = 'VARCHAR'
        elif col_type == 'int64':
            col_type_sql = 'INT'
        elif col_type == 'float64':
            col_type_sql = 'FLOAT'
        elif col_type == 'datetime64[ns]':
            col_type_sql = 'DATE'
        else:
            col_type_sql = 'VARCHAR(1000)'  # Tipo por defecto si no se reconoce el tipo de dato
    
        create_table_sql += f"{col} {col_type_sql} \n"
    
    create_table_sql = create_table_sql.rstrip(' ')  
    create_table_sql += "}"
    
    return create_table_sql

In [95]:
# Crear Script para la tabla country_wise_latest
print(generate_table(df_cwlpd,'country_wise_latest'))

TABLE country_wise_latest {
CountryRegion VARCHAR 
Confirmed INT 
Deaths INT 
Recovered INT 
Active INT 
NewCases INT 
NewDeaths INT 
NewRecovered INT 
Deaths100Cases FLOAT 
Recovered100Cases FLOAT 
Deaths100Recovered FLOAT 
ConfirmedLastWeek INT 
1WeekChange INT 
1WeekIncrease FLOAT 
WhoRegion VARCHAR 
}


In [96]:
# Crear Script para la tabla covid_19_clean_complete
print(generate_table(df_cccpd,'covid_19_clean_complete'))

TABLE covid_19_clean_complete {
ProvinceState VARCHAR 
CountryRegion VARCHAR 
Lat FLOAT 
Long FLOAT 
Date DATE 
Confirmed INT 
Deaths INT 
Recovered INT 
Active INT 
WhoRegion VARCHAR 
}


In [97]:
# Crear Script para la tabla day_wise
print(generate_table(df_dwpd,'day_wise'))

TABLE day_wise {
Date DATE 
Confirmed INT 
Deaths INT 
Recovered INT 
Active INT 
NewCases INT 
NewDeaths INT 
NewRecovered INT 
Deaths100Cases FLOAT 
Recovered100Cases FLOAT 
Deaths100Recovered FLOAT 
NoOfContries INT 
}


In [98]:
# Crear Script para la tabla full_grouped
print(generate_table(df_fgpd,'full_grouped'))

TABLE full_grouped {
Date DATE 
CountryRegion VARCHAR 
Confirmed INT 
Deaths INT 
Recovered INT 
Active INT 
NewCases INT 
NewDeaths INT 
NewRecovered INT 
WhoRegion VARCHAR 
}


In [99]:
# Crear Script para la tabla usa_county_wise
print(generate_table(df_ucwpd,'usa_county_wise'))

TABLE usa_county_wise {
UID INT 
ISO2 VARCHAR 
ISO3 VARCHAR 
Code3 INT 
FIPS FLOAT 
Admin2 VARCHAR 
ProvinceState VARCHAR 
CountryRegion VARCHAR 
Lat FLOAT 
Long FLOAT 
CombinedKey VARCHAR 
Date DATE 
Confirmed INT 
Deaths INT 
}


In [100]:
# Crear Script para la tabla worldometer_data
print(generate_table(df_wdpd,'worldometer_data'))

TABLE worldometer_data {
CountryRegion VARCHAR 
Continent VARCHAR 
Population FLOAT 
TotalCases INT 
NewCases FLOAT 
TotalDeaths FLOAT 
NewDeaths FLOAT 
TotalRecovered FLOAT 
NewRecovered FLOAT 
ActiveCases FLOAT 
SeriousCritical FLOAT 
TotCases1MPop FLOAT 
Deaths1MPop FLOAT 
TotalTests FLOAT 
Tests1MPop FLOAT 
Region VARCHAR 
}
