In [None]:
import pandas as pd
import numpy as np
import pyodbc
import os
from datetime import datetime
from sqlalchemy import create_engine ,Integer, String, Date,VARCHAR
import subprocess
import sys
   #pip install sqlalchemy
   #tipos de dados ,Integer, String, Date,VARCHAR
   #conexão com  Postgres, MySQL, SQLite e muito mais com auto desempenho 

#importando bibliotecas e criando a conexão com o SQL Server para extrair os arquivos

server = 'localhost' # Substitua pelo nome do servidor SQL Server
database = 'AdventureWorks2019'  # Substitua pelo nome do banco de dados
conexaoDB = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};'
                      f'SERVER={server};'
                      f'DATABASE={database};'
                      'Trusted_Connection=yes;')

cursor = conexaoDB.cursor()   # criando cursor de comando 

In [14]:
SPECS = {
    "sales_order_header": {
        "table": "[Sales].[SalesOrderHeader]",
        "cols": [
            "SalesOrderID","OrderDate","DueDate","ShipDate",
            "CustomerID","TerritoryID","ShipMethodID",
            "SubTotal","TaxAmt","Freight","TotalDue","Status"
        ],
    },
    "sales_order_detail": {
        "table": "[Sales].[SalesOrderDetail]",
        "cols": [
            "SalesOrderID","SalesOrderDetailID","ProductID",
            "OrderQty","UnitPrice","UnitPriceDiscount","SpecialOfferID"
        ],
    },
    "product": {
        "table": "[Production].[Product]",
        "cols": ["ProductID","Name","ProductNumber","Color","StandardCost","ListPrice","ProductSubcategoryID"],
    },
    "product_subcategory": {
        "table": "[Production].[ProductSubcategory]",
        "cols": ["ProductSubcategoryID","Name","ProductCategoryID"],
    },
    "product_category": {
        "table": "[Production].[ProductCategory]",
        "cols": ["ProductCategoryID","Name"],
    },
    "customer": {
        "table": "[Sales].[Customer]",
        "cols": ["CustomerID","PersonID","StoreID","TerritoryID"],
    },
    "person": {
        "table": "[Person].[Person]",
        "cols": ["BusinessEntityID","FirstName","LastName"],
    },
    "store": {
        "table": "[Sales].[Store]",
        "cols": ["BusinessEntityID","Name"],
    },
    "special_offer": {
        "table": "[Sales].[SpecialOffer]",
        "cols": ["SpecialOfferID","Description","DiscountPct","Type","Category","StartDate","EndDate","MinQty","MaxQty"],
    },
    "special_offer_product": {
        "table": "[Sales].[SpecialOfferProduct]",
        "cols": ["SpecialOfferID","ProductID"],
    },
}

def extract_with_specs(conn):
    dfs = {}
    for alias, spec in SPECS.items():
        cols = ", ".join([f"[{c}]" for c in spec["cols"]])
        query = f"SELECT {cols} FROM {spec['table']};"
        df = pd.read_sql(query, conn)
        dfs[alias] = df
        print(f"[OK] {alias}: {df.shape}")
    return dfs

dfs = extract_with_specs(conexaoDB)

cursor.close()
conexaoDB.close()


  df = pd.read_sql(query, conn)


[OK] sales_order_header: (31465, 12)
[OK] sales_order_detail: (121317, 7)
[OK] product: (504, 7)
[OK] product_subcategory: (37, 3)
[OK] product_category: (4, 2)
[OK] customer: (19820, 4)
[OK] person: (19972, 3)
[OK] store: (701, 2)
[OK] special_offer: (16, 9)
[OK] special_offer_product: (538, 2)


In [None]:

#Validando dados corrompidos de Unicode

def find_bad_unicode(df: pd.DataFrame, max_rows=2000):
    obj_cols = df.select_dtypes(include=["object"]).columns.tolist()
    sample = df[obj_cols].head(max_rows).copy()

    for c in obj_cols:
        try:
            # tenta forçar para string e codificar em UTF-8
            sample[c].astype(str).apply(lambda x: x.encode("utf-8")).head(5)
        except Exception as e:
            print(f"Problema na coluna: {c} -> {e}")
            # tenta achar a linha exata
            for i, v in enumerate(sample[c].astype(str).tolist()):
                try:
                    v.encode("utf-8")
                except Exception:
                    print("Linha (amostra):", i, "Valor:", repr(v))
                    return c
    print("Nenhum problema encontrado nas primeiras linhas.")
    return None

bad_col = find_bad_unicode(dfs["sales_order_header"])
bad_col


Nenhum problema encontrado nas primeiras linhas.


In [None]:

#Sanitizando dados corrompidos de Unicode para carregar no Postgres
def sanitize_for_postgres(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    for c in df.columns:
        # se tiver bytes/memoryview, converte para str
        df[c] = df[c].apply(lambda x: x.tobytes().decode("cp1252", "ignore") if isinstance(x, memoryview) else x)
        df[c] = df[c].apply(lambda x: x.decode("cp1252", "ignore") if isinstance(x, (bytes, bytearray)) else x)

    # agora limpa strings para UTF-8
    obj_cols = df.select_dtypes(include=["object"]).columns
    for c in obj_cols:
        df[c] = df[c].apply(lambda x: x if x is None else str(x))
        df[c] = df[c].apply(lambda s: s.encode("utf-8", "ignore").decode("utf-8") if isinstance(s, str) else s)

    return df


In [None]:

# Conexão com o Postgres
dbname   = 'postgres'
user     = 'postgres'
password = '1003'
host     = 'localhost'
port     = '5432' 

# String de conexão
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}")

def load_to_postgres(df, table, schema, engine):
    print(f"Enviando {schema}.{table} | linhas = {len(df)}")

    df = sanitize_for_postgres(df)

    df.to_sql(
        table,
        engine,
        schema=schema,
        if_exists="replace",
        index=False,
        chunksize=50_000,
        method="multi"
    )


In [None]:

# Testando conexão com o Postgres

import psycopg2

dbname   = 'postgres'
user     = 'postgres'
password = '1003'
host     = 'localhost'
port     = '5432'

try:
    conn = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port,
        options="-c client_encoding=UTF8"
    )
    cur = conn.cursor()
    cur.execute("SELECT version();")
    print(cur.fetchone())
    cur.close()
    conn.close()
    print("✅ Conexão OK")
except Exception as e:
    print("❌ Conexão falhou:", repr(e))


with engine.connect() as conn:
    conn.exec_driver_sql("SET client_encoding TO 'UTF8';")
#Transformação dos dados - Silver Special Offer

('PostgreSQL 18.1 on x86_64-windows, compiled by msvc-19.44.35221, 64-bit',)
✅ Conexão OK


In [None]:


# Exportando para a RAW – dados brutos
load_to_postgres(dfs["sales_order_header"], "sales_order_header", "raw", engine)
load_to_postgres(dfs["sales_order_detail"], "sales_order_detail", "raw", engine)

load_to_postgres(dfs["product"], "product", "raw", engine)
load_to_postgres(dfs["product_subcategory"], "product_subcategory", "raw", engine)
load_to_postgres(dfs["product_category"], "product_category", "raw", engine)

load_to_postgres(dfs["customer"], "customer", "raw", engine)
load_to_postgres(dfs["person"], "person", "raw", engine)
load_to_postgres(dfs["store"], "store", "raw", engine)

load_to_postgres(dfs["special_offer"], "special_offer", "raw", engine)
load_to_postgres(dfs["special_offer_product"], "special_offer_product", "raw", engine)

# Fechar a conexão
engine.dispose()

Enviando raw.sales_order_header | linhas = 31465
Enviando raw.sales_order_detail | linhas = 121317
Enviando raw.product | linhas = 504
Enviando raw.product_subcategory | linhas = 37
Enviando raw.product_category | linhas = 4
Enviando raw.customer | linhas = 19820
Enviando raw.person | linhas = 19972
Enviando raw.store | linhas = 701
Enviando raw.special_offer | linhas = 16
Enviando raw.special_offer_product | linhas = 538


Product -> Tratando

In [38]:
silver_product = dfs["product"].copy()

for c in ["Name","ProductNumber","Color"]:
    if c in silver_product.columns:
        silver_product[c] = silver_product[c].astype(str).str.strip()

silver_product["ListPrice"] = pd.to_numeric(silver_product["ListPrice"], errors="coerce")
silver_product["StandardCost"] = pd.to_numeric(silver_product["StandardCost"], errors="coerce")
silver_product = silver_product.dropna(subset=["ProductID","Name","ProductNumber","ListPrice"])

Category/SubCategory -> Tratando

In [39]:
silver_product_subcategory = dfs["product_subcategory"].copy()
silver_product_category = dfs["product_category"].copy()

silver_product_subcategory["Name"] = silver_product_subcategory["Name"].astype(str).str.strip()
silver_product_category["Name"] = silver_product_category["Name"].astype(str).str.strip()
silver_product_subcategory = silver_product_subcategory.dropna(subset=["ProductSubcategoryID","Name"])
silver_product_category = silver_product_category.dropna(subset=["ProductCategoryID","Name"])

Customer -> Tratando

In [40]:
silver_customer = dfs["customer"].copy()
silver_customer = silver_customer.dropna(subset=["CustomerID"])

Person -> Tratando

In [41]:
silver_person = dfs["person"].copy()
silver_person["FirstName"] = silver_person["FirstName"].astype(str).str.strip()
silver_person["LastName"] = silver_person["LastName"].astype(str).str.strip()
silver_person = silver_person.dropna(subset=["BusinessEntityID","FirstName","LastName"])

Store -> Tratando

In [42]:
silver_store = dfs["store"].copy()
silver_store["Name"] = silver_store["Name"].astype(str).str.strip()
silver_store = silver_store.dropna(subset=["BusinessEntityID","Name"])

SpecialOffer -> Tratando

In [43]:
silver_special_offer = dfs["special_offer"].copy()

silver_special_offer["StartDate"] = pd.to_datetime(silver_special_offer["StartDate"], errors="coerce")
silver_special_offer["EndDate"] = pd.to_datetime(silver_special_offer["EndDate"], errors="coerce")
silver_special_offer["DiscountPct"] = pd.to_numeric(silver_special_offer["DiscountPct"], errors="coerce").fillna(0).clip(0,1)
silver_special_offer = silver_special_offer.dropna(subset=["SpecialOfferID","Description","StartDate","EndDate"])

SpecialOfferProduct -> Tratando

In [44]:
silver_special_offer_product = dfs["special_offer_product"].copy()
silver_special_offer_product = silver_special_offer_product.dropna(subset=["SpecialOfferID","ProductID"])

SalesOrderDetail -> Tratando

SalesOrderHeader -> Tratando

In [45]:
hdr = dfs["sales_order_header"].copy()
det = dfs["sales_order_detail"].copy()

# Tipagem
for col in ["OrderDate","DueDate","ShipDate"]:
    hdr[col] = pd.to_datetime(hdr[col], errors="coerce")

# Garantir numéricos
for col in ["SubTotal","TaxAmt","Freight","TotalDue"]:
    hdr[col] = pd.to_numeric(hdr[col], errors="coerce")

for col in ["OrderQty","UnitPrice","UnitPriceDiscount"]:
    det[col] = pd.to_numeric(det[col], errors="coerce")

dq = {}

dq["total_rows"] = len(det)
dq["invalid_qty_pct"] = (det["OrderQty"] <= 0).mean()
dq["invalid_price_pct"] = (det["UnitPrice"] < 0).mean()
dq["discount_out_of_range_pct"] = ((det["UnitPriceDiscount"] < 0) | (det["UnitPriceDiscount"] > 1)).mean()
dq["null_orderdate_pct"] = hdr["OrderDate"].isna().mean()

print(dq)


# Regras básicas de qualidade
det = det[(det["OrderQty"] > 0) & (det["UnitPrice"] >= 0)]
det["UnitPriceDiscount"] = det["UnitPriceDiscount"].fillna(0).clip(0, 1)

# Métricas
det["gross_revenue"] = det["OrderQty"] * det["UnitPrice"]
det["discount_amount"] = det["gross_revenue"] * det["UnitPriceDiscount"]
det["net_revenue"] = det["gross_revenue"] - det["discount_amount"]


{'total_rows': 121317, 'invalid_qty_pct': np.float64(0.0), 'invalid_price_pct': np.float64(0.0), 'discount_out_of_range_pct': np.float64(0.0), 'null_orderdate_pct': np.float64(0.0)}


Enviando Dados para o Postgre -> Camada Silver

In [46]:
load_to_postgres(hdr, "sales_order_header", "silver", engine)
load_to_postgres(det, "sales_order_detail", "silver", engine)

load_to_postgres(silver_product, "product", "silver", engine)
load_to_postgres(silver_product_subcategory, "product_subcategory", "silver", engine)
load_to_postgres(silver_product_category, "product_category", "silver", engine)

load_to_postgres(silver_customer, "customer", "silver", engine)
load_to_postgres(silver_person, "person", "silver", engine)
load_to_postgres(silver_store, "store", "silver", engine)

load_to_postgres(silver_special_offer, "special_offer", "silver", engine)
load_to_postgres(silver_special_offer_product, "special_offer_product", "silver", engine)


Enviando silver.sales_order_header | linhas = 31465
Enviando silver.sales_order_detail | linhas = 121317
Enviando silver.product | linhas = 504
Enviando silver.product_subcategory | linhas = 37
Enviando silver.product_category | linhas = 4
Enviando silver.customer | linhas = 19820
Enviando silver.person | linhas = 19972
Enviando silver.store | linhas = 701
Enviando silver.special_offer | linhas = 16
Enviando silver.special_offer_product | linhas = 538


Validando qualidade dos dados 

In [None]:
print("Starting Data Quality pipeline...")
subprocess.run([sys.executable, "Data_Quality.py"], check=True)
print("Data Quality completed.")