In [0]:
# Accessing S3 using Databricks Secrets (Best Practice)
access_key = dbutils.secrets.get(scope="aws", key="access_key")
secret_key = dbutils.secrets.get(scope="aws", key="secret_key")
bucket_name = "week9-project-s3"
file_path = "Amazon Sale Report.csv"

# We use the 'extraMetadata' or direct URI options
# This approach tells Spark: "Use these keys ONLY for this specific request"
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("fs.s3a.access.key", access_key) \
    .option("fs.s3a.secret.key", secret_key) \
    .load(f"s3a://{bucket_name}/bronze/{file_path}")

display(df)


In [0]:
# 1. SETUP YOUR KEYS
# Accessing S3 using Databricks Secrets (Best Practice)
access_key = dbutils.secrets.get(scope="aws", key="access_key")
secret_key = dbutils.secrets.get(scope="aws", key="secret_key")
BUCKET_NAME = "week9-project-s3"

# 2. THE SECRET URI (Bronze Path)
# This format is: s3a://ACCESS_KEY:SECRET_KEY@BUCKET_NAME/FOLDER/FILE
bronze_path = f"s3a://{access_key}:{secret_key}@{BUCKET_NAME}/bronze/Amazon Sale Report.csv"

# 3. READ FROM BRONZE (Raw)
raw_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(bronze_path)

# 4. TRANSFORM TO SILVER (Cleaned)
# Drop missing amounts and duplicates as per Medallion Arch
silver_df = raw_df.dropna(subset=["Amount"]).dropDuplicates()

# 5. SHOW RESULTS
display(silver_df)

In [0]:
from pyspark.sql import functions as F

# 1. Insight: Which category has the highest sales?
gold_top_category = silver_df.groupBy("Category") \
    .agg(F.sum("Amount").alias("Total_Sales")) \
    .orderBy(F.desc("Total_Sales"))

# 2. Insight: Which state brings the most revenue?
gold_top_state = silver_df.groupBy("ship-state") \
    .agg(F.sum("Amount").alias("Total_Revenue")) \
    .orderBy(F.desc("Total_Revenue"))

# 3. Insight: Average Basket Value (Filtering out Qty = 0 to avoid division error)
# We use .filter() to ensure the math only happens on valid orders
gold_avg_basket = silver_df.filter(F.col("Qty") > 0).select(
    F.avg(F.col("Amount") / F.col("Qty")).alias("Avg_Basket_Value")
)

# --- DISPLAYING THE GOLD RESULTS ---
print("üèÜ TOP 5 SALES CATEGORIES")
display(gold_top_category.limit(5))

print("üìç TOP 5 REVENUE STATES")
display(gold_top_state.limit(5))

print("üõí AVERAGE BASKET VALUE")
display(gold_avg_basket)

In [0]:
import matplotlib.pyplot as plt
import pandas as pd
from io import BytesIO
import boto3

# 1. Convert our Spark Gold data to a Pandas format (easier for plotting)
plot_df = gold_top_category.limit(5).toPandas()

# 2. Create the Bar Chart
plt.figure(figsize=(10, 6))
plt.bar(plot_df['Category'], plot_df['Total_Sales'], color='skyblue')
plt.xlabel('Category')
plt.ylabel('Total Sales (Amount)')
plt.title('Top 5 Sales Categories - Gold Insights')

# 3. Save the plot locally in memory
img_data = BytesIO()
plt.savefig(img_data, format='png')
img_data.seek(0)

# 4. Upload the PNG to S3 (Using the keys we used earlier)
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3.put_object(Bucket=BUCKET_NAME, Key='gold/insights/top_categories.png', Body=img_data, ContentType='image/png')

print("‚úÖ Visualization created and uploaded to s3://week9-project-s3/gold/insights/top_categories.png")
plt.show()

In [0]:
# The path where we want to store our 'Filtered' water
silver_output_path = f"s3a://{access_key}:{secret_key}@{BUCKET_NAME}/silver/amazon_sales_cleaned"

# Save the data in 'Parquet' format 
# Parquet is like a 'compressed suitcase'‚Äîit's much faster and smaller than CSV
silver_df.write.format("parquet") \
    .mode("overwrite") \
    .save(silver_output_path)

print(f"‚úÖ Silver layer successfully saved to: {silver_output_path}")