
##S3 to bronze

* Reads raw CSV files from S3 and writes them as Delta tables to himalaya.bronze

* Source: s3://secret/raw/

* Destination: himalaya.bronze

In [0]:
%run /Workspace/Repos/nikum.vedansh@gmail.com/himalayan-expeditions-project/0_scripts/configs/credentials

In [0]:
%run /Workspace/Repos/nikum.vedansh@gmail.com/himalayan-expeditions-project/0_scripts/configs/config

In [0]:
import boto3
import pandas as pd
from io import StringIO
import os
from datetime import datetime

In [0]:
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
    region_name=os.environ["AWS_DEFAULT_REGION"]
)

In [0]:
print("Starting S3 → Bronze ingestion...\n")
loaded = []
skipped = []
failed = []

for dataset in DATASETS:
    try:
        table = dataset["bronze_name"]
        
        # Skip if table already exists
        if spark.catalog.tableExists(table):
            skipped.append(table)
            continue
        
        # Read from S3
        key = dataset["s3_path"].replace(f"s3://{S3_BUCKET}/", "") + dataset["file"]
        obj = s3_client.get_object(Bucket=S3_BUCKET, Key=key)
        pandas_df = pd.read_csv(obj["Body"], encoding="utf-8")
        
        # Add ingestion timestamp
        pandas_df["ingested_at"] = datetime.now()
        
        # Convert to Spark and write to Delta
        spark_df = spark.createDataFrame(pandas_df)
        spark_df.write.format("delta").mode("overwrite").saveAsTable(table)
        
        loaded.append(table)

    except Exception as e:
        failed.append(dataset["bronze_name"])
        print(f"❌ Failed {dataset['bronze_name']}: {str(e)}")

def print_summary(label, items):
    print(f"{label} ({len(items)}):")
    if items:
        for i in items:
            print(f"   - {i}")
    else:
        print("   - none")
    print()

print_summary("✅ Loaded", loaded)
print_summary("⏭  Skipped", skipped)
print_summary("❌ Failed", failed)
print("✓ Bronze ingestion complete")