# ETL SILVER -> GOLD (PostgreSQL)
**Origem:** Tabela `silver.USER` (PostgreSQL)  
**Destino:** Schema `dw` (Star Schema)  
**Objetivo:** Modelagem dimensional com Surrogate Keys (SK/SRK) em **todas** as tabelas (Dimensões e Fato).

## 1. Configuração e Conexão
Definição das credenciais e conexão com o banco para leitura da Silver.

In [1]:
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extras import execute_batch

# Configurações do Banco
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'instagram_usage',
    'user': 'sbd2',
    'password': 'sbd2123'
}

# Schemas
SOURCE_SCHEMA = 'silver'
SOURCE_TABLE = 'USER'
TARGET_SCHEMA = 'dw'

## 2. Extract: Ler dados da Silver
Leitura direta do PostgreSQL.

In [2]:
def get_silver_data():
    conn = psycopg2.connect(**DB_CONFIG)
    query = f"SELECT * FROM {SOURCE_SCHEMA}.\"{SOURCE_TABLE}\""
    df = pd.read_sql(query, conn)
    conn.close()
    return df

df_silver = get_silver_data()

# Garantir datetime
df_silver['last_login_date'] = pd.to_datetime(df_silver['last_login_date'])
print(f"Registros carregados da Silver: {len(df_silver)}")

## 3. Transform: Criação das Dimensões com Surrogate Keys (SK)
Geração de chaves sequenciais artificiais para cada dimensão.

In [None]:
# --- 1. Dimensão Date ---
dates = df_silver['last_login_date'].unique()
dim_date = pd.DataFrame({'date_iso': dates})
dim_date = dim_date.sort_values('date_iso').reset_index(drop=True)

# Criar SK_Date (SRK)
dim_date['srk_date'] = dim_date.index + 1

dim_date['day'] = dim_date['date_iso'].dt.day
dim_date['month'] = dim_date['date_iso'].dt.month
dim_date['year'] = dim_date['date_iso'].dt.year
dim_date['quarter'] = dim_date['date_iso'].dt.quarter
dim_date['day_of_week'] = dim_date['date_iso'].dt.day_name()

# --- 2. Dimensão Demographics ---
dim_demographics = df_silver[[
    'user_id', 'age', 'gender', 'country', 'urban_rural', 'income_level', 
    'employment_status', 'education_level', 'relationship_status', 'has_children'
]].copy()

dim_demographics['srk_demographics'] = range(1, len(dim_demographics) + 1)
dim_demographics.rename(columns={'user_id': 'srk_user_id'}, inplace=True)

# --- 3. Dimensão Lifestyle ---
dim_lifestyle = df_silver[[
    'user_id', 'exercise_hours_per_week', 'sleep_hours_per_night', 'diet_quality', 
    'smoking', 'alcohol_frequency', 'body_mass_index', 
    'blood_pressure_systolic', 'blood_pressure_diastolic', 
    'hobbies_count', 'social_events_per_month', 'books_read_per_year', 
    'volunteer_hours_per_month', 'travel_frequency_per_year'
]].copy()

dim_lifestyle['srk_lifestyle'] = range(1, len(dim_lifestyle) + 1)
dim_lifestyle.rename(columns={'user_id': 'srk_user_id'}, inplace=True)

# --- 4. Dimensão Security & Privacy ---
dim_security = df_silver[[
    'user_id', 'privacy_setting_level', 'two_factor_auth_enabled', 
    'biometric_login_used', 'linked_accounts_count'
]].copy()

dim_security['srk_security'] = range(1, len(dim_security) + 1)
dim_security.rename(columns={'user_id': 'srk_user_id'}, inplace=True)
# --- 5. Dimensão App Preferences ---
dim_app = df_silver[[
    'user_id', 'app_name', 'account_creation_year', 'uses_premium_features', 
    'subscription_status', 'content_type_preference', 'preferred_content_theme'
]].copy()

dim_app['srk_app_profile'] = range(1, len(dim_app) + 1)
dim_app.rename(columns={'user_id': 'srk_user_id'}, inplace=True)

## 4. Transform: Montagem da Fato com SRK Própria
Agora a tabela fato também ganha sua própria Surrogate Key (`sk_fact_ads_performance`).

In [None]:
# Seleciona métricas da Silver
fact_table = df_silver[[
    'user_id', 'last_login_date', 
    'ads_clicked_per_day', 'ads_viewed_per_day', 
    'daily_active_minutes_instagram', 'sessions_per_day', 'posts_created_per_week', 
    'reels_watched_per_day', 'stories_viewed_per_day', 'likes_given_per_day', 
    'comments_written_per_day', 'dms_sent_per_week', 'dms_received_per_week', 
    'time_on_feed_per_day', 'time_on_explore_per_day', 'time_on_messages_per_day', 
    'time_on_reels_per_day', 'average_session_length_minutes', 
    'notification_response_rate', 'user_engagement_score',
    'perceived_stress_score', 'self_reported_happiness', 'daily_steps_count',
    'weekly_work_hours', 'followers_count', 'following_count'
]].copy()

# JOIN para pegar SK_DATE
fact_table = fact_table.merge(dim_date[['date_iso', 'sk_date']], left_on='last_login_date', right_on='date_iso', how='left')

# JOIN para pegar SKs das outras dimensões
fact_table = fact_table.merge(dim_demographics[['nk_user_id', 'sk_demographics']], left_on='user_id', right_on='nk_user_id', how='left')
fact_table = fact_table.merge(dim_lifestyle[['nk_user_id', 'sk_lifestyle']], left_on='user_id', right_on='nk_user_id', how='left')
fact_table = fact_table.merge(dim_security[['nk_user_id', 'sk_security']], left_on='user_id', right_on='nk_user_id', how='left')
fact_table = fact_table.merge(dim_app[['nk_user_id', 'sk_app_profile']], left_on='user_id', right_on='nk_user_id', how='left')

# Calcular CTR
fact_table['click_through_rate'] = fact_table.apply(
    lambda x: x['ads_clicked_per_day'] / x['ads_viewed_per_day'] if x['ads_viewed_per_day'] > 0 else 0, axis=1
)

# --- ATUALIZAÇÃO: CRIAR SRK PARA A FATO ---
# Cria uma chave única sequencial para cada registro da fato
fact_table['srk_fact_ads_performance'] = range(1, len(fact_table) + 1)

# Limpeza final (Remover colunas temporárias)
cols_to_drop = ['user_id', 'last_login_date', 'date_iso', 'nk_user_id_x', 'nk_user_id_y']
fact_table.drop(columns=[c for c in cols_to_drop if c in fact_table.columns], inplace=True)

# Reordenar colunas: SK da Fato -> SKs Dimensões -> Métricas
sk_cols = ['srk_fact_ads_performance', 'sk_date', 'sk_demographics', 'sk_lifestyle', 'sk_security', 'sk_app_profile']
metric_cols = [c for c in fact_table.columns if c not in sk_cols]
fact_table = fact_table[sk_cols + metric_cols]

## 5. Load: Carga no Schema DW
Criação das tabelas e inserção dos dados com PKs definidas para todos.

In [5]:
def load_to_dw(df, table_name, pk_col):
    try:
        with psycopg2.connect(**DB_CONFIG) as conn:
            with conn.cursor() as cur:
                cur.execute(f"CREATE SCHEMA IF NOT EXISTS {TARGET_SCHEMA};")
                cur.execute(f"DROP TABLE IF EXISTS {TARGET_SCHEMA}.{table_name} CASCADE;")
                
                # DDL Dinâmico
                create_query = f"CREATE TABLE {TARGET_SCHEMA}.{table_name} ("
                for col in df.columns:
                    dtype = 'TEXT'
                    # Se for a coluna definida como PK, recebe INTEGER PRIMARY KEY
                    if col == pk_col:
                        dtype = 'INTEGER PRIMARY KEY'
                    # Se for outra coluna SK (FK), recebe INTEGER
                    elif col.startswith('sk_'):
                        dtype = 'INTEGER'
                    elif df[col].dtype == 'float64':
                        dtype = 'FLOAT'
                    elif df[col].dtype == 'int64':
                        dtype = 'INTEGER'
                    elif df[col].dtype == 'bool':
                        dtype = 'BOOLEAN'
                    elif 'date' in str(df[col].dtype):
                         dtype = 'TIMESTAMP'
                    
                    create_query += f"{col} {dtype}, "
                
                create_query = create_query.rstrip(', ') + ");"
                cur.execute(create_query)
                
                # Insert
                cols_list = df.columns.tolist()
                placeholders = ", ".join(["%s"] * len(cols_list))
                colnames = ", ".join(cols_list)
                insert_sql = f"INSERT INTO {TARGET_SCHEMA}.{table_name} ({colnames}) VALUES ({placeholders});"
                
                data_to_insert = [tuple(x) for x in df.to_numpy()]
                execute_batch(cur, insert_sql, data_to_insert, page_size=5000)
                print(f"Tabela {table_name} carregada com {len(data_to_insert)} linhas.")
                
    except Exception as e:
        print(f"Erro ao carregar {table_name}: {e}")

# Executar Cargas (Agora passando a PK da fato também)
print("--- Iniciando Carga no DW ---")
load_to_dw(dim_date, 'dim_date', 'sk_date')
load_to_dw(dim_demographics, 'dim_demographics', 'sk_demographics')
load_to_dw(dim_lifestyle, 'dim_lifestyle', 'sk_lifestyle')
load_to_dw(dim_security, 'dim_security', 'sk_security')
load_to_dw(dim_app, 'dim_app_preferences', 'sk_app_profile')

# Tabela Fato agora tem PK definida: 'sk_fact_ads_performance'
load_to_dw(fact_table, 'fact_ads_performance', 'sk_fact_ads_performance') 
print("--- Carga Finalizada ---")