# __ETL__ _(Extract, Transform, Load)_

## Introducción

Este notebook se enfoca en el proceso de **ETL** utilizando datos extraídos de las plataformas Yelp y Google Maps. Este proceso implica una _extracccion,transformación y carga_ de los datos con el objetivo de prepararlos para análisis posteriores. Este paso es crucial en cualquier proyecto de ciencia de datos para garantizar la calidad y utilidad de los datos.

## Configuraciones Globales e Importaciones

En esta sección, se instalan e importan todas las librerías y/o módulos necesarios para el proceso ETL (Extract, Transform, Load) y se establecen configuraciones globales de ser requerido. Se utilizan las siguientes librerías y herramientas:

In [52]:
#Se conecta Google Colaboratory con Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Instalamos SPARK para manejar grande volumnes de datos

In [53]:
#Instala pyspark en Google Colaboratory
!pip install pyspark



## Importamos librerias necesarias

In [54]:
import os # Proporciona funciones para interactuar con el sistema operativo.
import requests # Se utiliza para realizar solicitudes HTTP.
import pandas as pd # Una librería de análisis de datos.
import seaborn as sns #S e utiliza para la visualización de datos.
import pyspark.pandas as ps # Proporciona una interfaz para trabajar con datos en Spark utilizando el formato de DataFrame de pandas.
import json # Se utiliza para trabajar con datos en formato JSON.
from pyspark.sql import SparkSession # Se utiliza para crear una instancia de SparkSession, que es la entrada principal para trabajar con Spark SQL.
from pyspark.sql import functions as F #  Proporciona funciones para trabajar con datos en Spark DataFrame.
from pyspark.sql.functions import array_contains # Esta función se utiliza para filtrar los datos basados en la presencia de un valor en un array.
from pyspark.sql.functions import sum, col # Se utiliza para acceder a una columna en un DataFrame de Spark.
from pyspark.sql.functions import split, substring, concat_ws, when, date_format
from pyspark.sql.functions import expr, regexp_replace, countDistinct, length

In [55]:
import warnings
warnings.filterwarnings("ignore")

In [56]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("ETL-ESTADOS")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

## RUTA DE LOS **ARCHIVOS**

In [57]:

# Rutas a los archivos JSON reviews-estados
file_path_NY = '/content/drive/MyDrive/Colab-Notebooks/reviews-estados/reviews-New_York'
file_path_CA = '/content/drive/MyDrive/Colab-Notebooks/reviews-estados/reviews-California'
file_path_WI = '/content/drive/MyDrive/Colab-Notebooks/reviews-estados/review-Wisconsin'

In [58]:
# Carga los JSON a un DataFrame de PySpark correspondiente
df_NY = spark.read.json(file_path_NY)
df_CA = spark.read.json(file_path_CA)
df_WI = spark.read.json(file_path_WI)

## **CONCATENACIÓN DE LOS ARCHIVOS**

In [59]:
# Concatenar los DataFrames
df = df_NY.union(df_CA).union(df_WI)

In [60]:
#Mostrar el DATAFRAME
df.show()

+--------------------+-----------------+----+------+----+--------------------+-------------+--------------------+
|             gmap_id|             name|pics|rating|resp|                text|         time|             user_id|
+--------------------+-----------------+----+------+----+--------------------+-------------+--------------------+
|0x89de09b4a848543...|      Sean Bailey|NULL|     3|NULL|Seems to be going...|1522976191648|11294936078038757...|
|0x89de09b4a848543...|  Rebekah Martino|NULL|     4|NULL|Great prices, pro...|1498137899161|10487654216930859...|
|0x89de09b4a848543...|Shirley Arrington|NULL|     4|NULL|Usually crowded w...|1497915516856|10310030037917646...|
|0x89de09b4a848543...|      Gees_Keeper|NULL|     1|NULL|pharmacy slow as ...|1497625448025|11320908474528403...|
|0x89de09b4a848543...|        Irene Ray|NULL|     5|NULL|             Love it|1514164540795|11173201801956000...|
|0x89de09b4a848543...|      Raúl Nieves|NULL|     5|NULL|         Best deals!|1519913588

### **CONTEO DE FILAS**

In [61]:
#Contamos filas
df.count()

6636482

### **VERIFICACIÓN DE NULOS**

In [62]:
# Cuenta el número de nulos en cada columna

def conteo_nulos(dataframe):
  conteo_nulos_por_columna = dataframe.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in dataframe.columns])

  # Muestra el resultado
  conteo_nulos_por_columna.show()

In [63]:
conteo_nulos(df)

+-------+----+-------+------+-------+-------+----+-------+
|gmap_id|name|   pics|rating|   resp|   text|time|user_id|
+-------+----+-------+------+-------+-------+----+-------+
|      0|   0|6413230|     0|5937784|2870737|   0|      0|
+-------+----+-------+------+-------+-------+----+-------+



### Eliminaremos las columnas que tienen el 90% de datos nulos

In [64]:
# Lista de columnas a eliminar
columnas_a_eliminar = ['pics', 'resp']

Buscamos por gmap_id a nuestro cliente, SGAMBATI'S para revisar que esten todos los datos los correctos y manipular y transformar aquellos que no lo estan para evitar perdidas importantes en nuestra base de dato

In [65]:
# Filtrar los datos por nombre y gmap_id
sgambatis_gmap_id = df.filter(col('gmap_id').like('0x8802e40e3b00b01b:0xbc746336817e4381'))

# Mostrar los datos de la columna gmap_id
sgambatis_gmap_id.show(truncate=False)

# Obtener el primer dato de la gmap_id
primer_dato_gmap_id = sgambatis_gmap_id.first()[0]

print("Primer dato en la columna gmap_id:", primer_dato_gmap_id)

+-------------------------------------+----------------------+----+------+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+---------------------+
|gmap_id                              |name                  |pics|rating|resp|text                                                                                                                                                                                                                                                                                         |time         |user_id              |
+-------------------------------------+----------------------+----+------+----+---------------------------------------------------------------------------------------------------------------------

Manipularemos la columna de *'text'* donde se encuentran los *NULL*

In [66]:
# Modificar los valores NULL en la columna 'text' que tienen referencia con "Sgambati"
df = df.withColumn('text',
                    when((col('gmap_id').like('0x8802e40e3b00b01b:0xbc746336817e4381')) & (col('text').isNull()),
                          'Modificacion por no tener comentarios')
                    .otherwise(col('text')))

# Mostrar los resultados modificados
df.filter(col('gmap_id').like('0x8802e40e3b00b01b:0xbc746336817e4381')).select('text').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Owners constantly watch you in person and on camera. Very uncomfortable. A lot of drama with staff. Bathroom smells, some toilets do no

Ya solucionado los datos *NULL* de nuestro cliente, procederemos a eliminar las filas con datos nulos

In [67]:
# Eliminar filas donde 'text' está vacío
df = df.dropna(subset=['text'])

# Verificar si se eliminaron las filas nulas
conteo_nulos(df)

+-------+----+-------+------+-------+----+----+-------+
|gmap_id|name|   pics|rating|   resp|text|time|user_id|
+-------+----+-------+------+-------+----+----+-------+
|      0|   0|3562649|     0|3288161|   0|   0|      0|
+-------+----+-------+------+-------+----+----+-------+



ELIMINEMOS LAS COLUMNAS 'PICS' Y 'RESP'

In [68]:
# Eliminar las columnas
df = df.drop(*columnas_a_eliminar)

# Mostrar el DataFrame resultante
df.show(truncate=False)

+-------------------------------------+-----------------+------+-------------------------------------------------------------------------+-------------+---------------------+
|gmap_id                              |name             |rating|text                                                                     |time         |user_id              |
+-------------------------------------+-----------------+------+-------------------------------------------------------------------------+-------------+---------------------+
|0x89de09b4a8485433:0xc87e593123ea4c9a|Sean Bailey      |3     |Seems to be going downhill, employees rude and always crowding the aisles|1522976191648|112949360780387570651|
|0x89de09b4a8485433:0xc87e593123ea4c9a|Rebekah Martino  |4     |Great prices, produce can be iffy                                        |1498137899161|104876542169308598040|
|0x89de09b4a8485433:0xc87e593123ea4c9a|Shirley Arrington|4     |Usually crowded with long lines to cash out.                 

### **VERIFICAMOS QUE NUESTRO CLIENTE SIGA EN LA BASE DE DATOS**

In [69]:
# Filtrar los datos por nombre y gmap_id
sgambatis_gmap_id2 = df.filter(col('gmap_id').like('0x8802e40e3b00b01b:0xbc746336817e4381'))

# Mostrar los datos de la columna gmap_id
sgambatis_gmap_id2.show(truncate=False)

# Obtener el primer dato de la gmap_id
primer_dato_gmap_id = sgambatis_gmap_id2.first()[0]

print("Primer dato en la columna gmap_id:", primer_dato_gmap_id)

+-------------------------------------+----------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+---------------------+
|gmap_id                              |name                  |rating|text                                                                                                                                                                                                                                                                                         |time         |user_id              |
+-------------------------------------+----------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------

### **VERIFICACIÓN DE DUPLICADOS**

In [70]:
# Eliminar duplicados basándote en todas las columnas
df_no_duplicates = df.dropDuplicates()

# Contar las filas después de eliminar duplicados
print("Número de filas después de eliminar duplicados:", df_no_duplicates.count())

Número de filas después de eliminar duplicados: 3564785


### **VERIFICAMOS QUE NUESTRO CLIENTE SIGA EN LA BASE DE DATOS**



In [71]:
# Filtrar los datos por nombre y gmap_id
sgambatis_gmap_id3 = df.filter(col('gmap_id').like('0x8802e40e3b00b01b:0xbc746336817e4381'))

# Mostrar los datos de la columna gmap_id
sgambatis_gmap_id3.show(truncate=False)

# Obtener el primer dato de la gmap_id
primer_dato_gmap_id = sgambatis_gmap_id3.first()[0]

print("Primer dato en la columna gmap_id:", primer_dato_gmap_id)

+-------------------------------------+----------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+---------------------+
|gmap_id                              |name                  |rating|text                                                                                                                                                                                                                                                                                         |time         |user_id              |
+-------------------------------------+----------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------

### **ARREGLAMOS LA COLUMNA TIME**

Para que este en un formato fecha aceptado por nuestra base de dato

In [72]:
# Transforma la columna "time" de milisegundos a timestamp.
df = df.withColumn("time", (col("time") / 1000).cast('timestamp'))

In [73]:
# Extrae solo la fecha en el formato "año-mes-día"
df = df.withColumn("date", date_format(col("time"), "yyyy-MM-dd"))

In [74]:
# Selecciona las columnas 'user_id' y 'name' para crear la tabla user.
user = df.select('user_id', 'name').dropDuplicates()

In [75]:
user.show()

+--------------------+-------------------+
|             user_id|               name|
+--------------------+-------------------+
|10200822310914408...|      Anna Gauthier|
|10899488233995060...|                R Y|
|11351926334456464...|     Kathy Cevallos|
|10922617511167842...|      Kevin Millman|
|11069281380017154...|         Mike Vance|
|10575795692280106...|          Wen Huang|
|11402712242834283...|      ranko colovic|
|10305602353116809...|      Frank Stathes|
|10651900844440853...|      Avi Feinstein|
|10979552747501323...|Nora Anthony-Taylor|
|11189013353711104...|            Ana Fer|
|11501759531064434...|      Amanda Rivera|
|10771487996118912...| samrat rajbhandari|
|11161365802707260...|             John B|
|10694392856845369...|              Jay S|
|10345501977728651...|     Beatrice Field|
|11150668802353367...|                Jon|
|10772362994856931...|         Gray Snake|
|11140538301416996...|    Brendal Shumway|
|11788867336835484...|      Nick Winchell|
+----------

In [76]:
# Reemplazar las comas en la columna 'text'
df = df.withColumn('text', regexp_replace(col('text'), ',', ''))

# Mostrar el DataFrame final
df.show()

+--------------------+-----------------+------+--------------------+--------------------+--------------------+----------+
|             gmap_id|             name|rating|                text|                time|             user_id|      date|
+--------------------+-----------------+------+--------------------+--------------------+--------------------+----------+
|0x89de09b4a848543...|      Sean Bailey|     3|Seems to be going...|2018-04-06 00:56:...|11294936078038757...|2018-04-06|
|0x89de09b4a848543...|  Rebekah Martino|     4|Great prices prod...|2017-06-22 13:24:...|10487654216930859...|2017-06-22|
|0x89de09b4a848543...|Shirley Arrington|     4|Usually crowded w...|2017-06-19 23:38:...|10310030037917646...|2017-06-19|
|0x89de09b4a848543...|      Gees_Keeper|     1|pharmacy slow as ...|2017-06-16 15:04:...|11320908474528403...|2017-06-16|
|0x89de09b4a848543...|        Irene Ray|     5|             Love it|2017-12-25 01:15:...|11173201801956000...|2017-12-25|
|0x89de09b4a848543...|  

In [77]:
# Verificar si la columna 'rating' contiene solo valores numéricos
has_only_numeric_values = df.filter(col("user_id").cast("float").isNotNull()).count() == df.count()

if has_only_numeric_values:
    print("La columna 'user_id' contiene solo valores numéricos.")
else:
    print("La columna 'user_id' contiene valores que no son numéricos.")

La columna 'user_id' contiene solo valores numéricos.


In [78]:
# Verificar si la columna 'rating' contiene solo valores numéricos
has_only_numeric_values = df.filter(col("rating").cast("float").isNotNull()).count() == df.count()

if has_only_numeric_values:
    print("La columna 'user_id' contiene solo valores numéricos.")
else:
    print("La columna 'user_id' contiene valores que no son numéricos.")

La columna 'user_id' contiene solo valores numéricos.


In [79]:
# Calcular la longitud de cada valor en la columna
lengths_df = df.withColumn("length", length("gmap_id"))

# Verificar si todos los valores tienen la misma longitud
same_length = lengths_df.select(countDistinct("length")).collect()[0][0]

if same_length == 1:
    print("La columna 'gmap_id' tiene la misma longitud en todas las filas.")
else:
    print("La columna 'gmap_id' tiene diferentes longitudes en las filas.")

La columna 'gmap_id' tiene diferentes longitudes en las filas.


In [80]:
# Convertir la columna 'rating' a tipo float, reemplazando los valores no numéricos por 0
df = df.withColumn("rating", when(col("rating").cast("float").isNotNull(), col("rating").cast("float")).otherwise(0.0))

# Verificar si la columna 'rating' contiene solo valores numéricos
has_only_numeric_values = df.filter(col("rating").cast("float").isNotNull()).count() == df.count()

if has_only_numeric_values:
    print("La columna 'rating' ahora contiene solo valores numéricos.")
else:
    print("La columna 'rating' contiene valores que no son numéricos.")

La columna 'rating' ahora contiene solo valores numéricos.


In [81]:
# Reemplazar las comas en todas las columnas
df_no_commas = df_no_duplicates.select([
    regexp_replace(col(column), ',', '').alias(column)
    for column in df_no_duplicates.columns
])

# Mostrar el DataFrame final sin comas en ninguna columna
df_no_commas.show()

+--------------------+--------------------+------+--------------------+-------------+--------------------+
|             gmap_id|                name|rating|                text|         time|             user_id|
+--------------------+--------------------+------+--------------------+-------------+--------------------+
|0x89deb689147034d...|     Martin Benedict|     5|Very good food  e...|1517945745290|11534398727851645...|
|0x89deb689147034d...|        Chris Bohler|     4|   Ask for Stephanie|1567557921887|11599446027886131...|
|0x89c25e789205122...|          Luis Nouel|     5|     Love this place|1554396146101|10535304662219838...|
|0x89c25e789205122...|      Melissa Davila|     4|Quick and tasty p...|1485799857229|11311589939540197...|
|0x89de7a00e3f61b3...|      Becky Straubel|     5|Excellent custome...|1487449909715|11672851848588995...|
|0x89dcdf75cd22fec...|    Vanessa Holzmann|     5|Still can find so...|1565462351211|10472080595879299...|
|0x89dcdf75cd22fec...|         Eagley

In [82]:
# Reemplazar las comillas en todas las columnas
df_no_quote = df_no_commas.select([
    regexp_replace(col(column), '"', '').alias(column)
    for column in df_no_duplicates.columns
])

In [83]:
# Filtrar los valores de la columna "rating" que están entre 0 y 5
df_filtered = df.filter((col("rating") >= 0) & (col("rating") <= 5))

In [84]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from textblob import TextBlob

# Función para determinar el sentimiento basado en el texto y la calificación
def get_sentiment(text, rating):
    if text:
        analysis = TextBlob(text)
        if analysis.sentiment.polarity > 0:
            return 1  # Sentimiento positivo
        elif analysis.sentiment.polarity < 0:
            return 2  # Sentimiento negativo
        else:
            return 0  # Sentimiento neutro
    else:
        if rating is not None:
            if rating >= 4:
                return 1  # Calificación alta considerada como sentimiento positivo
            elif rating <= 2:
                return 2  # Calificación baja considerada como sentimiento negativo
            else:
                return 0  # Calificación intermedia considerada como sentimiento neutro
        else:
            return 0  # Si tanto el texto como la calificación están vacíos, asignar 0

# Registrar la función como UDF
sentiment_udf = udf(lambda text, rating: get_sentiment(text, rating), IntegerType())


In [85]:
# Registrar la función como UDF
sentiment_udf = udf(lambda text, rating: get_sentiment(text, rating), IntegerType())

# Aplicar la función a la columna "review" y crear una nueva columna "sentiment"
df_with_rating_sentiment = df_filtered.withColumn("sentiment", sentiment_udf("text", "rating"))

# Mostrar el DataFrame resultante
df_with_rating_sentiment.show(truncate=False)

+-------------------------------------+-----------------+------+------------------------------------------------------------------------+-----------------------+---------------------+----------+---------+
|gmap_id                              |name             |rating|text                                                                    |time                   |user_id              |date      |sentiment|
+-------------------------------------+-----------------+------+------------------------------------------------------------------------+-----------------------+---------------------+----------+---------+
|0x89de09b4a8485433:0xc87e593123ea4c9a|Sean Bailey      |3.0   |Seems to be going downhill employees rude and always crowding the aisles|2018-04-06 00:56:31.648|112949360780387570651|2018-04-06|2        |
|0x89de09b4a8485433:0xc87e593123ea4c9a|Rebekah Martino  |4.0   |Great prices produce can be iffy                                        |2017-06-22 13:24:59.161|1048765421693085980

In [86]:
df_with_rating_sentiment.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sentiment: integer (nullable = true)



In [87]:
# Eliminar las columnas
df_with_rating_sentiment = df_with_rating_sentiment.drop('time')

# Mostrar el DataFrame resultante
df_with_rating_sentiment.show(truncate=False)

+-------------------------------------+-----------------+------+------------------------------------------------------------------------+---------------------+----------+---------+
|gmap_id                              |name             |rating|text                                                                    |user_id              |date      |sentiment|
+-------------------------------------+-----------------+------+------------------------------------------------------------------------+---------------------+----------+---------+
|0x89de09b4a8485433:0xc87e593123ea4c9a|Sean Bailey      |3.0   |Seems to be going downhill employees rude and always crowding the aisles|112949360780387570651|2018-04-06|2        |
|0x89de09b4a8485433:0xc87e593123ea4c9a|Rebekah Martino  |4.0   |Great prices produce can be iffy                                        |104876542169308598040|2017-06-22|1        |
|0x89de09b4a8485433:0xc87e593123ea4c9a|Shirley Arrington|4.0   |Usually crowded with long lines

In [94]:
# Eliminar los saltos de página de la columna 'text'
df = df_with_rating_sentiment.withColumn('text', regexp_replace('text', '\n', ''))

In [89]:
# Mostrar el DataFrame resultante
df.show(truncate=False)

+-------------------------------------+-----------------+------+------------------------------------------------------------------------+---------------------+----------+---------+
|gmap_id                              |name             |rating|text                                                                    |user_id              |date      |sentiment|
+-------------------------------------+-----------------+------+------------------------------------------------------------------------+---------------------+----------+---------+
|0x89de09b4a8485433:0xc87e593123ea4c9a|Sean Bailey      |3.0   |Seems to be going downhill employees rude and always crowding the aisles|112949360780387570651|2018-04-06|2        |
|0x89de09b4a8485433:0xc87e593123ea4c9a|Rebekah Martino  |4.0   |Great prices produce can be iffy                                        |104876542169308598040|2017-06-22|1        |
|0x89de09b4a8485433:0xc87e593123ea4c9a|Shirley Arrington|4.0   |Usually crowded with long lines

In [90]:
df.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- sentiment: integer (nullable = true)



In [91]:
# Cambiar el tipo de datos de la columna 'rating' a int
df = df.withColumn("rating", col("rating").cast("int"))

In [96]:
from pyspark.sql.functions import col, regexp_replace

df_co = df.select([
    regexp_replace(col(column), '"', '').alias(column)
    if column != 'time' else col(column)
    for column in df.columns
])

In [98]:
from pyspark.sql.functions import col

# Verificar si hay comillas en alguna columna
comillas = df.select([col(c).contains('"').alias(c) for c in df.columns]).where(col('gmap_id') == True)

# Verificar si hay saltos de página en alguna columna
saltos_pagina = df.select([col(c).contains('\n').alias(c) for c in df.columns]).where(col('gmap_id') == True)


# Verificar si hay saltos de página en alguna columna
comas = df.select([col(c).contains(',').alias(c) for c in df.columns]).where(col('gmap_id') == True)

# Contar el número de filas con comillas o saltos de página
num_filas_comillas = comillas.count()
num_filas_saltos_pagina = saltos_pagina.count()
num_filas_comas = comas.count()

print(f"Número de filas con comillas: {num_filas_comillas}")
print(f"Número de filas con saltos de página: {num_filas_saltos_pagina}")
print(f"Número de filas con comas: {num_filas_comas}")

Número de filas con comillas: 0
Número de filas con saltos de página: 0
Número de filas con comas: 0


In [100]:
# Eliminar los saltos de página de la columna 'text'
df_ff = df_co .withColumn('text', regexp_replace('text', '\n', ''))

### **CARGA ARCHIVO**

In [101]:
# Ruta al archivo CSV local
file_path = '/content/drive/MyDrive/Colab-Notebooks/transformaciones/estados-limpios9.csv'

# Escribe el DataFrame a un solo archivo CSV localmente con separador ","
df_ff.coalesce(4).write.csv(file_path, header=True, sep=',')