### SETUP INICIAL DO PROJETO

In [1]:

#importação das bibliotecase e pacotes necessários para a análise

import json
import numpy as np
import os
import pandas as pd
import pandas_gbq as gbq
import re
import seaborn as sns
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud.bigquery_storage import BigQueryReadClient
from google.oauth2 import service_account

# Carrega o .env: onde estão as credenciais do projeto/repositório
load_dotenv("/mnt/c/Users/wrpen/OneDrive/Desktop/df_lh/.env")

# Detectar ambiente: como eu estou usando wsl-ubuntu, no VS Code  -  Windows, estava dando conflitos de path
if os.name == "nt":  # se Windows
    credentials_path = r"C:\Temp\desafiolh-445818-3cb0f62cb9ef.json"
else:  # se WSL/Linux
    credentials_path = "/mnt/c/Temp/desafiolh-445818-3cb0f62cb9ef.json"

# Parâmetros injetados pelo Papermill ou definidos manualmente, caso não existam no ambiente
# Tables_to_process: lista de tabelas que serão processadas
# Output_dataset: nome do dataset onde os dados processados serão armazenados, neste caso, raw_data_cleaned
if 'tables_to_process' not in locals():
    tables_to_process = [
        "desafioadventureworks-446600.raw_data.sales-customer"       
    ]

if 'output_dataset' not in locals():
    output_dataset = "desafioadventureworks-446600.raw_data_cleaned"

# Configs do cliente BigQuery: input de project e location de acordo com dados no Bigquery
credentials = service_account.Credentials.from_service_account_file(credentials_path)
client = bigquery.Client(credentials=credentials, project=os.getenv("BIGQUERY_PROJECT"), location="us-central1")


In [2]:
# Print com a tabela que vai ser processada nesse notebook

print("Tabelas a processar:", tables_to_process)

Tabelas a processar: ['desafioadventureworks-446600.raw_data.sales-customer']


In [3]:
# Nome do dataset no Bigquery com os dados brutos (.csv) extraídos pelo Meltano 
dataset_id = 'raw_data'
print(dataset_id)

# Lista de tabelas do dataset raw_data no Bigquery
tables = client.list_tables('raw_data')
print("Tabelas disponíveis:")
for table in tables:
    print(table.table_id)

raw_data
Tabelas disponíveis:
humanresources-employee
person-address
person-businessentity
person-person
person-stateprovince
production-location
production-product
production-productcategory
production-productinventory
production-productsubcategory
sales-customer
sales-salesorderdetail
sales-salesorderheader
sales-salesterritory
sales-store


# Exploratory Data Analysis (EDA) e Data Cleaning

### Glossário dos dados:

O termo ''doc:'', situado no rodapé de algumas cells, indica algo como:

- documentação: documentar decisões, análises e resultados;

- abreviações de termos, como bkp, df, entre outros.

In [4]:
# Configuração para que o df exiba todas as colunas e todas as linhas completas, e também, exiba o formato numérico com 2 dígitos após a vírgula

pd.set_option('display.max_columns', None)
#pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', 10000)
pd.options.display.float_format = '{:.2f}'.format


#doc: df = dataframe  

In [5]:
# Dicionário para armazenar os df processados
df_processados = {}

# Iteração das tabelas e armazenamento em df
for input_table in tables_to_process:
    print(f"Processando tabela: {input_table}")
    
    # Nome da tabela com substituição de '-' por '_'
    table_name = input_table.split(".")[-1].replace("-", "_")  
    
    # Ler os dados da tabela do BigQuery para um df
    print("Lendo os dados do BigQuery...")
    query = f"SELECT * FROM `{input_table}`"
    table_data = client.query(query).to_dataframe()
    
    # Armazenar o df no dicionário
    df_processados[table_name] = table_data
    print(f"Tabela {table_name} processada e armazenada com sucesso.")

# Print de validação
print("Todas as tabelas foram processadas com sucesso!")


Processando tabela: desafioadventureworks-446600.raw_data.sales-customer
Lendo os dados do BigQuery...
Tabela sales_customer processada e armazenada com sucesso.
Todas as tabelas foram processadas com sucesso!


In [6]:
# Listar todas as variáveis criadas dinamicamente
for table_name in df_processados.keys():
    print(f"Variável criada: {table_name}")  

Variável criada: sales_customer


In [7]:
# Atribuir o df a uma variável com nome mais simples
sales_customer = df_processados['sales_customer']

print(f"Colunas: {sales_customer.shape[1]}\nLinhas: {sales_customer.shape[0]}")

Colunas: 6
Linhas: 317120


In [8]:
# Identificar duplicadas com base em 'customerid'
duplicadas = sales_customer[
    sales_customer.duplicated(subset=['customerid'], keep=False)
]

# Verificar se existem duplicadas
if not duplicadas.empty:
    # Ordenar duplicadas por 'customerid' e 'modifieddate'
    duplicadas_ordenadas = duplicadas.sort_values(by=['customerid', 'modifieddate'])

    # Exibir duplicadas ordenadas
    print("duplicadas ordenadas:")
    print(duplicadas_ordenadas)
else:
    print("Não foram encontradas duplicadas.")

duplicadas ordenadas:
        customerid  personid  storeid  territoryid                               rowguid                     modifieddate
0                1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
19820            1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
39640            1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
59460            1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
79280            1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
...            ...       ...      ...          ...                                   ...                              ...
221784       30118      1993     1994            3  2495b4eb-fe8b-459e-a1b6-dba25c04e626 2014-09-12 11:15:07.263000+00:00
24

In [9]:
# Remover duplicadas mantendo a última ocorrência com base em 'modifieddate', pois ela que indica a data da última modificação nos dados
# Importante, pois se houver erro na ingestão (duplicação), mantém os dados integros.

sales_customer = sales_customer.drop_duplicates(subset=['customerid'], keep='last')

print(f"Linhas após remover duplicadas (baseando-se na última 'modifieddate'): {len(sales_customer)}")

#bkp dos dados brutos
raw_data_bkp_2_sem_duplicadas = sales_customer.copy()


#doc: bkp = backup (cópia)

Linhas após remover duplicadas (baseando-se na última 'modifieddate'): 19820


In [10]:
# Ordenar e exibir o df por 'customerid'
sales_customer = sales_customer.sort_values(by=['customerid'])

print(sales_customer)

        customerid  personid  storeid  territoryid                               rowguid                     modifieddate
297300           1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
297301           2      <NA>     1028            1  e552f657-a9af-4a7d-a645-c429d6e02491 2014-09-12 11:15:07.263000+00:00
301065           3      <NA>      642            4  130774b1-db21-4ef3-98c8-c104bcd6ed6d 2014-09-12 11:15:07.263000+00:00
301066           4      <NA>      932            4  ff862851-1daa-4044-be7c-3e85583c054d 2014-09-12 11:15:07.263000+00:00
301067           5      <NA>     1026            4  83905bdc-6f5e-4f71-b162-c98da069f38a 2014-09-12 11:15:07.263000+00:00
...            ...       ...      ...          ...                                   ...                              ...
309611       30114      1985     1986            7  97154f3d-28f1-4b15-ae03-9518b781ece3 2014-09-12 11:15:07.263000+00:00
307727       30115      

In [11]:
# Iterar por todas as colunas do df, para verificar valores ausentes

# Verificar valores ausentes na coluna
for column in sales_customer.columns:   
    missing_rows = sales_customer[sales_customer[column].isnull()]
    print(f"Coluna '{column}': {missing_rows.shape[0]} linhas ausentes.")
    
# Mostrar as primeiras linhas ausentes, se preciso for, limitar o head() para dar menos outputs ou limitar os outputs
    if not missing_rows.empty:
        print(f"Exibindo as primeiras linhas com valores ausentes em '{column}':")
        print(missing_rows.head(), "\n")
    else:
        print(f"Nenhuma linha com valores ausentes em '{column}'.\n")


#doc: as colunas personid e storeid possuem muitos valores ausentes, mas manter essas colunas ainda fazem sentido, se tratado de ID

Coluna 'customerid': 0 linhas ausentes.
Nenhuma linha com valores ausentes em 'customerid'.

Coluna 'personid': 701 linhas ausentes.
Exibindo as primeiras linhas com valores ausentes em 'personid':
        customerid  personid  storeid  territoryid                               rowguid                     modifieddate
297300           1      <NA>      934            1  3f5ae95e-b87d-4aed-95b4-c3797afcb74f 2014-09-12 11:15:07.263000+00:00
297301           2      <NA>     1028            1  e552f657-a9af-4a7d-a645-c429d6e02491 2014-09-12 11:15:07.263000+00:00
301065           3      <NA>      642            4  130774b1-db21-4ef3-98c8-c104bcd6ed6d 2014-09-12 11:15:07.263000+00:00
301066           4      <NA>      932            4  ff862851-1daa-4044-be7c-3e85583c054d 2014-09-12 11:15:07.263000+00:00
301067           5      <NA>     1026            4  83905bdc-6f5e-4f71-b162-c98da069f38a 2014-09-12 11:15:07.263000+00:00 

Coluna 'storeid': 18484 linhas ausentes.
Exibindo as primeiras linha

In [12]:
# Valores únicos por coluna, para verificar se colunas como flags, normalmente booleanas, possuem apenas 1 ou 2 valores.

valores_unicos = sales_customer.nunique(dropna=False)

print("Valores únicos incluindo NaN:")
print(valores_unicos)

#doc: currentflag possue somente 1 valor, o que indica que pode ser somente valores True ou False.

Valores únicos incluindo NaN:
customerid      19820
personid        19120
storeid           702
territoryid        10
rowguid         19820
modifieddate        1
dtype: int64


In [13]:
#verificar informações do df
sales_customer.info()

<class 'pandas.core.frame.DataFrame'>
Index: 19820 entries, 297300 to 301064
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype              
---  ------        --------------  -----              
 0   customerid    19820 non-null  Int64              
 1   personid      19119 non-null  Int64              
 2   storeid       1336 non-null   Int64              
 3   territoryid   19820 non-null  Int64              
 4   rowguid       19820 non-null  object             
 5   modifieddate  19820 non-null  datetime64[us, UTC]
dtypes: Int64(4), datetime64[us, UTC](1), object(1)
memory usage: 1.1+ MB


In [14]:
# Atualizar o dicionário df_processados com o df ajustado
df_processados['sales_customer'] = sales_customer

In [15]:
# Padronizar colunas com valores textuais
sales_customer['rowguid'] = sales_customer['rowguid'].str.strip().str.upper()

print(sales_customer.head())

#doc: padronizar as strings nessa etapa, contribui para a execução das demais etapas do pipeline

        customerid  personid  storeid  territoryid                               rowguid                     modifieddate
297300           1      <NA>      934            1  3F5AE95E-B87D-4AED-95B4-C3797AFCB74F 2014-09-12 11:15:07.263000+00:00
297301           2      <NA>     1028            1  E552F657-A9AF-4A7D-A645-C429D6E02491 2014-09-12 11:15:07.263000+00:00
301065           3      <NA>      642            4  130774B1-DB21-4EF3-98C8-C104BCD6ED6D 2014-09-12 11:15:07.263000+00:00
301066           4      <NA>      932            4  FF862851-1DAA-4044-BE7C-3E85583C054D 2014-09-12 11:15:07.263000+00:00
301067           5      <NA>     1026            4  83905BDC-6F5E-4F71-B162-C98DA069F38A 2014-09-12 11:15:07.263000+00:00


In [16]:

# Garantir que apenas tabelas únicas sejam exportadas
unique_df_processados = {k: v for k, v in df_processados.items()}

# Exportar tabelas para o BigQuery
for table_name, df_cleaned in unique_df_processados.items():
    # Nome da tabela no BigQuery
    output_table = f"{output_dataset}.{table_name}"

    # Configurar job de exportação
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_TRUNCATE"  
    )
    
    # Exportar DataFrame para o BigQuery
    job = client.load_table_from_dataframe(df_cleaned, output_table, job_config=job_config)
    job.result()

    print(f"Tabela {table_name} exportada com sucesso para {output_table}.")

Tabela sales_customer exportada com sucesso para desafioadventureworks-446600.raw_data_cleaned.sales_customer.
