In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage, bigquery
import json
import logging

@functions_framework.http
def ETL_reviews_bigquery(request):
    try:
        # Obtén la carpeta en el bucket desde los parámetros de la URL
        estado = request.args.get('estado')

        # Verifica si se proporcionó la carpeta en la URL
        if not estado:
            return "Error: El parámetro 'estado' es necesario en la URL", 400

        # Define el nombre del bucket y la ruta de la carpeta temporal
        bucket_name = 'data_lake_pf_henry'
        bucket_path = f'Google Maps/reviews-estados/{estado}/'
        temporal_folder_path = 'temporal/'

        # Crea un cliente de Cloud Storage
        storage_client = storage.Client()

        # Obtiene el bucket
        bucket = storage_client.bucket(bucket_name)

        # Lista todos los blobs en la carpeta
        blobs = bucket.list_blobs(prefix=bucket_path)

        # Inicializa una lista para almacenar los DataFrames de cada archivo
        dfs = []

        # Itera sobre cada archivo en la carpeta
        for blob in blobs:
            # Descarga el contenido del blob como un string
            json_string = blob.download_as_text()

            # Dividir el string por nuevas líneas y cargar cada objeto JSON
            json_objects = [json.loads(j) for j in json_string.splitlines()]

            # Convierte la lista de objetos JSON en un DataFrame de Pandas
            df = pd.DataFrame(json_objects)
            dfs.append(df)

        # Concatena todos los DataFrames en uno solo
        df_combined = pd.concat(dfs)

        #ETL
        for columna in df_combined.columns:
            if df_combined[columna].dtype == 'object':
                # Si la columna es de texto, llenar los valores nulos con "No data" y eliminar espacios en blanco y saltos de línea
                df_combined[columna] = df_combined[columna].fillna("No data").str.strip()
            elif df_combined[columna].dtype in ['int64', 'float64']:
                # Si la columna es numérica, llenar los valores nulos con 0
                df_combined[columna] = df_combined[columna].fillna(0)
        
        # Eliminar duplicados
        df_combined.drop_duplicates(inplace=True)


        # Guarda el DataFrame combinado como un archivo CSV en Cloud Storage en la carpeta temporal
        csv_file_path = 'gs://{}/{}'.format(bucket_name, temporal_folder_path + 'combined_data.json')
        df_combined.to_json(csv_file_path, orient="records", lines=True)


        #CARGA EN BIG QUERY:
        client = bigquery.Client()

        # Especifica el nombre del conjunto de datos y la tabla
        proyect_id='proyecto-final-henry-412703'
        dataset_id = 'reviews_estados'  # Reemplaza 'proyecto-final-henry-412703' con tu ID de proyecto
        table_id = estado  # Puedes cambiar esto según tus preferencias
        full_table_id = f'{proyect_id}.{dataset_id}.{table_id}'
        
        # Configura el job de carga con autodetección de esquema
        job_config = bigquery.LoadJobConfig(
            autodetect=True,  # Permite a BigQuery autodetectar el esquema
            source_format="NEWLINE_DELIMITED_JSON",
            create_disposition="CREATE_IF_NEEDED"
        )

        # Inicia el job de carga
        load_job = client.load_table_from_uri(csv_file_path, full_table_id, job_config=job_config)

        # Espera a que el job se complete
        load_job.result()

        # Devuelve la respuesta HTTP con el JSON del DataFrame
        return "Carga exitosa!", 200
    
    except Exception as e:
        logging.error(f"Error en la función: {e}", exc_info=True)
        return f"Error: {e}", 500

ETL archivo business.pkl

In [None]:

def archivo_df_business(bucket_path):
#Filtrado segun nombre archivo para su respectivo ETL
            df_bussines=pd.read_pickle(bucket_path)
            df_bussines.to_csv(bucket_path, index=False)
            df_bussines=pd.read_csv(bucket_path)
            df_bussines.drop(columns=['attributes', 'hours','postal_code','business_id.1', 'name.1',
            'address.1', 'city.1', 'state.1', 'postal_code.1', 'latitude.1',
            'longitude.1', 'stars.1', 'review_count.1', 'is_open.1', 'attributes.1',
            'categories.1', 'hours.1'], inplace=True)
            for columna in df_bussines.columns:
                if df_bussines[columna].dtype == 'object':
                    # Si la columna es de texto, llenar los valores nulos con "No data" y eliminar espacios en blanco y saltos de línea
                    df_bussines[columna] = df_bussines[columna].fillna("No data").str.strip()
                elif df_bussines[columna].dtype in ['int64', 'float64']:
                    # Si la columna es numérica, llenar los valores nulos con 0
                    df_bussines[columna] = df_bussines[columna].fillna(0)
            return (df_bussines)
def archivo_df_tip (bucket_path):
    df_tip=pd.read_json(bucket_path, lines=True)
    df_tip.drop(columns="compliment_count", inplace=True)
    return (df_tip)
def archivo_df_user(bucket_path):
    df_user001=pd.read_parquet(bucket_path)
    df_user001.drop(columns=['elite','friends','compliment_hot','compliment_more', 'compliment_profile', 'compliment_cute','compliment_list', 'compliment_note', 'compliment_plain',
        'compliment_cool', 'compliment_funny', 'compliment_writer','compliment_photos'], inplace=True)
    return (df_user001)

@functions_framework.http
def ETL_reviews_bigquery(request):
    try:
        # Obtén la carpeta en el bucket desde los parámetros de la URL
        archivo = request.args.get('archivo')

        # Verifica si se proporcionó la carpeta en la URL
        if not archivo:
            return "Error: El parámetro 'archivo' es necesario en la URL", 400

        # Define el nombre del bucket y la ruta de la carpeta temporal
        bucket_name = 'data_lake_pf_henry'
        bucket_path = f'YELP/Yelp/{archivo}/'
        temporal_folder_path = 'temporal/'

        # Crea un cliente de Cloud Storage
        storage_client = storage.Client()

        # Obtiene el bucket
        bucket = storage_client.bucket(bucket_name)

        # Lista todos los blobs en la carpeta
        blobs = bucket.list_blobs(prefix=bucket_path)
        
        if "business" in bucket_path:
            df_combined=archivo_df_business(bucket_path)
        if "tip" in bucket_path:
            df_combined=archivo_df_tip(bucket_path)
        if "user-001" in bucket_path:
            df_combined=archivo_df_user(bucket_path)
        

        # Guarda el DataFrame combinado como un archivo CSV en Cloud Storage en la carpeta temporal
        csv_file_path = 'gs://{}/{}'.format(bucket_name, temporal_folder_path + 'combined_data.json')
        df_combined.to_json(csv_file_path, orient="records", lines=True)


        #CARGA EN BIG QUERY:
        client = bigquery.Client()

        # Especifica el nombre del conjunto de datos y la tabla
        proyect_id='proyecto-final-henry-412703'
        dataset_id = 'reviews_archivos'  # Reemplaza 'proyecto-final-henry-412703' con tu ID de proyecto
        table_id = archivo  # Puedes cambiar esto según tus preferencias
        full_table_id = f'{proyect_id}.{dataset_id}.{table_id}'
        
        # Configura el job de carga con autodetección de esquema
        job_config = bigquery.LoadJobConfig(
            autodetect=True,  # Permite a BigQuery autodetectar el esquema
            source_format="NEWLINE_DELIMITED_JSON",
            create_disposition="CREATE_IF_NEEDED"
        )

        # Inicia el job de carga
        load_job = client.load_table_from_uri(csv_file_path, full_table_id, job_config=job_config)

        # Espera a que el job se complete
        load_job.result()

        # Devuelve la respuesta HTTP con el JSON del DataFrame
        return "Carga exitosa!", 200
    
    except Exception as e:
        logging.error(f"Error en la función: {e}", exc_info=True)
        return f"Error: {e}", 500

In [None]:
def archivo_df_review(bucket_path):
    df_review=pd.read_json(bucket_path, lines=True)
    data_user001.drop(columns=['elite','friends','compliment_hot','compliment_more', 'compliment_profile', 'compliment_cute','compliment_list', 'compliment_note', 'compliment_plain',
        'compliment_cool', 'compliment_funny', 'compliment_writer','compliment_photos'], inplace=True)