In [1]:
import os
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
from typing import Optional

In [2]:
# Configuração do caminho para o arquivo de credenciais
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../../key.json"

# Configuração do cliente BigQuery
client = bigquery.Client()
gsclient = storage.Client()

In [3]:
def load_bigquery(query: str, project_id: str = "bankmarketingdatapipeline", name_view: Optional[str] = None) -> pd.DataFrame:
    """
    Executa uma consulta no BigQuery e retorna os resultados como um DataFrame.

    Args:
        query (str): Consulta SQL a ser executada.
        project_id (str): ID do projeto do BigQuery.
        name_view (Optional[str]): Nome da visualização a ser criada (opcional).
    
    Returns:
        pd.DataFrame: Resultados da consulta como um DataFrame.
    """
    if name_view:
        create_view(project_id, query, name_view)
    
    query_job = client.query(query)
    return query_job.to_dataframe()


In [4]:
def create_view(project_id: str, query: str, name_view: str) -> None:
    """
    Cria ou substitui uma visualização no BigQuery.

    Args:
        project_id (str): ID do projeto no BigQuery.
        query (str): Consulta SQL a ser usada na visualização.
        name_view (str): Nome da visualização a ser criada ou substituída.
    """
    view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{name_view}` AS {query}
    """
    client.query(view_query).result()  # Executa a criação da visualização
    print(f"View '{name_view}' criada com sucesso.")


In [5]:
def load_data_incrementally(uri: str, table_id: str, job_config: bigquery.LoadJobConfig) -> None:
    """
    Carrega dados incrementais para a tabela no BigQuery.
    
    Args:
        uri (str): URI do arquivo de dados a ser carregado.
        table_id (str): ID da tabela onde os dados serão carregados.
        job_config (bigquery.LoadJobConfig): Configuração do job de carregamento.
    """
    load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
    load_job.result()  # Aguarda a conclusão do job
    print(f"Dados carregados incrementalmente para a tabela '{table_id}'.")


In [6]:
def update_table_description(table_id: str, description: str) -> None:
    """
    Atualiza a descrição da tabela no BigQuery.

    Args:
        table_id (str): ID da tabela a ser atualizada.
        description (str): Nova descrição para a tabela.
    """
    table = client.get_table(table_id)
    table.description = description
    client.update_table(table, ["description"])
    print(f"Descrição da tabela '{table_id}' atualizada.")


In [None]:
# Configuração de carga incremental
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("order_id", "INTEGER", mode="REQUIRED", description="ID único do pedido"),
        bigquery.SchemaField("order_date", "TIMESTAMP", mode="REQUIRED", description="Data e hora do pedido"),
        bigquery.SchemaField("order_customer_id", "INTEGER", mode="REQUIRED", description="ID do cliente"),
        bigquery.SchemaField("order_status", "STRING", mode="REQUIRED", description="Status do pedido")
    ],
    write_disposition="WRITE_APPEND",  # Incremental: adicionar dados sem sobrescrever
)



In [17]:
# Função para carregar dados incrementais no BigQuery com verificação de duplicidade
def load_data_incrementally_with_check(uri: str, table_id: str, job_config: bigquery.LoadJobConfig):
    # Carregar dados temporários para uma tabela temporária
    temp_table_id = table_id + "_temp"
    load_job = client.load_table_from_uri(uri, temp_table_id, job_config=job_config)
    load_job.result()  # Aguarda a conclusão do job

    print(f"Dados carregados temporariamente para a tabela '{temp_table_id}'.")

    # Realizar o merge (evitar duplicação de 'order_id')
    merge_query = f"""
    MERGE `{table_id}` AS target
    USING `{temp_table_id}` AS source
    ON target.order_id = source.order_id
    WHEN NOT MATCHED THEN
        INSERT (order_id, order_date, order_customer_id, order_status)
        VALUES (source.order_id, source.order_date, source.order_customer_id, source.order_status)
    """
    
    client.query(merge_query).result()  # Executa o merge
    print(f"Merge realizado. Dados inseridos na tabela '{table_id}'.")

    # Deletar a tabela temporária
    client.delete_table(temp_table_id)
    print(f"Tabela temporária '{temp_table_id}' deletada.")

# Configuração de carga incremental (com verificação de duplicidade)
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("order_id", "INTEGER", mode="REQUIRED", description="ID único do pedido"),
        bigquery.SchemaField("order_date", "TIMESTAMP", mode="REQUIRED", description="Data e hora do pedido"),
        bigquery.SchemaField("order_customer_id", "INTEGER", mode="REQUIRED", description="ID do cliente"),
        bigquery.SchemaField("order_status", "STRING", mode="REQUIRED", description="Status do pedido")
    ],
    write_disposition="WRITE_APPEND",  # Incremental: adicionar dados sem sobrescrever
)




In [19]:
uri = "gs://raw_retail/orders/part-00008"
table_id = "bankmarketingdatapipeline.db_retail.trusted_order"

# Carregar dados de forma incremental
load_data_incrementally(uri, table_id, job_config)

# Atualizar a descrição da tabela
update_table_description(table_id, "Tabela contendo os pedidos confiáveis do banco de dados de varejo.")


BadRequest: 400 Error while reading data, error message: CSV processing encountered too many errors, giving up. Rows: 2; errors: 2; max bad: 0; error percent: 0; reason: invalid, message: Error while reading data, error message: CSV processing encountered too many errors, giving up. Rows: 2; errors: 2; max bad: 0; error percent: 0; reason: invalid, location: gs://raw_retail/orders/part-00008, message: Error while reading data, error message: Unable to parse; line_number: 1 byte_offset_to_start_of_line: 0 column_index: 0 column_name: "order_id" column_type: INT64 value: "order_id" File: gs://raw_retail/orders/part-00008; reason: invalid, location: gs://raw_retail/orders/part-00008, message: Error while reading data, error message: CSV table references column position 3, but line contains only 1 columns.; line_number: 3 byte_offset_to_start_of_line: 93 column_index: 3 column_name: "order_status" column_type: STRING File: gs://raw_retail/orders/part-00008; reason: invalid, message: You are loading data without specifying data format, data will be treated as CSV format by default. If this is not what you mean, please specify data format by --source_format.

In [18]:
# Caminho do arquivo mockado
uri = "gs://raw_retail/orders/part-00008"
table_id = "bankmarketingdatapipeline.db_retail.trusted_order"

# Carregar dados incrementais com verificação de duplicidade
load_data_incrementally_with_check(uri, table_id, job_config)

BadRequest: 400 Error while reading data, error message: CSV processing encountered too many errors, giving up. Rows: 2; errors: 2; max bad: 0; error percent: 0; reason: invalid, message: Error while reading data, error message: CSV processing encountered too many errors, giving up. Rows: 2; errors: 2; max bad: 0; error percent: 0; reason: invalid, location: gs://raw_retail/orders/part-00008, message: Error while reading data, error message: Unable to parse; line_number: 1 byte_offset_to_start_of_line: 0 column_index: 0 column_name: "order_id" column_type: INT64 value: "order_id" File: gs://raw_retail/orders/part-00008; reason: invalid, location: gs://raw_retail/orders/part-00008, message: Error while reading data, error message: CSV table references column position 3, but line contains only 1 columns.; line_number: 3 byte_offset_to_start_of_line: 93 column_index: 3 column_name: "order_status" column_type: STRING File: gs://raw_retail/orders/part-00008; reason: invalid, message: You are loading data without specifying data format, data will be treated as CSV format by default. If this is not what you mean, please specify data format by --source_format.