In [0]:
# =========================================
# silver_to_gold.py (Updated)
# Description: Add summary column using DeepSeek + TTS audio via Azure
# =========================================

import requests
import os
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import json
import pandas 

# Set Spark config for Azure Data Lake Gen2
spark.conf.set("fs.azure.account.key.kkstoragemo.dfs.core.windows.net", "")

# Paths
silver_path = "abfss://silver@kkstoragemo.dfs.core.windows.net/BooksDatasetCleaned.parquet"
gold_path = "abfss://gold@kkstoragemo.dfs.core.windows.net/BooksDatasetWithSummary.parquet"
gold_audio_base_uri = "abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/"

In [0]:
# API Keys


In [0]:
# --- DeepSeek Summary Function ---
def summarize_with_deepseek(text):
    if not text:
        return None
    try:
        headers = {
            "Authorization": f"Bearer {OPENROUTER_API_KEY}",
            "Content-Type": "application/json",
            "HTTP-Referer": "https://kukushorts.ai",  # Replace with your actual domain or local dev URL
            "X-Title": "KukuShorts"  # Your app/project name
        }
        data = {
            "model": "deepseek/deepseek-r1-zero:free",
            "messages": [
                {"role": "system", "content": "You are a helpful assistant that summarizes book descriptions."},
                {"role": "user", "content": f"Summarize this book: {text}"}
            ]
        }
        response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, data=json.dumps(data))
        if response.status_code == 200:
            result = response.json()
            return result["choices"][0]["message"]["content"].strip()
        else:
            print("DeepSeek Error:", response.status_code, response.text)
            return None
    except Exception as e:
        print("DeepSeek Exception:", e)
        return None

# Register UDF for Summary
summary_udf = udf(summarize_with_deepseek, StringType())

# --- Read Silver Dataset ---
df_silver = spark.read.parquet(silver_path)

# --- Add Summary Column ---
df_with_summary = df_silver.withColumn("Summary", summary_udf(col("Description")))

# --- Azure TTS Function ---
def generate_tts_and_return_path(title, summary):
    if not summary:
        return None
    try:
        url = f"https://{AZURE_REGION}.tts.speech.microsoft.com/cognitiveservices/v1"
        headers = {
            "Ocp-Apim-Subscription-Key": AZURE_TTS_KEY,
            "Content-Type": "application/ssml+xml",
            "X-Microsoft-OutputFormat": "audio-16khz-128kbitrate-mono-mp3"
        }
        ssml = f"""<speak version='1.0' xml:lang='en-US'>
            <voice xml:lang='en-US' name='en-US-GuyNeural'>
                {summary}
            </voice>
        </speak>"""
        response = requests.post(url, headers=headers, data=ssml.encode("utf-8"))

        if response.status_code == 200:
            safe_title = "".join([c for c in title if c.isalnum() or c in (' ', '-', '_')]).rstrip()
            file_name = f"{safe_title}.mp3"
            file_path = f"/dbfs/mnt/kkstoragemo/gold/audio_summaries/{file_name}"

            os.makedirs(os.path.dirname(file_path), exist_ok=True)
            with open(file_path, "wb") as audio_file:
                audio_file.write(response.content)

            return f"{gold_audio_base_uri}{file_name}"
        else:
            print("TTS Error:", response.status_code, response.text)
            return None
    except Exception as e:
        print("TTS Exception:", e)
        return None

# Register UDF for Audio Path
tts_udf = udf(generate_tts_and_return_path, StringType())

# --- Add AudioPath Column ---
df_gold = df_with_summary.withColumn("AudioPath", tts_udf(col("Title"), col("Summary")))

# --- Write to Gold ---
df_gold.write.mode("overwrite").parquet(gold_path)

In [0]:
from pyspark.sql.functions import col, count, when

# Load your gold dataset
df_gold = spark.read.parquet(gold_path)

# Check how many rows have Summary as null and non-null
df_gold.select(
    count(when(col("Summary").isNull(), True)).alias("Null_Summary_Count"),
    count(when(col("Summary").isNotNull(), True)).alias("NonNull_Summary_Count")
).show()

+------------------+---------------------+
|Null_Summary_Count|NonNull_Summary_Count|
+------------------+---------------------+
|                 1|                   49|
+------------------+---------------------+



In [0]:
import requests
import os
from xml.sax.saxutils import escape
from pyspark.sql.functions import col

# Azure TTS credentials
tts_key = "9N88zutUMyvLzYhRbvFJOKpcYE6bQMuf0pkvauAAQIHvHPuSr7wXJQQJ99BDACLArgHXJ3w3AAAYACOGdsmf"
tts_region = "southcentralus"

# Step 1: Filter non-null TTSInput
rows = df_gold_filled.select("Title", "TTSInput").where(col("TTSInput").isNotNull()).collect()

# Step 2: TTS generation function
def generate_tts(text, title):
    filename = title.replace(" ", "_").replace("/", "_").replace(":", "_")[:30]
    local_path = f"/tmp/{filename}.mp3"
    abfss_path = f"abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/{filename}.mp3"

    escaped_text = escape(text)

    endpoint = f"https://{tts_region}.tts.speech.microsoft.com/cognitiveservices/v1"
    headers = {
        "Ocp-Apim-Subscription-Key": tts_key,
        "Content-Type": "application/ssml+xml",
        "X-Microsoft-OutputFormat": "audio-16khz-32kbitrate-mono-mp3"
    }

    ssml = f"""
    <speak version='1.0' xml:lang='en-US'>
        <voice xml:lang='en-US' xml:gender='Female' name='en-US-AriaNeural'>
            {escaped_text}
        </voice>
    </speak>
    """

    try:
        response = requests.post(endpoint, headers=headers, data=ssml.encode("utf-8"))
        if response.status_code == 200:
            with open(local_path, "wb") as f:
                f.write(response.content)
            dbutils.fs.cp(f"file:{local_path}", abfss_path)
            print(f"✅ Saved: {abfss_path}")
        else:
            print(f"❌ TTS Error ({response.status_code}): {title}")
    except Exception as e:
        print(f"⚠️ Error for {title}: {str(e)}")

# Step 3: Loop through rows and generate
for row in rows:
    title = row["Title"]
    text = row["TTSInput"].replace("\\boxed{", "").replace("}", "").strip()
    generate_tts(text, title)

✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Goat_Brothers.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/The_Missing_Person.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Don't_Eat_Your_Heart_Out_Cookb.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/When_Your_Corporate_Umbrella_B.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Amy_Spangler's_Breastfeeding__.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/The_Foundation_of_Leadership__.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Chicken_Soup_for_the_Soul__101.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Journey_Through_Heartsongs.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/In_Search_of_Melancholy_Baby.mp3
✅ Saved: abfss://gold@kkstoragemo.dfs.core.windows.net/audio_summaries/Christmas_