# Obtención de datos

Los datos se obtienen clonando el repositorio de COVID19_Tweets_Dataset dentro del nodo master en un directorio con suficiente almacenamiento (más de 120 GB), en nuestro caso el directorio /datadrive.

```
cd /datadrive
git clone https://github.com/lopezbec/COVID19_Tweets_Dataset.git
```

Una vez hecho esto es necesario hacer una limpieza preliminar de los datos que consiste en simplemente remover aquellos directorios y archivos del dataset que no vamos a utilizar para que los scripts se ejecuten correctamente. Debemos remover las carpetas NYT_COVID_with_Reverse_Geo y Tweets_ID_Filter_requests así como todos los archivos restantes que no son carpetas, de modo que al final solo nos quedan las carpetas que corresponden a las siete tablas principales. Dentro de cada carpeta restante tenemos 12 carpetas que contienen la data de los 12 meses del 2022 además de archivos adicionales con análisis previos de las tablas, estos archivos deben ser removidos de modo que solo queden las carpetas.

```
rm archivo
rm -r carpeta
```

# Preprocesamiento de datos

Los scripts deben ser ejecutados desde el mismo directorio donde se clonó el dataset.

In [None]:
# Instalar de librerías necesarias
!pip install pandas
!pip install pyarrow
!pip install pyspark

In [None]:
# Importar dependencias
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import platform
from glob import glob
import os
import shutil
from pyspark.sql import SparkSession

# Definir la ruta del dataset
path = os.getcwd() + "/COVID19_Tweets_Dataset"
l = os.listdir(path)

In [None]:
# Mover los archivos de datos dentro de las carpetas de meses para ser accesibles directamente desde las carpetas principales de las tablas (solo se ejecuta una vez)
for direc in l:
    for mes in os.listdir(os.path.join(path, direc)):
        if not mes.endswith("csv"):
            if not os.listdir(os.path.join(path, direc, mes)):
                os.rmdir(os.path.join(path, direc, mes))
            else:
                for csv in os.listdir(os.path.join(path, direc, mes)):
                    shutil.move(os.path.join(path, direc, mes, csv), os.path.join(path, direc))

In [None]:
# Mapear cada archivo de datos con la tabla a la que pertenecen
tables = {}
for direct in l:
    tables[direct] = []
    for csv in os.listdir(os.path.join(path, direct)):
        if csv.endswith(".csv"):
            tables[direct].append(csv)

print(tables['Summary_Details'])

In [None]:
# Análisis de valores nulos en las tablas (opcional y no recomendado sobre todo el dataset)
for key in tables:
    print(key)
    for csv in tables[key]:
        df = pd.read_csv(os.path.join(path, key, csv))
        print(df.isnull().sum())
        break

In [None]:
# Preprocesamiento de los datos
for key in tables:
    print(key)
    for csv in tables[key]:
        if not os.path.exists(os.getcwd() + "/COVID19_Tweets_Dataset_Pq/" + key + "/" + f'{csv[:-4]}.parquet'):
            df = pd.read_csv(os.path.join(path, key, csv), dtype={'Country': str})

            if key == 'Summary_Details':
                # Remover la columna 'Country'
                df = df.drop('Country', axis=1)

            # Remover los NA
            df = df.dropna()

            if 'Tweet_ID' in df:
                # Cambiar la variable Tweet_ID de numérica a categórica
                df = df.astype({'Tweet_ID': str})

            # Remover espacios en nombre de columnas
            for col in df.columns:
                df = df.rename(columns={col: col.replace(" ", "_")})

            if not os.path.exists(os.getcwd() + "/COVID19_Tweets_Dataset_Pq/" + key):
                os.mkdir(os.getcwd() + "/COVID19_Tweets_Dataset_Pq/" + key)

            # Convertir los dataframes a formato parquet
            df.to_parquet(
                path=os.getcwd() + "/COVID19_Tweets_Dataset_Pq/" + key + "/" + f'{csv[:-4]}.parquet',
                engine='pyarrow',
                compression='gzip',
            )

# Lectura de datos parquet en el clúster HDFS

Una vez haya finalizado el preprocesamiento, puedes subir los archivos en formato parquet al clúster con los siguientes comandos:

```
hdfs dfs -mkdir -p /user/hadoop
hdfs dfs -mkdir -p datasets
hdfs dfs -put COVID19_Tweets_Dataset_Pq datasets
```

Luego de finalizar la subida, puedes probar que los archivos se lean correctamente con los siguientes scripts:

In [None]:
# Crear sesión de Spark
spark = SparkSession.builder.appName("HDFS Parquet Test").getOrCreate()

In [None]:
# Leer tabla Summary_Details y contar filas
df = spark.read.parquet(os.getcwd() + "/COVID19_Tweets_Dataset_Pq/Summary_Details")
df.count()