## ETL Preliminar

In [158]:
%pip install nltk
from google.cloud import storage
import pyarrow.parquet as pq
import pickle
import io
import pandas as pd
import json
from datetime import datetime
import nltk
nltk.download("vader_lexicon")
from nltk.sentiment.vader import SentimentIntensityAnalyzer

Collecting nltk
  Using cached nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting regex>=2021.8.3 (from nltk)
  Using cached regex-2023.12.25-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (40 kB)
Using cached regex-2023.12.25-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (773 kB)
Installing collected packages: regex, nltk
Successfully installed nltk-3.8.1 regex-2023.12.25
Note: you may need to restart the kernel to use updated packages.


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/jupyter/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [2]:
# Crea una instancia de cliente de GCS
storage_client = storage.Client()

# Especifica la ruta del archivo en GCS
bucket_name = 'project_yelp_parquet'
folder_name = 'Yelp'

file_check_in = 'checkin.json'
file_path_check_in = f'{folder_name}/{file_check_in}'

file_tip = 'tip.json'
file_path_tip = f'{folder_name}/{file_tip}'

file_review = 'review.parquet'
file_path_review = f'{folder_name}/{file_review}'

file_user = 'user.parquet'
file_path_user = f'{folder_name}/{file_user}'

file_business = 'business.pkl'
file_path_business = f'{folder_name}/{file_business}'

file_metadata = 'metadata-sitios.parquet'
file_path_metadata = f'gs://{bucket_name}/{file_metadata}'

# Obtén el archivo JSON directamente desde GCS
bucket = storage_client.get_bucket(bucket_name)

blob_check_in = bucket.blob(file_path_check_in)
blob_tip = bucket.blob(file_path_tip)
blob_review = bucket.blob(file_path_review)
blob_user = bucket.blob(file_path_user)
blob_business = bucket.blob(file_path_business)

# Business

In [165]:
# Carga el archivo pkl como un DataFrame usando pickle
# Descargar el archivo pkl desde GCS como un flujo de bytes
file_content_business = blob_business.download_as_bytes()
df_business = pd.read_pickle(io.BytesIO(file_content_business))

In [166]:
df_business.columns

Index(['business_id', 'name', 'address', 'city', 'state', 'postal_code',
       'latitude', 'longitude', 'stars', 'review_count', 'is_open',
       'attributes', 'categories', 'hours', 'business_id', 'name', 'address',
       'city', 'state', 'postal_code', 'latitude', 'longitude', 'stars',
       'review_count', 'is_open', 'attributes', 'categories', 'hours'],
      dtype='object')

In [167]:
# Crear un contador para el sufijo
suffix_counter = {}

# Renombrar las columnas agregando sufijos
new_columns = []
for col in df_business.columns:
    if col in suffix_counter:
        # Columna duplicada: agregar sufijo '_duplicada'
        new_col = col + '_duplicada'
    else:
        # Columna no duplicada: agregar sufijo '_1' y actualizar contador
        suffix_counter[col] = 1
        new_col = col + ''
    new_columns.append(new_col)

# Asignar los nuevos nombres de columna al DataFrame
df_business.columns = new_columns

In [168]:
# Filtrar las columnas que contienen la palabra "duplicada" en su nombre
columnas_a_eliminar = [col for col in df_business.columns if 'duplicada' in col]

# Eliminar las columnas seleccionadas del DataFrame
df_business = df_business.drop(columns=columnas_a_eliminar)

In [169]:
df_business.columns

Index(['business_id', 'name', 'address', 'city', 'state', 'postal_code',
       'latitude', 'longitude', 'stars', 'review_count', 'is_open',
       'attributes', 'categories', 'hours'],
      dtype='object')

In [170]:
duplicates_business = df_business.duplicated('business_id')
print(duplicates_business.sum())

0


In [171]:
# Estados contenidos en 'state'
print(df_business['state'].unique())

[nan 'CA' 'MO' 'AZ' 'PA' 'TN' 'FL' 'IN' 'LA' 'AB' 'NV' 'ID' 'DE' 'IL' 'NJ'
 'NC' 'CO' 'WA' 'HI' 'UT' 'TX' 'MT' 'MI' 'SD' 'XMS' 'MA' 'VI' 'VT']


In [172]:
# Filtar registros de Florida
df_business = df_business[df_business['state'].isin(['FL'])]
print(df_business['state'].unique())

In [176]:
df_business.isna().sum()

business_id        0
name               0
address            0
city               0
state              0
postal_code        0
latitude           0
longitude          0
stars              0
review_count       0
is_open            0
attributes      2446
categories        18
hours           4108
dtype: int64

In [177]:
# Reemplazar los valores nulos en la columna 'hours' con "Categories not specified"
df_business['categories'].fillna('Categories not specified', inplace=True)

In [178]:
# Lista de palabras clave gatronomia
gastronomia = ['Restaurant', 'Restaurants','Food','Bars','Bar','Café','Coffeehouse','Bistro','Tavern','Buffet','Brewpub','Pub','Brasserie','Specialty Coffee Shop','Pub','Churrería','Diner','Dining','Teahouse','Tea Room','Gas Station', 'Gas','Fuel Station','Fuel']

# Filtrar los registros que contienen al menos una palabra clave en 'categories'
df_business = df_business[df_business['categories'].str.contains('|'.join(gastronomia))]

In [179]:
# Crear una lista de palabras clave de categorías de restaurantes étnicos
comida_etnica = ['Chinese','Indian','Thai','Italian','Greek','Helthy','Helth','Latin','Mexican','Tacos','Burritos','Enchiladas','Argentinian','Peruvian','Ceviche','Lomo','Pisco','Colombian','Empanadas','Arepas','Asian','Japanese','Sushi','Ramen','Sashimi','Tempura','Korean','Kimchi','Vietnamese','African','Ethiopian','Nigerian','Middle Eastern','Lebanese','Hummus','Falafel','Shawarma','Tabbouleh','Israeli','Shakshuka','Falafel','Hummus','Iranian','Healthy','Vegetarian','Vegan','Gluten-free','Gluten-Free','Fresh','Seasonal','Casual']

# Filtrar las filas donde la columna 'categories' contiene las palabras clave de restaurantes étnicos o 'Gas Stations'
filtro_categorias = '|'.join(comida_etnica + ['Gas Stations'])
df_business = df_business[df_business['categories'].str.contains(filtro_categorias, case=False, na=False)]

In [180]:
# Listado de Categorias resultantes
valores_unicos = df_business['categories'].unique()
print(valores_unicos)

['Food, Delis, Italian, Bakeries, Restaurants'
 'Restaurants, Automotive, Delis, Gas Stations, Food, Coffee & Tea, Sandwiches, Convenience Stores'
 'Cocktail Bars, Italian, Nightlife, Seafood, Bars, Restaurants' ...
 'Restaurants, Japanese, Ramen'
 'Convenience Stores, Gas Stations, Automotive, Food, Coffee & Tea'
 'Mexican, Shaved Ice, Restaurants, Food, Food Stands']


In [181]:
df_business.isna().sum()

business_id       0
name              0
address           0
city              0
state             0
postal_code       0
latitude          0
longitude         0
stars             0
review_count      0
is_open           0
attributes       62
categories        0
hours           511
dtype: int64

In [182]:
df_business.shape

(3779, 14)

In [184]:
# Reemplazar los valores nulos en la columna 'hours' con "Hours not specified"
df_business['hours'].fillna("Hours not specified", inplace=True)

In [186]:
# Lista de columnas permitidas en el orden deseado
columnas_permitidas = ['business_id', 'name', 'address', 'city', 'state', 'postal_code', 'latitude', 'longitude', 'stars', 'review_count', 'categories', 'hours']

# Filtrar las columnas que se van a eliminar
columnas_a_eliminar = [col for col in df_business.columns if col not in columnas_permitidas]

# Imprimir las columnas que se eliminarán
print("Columnas a eliminar:")
for col in columnas_a_eliminar:
    print(col)

# Filtrar las columnas permitidas
df_business = df_business[columnas_permitidas]

Columnas a eliminar:
is_open
attributes


In [187]:
df_business.info()

<class 'pandas.core.frame.DataFrame'>
Index: 3779 entries, 14 to 150283
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   business_id   3779 non-null   object
 1   name          3779 non-null   object
 2   address       3779 non-null   object
 3   city          3779 non-null   object
 4   state         3779 non-null   object
 5   postal_code   3779 non-null   object
 6   latitude      3779 non-null   object
 7   longitude     3779 non-null   object
 8   stars         3779 non-null   object
 9   review_count  3779 non-null   object
 10  categories    3779 non-null   object
 11  hours         3779 non-null   object
dtypes: object(12)
memory usage: 383.8+ KB


In [188]:
df_business.head(1)

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,categories,hours
14,0bPLkL0QhhPO5kt1_EXmNQ,Zio's Italian Market,2575 E Bay Dr,Largo,FL,33771,27.916116,-82.760461,4.5,100,"Food, Delis, Italian, Bakeries, Restaurants","{'Monday': '10:0-18:0', 'Tuesday': '10:0-20:0'..."


In [189]:
# Convertir el DataFrame a un archivo CSV en memoria
csv_buffer = io.StringIO()
df_business.to_csv(csv_buffer, index=False)
csv_content = csv_buffer.getvalue().encode('utf-8')

# Nombre del archivo en Google Cloud Storage
blob_name = 'archivos_csv/datasets/procesados/df_business_procesado.csv'

# Subir el archivo CSV al bucket
blob = bucket.blob(blob_name)
blob.upload_from_file(io.BytesIO(csv_content), content_type='text/csv')

# Check-in

In [190]:
# Lee el archivo JSON línea por línea y concatena los objetos JSON en una lista
data_check_in = []
for line in blob_check_in.download_as_text().split('\n'):
    try:
        json_obj = json.loads(line)
        data_check_in.append(json_obj)
    except json.JSONDecodeError:
        print(f"Error al decodificar la línea: {line}")

# Convierte los datos a un DataFrame de Pandas
df_check_in = pd.DataFrame(data_check_in)

Error al decodificar la línea: 


In [191]:
df_check_in.info()

Index(['business_id', 'date'], dtype='object')

In [192]:
# Filtrar df_check_in basado en los business_id de df_business
df_check_in = df_check_in[df_check_in['business_id'].isin(df_business['business_id'])]

In [193]:
df_check_in.shape

(3718, 2)

In [194]:
duplicates_check_in = df_check_in.duplicated('business_id')
print(duplicates_check_in.sum())

0


In [195]:
df_check_in.isna().sum()

business_id    0
date           0
dtype: int64

In [196]:
valor_date_checkin = df_check_in['date'].iloc[0]
valor_date_checkin

'2020-03-13 21:10:56, 2020-06-02 22:18:06, 2020-07-24 22:42:27, 2020-10-24 21:36:13, 2020-12-09 21:23:33, 2021-01-20 17:34:57, 2021-04-30 21:02:03, 2021-05-25 21:16:54, 2021-08-06 21:08:08, 2021-10-02 15:15:42, 2021-11-11 16:23:50'

In [197]:
# Función para normalizar una fecha individual en una cadena con múltiples fechas
def normalize_dates(date_str):
    dates_list = date_str.split(", ")
    return [datetime.strptime(date, '%Y-%m-%d %H:%M:%S') for date in dates_list]

# Normalizar la columna 'date' que contiene múltiples fechas en una sola cadena
df_check_in['date'] = df_check_in['date'].apply(normalize_dates)

# Explode la lista de fechas normalizadas en múltiples filas
df_check_in = df_check_in.explode('date')

In [198]:
# Renombrar la columna 'date' a 'date_and_hour'
df_check_in.rename(columns={'date': 'date_and_hour'}, inplace=True)

# Convertir la columna 'date_and_hour' a formato de cadena de texto
df_check_in['date_and_hour'] = df_check_in['date_and_hour'].astype(str)

# Dividir la columna 'date_and_hour' en dos nuevas columnas: 'date' y 'hour'
df_check_in[['date', 'hour']] = df_check_in['date_and_hour'].str.split(' ', expand=True)

# Convertir las columnas 'date' y 'hour' al formato datetime
df_check_in['date'] = pd.to_datetime(df_check_in['date'])
df_check_in['hour'] = pd.to_datetime(df_check_in['hour'], format='%H:%M:%S').dt.time

# Eliminar la columna 'date_and_hour'
df_check_in.drop(columns=['date_and_hour'], inplace=True)

In [199]:
df_check_in.info()

<class 'pandas.core.frame.DataFrame'>
Index: 599282 entries, 0 to 131910
Data columns (total 3 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   business_id  599282 non-null  object        
 1   date         599282 non-null  datetime64[ns]
 2   hour         599282 non-null  object        
dtypes: datetime64[ns](1), object(2)
memory usage: 18.3+ MB


In [200]:
# Reiniciar el índice
df_check_in = df_check_in.reset_index(drop=True)
df_check_in.head(12)

Unnamed: 0,business_id,date,hour
0,---kPU91CF4Lq2-WlRu9Lw,2020-03-13,21:10:56
1,---kPU91CF4Lq2-WlRu9Lw,2020-06-02,22:18:06
2,---kPU91CF4Lq2-WlRu9Lw,2020-07-24,22:42:27
3,---kPU91CF4Lq2-WlRu9Lw,2020-10-24,21:36:13
4,---kPU91CF4Lq2-WlRu9Lw,2020-12-09,21:23:33
5,---kPU91CF4Lq2-WlRu9Lw,2021-01-20,17:34:57
6,---kPU91CF4Lq2-WlRu9Lw,2021-04-30,21:02:03
7,---kPU91CF4Lq2-WlRu9Lw,2021-05-25,21:16:54
8,---kPU91CF4Lq2-WlRu9Lw,2021-08-06,21:08:08
9,---kPU91CF4Lq2-WlRu9Lw,2021-10-02,15:15:42


In [201]:
# Convertir el DataFrame df_check_in a un archivo CSV en memoria
csv_buffer = io.StringIO()
df_check_in.to_csv(csv_buffer, index=False)
csv_content = csv_buffer.getvalue().encode('utf-8')

# Nombre del archivo en Google Cloud Storage
blob_name = 'archivos_csv/df_check_in.csv'

# Subir el archivo CSV al bucket
blob = bucket.blob(blob_name)
blob.upload_from_file(io.BytesIO(csv_content), content_type='text/csv')

# Review

In [202]:
# Lee los archivos Parquet de review y user desde GCS
table_review = pq.read_table(blob_review.open('rb'))

# Convierte los datos a DataFrames de Pandas (opcional)
df_review = table_review.to_pandas()

In [203]:
df_review.info()

Index(['review_id', 'user_id', 'business_id', 'stars', 'useful', 'funny',
       'cool', 'text', 'date'],
      dtype='object')

In [204]:
# Filtrar df_review basado en los business_id de df_business
df_review = df_review[df_review['business_id'].isin(df_business['business_id'])]

In [205]:
df_review.shape

(352820, 9)

In [206]:
df_review.isna().sum()

review_id      0
user_id        0
business_id    0
stars          0
useful         0
funny          0
cool           0
text           0
date           0
dtype: int64

In [207]:
# Renombrar la columna 'date' a 'date_and_hour'
df_review.rename(columns={'date': 'date_and_hour'}, inplace=True)

# Convertir la columna 'date_and_hour' a formato de cadena de texto
df_review['date_and_hour'] = df_review['date_and_hour'].astype(str)

# Dividir la columna 'date_and_hour' en dos nuevas columnas: 'date' y 'hour'
df_review[['date', 'hour']] = df_review['date_and_hour'].str.split(' ', expand=True)

# Convertir las columnas 'date' y 'hour' al formato datetime
df_review['date'] = pd.to_datetime(df_review['date'])
df_review['hour'] = pd.to_datetime(df_review['hour'], format='%H:%M:%S').dt.time

# Eliminar la columna 'date_and_hour'
df_review.drop(columns=['date_and_hour'], inplace=True)

In [208]:
# Definir función para obtener la puntuación de sentimiento
def get_sentiment_score(text):
    if pd.isnull(text) or text == "":
        return 1  # Valor neutral si el texto está vacío o es NaN
    elif isinstance(text, str):
        # Realizar análisis de sentimiento
        sentiment = sia.polarity_scores(text)
        compound_score = sentiment["compound"]

        # Escalar la puntuación entre 1 y 5
        score = int(round(5 * compound_score))

        # Asignar la puntuación
        if score <= 2:
            return 1
        elif score <= 3:
            return 2
        elif score <= 4:
            return 3
        else:
            return 4
    else:
        return 1  # Valor neutral para datos que no son de tipo cadena

# Instanciar el modelo de análisis de sentimiento
sia = SentimentIntensityAnalyzer()

# Asegurarse de que la columna 'text' sea de tipo cadena
df_review["text"] = df_review["text"].astype(str)

# Aplicar la función de análisis de sentimiento a la columna 'text' y crear una nueva columna 'sentiment_analysis'
df_review["sentiment_analysis"] = df_review["text"].apply(get_sentiment_score)

# Convertir la columna 'sentiment_analysis' al tipo de dato flotante si es necesario
df_review["sentiment_analysis"] = df_review["sentiment_analysis"].astype(float)

In [209]:
# Lista de columnas permitidas en el orden deseado
columnas_permitidas = ['review_id', 'user_id', 'business_id', 'stars', 'date', 'hour', 'sentiment_analysis']

# Filtrar las columnas que se van a eliminar
columnas_a_eliminar = [col for col in df_review.columns if col not in columnas_permitidas]

# Imprimir las columnas que se eliminarán
print("Columnas a eliminar:")
for col in columnas_a_eliminar:
    print(col)

# Filtrar las columnas permitidas
df_review = df_review[columnas_permitidas]

Columnas a eliminar:
useful
funny
cool
text


In [210]:
# Verificar la información del DataFrame df_review
df_review.info()

<class 'pandas.core.frame.DataFrame'>
Index: 352820 entries, 9 to 6990266
Data columns (total 7 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   review_id           352820 non-null  object        
 1   user_id             352820 non-null  object        
 2   business_id         352820 non-null  object        
 3   stars               352820 non-null  float64       
 4   date                352820 non-null  datetime64[ns]
 5   hour                352820 non-null  object        
 6   sentiment_analysis  352820 non-null  float64       
dtypes: datetime64[ns](1), float64(2), object(4)
memory usage: 21.5+ MB


In [211]:
df_review.head(1)

Unnamed: 0,review_id,user_id,business_id,stars,date,hour,sentiment_analysis
9,pUycOfUwM8vqX7KjRRhUEA,59MxRhNVhU9MYndMkz0wtw,gebiRewfieSdtt17PTW6Zg,3.0,2016-07-25,07:31:06,4.0


In [212]:
# Convertir el DataFrame df_review a un archivo CSV en memoria
csv_buffer = io.StringIO()
df_review.to_csv(csv_buffer, index=False)
csv_content = csv_buffer.getvalue().encode('utf-8')

# Nombre del archivo en Google Cloud Storage
blob_name = 'archivos_csv/datasets/procesados/df_review_prosesado.csv'

# Subir el archivo CSV al bucket
blob = bucket.blob(blob_name)
blob.upload_from_file(io.BytesIO(csv_content), content_type='text/csv')