# üîÑ Notebook: 01 ETL Silver Layer

This notebook handles the **Silver Layer (Data Transformation & Processing)** in the Medallion Architecture of the AI-powered claims pipeline. It focuses on **audio conversion**, **metadata extraction**, and **speech-to-text transcription** using the [OpenAI Whisper model](https://openai.com/index/whisper/).

---

## üß± Purpose

To convert raw audio files into a consistent format (MP3), calculate metadata (duration), and transcribe the content into structured text to support downstream AI analytics.



In [0]:
%pip install pydub mutagen openai-whisper numpy>=1.24
dbutils.library.restartPython()

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting mutagen
  Downloading mutagen-1.47.0-py3-none-any.whl.metadata (1.7 kB)
Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/800.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m174.1/800.5 kB[0m [31m5.5 MB/s[0m eta [36m0:00:01[0m
[2K     [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m491.5/800.5 kB[0m [31m7.5 MB/s[0m eta [36m0:00:01[0m
[2K     [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ

In [0]:
%run "./resources/init" 

DataFrame[]

True

In [0]:
bronze_df = spark.table(f"{CATALOG}.{SCHEMA}.{BRONZE_TABLE}")

meta_table_name = f"{CATALOG}.{SCHEMA}.{META_TABLE}"
if spark._jsparkSession.catalog().tableExists(meta_table_name):
    processed_df = spark.table(meta_table_name).filter("processed = True")
    file_reference_df = bronze_df.join(processed_df, "file_name", "left_anti")
else:
    file_reference_df = bronze_df

if file_reference_df.isEmpty():
    dbutils.notebook.exit("‚úÖ No new files to process. Exiting Silver Layer.")

In [0]:
from pydub import AudioSegment
import os

dbutils.fs.mkdirs(mp3_audio_path)

# Convert each file to mp3 and save to the new volume
for row in file_reference_df.collect():
    file_path = row['file_path']
    try:
        audio = AudioSegment.from_file(file_path)
        new_file_path = os.path.join(mp3_audio_path, os.path.basename(file_path).replace(os.path.splitext(file_path)[1], ".mp3"))
        audio.export(new_file_path, format="mp3")
    except Exception as e:
        print(f"‚ö†Ô∏è Error converting {file_path}: {e}")

In [0]:
from mutagen.mp3 import MP3
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

mp3_df = (
    spark.createDataFrame(dbutils.fs.ls(mp3_audio_path))
    .withColumn("file_path", F.expr("substring(path, 6, length(path))"))
    .withColumn("file_name", F.expr("substring(name, 1, length(name) - 4)"))
    .filter(F.col("file_name").isin([r["file_name"] for r in file_reference_df.collect()]))
)

def get_audio_duration(file_path):
    try:
        audio = MP3(file_path)
        return float(audio.info.length)
    except Exception as e:
        print(f"‚ö†Ô∏è Error getting duration for {file_path}: {e}")
        return None

duration_udf = F.udf(get_audio_duration, FloatType())
mp3_df = mp3_df.withColumn("audio_duration", F.round(duration_udf("file_path"), 0))

display(mp3_df)

path,name,size,modificationTime,file_path,file_name,audio_duration
dbfs:/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/5e7e3k53_AGT002_2025-01-15 13_35_10.mp3,5e7e3k53_AGT002_2025-01-15 13_35_10.mp3,724461,1747047262000,/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/5e7e3k53_AGT002_2025-01-15 13_35_10.mp3,5e7e3k53_AGT002_2025-01-15 13_35_10,91.0
dbfs:/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ct4m50n5_AGT005_2025-03-01 12_36_07.mp3,ct4m50n5_AGT005_2025-03-01 12_36_07.mp3,865197,1747047253000,/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ct4m50n5_AGT005_2025-03-01 12_36_07.mp3,ct4m50n5_AGT005_2025-03-01 12_36_07,108.0
dbfs:/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/nv7032f9_AGT001_2025-02-27 12_40_45.mp3,nv7032f9_AGT001_2025-02-27 12_40_45.mp3,914349,1747047255000,/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/nv7032f9_AGT001_2025-02-27 12_40_45.mp3,nv7032f9_AGT001_2025-02-27 12_40_45,114.0
dbfs:/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/pxvlh18a_AGT001_2025-02-11 11_33_33.mp3,pxvlh18a_AGT001_2025-02-11 11_33_33.mp3,946605,1747047258000,/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/pxvlh18a_AGT001_2025-02-11 11_33_33.mp3,pxvlh18a_AGT001_2025-02-11 11_33_33,118.0
dbfs:/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ulnocrnh_AGT005_2025-02-04 05_42_51.mp3,ulnocrnh_AGT005_2025-02-04 05_42_51.mp3,956013,1747047260000,/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ulnocrnh_AGT005_2025-02-04 05_42_51.mp3,ulnocrnh_AGT005_2025-02-04 05_42_51,119.0


In [0]:
from pyspark.sql.functions import regexp_extract

audio_stream_df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("recursiveFileLookup", "true")
.load(mp3_audio_path))

(audio_stream_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/raw_audio")
.toTable(f"{CATALOG}.{SCHEMA}.raw_audio_binaries")
.awaitTermination())

In [0]:
transcriptions_df = spark.sql(f"""
SELECT
  path as path,
  ai_query(
    'va_whisper_large_v3',
    content,
    failOnError => True
  ) as transcription
FROM `{CATALOG}`.`{SCHEMA}`.`raw_audio_binaries`
""")

# Join the transcriptions back to the original DataFrame
transcribed_df = mp3_df.join(transcriptions_df, on="path", how="inner") \
                                 .select("file_path", "file_name", "transcription", "audio_duration")

In [0]:
display(transcribed_df)

file_path,file_name,transcription,audio_duration
/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/5e7e3k53_AGT002_2025-01-15 13_35_10.mp3,5e7e3k53_AGT002_2025-01-15 13_35_10,"Hello, you've reached out VitalGuard. This is John speaking. How can I assist you today? Hi, this is Sophia Wilson. I was hoping to change my payment method. Of course, Sophia. For security, can I confirm your date of birth and policy number, please? Sure. My date of birth is 15th August 1995 and my policy number is VG924695. Thank you for confirming. What is your current payment method and how would you like to change it? I currently pay by direct debit, but I would like to switch to paying by credit card. Hmm, I can assist you with that. Please note that changing your payment method may affect your policy terms. Would you like me to explain the details? Yes, please do. Let me check that for you. Okay, I've updated your payment method to credit card. Your new payment schedule will be sent to you via email. Also, I'd like to take this opportunity to explain your coverage details. That sounds great, thank you! You're welcome! Just to summarize, I have changed your payment method to credit card and explained your coverage details. If you need further assistance, don't hesitate to reach out and have a wonderful day! Thanks, you too!",91.0
/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ct4m50n5_AGT005_2025-03-01 12_36_07.mp3,ct4m50n5_AGT005_2025-03-01 12_36_07,"Hello, this is VitalGuard. You're speaking to John. How can I help you today? Hi, this is Emma Johnson. I'm hoping to get some advice on my coverage options. Of course, Emma. For security, can I confirm your date of birth and policy number, please? Sure. My DOB is the 22nd of July, 1990, and my policy number is VG434271. Thank you for confirming. What kind of coverage details are you looking for? I want to know if my policy covers physiotherapy sessions. Okay, let me check that for you. Yes, your current plan includes up to 10 physiotherapy sessions per year, but they need to be prescribed by a GP. That's great to know. Also, if I need to make a complaint about a billing issue, how do I do that? I understand. I will provide you with a complaint reference number and you can submit your complaint through our online portal or over the phone. Here is your reference number. VGCMP4823. Thanks, that helps. Of course, you're very welcome. Is there anything else I can help you with today? No, that's all for now Great Just to summarize, Emma I have confirmed your policy covers physiotherapy with the GP referral and I have provided a complaint reference number for your billing issue If you do need further assistance, don't hesitate to reach out Have a wonderful day Thanks, you too",108.0
/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/nv7032f9_AGT001_2025-02-27 12_40_45.mp3,nv7032f9_AGT001_2025-02-27 12_40_45,"Hello, thank you for calling VitalGuard. My name is Michelle. How can I assist you today? Hey, this is Liam. Liam Brown. And I was hoping to get a duplicate policy document. Of course Liam. For security, can I confirm your date of birth and policy number? Sure. It's 10th of May 1978 and my policy number is VG8-29910. Thank you for confirming. I understand you're requesting a duplicate policy document. Can you please tell me what happened to the original document? I lost it during the move and now I need it for my records. I apologise for the inconvenience, Liam. I'm going to go ahead and send you a duplicate policy document via email. You should receive it within the next 24 hours. Hmm, okay, that sounds okay. But I'm also a bit frustrated with the premium costs. Can you explain the breakdown? I completely understand your concern, Liam. I'd be happy to provide you with a premium breakdown. Your monthly premium is ¬£3,100, which includes ¬£350 for medical coverage and ¬£320 for dental and ¬£330 for administrative fees. I see. That helps clarify things. You're welcome, Liam. Just to summarize, I've sent a duplicate policy document to your email, and I've provided a premium breakdown. If you have any further questions or concerns, please don't hesitate to reach out. Is there anything else I can assist you with today? No, that's all for now. Thanks. Great. It's a pleasure assisting you today, Liam. Have a wonderful day.",114.0
/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/pxvlh18a_AGT001_2025-02-11 11_33_33.mp3,pxvlh18a_AGT001_2025-02-11 11_33_33,"Hello, thank you for calling VitalGuard. My name is Lucy. How can I assist you today? Hi, this is John Smith. How are you doing? I'm doing good, thank you. How are you? Yeah, pretty good, thanks. Um, yeah, so how can I assist you today? Yeah, well, so I was hoping to get some help because I've had some payment issues recently. Of course, John. For security, can I confirm your date of birth and policy number? Of course. So my full name is John Smith. My date of birth is 3rd December 1985. And my policy number is VG493147. Thank you for confirming. Can you please tell me more about the payment issue you're experiencing? Yeah, I think there might be some errors with the last time I was trying to make the payment, but I'm not sure what exactly was the issue. I understand. Let me check that for you. Yes, it seems that there was a minor issue with the payment pre-processing. I'm going to go ahead and correct that for you. That would be great. Thank you so much. you're welcome john uh as a precaution i'm also going to issue a new insurance card to ensure you have the most up-to-date information you should receive it within the next seven to ten working days oh that sounds good yeah thank you so much for your help you're very welcome so just to summarize i have corrected the payment issue and i'm issuing a new insurance card which you will receive shortly if you have any further questions or concerns please don't hesitate to reach out. Is there anything I can help you with today? Oh no, that's all for now. Thank you so much. Great. It was a pleasure assisting you, John, and have a wonderful day. Thank you. You too.",118.0
/Volumes/samantha_wise/ai_claims_processing_customer_demo/audio_recordings/mp3_audio_recordings/ulnocrnh_AGT005_2025-02-04 05_42_51.mp3,ulnocrnh_AGT005_2025-02-04 05_42_51,"Hello, thank you for calling VitalGuard. My name is Sophia. How can I assist you today? Hi, this is Noah Taylor. I was hoping to report a lost insurance card. I'm so sorry to hear that, Noah. For security, can I confirm your date of birth and policy number? Sure. My date of birth is 30th November 1983 and my policy number is VG997352. Thank you for confirming. Can you please tell me more about what happened to your insurance card? I think I might have misplaced it when I was moving houses. I've looked everywhere but can't seem to find it. Don't worry, Noah. I'm here to help. I can arrange a replacement card to be sent to you. Would you like me to do that? Yes, please. That would be great. Thank you. I've processed the request for your replacement card. You should receive it within the next 7 to 10 working days. In the meantime, if you need any medical attention, you can contact us and we will provide you with a temporary cover note. That sounds good. What about the status of my claim? Will this affect it? I'll check on the status of your claim. Everything seems to be in order. I will send you an email with an update on your claim status. If there are any changes, we will notify you promptly. That's great. Such a relief. Thank you. You're welcome, Noah. So just to summarize, I have arranged a replacement insurance card to be sent to you and I will email you with an update on your insurance claim status. If you have any further questions or concerns, please don't hesitate to reach out. Is there anything I can help you with further today? That's all for today. Thank you so much. You're welcome, Noah. It was my pleasure to assist you. Have a great day and we will be in touch soon.",119.0


In [0]:
##### For simulated data purposes, not part of final solution

from pyspark.sql.functions import expr, substring, regexp_replace, col, split, to_timestamp, concat_ws

silver_table_path = f"{CATALOG}.{SCHEMA}.{SILVER_TABLE}"

if first_run:
    # Load existing transcriptions from the simulated table
    existing_transcriptions_df = spark.table(f"{CATALOG}.{SCHEMA}.simulated_transcriptions")
    
    # Extract file name from file path
    existing_transcriptions_df = existing_transcriptions_df.withColumn("file_name", expr("substring(file_path, -39, 35)"))
    
    # Combine existing (simulated) transcriptions with new transcriptions
    combined_transcriptions_df = existing_transcriptions_df.unionByName(transcribed_df)
    
    # Extract and transform relevant columns from file name
    combined_transcriptions_df = combined_transcriptions_df.withColumn("file_name", split(col("file_path"), "/").getItem(6)) \
        .withColumn("file_name", regexp_replace(col("file_name"), ".mp3", "")) \
        .withColumn("call_id", split(col("file_name"), "_").getItem(0)) \
        .withColumn("agent_id", split(col("file_name"), "_").getItem(1)) \
        .withColumn("call_datetime", 
            to_timestamp(
                concat_ws(":", split(col("file_name"), "_").getItem(2), 
                split(col("file_name"), "_").getItem(3), 
                split(col("file_name"), "_").getItem(4))))
    
    # Display the combined DataFrame
    display(combined_transcriptions_df)
    
    # Overwrite the transcriptions_silver table with the combined DataFrame
    combined_transcriptions_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_path)

else:
    transcribed_df = transcribed_df.withColumn("file_name", regexp_replace(col("file_name"), ".mp3", "")) \
        .withColumn("call_id", split(col("file_name"), "_").getItem(0)) \
        .withColumn("agent_id", split(col("file_name"), "_").getItem(1)) \
        .withColumn("call_datetime", 
            to_timestamp(
                concat_ws(":", split(col("file_name"), "_").getItem(2), 
                split(col("file_name"), "_").getItem(3), 
                split(col("file_name"), "_").getItem(4)))
        )

    display(transcribed_df.select("file_name", "call_id", "agent_id", "call_datetime", "audio_duration", "transcription"))

    if not spark._jsparkSession.catalog().tableExists(silver_table_path):
        transcribed_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_path)
    else:
        transcribed_df.write.mode("append").saveAsTable(silver_table_path)

    print(f"‚úÖ Transcriptions written to Silver table: {silver_table_path}")



## ‚úÖ Output
- A clean, enriched Delta table: transcriptions_silver
- Includes transcription text, call metadata, and audio duration for each entry.