In [0]:
# variables
storage_account_name = "voc43cdmqlsgj6nostorage"
container_name = "data"
mount_point = "/mnt/data"

# Securely fetch the secret key from the scope we created
try:
    secret_key = dbutils.secrets.get(scope="voc_project_scope", key="storage-account-key")
except Exception as e:
    print("Looks like your secret scope or key isn't set up.")
    raise e

# Build the configuration string
config = f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net"

# Check if the mount point already exists (makes our code safe to re-run)
if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    print(f"Mounting {mount_point}...")
    
    # Mount the storage
    dbutils.fs.mount(
      source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
      mount_point = mount_point,
      extra_configs = {config: secret_key}
    )
    print(f"Successfully mounted {mount_point}!")
else:
    print(f"{mount_point} is already mounted.")

# Verify by listing the files
print("--- Files in mount point ---")
display(dbutils.fs.ls(mount_point))

In [0]:
# This is the "nuke and pave" method.
# It guarantees a fresh start for our ETL job.
spark.sql("DROP TABLE IF EXISTS voc_bronze_layer")

print("Old 'voc_bronze_layer' table (if any) has been dropped.")

In [0]:
# Import all the functions we'll need
from pyspark.sql.functions import col, explode, split, udf, lit, regexp_replace, length
from pyspark.sql.types import StructType, StructField, StringType
import mailbox
import email # We need the 'email' library

# -----------------------------------------------------------------
# 1. --- Read the Data ---
# -----------------------------------------------------------------
# Read each .mbox file *entirely* into memory
rdd = spark.sparkContext.wholeTextFiles("/mnt/data/*.mbox")
df_raw_files = rdd.toDF(["filepath", "raw_text"])

# -----------------------------------------------------------------
# 2. --- Split Files into Individual Emails ---
# -----------------------------------------------------------------
# This creates the 'df_split_emails' DataFrame
df_split_emails = df_raw_files.select(
    "filepath",
    explode(
        split(col("raw_text"), "\nFrom ") # Note the \n and the space
    ).alias("raw_email_text")
).where(length(col("raw_email_text")) > 10) # Filter out empty splits

# -----------------------------------------------------------------
# 3. --- Define the Robust Parsing UDF ---
# -----------------------------------------------------------------

def get_plain_text_payload(msg):
    """
    Iterates through a potentially multipart email message
    and returns the first 'text/plain' part it finds.
    """
    if msg.is_multipart():
        # This is a multipart email, walk through its parts
        for part in msg.walk():
            if part.get_content_type() == 'text/plain':
                try:
                    # Decode the payload and handle any character set errors
                    return part.get_payload(decode=True).decode(part.get_content_charset() or 'utf-8', 'ignore')
                except Exception:
                    return None # Return None if decoding fails
        return None # No 'text/plain' part found
    else:
        # This is a simple, single-part email
        if msg.get_content_type() == 'text/plain':
            try:
                return msg.get_payload(decode=True).decode(msg.get_content_charset() or 'utf-8', 'ignore')
            except Exception:
                return None
    return None # Not multipart and not plain text

def parse_email_text(raw_text):
    """
    Takes a raw email string, parses it, and uses our helper
    to extract the clean, plain-text body.
    """
    try:
        # Re-add the separator for the parser to work
        msg = mailbox.Message("From " + raw_text)
        
        # Use our new helper function to get the body
        body = get_plain_text_payload(msg)
        
        # If no plain text body was found, mark it
        if body is None:
            body = "N/A_NO_PLAIN_TEXT"
            
        return (
            msg.get('Message-ID', 'N/A'),
            msg.get('From', 'N/A'),
            msg.get('Subject', 'N/A'),
            msg.get('Date', 'N/A'),
            body
        )
    except Exception:
        # Handle malformed emails gracefully
        return ('PARSE_ERROR', 'PARSE_ERROR', 'PARSE_ERROR', 'PARSE_ERROR', 'PARSE_ERROR')

# Define the "schema" or structure that our UDF will return
email_schema = StructType([
    StructField("message_id", StringType(), True),
    StructField("sender", StringType(), True),
    StructField("subject", StringType(), True),
    StructField("date_str", StringType(), True),
    StructField("body", StringType(), True)
])

# Register our Python function as a Spark UDF
parse_email_udf = udf(parse_email_text, email_schema)

print("UDF is defined and registered.")

# -----------------------------------------------------------------
# 4. --- Apply UDF, Clean, and Save ---
# -----------------------------------------------------------------
print("Applying UDF...")
# Now this line will work, because 'df_split_emails' exists
df_parsed = df_split_emails.withColumn("parsed_email", parse_email_udf(col("raw_email_text")))

print("Cleaning and filtering...")
df_bronze_clean = df_parsed.select(
    "filepath",
    col("parsed_email.message_id").alias("message_id"),
    col("parsed_email.sender").alias("sender"),
    col("parsed_email.subject").alias("subject"),
    col("parsed_email.date_str").alias("date_str"),
    col("parsed_email.body").alias("body_raw"),
    col("raw_email_text") 
).where(col("message_id") != "PARSE_ERROR") \
 .where(col("body_raw") != "N/A_NO_PLAIN_TEXT") # Filter out HTML-only emails

# Clean the body (now on the 'body_raw' column)
df_bronze_clean = df_bronze_clean.withColumn(
    "body_no_quotes",
    regexp_replace(col("body_raw"), r"(?m)^\>.*", "") # 1. Remove ">" quotes
).withColumn(
    "body_clean",
    regexp_replace(col("body_no_quotes"), r"(?s)--\n.*", "") # 2. Remove signatures
)

# -----------------------------------------------------------------
# 5. --- Save as "Bronze" Delta Lake Table ---
# -----------------------------------------------------------------
print("Saving to Delta table 'voc_bronze_layer'...")
(df_bronze_clean
    .write
    .format("delta")
    .mode("overwrite") # Overwrite the old, messy data
    .saveAsTable("voc_bronze_layer")
)

print("--- Successfully REBUILT voc_bronze_layer Delta table! ---")