# Bronze Layer Creation - IESB BigData Class

This notebook creates a bronze layer from PostgreSQL tables with proper organization and metadata.

In [None]:
import sys
import boto3
import json
from datetime import datetime
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import current_timestamp, lit

In [None]:
# Initialize contexts
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

In [None]:
# Configuration
BUCKET = "iesb-bigdata"
BRONZE_PATH = f"s3://{BUCKET}/bronze"
BATCH_DATE = datetime.now().strftime("%Y-%m-%d")

print(f"Bronze Layer Path: {BRONZE_PATH}")
print(f"Batch Date: {BATCH_DATE}")

In [None]:
# Get RDS credentials
secrets_client = boto3.client('secretsmanager')
secret = secrets_client.get_secret_value(SecretId='rds-secret')
credentials = json.loads(secret['SecretString'])

jdbc_url = f"jdbc:postgresql://{credentials['host']}:{credentials['port']}/{credentials['db_name']}"
connection_properties = {
    "user": credentials['username'],
    "password": credentials['password'],
    "driver": "org.postgresql.Driver"
}

print("✓ Database connection configured")

In [None]:
# Organized table categories
tables_config = {
    "geographic": ["municipio", "unidade_federacao", "regiao", "municipio_ride_brasilia"],
    "education": ["ed_enem_2024_resultados", "ed_enem_2024_participantes", "educacao_basica", 
                  "censo_escolar_2024", "ed_superior_cursos", "ed_superior_ies"],
    "health": ["sus_aih", "sus_procedimento_ambulatorial"],
    "demographics": ["Censo_20222_Populacao_Idade_Sexo", "agregados_setores_censitarios"],
    "economics": ["pib_municipios"],
    "incidents": ["ocorrencia"]
}

# Show organization
for category, tables in tables_config.items():
    print(f"{category.upper()}: {len(tables)} tables")
    for table in tables:
        print(f"  - {table}")

In [None]:
# Process one category as example (Geographic)
category = "geographic"
tables = tables_config[category]

print(f"Processing {category.upper()} tables...")

for table_name in tables:
    print(f"\n--- Processing {table_name} ---")
    
    # Read from PostgreSQL
    df = spark.read.jdbc(
        url=jdbc_url,
        table=table_name,
        properties=connection_properties
    )
    
    # Show schema and sample
    print(f"Schema for {table_name}:")
    df.printSchema()
    
    print(f"Sample data (first 3 rows):")
    df.show(3)
    
    print(f"Record count: {df.count()}")
    
    # Add metadata columns
    df_with_metadata = df \
        .withColumn("bronze_load_date", lit(BATCH_DATE)) \
        .withColumn("bronze_load_timestamp", current_timestamp()) \
        .withColumn("source_system", lit("postgresql")) \
        .withColumn("table_category", lit(category))
    
    # Write to bronze layer
    output_path = f"{BRONZE_PATH}/{category}/{table_name}"
    
    df_with_metadata.write \
        .mode("overwrite") \
        .partitionBy("bronze_load_date") \
        .parquet(output_path)
    
    print(f"✓ Saved to: {output_path}")

In [None]:
# Trigger crawler to update Glue catalog
try:
    glue_client = boto3.client('glue')
    response = glue_client.start_crawler(Name='iesb-s3-crawler')
    print("✓ S3 Crawler started to update catalog")
    print("Check Glue console for crawler status")
except Exception as e:
    print(f"⚠ Could not start crawler: {str(e)}")

In [None]:
# Verify bronze layer structure
print("Bronze Layer Structure:")
print(f"s3://{BUCKET}/bronze/")
for category in tables_config.keys():
    print(f"├── {category}/")
    for table in tables_config[category]:
        print(f"│   ├── {table}/")
        print(f"│   │   └── bronze_load_date={BATCH_DATE}/")