**Engenharia de dados - Case GB 2**

Este notebook desenvolvido no ambiente do Google Colab implementa o carregamento de dados de um banco de dados relacional instanciado na nuvem (PostgreSQL - GCP Cloud SQL) em tabelas de um Data Warehouse (GCP BigQuery).

In [4]:
from google.colab import auth
auth.authenticate_user()

In [6]:
# Initialize parameters
project_id="case-gb-2"
pg_instance = "gb-sales-db"
region="us-central1"
bq_dataset="gb_vendas"

!gcloud config set project {project_id}

Updated property [core/project].


In [7]:
# grant Cloud SQL Client role to authenticated user
current_user = !gcloud auth list --filter=status:ACTIVE --format="value(account)"

!gcloud projects add-iam-policy-binding {project_id} --member=user:{current_user[0]} --role="roles/cloudsql.client"

Updated IAM policy for project [case-gb-2].
bindings:
- members:
  - serviceAccount:service-868746573412@gcp-sa-artifactregistry.iam.gserviceaccount.com
  role: roles/artifactregistry.serviceAgent
- members:
  - serviceAccount:service-868746573412@gcp-sa-bigquerydatatransfer.iam.gserviceaccount.com
  role: roles/bigquerydatatransfer.serviceAgent
- members:
  - serviceAccount:868746573412@cloudbuild.gserviceaccount.com
  role: roles/cloudbuild.builds.builder
- members:
  - serviceAccount:service-868746573412@gcp-sa-cloudbuild.iam.gserviceaccount.com
  role: roles/cloudbuild.serviceAgent
- members:
  - user:tat.arruda1@gmail.com
  role: roles/cloudsql.client
- members:
  - serviceAccount:service-868746573412@cloudcomposer-accounts.iam.gserviceaccount.com
  role: roles/composer.serviceAgent
- members:
  - serviceAccount:service-868746573412@compute-system.iam.gserviceaccount.com
  role: roles/compute.serviceAgent
- members:
  - serviceAccount:service-868746573412@container-engine-robot.ia

In [8]:
# enable Cloud SQL Admin API
!gcloud services enable sqladmin.googleapis.com

In [9]:
!pip install "cloud-sql-python-connector[pg8000]"
from google.cloud.sql.connector import Connector
import pandas as pd
import datetime
from google.cloud import bigquery

INSTANCE_CONNECTION = f"{project_id}:{region}:{pg_instance}"
print(f"Your instance connection name is: {INSTANCE_CONNECTION}")
DB_USER = "postgres"
DB_PWD = "war1234"
DB_NAME = "base_vendas"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting cloud-sql-python-connector[pg8000]
  Downloading cloud_sql_python_connector-1.2.2-py2.py3-none-any.whl (35 kB)
Collecting aiohttp
  Downloading aiohttp-3.8.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
Collecting pg8000==1.29.4
  Downloading pg8000-1.29.4-py3-none-any.whl (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.4/51.4 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Collecting scramp>=1.4.3
  Downloading scramp-1.4.4-py3-none-any.whl (13 kB)
Collecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Your instance connection name is: case-gb-2:us-central1:gb-sales-db


In [10]:
# initialize Connector object
connector = Connector()

# function to return the database connection object
def getconn():
    conn = connector.connect(
        INSTANCE_CONNECTION,
        "pg8000",
        user=DB_USER,
        password=DB_PWD,
        db=DB_NAME
    )
    return conn

In [11]:
with getconn() as db_conn:

###  a. Tabela 1: Consolidado de vendas por ano e mês;
  consolidado_1_query = pd.read_sql_query("""
          SELECT
            v.id_marca, 
            v.marca, 
            v.id_linha, 
            v.linha,
            DATE_PART('month',TO_DATE(v.DATA_VENDA,'DD/MM/YYYY')) AS mes, 
            DATE_PART('year',TO_DATE(v.DATA_VENDA,'DD/MM/YYYY')) AS ano, 
            SUM(v.qtd_venda) AS qtd_venda 
          FROM vendas_gerais v 
          GROUP BY v.id_marca, v.marca, v.id_linha, v.linha, v.data_venda
          """, db_conn)
  
  consolidado_1_df = pd.DataFrame(consolidado_1_query, columns = ['id_marca', 'marca', 'id_linha', 'linha', 'mes', 'ano', 'qtd_venda'])

###  b. Tabela 2: Consolidado de vendas por marca e linha;
  
  consolidado_2_query = pd.read_sql_query("""
        SELECT
          v.id_marca, 
          v.marca, 
          v.id_linha, 
          v.linha, 
          SUM(v.qtd_venda) AS qtd_venda
        FROM vendas_gerais v 
        GROUP BY v.id_marca, v.marca, v.id_linha, v.linha""", db_conn)
  
  consolidado_2_df = pd.DataFrame(consolidado_2_query, columns = ['id_marca', 'marca', 'id_linha', 'linha', 'qtd_venda'])
  
##  c. Tabela 3: Consolidado de vendas por marca, ano e mês; 

  consolidado_3_query = pd.read_sql_query("""
      SELECT
        v.id_marca, 
        v.marca,  
        DATE_PART('month',TO_DATE(v.data_venda,'DD/MM/YYYY')) AS mes, 
        DATE_PART('year',TO_DATE(v.data_venda,'DD/MM/YYYY')) AS ano, 
        SUM(v.qtd_venda) AS qtd_venda
      FROM vendas_gerais v 
      GROUP BY v.id_marca, v.marca, v.data_venda""", db_conn)
  
  consolidado_3_df = pd.DataFrame(consolidado_3_query, columns = ['id_marca', 'marca','mes','ano','qtd_venda'])
 
## d. Tabela 4: Consolidado de vendas por linha, ano e mês;
 
  consolidado_4_query = pd.read_sql_query("""
      SELECT
        v.id_linha, 
        v.linha,  
        DATE_PART('month',TO_DATE(v.data_venda,'DD/MM/YYYY')) AS mes, 
        DATE_PART('year',TO_DATE(v.data_venda,'DD/MM/YYYY')) AS ano, 
        SUM(v.qtd_venda) AS qtd_venda
      FROM vendas_gerais v 
      GROUP BY v.id_linha, v.linha, v.data_venda""", db_conn)
  
  consolidado_4_df = pd.DataFrame(consolidado_4_query, columns = ['id_linha', 'linha','mes','ano','qtd_venda'])

# Construct a BigQuery client object.
client = bigquery.Client(project_id)

  consolidado_1_query = pd.read_sql_query("""
  consolidado_2_query = pd.read_sql_query("""
  consolidado_3_query = pd.read_sql_query("""
  consolidado_4_query = pd.read_sql_query("""
  vendas_query = pd.read_sql_query("""


In [12]:
### Consolidado 1  
# Consolidado de vendas por ano e mês;

# check dtypes 
# consolidado_1_df.dtypes

table_id=f"{bq_dataset}.consolidado_1_vendas"

job_config_1 = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("id_marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("id_linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("mes", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("ano", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("qtd_venda", bigquery.enums.SqlTypeNames.INT64)
    ],
    write_disposition="WRITE_TRUNCATE",
)

job = client.load_table_from_dataframe(
    consolidado_1_df, 
    table_id, 
    job_config=job_config_1
)  

job.result()

LoadJob<project=case-gb-2, location=us-central1, id=13913d52-b594-498e-a32f-f9f5973a75aa>

In [13]:
### Consolidado 2 

# check dtypes 
# consolidado_2_df.dtypes

table_id=f"{bq_dataset}.consolidado_2_vendas"

job_config_2 = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("id_marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("id_linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("qtd_venda", bigquery.enums.SqlTypeNames.INT64)
    ],
    write_disposition="WRITE_TRUNCATE",
)

job = client.load_table_from_dataframe(
    consolidado_2_df, 
    table_id, 
    job_config=job_config_2
)  

job.result()

LoadJob<project=case-gb-2, location=us-central1, id=349d5a36-516f-40c9-a60d-5006aabf1b33>

In [14]:
### Consolidado 3 

# check dtypes 
# consolidado_3_df.dtypes

table_id=f"{bq_dataset}.consolidado_3_vendas"

job_config_3 = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("id_marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("marca", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("mes", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("ano", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("qtd_venda", bigquery.enums.SqlTypeNames.INT64)
    ],
    write_disposition="WRITE_TRUNCATE",
)

job = client.load_table_from_dataframe(
    consolidado_3_df, 
    table_id, 
    job_config=job_config_3
) 

job.result()

LoadJob<project=case-gb-2, location=us-central1, id=b29d3df6-aeec-49af-84b9-ff65c09be85e>

In [15]:
### Consolidado 4 

# check dtypes 
# consolidado_4_df.dtypes

table_id=f"{bq_dataset}.consolidado_4_vendas"

job_config_4 = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("id_linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("linha", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("mes", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("ano", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("qtd_venda", bigquery.enums.SqlTypeNames.INT64)
    ],
    write_disposition="WRITE_TRUNCATE",
)

job = client.load_table_from_dataframe(
    consolidado_4_df, 
    table_id, 
    job_config=job_config_4
) 

job.result() 

LoadJob<project=case-gb-2, location=us-central1, id=8fdbd586-df68-44ee-aeef-cdc2db042f11>