In [None]:
import functions_framework
import os
import tempfile
from google.cloud import storage, bigquery
import pandas as pd

@functions_framework.http
def process_and_save_to_bq(request):
    try:
        # Configuración
        bucket_name = "prueba_jd_01"
        #source_folder_name = "Google/metadata-sitios"
        destination_folder_name = "Clean_data"
        project_id = "prueba-406313"
        dataset_id = "prueba-406313.DB_Google_Yelp"
        table_id = "prueba-406313.DB_Google_Yelp.Restaurantes_Google"

        storage_client = storage.Client()
        bq_client = bigquery.Client()

        # Listar los objetos
        blobs = storage_client.get_bucket(bucket_name).list_blobs()#prefix=source_folder_name)

        for blob in blobs:
            if blob.name.endswith(".parquet"):
                # Descargar archivo Parquet
                temp_dir = tempfile.mkdtemp()
                local_file_path = os.path.join(temp_dir, os.path.basename(blob.name))
                blob.download_to_filename(local_file_path)

                # Cargar DataFrame desde archivo Parquet
                df = pd.read_parquet(local_file_path)

                # Realizar modificaciones en el DataFrame (eliminar nulos y duplicados, por ejemplo)
                df = df.dropna().drop_duplicates()

                # Subir el archivo Parquet modificado de vuelta al bucket
                #destination_blob_name = os.path.join(destination_folder_name, os.path.basename(blob.name))
                #destination_blob = storage_client.get_bucket(bucket_name).blob(destination_blob_name)
                #destination_blob.upload_from_filename(local_file_path)

                # Cargar el DataFrame modificado a BigQuery
                #dataset_ref = bq_client.dataset(dataset_id)
                #table_ref = dataset_ref.table(table_id)
                job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
                  #  source_format=bigquery.SourceFormat.PARQUET,
                #)
                #with open(local_file_path, "rb") as source_file:
                   # job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
                #job.result()

                df.to_gbq(destination_table=table_id, project_id=project_id, if_exists='replace', table_config=job_config)
                # Limpiar Directorio temporal
                os.remove(local_file_path)
                os.rmdir(temp_dir)

        return f"Tabla {table_id} actualizada en BigQuery"
    except Exception as e:
        return f"Error: {str(e)}"


## Prueba 2:

In [None]:
import functions_framework
import os
import tempfile
from google.cloud import storage, bigquery
import pandas as pd
import pandas_gbq

@functions_framework.http
def process_and_save_to_bq(request):
    try:
        # Configuración
        bucket_name = "prueba_jd_01"
        project_id = "prueba-406313"
        #dataset_id = "prueba-406313.DB_Google_Yelp"
        table_id = "prueba-406313.DB_Google_Yelp.Restaurantes_Google"

        storage_client = storage.Client()
        bq_client = bigquery.Client()

        # Listar los objetos
        blobs = storage_client.get_bucket(bucket_name).list_blobs()#prefix=source_folder_name)

        for blob in blobs:
            if blob.name.endswith(".parquet"):
                # Descargar archivo Parquet
                temp_dir = tempfile.mkdtemp()
                local_file_path = os.path.join(temp_dir, os.path.basename(blob.name))
                blob.download_to_filename(local_file_path)

                # Cargar DataFrame desde archivo Parquet
                df = pd.read_parquet(local_file_path)

                # Realizar modificaciones en el DataFrame (eliminar nulos y duplicados, por ejemplo)
                df = df.dropna().drop_duplicates()
                
                # Cargar el DataFrame modificado a BigQuery
                #dataset_ref = bq_client.dataset(dataset_id)
                #table_ref = dataset_ref.table(table_id)
                job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
                
                df.to_gbq(destination_table=table_id, project_id=project_id, if_exists='append', table_config=job_config)
                # Limpiar Directorio temporal
                os.remove(local_file_path)
                os.rmdir(temp_dir)

        return f"Tabla {table_id} actualizada en BigQuery"
    except Exception as e:
        return f"Error: {str(e)}"

## Prueba 3:

In [None]:
import functions_framework
import pandas as pd
from pandas.io import gbq
from google.cloud import storage, bigquery

@functions_framework.cloud_event
def etl_google_res(cloud_event):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
    event (dict): Event payload.
    context (google.cloud.functions.Context): Metadata for the event.
    """
    event = cloud_event.data
    lst = []
    file_name = event['name']
    table_name = file_name.split('.')[0]
    project_id = "prueba-406313"
    dataset_id = "prueba-406313.DB_Google_Yelp."

    # Event,File metadata details writing into Big Query
    dct={
    'Event_ID':cloud_event["id"],
    'Event_type':cloud_event["type"],
    'Bucket_name':event['bucket'],
    'File_name':event['name'],
    'Created':event['timeCreated'],
    'Updated':event['updated']
     }
    lst.append(dct)
    df_metadata = pd.DataFrame.from_records(lst)
    df_metadata.to_gbq(dataset_id+'Google_restaurants_metadata', 
                    project_id=project_id, 
                    if_exists='C')

     # Actual file data , writing to Big Query
    df_data = pd.read_parquet('gs://' + event['bucket'] + '/' + file_name)

    df_data = df_data.drop(columns="avg_rating")

    df_data.to_gbq(dataset_id + table_name, 
                    project_id=project_id, 
                    if_exists='append')

In [1]:
import pandas as pd
df = pd.read_parquet("Data/yelp_latin_reviews.parquet")
df.head()

Unnamed: 0,review_id,user_id,business_id,stars,text,date
0,UBp0zWyH60Hmw6Fsasei7w,4Uh27DgGzsp6PqrH913giQ,otQS34_MymijPTdNBoBdCw,4,The bun makes the Sonoran Dog. It's like a snu...,2011-10-27 17:12:05
1,jlvaJo1I56NrZ1Q1CUuuRw,17jzGkFYCvB5Q0fjJEzVAA,otQS34_MymijPTdNBoBdCw,4,I was told this place is a must for a Sonoran ...,2017-06-17 17:17:29
2,yx1IGiMSFDeuosuoRE1gpQ,EYg-VaZlk13-blZxyohLDg,JUlsvVAvZvGHWFfkKm0nlg,3,We went to this restaurant after running in th...,2010-11-23 13:45:24
3,7ouwt0DtmhHxjQpDwDkN7Q,dme9K6sQrgpojAay5r8IAA,JUlsvVAvZvGHWFfkKm0nlg,3,"I loves me some seitan, and that's why I keep ...",2010-03-12 23:41:38
4,7ym4hsISxBUwa3ugL7cL9Q,SEPSy_TUicGdTg-_72aOmQ,IwqFmo-RJs15WvA8PVHFnA,5,This place just opened up near rivers edge apa...,2018-05-19 15:04:56


In [2]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14539 entries, 0 to 14538
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   review_id    14539 non-null  object        
 1   user_id      14539 non-null  object        
 2   business_id  14539 non-null  object        
 3   stars        14539 non-null  int64         
 4   text         14539 non-null  object        
 5   date         14539 non-null  datetime64[ns]
dtypes: datetime64[ns](1), int64(1), object(4)
memory usage: 681.6+ KB
