# 🟤 Bronze Layer: Raw Data Ingestion

This notebook step performs the initial ingestion of raw text data from the provided CSV file into a Spark DataFrame (`raw_df`).  

**Data Source:**
- **File**: `AI_Data_Developer_Technical_Challenge___Sample_Raw_Data.csv`
- **Format**: CSV
- **Header Included**: Yes
- **Schema Inference**: Disabled (all columns initially read as strings)

The output displays the first 10 rows of the raw dataset to confirm successful data ingestion.


In [0]:
# File location and type
file_location = "/FileStore/tables/AI_Data_Developer_Technical_Challenge___Sample_Raw_Data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

raw_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(raw_df.limit(10))

id,category,text
1,customer_review,"I recently filed a claim with Rival Insurance, and the process was slower than expected. The customer service was helpful, but I had to follow up multiple times."
2,customer_review,Amazing experience! Rival Insurance approved my claim in just two days. Very satisfied with the service.
3,customer_review,Frustrating experience. My claim was denied without a clear explanation. I will consider switching providers.
4,policy_document,"Policyholder agrees to maintain comprehensive coverage for damages not caused by collision, including theft and natural disasters."
5,policy_document,"This insurance policy covers personal injury protection (PIP) up to $50,000 per insured person per accident."
6,policy_document,"Exclusions: This policy does not cover damages resulting from intentional acts, fraud, or unauthorized vehicle usage."
7,claim_note,Claim #11234: Reviewed documents and found discrepancies in reported damages. Requested additional evidence from claimant.
8,claim_note,Claim #23567: Approved for payout after verification of supporting documents and repair estimates.
9,claim_note,Claim #34890: Denied due to policy exclusion on pre-existing damages. Notified policyholder of appeal options.
10,customer_review,Had to wait on hold for over an hour before speaking to a representative. Not a great experience.


In [0]:
dbutils.fs.mkdirs("/mnt/data/bronze")
dbutils.fs.mkdirs("/mnt/data/silver")
dbutils.fs.mkdirs("/mnt/data/gold")


True

In [0]:
raw_df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/mnt/data/bronze/BronzeTable")

# 🥈 Silver Layer: Data Cleaning and Preprocessing

This step transforms the raw data (`bronze_df`) into a clean and structured format suitable for analytics and modeling.

**Data Cleaning Steps Applied:**
- **Duplicate Removal**: Eliminated duplicate records based on the original `text` column.
- **Text Normalization**:
  - Converted all text to lowercase.
  - Removed special characters, retaining only alphanumeric and whitespace characters.
  - Condensed multiple spaces into single spaces.
  - Trimmed leading and trailing whitespace.
- **Null and Empty Handling**: Removed records with empty or null text after cleaning.

The resulting DataFrame (`cleaned_df`) represents high-quality, cleaned data ready for advanced processing in the Gold layer.


In [0]:
bronze_df = spark.read.format("delta").load("/mnt/data/bronze/BronzeTable")

In [0]:
from pyspark.sql.functions import col, lower, regexp_replace, trim, when

# Clean Bronze data and prepare Silver-layer DataFrame
cleaned_df = (bronze_df
    # Remove duplicate records based on 'text' column
    .dropDuplicates(["text"])

    # Convert text to lowercase for consistency
    .withColumn("clean_text", lower(col("text")))

    # Remove all special characters (retain alphanumeric and whitespace)
    .withColumn("clean_text", regexp_replace(col("clean_text"), "[^a-zA-Z0-9\\s]", ""))

    # Replace multiple consecutive whitespaces with a single space
    .withColumn("clean_text", regexp_replace(col("clean_text"), "\\s+", " "))

    # Remove leading/trailing whitespace
    .withColumn("clean_text", trim(col("clean_text")))

    # Set empty strings to null to filter them out later
    .withColumn("clean_text", when(col("clean_text") == "", None).otherwise(col("clean_text")))
    
    # Convert 'id' column explicitly to integer
    .withColumn("id", col("id").cast("integer"))                     

    # Remove rows where 'clean_text' is null or empty after cleaning
    .na.drop(subset=["clean_text"])
)


# Write to Silver
cleaned_df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/data/silver/SilverCleanData")

In [0]:
silver_df = spark.read.format("delta").load("/mnt/data/silver/SilverCleanData")

# 🥇 Gold Layer: AI-Powered Text Summarization
In this final stage, the cleaned data from the Silver layer is processed by the Azure OpenAI (GPT-4) Large Language Model (LLM) to produce concise summaries. This summarized dataset is intended for advanced analytics, reporting, or downstream consumption.

Key Steps:

- Summarize text using Azure OpenAI's GPT-4 model.
- Implement robust error handling with retry logic and exponential backoff.
- Leverage efficient batch processing (mapInPandas) for scalability.

In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
from openai import AzureOpenAI
import time

# Azure OpenAI Configuration 
endpoint = "https://mahma-m851s8eb-eastus2.cognitiveservices.azure.com/"
model_name = "gpt-4"
deployment = "gpt-4"

subscription_key = dbutils.secrets.get(scope = "azure-openai", key = "api_key")
api_version = "2024-12-01-preview"


# Initialize client
client = AzureOpenAI(
    api_version=api_version,
    azure_endpoint=endpoint,
    api_key=subscription_key,
)

# Summarization function with robust exception handling
def summarize_text(text, client, retries=3):
    for attempt in range(retries):
        try:
            response = client.chat.completions.create(
                messages=[
                    {"role": "system", "content": "Summarize the following text concisely."},
                    {"role": "user", "content": text}
                ],
                max_tokens=100,
                temperature=0.5,
                top_p=0.9,
                model=deployment
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Attempt {attempt+1} failed: {e}")
            time.sleep(2 ** attempt)  # exponential backoff for API retry
    return "Summarization Failed"

# Batch summarization function for efficient Spark execution
def batch_summarize(iterator) -> pd.DataFrame:
    # Initialize Azure OpenAI client within each Spark partition (avoids serialization issues)
    client = AzureOpenAI(
        api_version=api_version,
        azure_endpoint=endpoint,
        api_key=subscription_key,
    )

    for pdf in iterator:
         # Apply summarization on the 'clean_text' column
        pdf['summary'] = pdf['clean_text'].apply(lambda x: summarize_text(x, client))
        # Ensure 'id' column data type compatibility with Spark
        pdf['id'] = pdf['id'].astype('int64')
        # Return the DataFrame with required column
        yield pdf[['id', 'text', 'clean_text', 'summary']]

# Apply batch summarization with Spark's mapInPandas for scalability
small_spark_df = silver_df.limit(3)
gold_df = small_spark_df.mapInPandas(batch_summarize, schema="id long, text string, clean_text string, summary string")


# Persist the summarized data in Delta or Azure Blob Storage (Gold Layer)
#gold_df.write.format("delta").mode("overwrite").save("/mnt/data/gold/gold_summarized")

# 📦 Saving Summarized Data to Azure Blob Storage

In this step, the summarized data (`gold_df`) from the Gold Layer is persisted to **Azure Blob Storage**. The data is stored in Parquet format, ensuring efficient retrieval and optimal performance for downstream analytics or API consumption.

**Key Details:**
- **Storage Account**: `textsummarizationstorage`
- **Container Name**: `gold-layer`
- **Output Path**: `gold-summarized/`
- **Security**: Credentials (SAS tokens) securely managed via Databricks secrets.



In [0]:
# Define Azure storage account details
storage_account = "textsummarizationstorage"
container_name = "gold-layer"

# Set Azure Blob Storage SAS token securely from Databricks Secrets
sas_token = dbutils.secrets.get(scope = "azure-storage", key = "sas_token")

spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net",
    sas_token
)

# Define the Azure Blob Storage output path
output_path = f"wasbs://{container_name}@{storage_account}.blob.core.windows.net/gold-summarized/"

# Write the summarized DataFrame to Blob Storage in Parquet format
gold_df.write.mode("overwrite").parquet(output_path)


# Retrieving Summarized Data from Azure Function API
This step demonstrates how to fetch summarized text data from an Azure Function API endpoint directly into Databricks. This method allows our Databricks notebook to seamlessly integrate with external APIs, ensuring easy retrieval and further processing of our Gold-layer summarized data.

Steps involved:

 - Securely fetching the Azure Function key from Databricks Secrets.
 - Making an HTTP GET request to the Azure Function API.
 - Handling and verifying the API response.

In [0]:
import requests
import pandas as pd

# Azure Function endpoint and function key
function_url = "https://textsummarizationapi.azurewebsites.net/api/summaries"
function_key = dbutils.secrets.get(scope = "azure-function-apps", key = "summarize_api_key")

# Make an HTTP GET request to fetch summarized data
response = requests.get(f"{function_url}?code={function_key}")

# Check response
if response.status_code == 200:
    # Successful API call, parse JSON response
    data = response.json()
    print("API call successful!")
else:
    # API call failed; print status code and error details
    print(f"API call failed with status code: {response.status_code}")
    print(response.text)


API call successful!


In [0]:
# Load JSON response into Pandas DataFrame first
df = pd.DataFrame(data)

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Display the DataFrame
display(spark_df)


text,summary
Claim #38945: Initial payout approved. Funds to be disbursed soon.,"Claim #38945 has been approved for initial payout, with funds expected to be disbursed shortly."
Filed a claim through the mobile app. Process was smooth and easy.,The claim filing process via the mobile app was smooth and easy.
Claim #43298: Claim rejected due to policy exclusions.,Claim #43298 was rejected because it falls under policy exclusions.
