# Google Gen AI Capstone Project Q1:

This notebook consolidates three projects from the **Google Gen AI Capstone Project Q1**:

### 📁 Project 1: AI Noise Reduction
- Uses Gemini to clean noisy sentences and restore clarity using prompt engineering and text generation.

### 📁 Project 2: Anomaly Detection in Text
- Leverages Gemini's embedding API to detect outliers in textual datasets by comparing distances in vector space.

### 📁 Project 3: Convert Notebook to PDF
- Demonstrates a simple Gradio UI to upload a notebook and convert it into a styled PDF or HTML output.

Each project retains its original structure and content.

## Conversational Audio Restoration Agent using Gemini and Gradio

This notebook implements an interactive web application where users can upload audio files, ask questions about them (like "What is the noise floor?"), and request noise reduction (like "Remove noise by 5 dB"). The application uses the Google Gemini API for natural language understanding and function calling, `librosa` and `noisereduce` for audio processing, and `Gradio` for the user interface.

**Key Features:**

*   **Conversational Interface:** Interact with the system using natural language queries.
*   **Audio Upload:** Supports uploading audio files (WAV recommended).
*   **Noise Floor Analysis:** Estimates and reports the background noise level in dB using the `get_noise_floor` function.
*   **Noise Reduction:** Applies noise reduction using spectral gating via the `reduce_noise_by_db` function, controlled by the user's request (e.g., "by 5 dB").
*   **Gemini Function Calling:** Leverages Gemini's ability to understand the user's intent and automatically call the appropriate Python function (`get_noise_floor` or `reduce_noise_by_db`) with the correct parameters.
*   **Visual Feedback:** Displays spectrograms of the original and denoised audio for visual comparison.
*   **Audio Playback:** Allows playback of the original (via re-upload if needed) and denoised audio.

**Potential Future Enhancements:**

*   **Support for More Formats:** Improve audio loading robustness (e.g., ensure FFmpeg is reliably used) to handle MP3, M4A, etc.
*   **Advanced Noise Reduction:** Integrate more sophisticated denoising models (e.g., deep learning based) or offer different algorithm choices.
*   **Parameter Tuning:** Allow users to fine-tune noise reduction parameters via the UI (e.g., aggressiveness, frequency range).
*   **Streaming Audio:** Support processing real-time audio streams.
*   **Deployment:** Package the application for deployment (e.g., using Docker, Hugging Face Spaces).


In [None]:
# Remove unused conflicting packages
#!pip uninstall -qqy jupyterlab kfp 2>/dev/null
# Install specific google-genai version used in the original notebook
!pip install -U -q "google-genai==1.7.0"

## 1. Install Dependencies

*   This cell installs the necessary Python libraries required for the application.
*   `google-genai`: The official Google Gemini SDK for Python.
*   `gradio`: Used to create the interactive web UI.
*   `librosa`: A powerful library for audio analysis (loading, spectrograms).
*   `matplotlib`: Used by librosa for plotting spectrograms.
*   `noisereduce`: Performs the noise reduction algorithm.
*   `soundfile`: Used by librosa (and directly) for reading/writing audio files (often needs system libraries like `libsndfile`).
*   `numpy`: Fundamental package for numerical operations.
*   `ffmpeg-python`: Python bindings for FFmpeg. `librosa`'s fallback audio loading mechanism (`audioread`) often requires the FFmpeg multimedia framework to be installed on your system to handle various audio formats (like MP3 or certain WAV encodings). You might need to install FFmpeg separately using your system's package manager (e.g., `sudo apt update && sudo apt install ffmpeg` on Debian/Ubuntu, `brew install ffmpeg` on macOS).


In [None]:
# Install required packages if not already installed
# Uncomment the line below to run the installation if needed
!pip install  gradio librosa matplotlib noisereduce soundfile numpy ffmpeg-python --quiet

## 2. Import Libraries

*   Imports all the necessary modules from the installed libraries for use in the script.
*   `os`: For interacting with the operating system (e.g., setting environment variables, path operations).
*   `gradio` as `gr`: For building the user interface.
*   `numpy` as `np`: For numerical calculations.
*   `librosa`, `librosa.display`: For audio loading and spectrogram visualization.
*   `matplotlib.pyplot` as `plt`: For finalizing and customizing plots.
*   `noisereduce` as `nr`: For the noise reduction function.
*   `soundfile` as `sf`: For writing audio files.
*   `google.genai` as `genai`, `google.genai.types`: For interacting with the Gemini API and its specific types.
*   `traceback`: For getting detailed error information in exception handlers.
*   `time`: For generating unique timestamps for filenames.

In [None]:
import os
import gradio as gr
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import noisereduce as nr
import soundfile as sf
from google import genai
from google.genai import types
import traceback # Import traceback for better error details
import time

## 3. Initialize Gemini Client

*   Sets the Google API key from an environment variable. **Remember to replace `"YOUR_GEMINI_API_KEY"` with your actual key.**
*   Creates the Gemini API client instance (`genai.Client`).


In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
api_key = user_secrets.get_secret("GOOGLE_API_KEY")
print("Running on Kaggle, API key loaded from Kaggle Secrets.")
client = genai.Client(api_key=api_key)

## 4. Define Audio Analysis Tool Functions

These Python functions will be made available to the Gemini model. The Gemini SDK's function calling feature allows the model to decide when to execute these functions based on the user's query.

#### `get_noise_floor(audio_path: str) -> dict`

*   **Purpose:** Estimates the background noise level (noise floor) of an audio file.
*   **Input:** `audio_path` (string) - The absolute path to the audio file.
*   **Process:**
    1.  Checks if the provided `audio_path` actually exists. Returns an error dictionary if not.
    2.  Loads the audio file using `librosa.load(audio_path, sr=None)`. `sr=None` preserves the original sample rate. Librosa might use `soundfile` or fallback to `audioread` (which may need FFmpeg).
    3.  Handles cases where the audio file is empty or contains only silence.
    4.  Estimates the noise floor amplitude: It calculates the 10th percentile of the *absolute* values of the audio samples. This is a simple heuristic assuming that the quietest 10% of the signal largely represents background noise.
    5.  Converts the noise amplitude to decibels (dB) relative to the maximum possible amplitude (0 dBFS). The formula used is $$ \text{dB} = 10 \log_{10}(\text{amplitude}^2 + 10^{-10}) $$. A small value ($$10^{-10}$$) is added before the logarithm for numerical stability, preventing $$ \log_{10}(0) $$.
*   **Output:** A dictionary containing either the calculated noise floor (`{'noise_floor_db': float_value}`) or an error message (`{'noise_floor_db': None, 'error': '...'}`).
*   **Gemini Integration:** The function's docstring and type hints allow the Gemini SDK to automatically create a schema. Gemini will call this function when the user asks a question like "What is the noise floor?".

#### `reduce_noise_by_db(audio_path: str, reduction_db: int) -> dict`

*   **Purpose:** Reduces background noise in an audio file using a spectral gating algorithm provided by the `noisereduce` library.
*   **Inputs:**
    *   `audio_path` (string): The absolute path to the input audio file.
    *   `reduction_db` (integer): The desired amount of noise reduction in decibels (dB). A higher value means more aggressive reduction.
*   **Process:**
    1.  Checks if the input `audio_path` exists.
    2.  Loads the audio file using `librosa.load`.
    3.  Handles empty audio files.
    4.  Estimates the noise profile: It takes the first 0.5 seconds of the audio (`noise_clip`) as representative of the background noise. Handles cases where the audio is shorter than 0.5 seconds or if the noise clip contains non-finite values (e.g., `NaN`, `inf`).
    5.  Applies noise reduction using `noisereduce.reduce_noise`:
        *   `y`: The full audio signal array.
        *   `sr`: The audio sample rate.
        *   `y_noise`: The noise profile estimated from `noise_clip`.
        *   `prop_decrease`: Controls the aggressiveness of the noise reduction. The `noisereduce` library expects a proportion (0 to 1), so the input `reduction_db` is scaled (approximately `reduction_db / 20`) and clamped between 0 and 1.
    6.  Generates a unique output filename using a timestamp (`denoised_{timestamp}.wav`) and constructs its absolute path in the current working directory.
    7.  Saves the noise-reduced audio (`reduced`) to the new file using `soundfile.write`.
    8.  Verifies that the output file was created successfully and is not empty using `os.path.exists` and `os.path.getsize`.
*   **Output:** A dictionary containing either the absolute path to the denoised file (`{'denoised_path': '/path/to/denoised_....wav'}`) or an error message (`{'denoised_path': None, 'error': '...'}`).
*   **Gemini Integration:** Gemini calls this function when the user asks to "remove noise", "denoise", etc., inferring the `reduction_db` amount from the query.


In [None]:
# --- Audio Analysis Functions (for Gemini Function Calling) ---

def get_noise_floor(audio_path: str) -> dict:
    """
    Calculate the noise floor (in dB) of the given audio file.
    The noise floor is estimated based on the 10th percentile amplitude.

    Args:
        audio_path: Absolute path to the audio file (wav recommended).

    Returns:
        Dictionary with 'noise_floor_db' as float, or 'error' on failure.
    """
    try:
        # Check if the file exists before trying to load
        if not os.path.exists(audio_path):
            print(f"[get_noise_floor] Error: File not found at specified path: {audio_path}")
            return {"noise_floor_db": None, "error": f"File not found: {audio_path}"}

        # Load audio using librosa (sr=None preserves original sample rate)
        y, sr = librosa.load(audio_path, sr=None)

        # Handle empty or silent audio
        if y.size == 0 or np.all(y == 0):
            print(f"[get_noise_floor] Warning: Audio file is empty or silent: {audio_path}")
            return {"noise_floor_db": -np.inf} # Represent silence as negative infinity dB

        # Estimate noise floor amplitude (10th percentile of absolute signal)
        noise_amplitude = np.percentile(np.abs(y), 10)

        # Convert amplitude to dB (relative to 1.0)
        # Add epsilon (1e-10) for numerical stability to avoid log10(0)
        noise_floor_db = 10 * np.log10(noise_amplitude**2 + 1e-10)

        # print(f"[get_noise_floor] Calculated noise floor for {os.path.basename(audio_path)}: {noise_floor_db:.2f} dB") # Optional debug log
        return {"noise_floor_db": float(noise_floor_db)}
    except Exception as e:
        # Log and return error if any step fails
        print(f"[get_noise_floor] ERROR processing {os.path.basename(audio_path)}: {e}")
        return {"noise_floor_db": None, "error": str(e)}

def reduce_noise_by_db(audio_path: str, reduction_db: int) -> dict:
    """
    Reduce noise in the audio file by a specified dB amount using spectral gating.

    Args:
        audio_path: Absolute path to the input audio file (wav recommended).
        reduction_db: Amount of noise reduction desired in dB (e.g., 5, 10).

    Returns:
        Dictionary with 'denoised_path' (absolute path to output wav file), or 'error'.
    """
    try:
        # Check if input file exists
        if not os.path.exists(audio_path):
            print(f"[reduce_noise_by_db] Error: Input file not found: {audio_path}")
            return {"denoised_path": None, "error": f"Input file not found: {audio_path}"}

        # Load audio
        y, sr = librosa.load(audio_path, sr=None)

        # Handle empty audio
        if y.size == 0:
            print(f"[reduce_noise_by_db] Warning: Input audio is empty: {audio_path}")
            return {"denoised_path": None, "error": "Input audio is empty"}

        # Estimate noise profile from the beginning of the audio (e.g., first 0.5 seconds)
        noise_clip_len = min(len(y), int(sr*0.5)) # Use min to handle short audio
        if noise_clip_len == 0:
             print(f"[reduce_noise_by_db] Warning: Audio too short for noise profiling: {audio_path}")
             # If audio is extremely short, maybe just return original or error
             return {"denoised_path": audio_path, "error": "Audio too short for noise profiling"}
        noise_clip = y[:noise_clip_len]

        # Ensure noise profile contains valid numbers
        if not np.all(np.isfinite(noise_clip)):
             print(f"[reduce_noise_by_db] Error: Non-finite values detected in noise clip for {audio_path}")
             return {"denoised_path": None, "error": "Non-finite values detected in noise clip"}

        # Perform noise reduction using noisereduce library
        # `prop_decrease` controls reduction amount (0-1 scale), map from dB and clamp
        prop_decrease = min(max(reduction_db / 20.0, 0.0), 1.0) # Clamp between 0 and 1
        print(f"[reduce_noise_by_db] Applying noise reduction with prop_decrease={prop_decrease:.2f} (from {reduction_db} dB)")
        reduced_audio = nr.reduce_noise(y=y, sr=sr, y_noise=noise_clip, prop_decrease=prop_decrease)

        # Generate unique output filename and absolute path in the current directory
        timestamp = int(time.time())
        out_filename = f"denoised_{timestamp}.wav"
        out_path = os.path.join(os.getcwd(), out_filename) # Use absolute path

        # Save the denoised audio
        sf.write(out_path, reduced_audio, sr)

        # Verify file was saved successfully
        if os.path.exists(out_path) and os.path.getsize(out_path) > 0:
            print(f"[reduce_noise_by_db] Noise reduction successful. Saved to: {out_path} (Size: {os.path.getsize(out_path)} bytes)")
            return {"denoised_path": out_path} # Return the absolute path
        else:
            # Handle file saving errors
            error_msg = f"Failed to write or created empty denoised file: {out_path}"
            print(f"[reduce_noise_by_db] ERROR: {error_msg}")
            return {"denoised_path": None, "error": error_msg}
    except Exception as e:
        # Log and return error if any step fails
        print(f"[reduce_noise_by_db] ERROR processing {os.path.basename(audio_path)}: {e}")
        # traceback.print_exc() # Uncomment for full traceback during debugging
        return {"denoised_path": None, "error": str(e)}


## 5. Define Spectrogram Plotting Function

*   **`plot_spectrogram(audio_path, title)`**:
    *   Takes an audio file path and a title string.
    *   Checks if the path is valid and the file exists and is not empty.
    *   Loads the audio using `librosa.load`.
    *   Calculates the Short-Time Fourier Transform (STFT) using `librosa.stft`.
    *   Converts the STFT magnitude to decibels using `librosa.amplitude_to_db`.
    *   Uses `librosa.display.specshow` to create a spectrogram plot (frequency vs. time, color intensity represents dB level).
    *   Adds a title and color bar.
    *   Uses `plt.close(fig)` to prevent the plot from displaying directly in the notebook output (Gradio will handle displaying it).
    *   Returns the `matplotlib.figure.Figure` object for Gradio to display, or `None` if an error occurred.


In [None]:
# --- Spectrogram Plotting Function ---

def plot_spectrogram(audio_path, title="Spectrogram"):
    """
    Generates a spectrogram plot for the given audio file.

    Args:
        audio_path: Absolute path to the audio file.
        title: Title for the plot.

    Returns:
        A matplotlib Figure object containing the plot, or None on error.
    """
    try:
        # Validate input path and file existence/size
        if not audio_path or not isinstance(audio_path, str) or not os.path.exists(audio_path):
             print(f"[plot_spectrogram] Skipped: File not found or path invalid: {audio_path}")
             return None
        # Check file size to avoid errors with empty files
        if os.path.getsize(audio_path) == 0:
            print(f"[plot_spectrogram] Skipped: Audio file is empty: {audio_path}")
            return None

        # print(f"[plot_spectrogram] Plotting: {audio_path}") # Optional debug log
        # Load audio
        y, sr = librosa.load(audio_path, sr=None)

        # Handle empty loaded audio data (should be caught by size check, but belt-and-suspenders)
        if y.size == 0:
            print(f"[plot_spectrogram] Skipped: Loaded audio data is empty after load: {audio_path}")
            return None

        # Compute STFT and convert to dB scale (log magnitude)
        D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)

        # Create the plot using matplotlib
        fig, ax = plt.subplots(figsize=(8, 3)) # Adjust figsize as needed
        # Display spectrogram with log frequency axis
        img = librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log', ax=ax)
        ax.set(title=title) # Set plot title
        fig.colorbar(img, ax=ax, format="%+2.0f dB") # Add color bar showing dB scale
        plt.tight_layout() # Adjust layout

        # IMPORTANT: Close the plot figure object to prevent duplicate display
        plt.close(fig)

        # print(f"[plot_spectrogram] Success for: {audio_path}") # Optional debug log
        return fig # Return the figure object for Gradio
    except Exception as e:
        # Log errors during plotting
        print(f"[plot_spectrogram] ERROR plotting spectrogram for {os.path.basename(audio_path)}: {e}")
        # traceback.print_exc() # Uncomment for detailed traceback during debugging
        return None # Return None if plotting fails


## 6. Define Main Agent Logic

*   **`agent(audio, user_query)`**: This is the core function that Gradio calls when the user clicks "Submit".
    *   **Inputs:**
        *   `audio`: The audio data uploaded by the user (as a tuple `(sample_rate, numpy_array)` because `type="numpy"` is used in `gr.Audio`).
        *   `user_query`: The text entered by the user.
    *   **Process:**
        1.  Performs initial checks: verifies the Gemini client is initialized and the audio input is valid.
        2.  Generates a unique input filename and saves the uploaded audio data to an absolute path using `soundfile.write`. Checks if saving was successful.
        3.  Defines the list of available Python tool functions (`tools = [get_noise_floor, reduce_noise_by_db]`).
        4.  Defines the `system_instruction` to guide the Gemini model on how to behave and use the tools.
        5.  Constructs the prompt for Gemini, explicitly including the absolute path to the saved input audio file.
        6.  Prepares the `GenerateContentConfig` object, passing the `tools` list and the `system_instruction`.
        7.  Calls the Gemini API using `client.models.generate_content`, providing the model name, the user prompt (`contents`), and the `config`. This triggers the automatic function calling process if needed.
        8.  Initializes variables for the output text, denoised audio path, and spectrogram plots. Plots the original spectrogram immediately using the absolute input path.
        9.  **Parses Function Call Results from History:** Iterates through the `response.automatic_function_calling_history`. This history contains the sequence of model turns and tool executions. It looks specifically for parts with a `function_response` added back by the SDK (usually under the `user` role in the history).
        10. If a `function_response` is found, it extracts the nested `result` dictionary (which contains the actual dictionary returned by your Python function, e.g., `{'denoised_path': '...'}`).
        11. Based on the function name (`func_name`), it updates the `output_text` by appending the function result, assigns the `denoised_audio_path` (using the absolute path returned by the function), and calls `plot_spectrogram` for the denoised file if applicable. Includes checks for valid results (e.g., path exists).
        12. Retrieves the final text generated by the model (after function calls) from `response.candidates`.
        13. Includes robust error handling using a `try...except` block around the entire process.
    *   **Outputs:** Returns a tuple containing the values needed to update the Gradio output components: `(output_text, orig_spec_plot, denoised_spec_plot, denoised_audio_path)`.


In [None]:
# --- Gradio + Gemini Conversational Agent Logic ---

def agent(audio, user_query):
    """
    Handles user interaction via Gradio: saves audio, calls Gemini with tools,
    parses results, generates plots, and returns outputs for the UI.
    """
    # --- 1. Input Validation & Setup ---
    if client is None:
        # Check if Gemini client failed to initialize
        return "ERROR: Gemini client not initialized. Please check API Key and restart.", None, None, None
    if audio is None or not isinstance(audio, tuple) or len(audio) != 2:
        return "ERROR: Invalid audio input. Please upload a valid audio file.", None, None, None

    sample_rate, audio_data = audio
    if audio_data is None or audio_data.size == 0:
        return "ERROR: Audio data is empty.", None, None, None

    # Generate unique absolute path for the input file
    timestamp = int(time.time())
    input_audio_filename = f"input_{timestamp}.wav"
    input_audio_path = os.path.join(os.getcwd(), input_audio_filename) # Use absolute path

    # Initialize output variables
    output_text = "Processing..."
    orig_spec_plot = None
    denoised_spec_plot = None
    denoised_audio_path = None # Store path to the final denoised audio

    try:
        # --- 2. Save Uploaded Audio ---
        print(f"[agent] Saving uploaded audio to: {input_audio_path}")
        sf.write(input_audio_path, audio_data, sample_rate)
        # Verify save operation
        if not os.path.exists(input_audio_path) or os.path.getsize(input_audio_path) == 0:
             print(f"[agent] ERROR: Failed to save uploaded audio file to {input_audio_path}")
             return "[agent] Error: Failed to save uploaded audio file.", None, None, None
        print(f"[agent] Successfully saved input file.")

        # --- 3. Prepare Gemini API Call ---
        tools = [get_noise_floor, reduce_noise_by_db]
        system_instruction_text = (
            "You are an audio analysis assistant. Always use the provided tools to answer questions about noise floor or to denoise audio. "
            "Do not attempt to answer directly—always invoke the relevant function. The audio file path is specified in the user prompt."
        )
        config = types.GenerateContentConfig(
            tools=tools,
            system_instruction=system_instruction_text
        )
        # Pass the absolute path to Gemini in the prompt
        prompt_for_gemini = f"{user_query}\n\nAudio path: '{input_audio_path}'"
        contents = [{"role": "user", "parts": [{"text": prompt_for_gemini}]}]

        # --- 4. Call Gemini API ---
        print(f"[agent] Sending request to Gemini for file: {input_audio_path}")
        response = client.models.generate_content(
            model="gemini-1.5-flash-latest", # Use a model supporting function calling
            contents=contents,
            config=config
        )
        print(f"[agent] Received response from Gemini.")

        # --- 5. Process Gemini Response ---
        # Get the final text response generated by the model (after function calls)
        final_text_response = ""
        if response.candidates and response.candidates[0].content.parts:
             final_text_response = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
        output_text = final_text_response if final_text_response else "Processing complete."

        # Plot original spectrogram immediately (using absolute path)
        orig_spec_plot = plot_spectrogram(input_audio_path, "Original Spectrogram")

        # Parse function call results from the automatic history
        if hasattr(response, 'automatic_function_calling_history'):
            print(f"[agent] Parsing automatic_function_calling_history...")
            history = response.automatic_function_calling_history
            # Iterate through history to find the SDK-added FunctionResponse
            for content in reversed(history): # Look from end
                 if content.role == 'user' and content.parts: # SDK adds result as 'user' role
                     for part in content.parts:
                         if hasattr(part, "function_response") and part.function_response is not None:
                            func_name = part.function_response.name
                            func_response_data = part.function_response.response # Outer dict: {'result': {...}}
                            print(f"[agent]   Found FunctionResponse in history for: {func_name}")

                            # Check the expected nested structure: {'result': {actual_dict}}
                            if isinstance(func_response_data, dict) and 'result' in func_response_data:
                                actual_result = func_response_data.get('result')

                                if isinstance(actual_result, dict):
                                    # --- Handle get_noise_floor result ---
                                    if func_name == "get_noise_floor":
                                        if 'noise_floor_db' in actual_result and actual_result.get('noise_floor_db') is not None and np.isfinite(actual_result.get('noise_floor_db', np.nan)):
                                            noise_db = actual_result['noise_floor_db']
                                            output_text += f"\n\n[Function Result]: Noise floor is {noise_db:.2f} dB"
                                        else:
                                             error_msg = actual_result.get('error', 'invalid value')
                                             output_text += f"\n\n[Function Error]: Could not get noise floor - {error_msg}"

                                    # --- Handle reduce_noise_by_db result ---
                                    elif func_name == "reduce_noise_by_db":
                                        if 'denoised_path' in actual_result:
                                            path_value = actual_result.get('denoised_path')
                                            # Check if path is valid (non-None string) AND file exists
                                            if path_value and isinstance(path_value, str) and os.path.exists(path_value):
                                                denoised_audio_path = path_value # Assign the absolute path
                                                print(f"[agent]     Assigned denoised path: {denoised_audio_path}")
                                                if "Noise reduction successful" not in output_text: # Append if not already said by model
                                                     output_text += f"\n\n[Function Result]: Noise reduction successful. Output path: {os.path.basename(denoised_audio_path)}"
                                                # Plot the denoised spectrogram
                                                denoised_spec_plot = plot_spectrogram(denoised_audio_path, "Denoised Spectrogram")
                                            else:
                                                 error_msg = f"Path '{path_value}' from function invalid or file missing."
                                                 print(f"[agent]     ERROR: {error_msg}")
                                                 if "Noise reduction failed" not in output_text: output_text += f"\n\n[Function Error]: Noise reduction failed - {error_msg}"
                                        else:
                                             error_msg = actual_result.get('error', "'denoised_path' key missing")
                                             print(f"[agent]     ERROR: {error_msg}")
                                             if "Noise reduction failed" not in output_text: output_text += f"\n\n[Function Error]: Noise reduction failed - {error_msg}"
                                    # If only one function call expected, can break history loop here
                                    # break
                                # else: print error if actual_result wasn't a dict
                                # else: print error if 'result' key missing or not dict
        else:
             print("[agent] No automatic_function_calling_history found or parsed.")

        # --- 6. Final Output Preparation ---
        print("-" * 20)
        print("[agent] Values FINALIZED for Gradio:")
        print(f"[agent] Output Text: {output_text[:500]}...")
        print(f"[agent] Original Plot Type: {type(orig_spec_plot)}")
        print(f"[agent] Denoised Plot Type: {type(denoised_spec_plot)}")
        print(f"[agent] Denoised Audio Path: {denoised_audio_path}")
        print("-" * 20)

        # Return values in the order expected by Gradio outputs
        return output_text, orig_spec_plot, denoised_spec_plot, denoised_audio_path

    # --- 7. Error Handling ---
    except Exception as e:
        # Catch any unexpected errors during the agent execution
        error_details = traceback.format_exc()
        print(f"[agent] FATAL ERROR in agent function: {error_details}")
        error_message = f"An unexpected error occurred: {str(e)}"
        # Return error message to the user
        return f"ERROR: {error_message}\n\n(Details logged server-side)", None, None, None
    # --- 8. Cleanup (Optional) ---
    finally:
        # Example: Delete the temporary input file after processing
        try:
            if input_audio_path and os.path.exists(input_audio_path):
                # os.remove(input_audio_path)
                # print(f"[agent] Cleaned up input file: {input_audio_path}")
                pass # Keeping files for now for inspection
        except Exception as clean_e:
            print(f"[agent] Error during cleanup: {clean_e}")
        # Avoid deleting denoised_audio_path here as Gradio needs it
        pass


## 7. Define Gradio User Interface

*   Uses `gr.Blocks` for a custom UI layout.
*   `gr.Markdown`: Displays introductory text.
*   `gr.Row`, `gr.Column`: Organizes components horizontally and vertically.
*   `gr.Audio`:
    *   Input (`audio_input`): Allows users to upload or record audio. `type="numpy"` makes the callback function (`agent`) receive the audio as a tuple `(sample_rate, numpy_array)`.
    *   Output (`denoised_audio`): Displays the processed audio file. `type="filepath"` means it expects an absolute file path string from the `agent` function. `interactive=False` prevents user input on this output component.
*   `gr.Textbox`:
    *   Input (`user_query`): For the user's text request.
    *   Output (`output_text`): Displays the text response from the agent. `interactive=False`.
*   `gr.Plot`: Displays the spectrogram images (`orig_spec`, `denoised_spec`) generated by `plot_spectrogram`.
*   `gr.Button`: The "Submit" button.
*   `.click()`: Connects the button click event to the `agent` function, mapping UI inputs (`audio_input`, `user_query`) to the function's arguments and the function's return values to the UI outputs (`output_text`, `orig_spec`, `denoised_spec`, `denoised_audio`) in the specified order.


In [None]:
# --- Gradio User Interface Definition ---

# Use gr.Blocks for more layout control
with gr.Blocks() as demo:
    # Add a title and description using Markdown
    gr.Markdown("# Conversational Audio Restoration Agent 🎤")
    gr.Markdown("Upload audio, ask questions (e.g., 'What is the noise floor?', 'Remove noise by 5 dB'), and see the results.")

    # Arrange components in rows and columns
    with gr.Row():
        # Left column for user inputs
        with gr.Column(scale=1):
            # Component for audio upload/recording
            audio_input = gr.Audio(
                label="Upload Audio (WAV recommended)",
                type="numpy" # Provides (sample_rate, data_array) to the backend function
            )
            # Textbox for the user's natural language query
            user_query = gr.Textbox(
                label="Ask the agent",
                placeholder="e.g. 'What is the noise floor?', 'Remove noise by 5 dB'"
            )
            # Button to trigger the agent function
            btn = gr.Button("Submit", variant="primary") # 'primary' makes it stand out

        # Right column for displaying outputs
        with gr.Column(scale=2):
            # Textbox to show the agent's text response
            output_text = gr.Textbox(
                label="Agent Response",
                lines=5, # Allow multiple lines for longer responses
                interactive=False # Output only
            )
            # Row specifically for the two plots side-by-side
            with gr.Row():
                 # Placeholder for the original audio spectrogram plot
                 orig_spec = gr.Plot(label="Original Spectrogram")
                 # Placeholder for the denoised audio spectrogram plot
                 denoised_spec = gr.Plot(label="Denoised Spectrogram")
            # Component to play back the denoised audio file
            denoised_audio = gr.Audio(
                label="Denoised Audio Output",
                type="filepath", # Expects a file path from the backend function
                interactive=False # Output only
            )

    # Define the action when the button is clicked
    btn.click(
        fn=agent,                           # The Python function to execute
        inputs=[audio_input, user_query],   # Components providing input to the function
        outputs=[output_text, orig_spec, denoised_spec, denoised_audio] # Components to update with function's return values
    )


## 8. Launch Gradio App

*   Checks if the `client` object was successfully initialized in Step 3.
*   If the client is ready, it calls `demo.launch(debug=True)`.
    *   `demo.launch()` starts the Gradio web server, making the UI accessible via a local URL (or a public one if `share=True` is used, though be cautious with API keys).
    *   `debug=True` provides more detailed error messages directly in the browser console and server logs if something goes wrong within Gradio or the callback function, which is very helpful during development.
*   If the client initialization failed, it prints an error message instead of launching the app.


In [None]:
# --- Launch Gradio App ---

# Check if the Gemini client was initialized successfully before launching the web UI
if __name__ == "__main__":
    # This check prevents running the server if the API key is invalid, for example
    if client:
        print("Gemini client initialized. Launching Gradio app...")
        # Start the Gradio web server interface
        # debug=True provides helpful error messages during development
        # share=True can create a temporary public link (use with caution regarding API keys/data)
        demo.launch(debug=True)
    else:
        # Inform the user if the app cannot launch due to client initialization failure
        print("ERROR: Gradio app cannot launch because Gemini client failed to initialize. Please check API key and restart.")


In [None]:
demo.close()
print("Gradio app closed.")

---
---

# Anomaly Detection with Gemini API

This notebook demonstrates how to detect anomalies in text data using **embeddings** generated by the Gemini API.

## 💡 Capabilities of this Notebook
- Run in **Kaggle or Google Colab** seamlessly
- Loads and preprocesses newsgroup text data from multiple domains
- Generates text embeddings using Google's Gemini API
- Defines a subset of text (**science newsgroups**) as 'normal' data
- Introduces anomalies by **mixing in unrelated categories in test data**
- Calculates semantic distances using embeddings
- **Computes/Finds anomaly text in test data**


## 🔧 Possible Enhancements
- Try different distance metrics (cosine, Mahalanobis)
- Use alternative embedding models (e.g., BERT, OpenAI)
- Introduce real-world noisy or domain-shifted data
- Save and reuse embeddings to avoid repeated API calls

In [None]:
# Remove unused conflicting packages
#!pip uninstall -qqy jupyterlab kfp 2>/dev/null
# Install specific google-genai version used in the original notebook
!pip install -U -q "google-genai==1.7.0"

## 📚 Import Necessary Libraries

We import the required Python modules for:
- Data loading and preprocessing
- Using the Gemini API to embed text
- Distance computation for anomaly scoring


In [None]:
# Load newsgroup dataset with selected categories
from google import genai
from google.genai import types
from google.api_core import retry
import google.api_core.exceptions # Often needed for specific error types

print(f"Using google-genai version: {genai.__version__}")

# Import necessary libraries
import numpy as np
import pandas as pd
import tensorflow as tf # Still needed for random seed setting initially
import time
import concurrent.futures
from tqdm.rich import tqdm as tqdmr
import warnings
import email # Standard Python library for parsing email messages
import re # Regular expressions for pattern matching and text cleaning
from sklearn.datasets import fetch_20newsgroups
from sklearn.metrics.pairwise import cosine_distances # For distance calculation
from sklearn.utils import Bunch # To help manage data sourcing workaround


# For demonstration purposes, we use a random seed for reproducibility.
np.random.seed(42)
tf.random.set_seed(42) # Keep for numpy seed consistency if needed elsewhere

# Define the retry predicate function (checks for rate limit or server errors)
# Ensure this runs before functions using the @retry decorator
is_retriable = lambda e: isinstance(e, (
    genai.errors.APIError, # General API errors (includes 503)
    google.api_core.exceptions.ResourceExhausted, # Specific error for 429
    google.api_core.exceptions.DeadlineExceeded,
    google.api_core.exceptions.ServiceUnavailable # Handles 503
)) and (not hasattr(e, 'code') or e.code in {429, 503}) # Check code if available

print("Imports and retry logic set up.")

## 🔐 Set Up Gemini API Key

To use the Gemini API for generating embeddings, an API key is required.

### Options:
- **Kaggle**: Secrets are stored under the Kaggle environment and fetched automatically.
- **Colab**: Use `getpass` or manually input your key. Alternatively, you can use `os.environ`.

> 💡 Make sure your API key has access to the embedding endpoint.

In [None]:
# Authenticate and create embedding model client
import os
# Make sure genai is imported if not done earlier in Cell 3
# import google.generativeai as genai

# --- Auto-detect Environment and Get API Key ---
client = None # Initialize client to None
GOOGLE_API_KEY = None
environment = "unknown"

print("Attempting to detect environment and configure Google GenAI client...")

# --- Environment Detection using Environment Variables ---
if 'COLAB_GPU' in os.environ:
    print("Detected Colab environment via COLAB_GPU.")
    environment = "colab"
elif 'KAGGLE_KERNEL_RUN_TYPE' in os.environ:
    print("Detected Kaggle environment via KAGGLE_KERNEL_RUN_TYPE.")
    environment = "kaggle"
else:
    # Fallback check using imports if variables aren't definitive
    try:
        from google.colab import userdata
        print("Detected Colab environment via import.")
        environment = "colab"
    except ImportError:
        try:
            from kaggle_secrets import UserSecretsClient
            print("Detected Kaggle environment via import.")
            environment = "kaggle"
        except ImportError:
             print("Could not detect Colab or Kaggle environment via variables or imports.")
             environment = "other"


# --- Get API Key based on detected environment ---
if environment == "colab":
    try:
        from google.colab import userdata # Import again just in case
        GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
        print("Successfully retrieved GOOGLE_API_KEY from Colab secrets.")
    except userdata.SecretNotFoundError:
        print("Secret 'GOOGLE_API_KEY' not found in Colab secrets.")
    except Exception as e:
        print(f"An error occurred retrieving Colab secret: {type(e).__name__}: {e}")

elif environment == "kaggle":
    try:
        from kaggle_secrets import UserSecretsClient # Import again just in case
        GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
        print("Successfully retrieved GOOGLE_API_KEY from Kaggle secrets.")
    except Exception as e: # Catch potential errors during secret retrieval
         print(f"An error occurred retrieving Kaggle secret: {type(e).__name__}: {e}")
         # Check if it's specifically the secret not found error if possible
         if "Secret not found" in str(e): # Simple string check
              print("Secret 'GOOGLE_API_KEY' not found in Kaggle secrets.")
         else:
              print("Please ensure the secret 'GOOGLE_API_KEY' is added to this notebook.")

elif environment == "other":
     # Try environment variable as a last resort
     print("Trying OS environment variable 'GOOGLE_API_KEY'.")
     GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
     if GOOGLE_API_KEY:
          print("Found GOOGLE_API_KEY in OS environment variables.")
     else:
          print("GOOGLE_API_KEY not found as OS environment variable.")
else: # Should not happen, but handle unknown case
     print("Environment detection resulted in an unexpected state.")


# --- Initialize Client ---
if GOOGLE_API_KEY:
    try:
        # Ensure genai.Client is available
        if hasattr(genai, 'Client'):
             client = genai.Client(api_key=GOOGLE_API_KEY)
             print("Successfully configured Google GenAI client.")
             # Optional: Test client
             # try:
             #      client.models.list()
             #      print("Client connection test successful.")
             # except Exception as test_e:
             #      print(f"Client connection test failed: {test_e}")
             #      client = None # Reset client if test fails
        else:
             print("Error: genai.Client class not found. Was the library imported correctly?")
             client = None

    except Exception as client_e:
        print(f"\n--- ERROR: Failed to configure client ---")
        print(f"An error occurred during client configuration: {type(client_e).__name__}: {client_e}")
        client = None
else:
    print("\n--- WARNING ---")
    print("GOOGLE_API_KEY could not be retrieved.")
    print("GenAI Client could not be configured.")
    print("API calls to Gemini will FAIL.")
    print("--- END WARNING ---")




## Dataset

* The [20 Newsgroups Text Dataset](https://scikit-learn.org/0.19/datasets/twenty_newsgroups.html) is used as the source for our 'normal' data.
* We load the raw data, preprocess it, and then sample subsets from 'sci.*' categories to define our 'normal' data group.

In [None]:
# Load newsgroup dataset with selected categories
# Load the raw data needed later for finding anomalies
# Keep these variables accessible
print("Loading initial train/test splits...")
# Use try-except to handle potential network errors during fetch
try:
    newsgroups_train_raw = fetch_20newsgroups(subset="train")
    newsgroups_test_raw = fetch_20newsgroups(subset="test")
    print(f"Raw train posts: {len(newsgroups_train_raw.data)}")
    print(f"Raw test posts: {len(newsgroups_test_raw.data)}")
    print(f"All categories: {newsgroups_train_raw.target_names}")
except Exception as e:
    print(f"ERROR loading dataset: {e}")
    print("Please ensure internet is enabled for the notebook and try again.")
    # Optionally raise the error to stop execution if data loading is critical
    raise e

## Prepare the Dataset

### **Objective**

* **Preprocessing**: Clean the raw newsgroup posts.
* **Normalization**: Format the text to resemble standard prose.

### **Processing Steps**

* **Extract**: Use email headers (e.g., "Subject") and the message payload.
* **Remove**: Eliminate email addresses and common headers/footers.
* **Truncate**: Limit the text length (e.g., 5000 characters).

In [None]:
def preprocess_newsgroup_row(data):
    """
    Processes a single email/newsgroup entry:
    - Extracts the subject and body.
    - Removes email addresses and common clutter.
    - Truncates text to 5,000 characters.

    Args:
        data (str): Raw email message as a string.

    Returns:
        str: Cleaned and truncated text.
    """
    # Parse the email message from the raw string format
    try:
        msg = email.message_from_string(data)
        # Extract subject and body text
        subject = msg['Subject'] if msg['Subject'] else ""
        payload = msg.get_payload() if msg.get_payload() else ""
        # Ensure payload is a string
        if isinstance(payload, list): # Handle multipart messages simply
            # Decode parts if necessary, assuming utf-8 or latin-1
            payload_parts = []
            for part in payload:
                try:
                    if hasattr(part, 'get_payload'):
                        p_content = part.get_payload(decode=True)
                        if p_content:
                             try:
                                  payload_parts.append(p_content.decode('utf-8'))
                             except UnicodeDecodeError:
                                  try:
                                       payload_parts.append(p_content.decode('latin-1'))
                                  except UnicodeDecodeError:
                                       payload_parts.append("[Undecodable Content]") # Placeholder
                        else:
                            payload_parts.append(str(part)) # Fallback for non-payload parts
                    else:
                         payload_parts.append(str(part)) # Fallback for non-message parts
                except Exception as part_e:
                     payload_parts.append(f"[Error processing part: {part_e}]")
            payload = "\n".join(payload_parts)

        elif isinstance(payload, bytes):
             try:
                   payload = payload.decode('utf-8')
             except UnicodeDecodeError:
                   try:
                        payload = payload.decode('latin-1')
                   except UnicodeDecodeError:
                        payload = "[Undecodable Bytes Payload]"


        text = f"{subject}\n\n{str(payload)}" # Ensure payload is string

    except Exception as e:
        # Handle potential parsing errors on malformed messages
        # print(f"Warning: Error parsing email data: {e}. Using raw data snippet.")
        text = str(data)[:5000] # Use raw data as fallback

    # Remove email addresses from the text
    text = re.sub(r"[\w\.-]+@[\w\.-]+", "", text)
    # Remove common header/footer lines (simple examples)
    text = re.sub(r'^\s*Lines: \d+\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*Organization: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*From: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*Subject: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*Nntp-Posting-Host: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*Article-I\.D\.: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*Keywords: .*\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^\s*In article <.*> you write:\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'wrote:$', '', text, flags=re.MULTILINE) # Common quote intro


    # Truncate text to 5,000 characters to limit processing size
    text = text.strip()[:5000]

    return text


def preprocess_newsgroup_data(newsgroup_dataset, apply_clean):
    """
    Converts the newsgroup dataset into a structured DataFrame:
    - Stores text and labels.
    - Cleans text using preprocess_newsgroup_row() if apply_clean is True.
    - Maps numeric labels to category names.

    Args:
        newsgroup_dataset (sklearn.utils.Bunch): Newsgroup dataset object.
        apply_clean (boolean) : Apply row cleaning

    Returns:
        pd.DataFrame: Preprocessed dataset with 'Text', 'Label', 'Class Name'.
    """
    # Convert dataset into a pandas DataFrame
    df = pd.DataFrame(
        {"Text": newsgroup_dataset.data, "Label": newsgroup_dataset.target}
    )
    if apply_clean:
        print(f"Applying text cleaning...")
        # Apply text cleaning to each entry
        df["Text"] = df["Text"].apply(preprocess_newsgroup_row)
        # Remove rows where text became empty after cleaning
        df = df[df["Text"].str.strip().astype(bool)]
        print(f"DataFrame shape after cleaning: {df.shape}")


    # Convert numerical labels to category names
    target_names = newsgroup_dataset.target_names
    if target_names:
         # Ensure label exists in target_names mapping
         df["Class Name"] = df["Label"].apply(lambda l: target_names[l] if 0 <= l < len(target_names) else "Unknown")
    else:
         df["Class Name"] = "Unknown"


    return df.reset_index(drop=True) # Reset index after potential row removal

#### create pandas  dataframe from training and test datasets 

In [None]:
# Create pandas dataframes from training and test datasets *with* preprocessing
# Check if raw data was loaded successfully
if 'newsgroups_train_raw' in locals() and 'newsgroups_test_raw' in locals():
    print("Preprocessing raw train data...")
    df_train_full = preprocess_newsgroup_data(newsgroups_train_raw, apply_clean=True)
    print("\nPreprocessing raw test data...")
    df_test_full = preprocess_newsgroup_data(newsgroups_test_raw, apply_clean=True)

    print("\nFull Train DataFrame head:")
    print(df_train_full.head())
    print(f"\nFull Train DataFrame shape: {df_train_full.shape}")
    print(f"Full Test DataFrame shape: {df_test_full.shape}")

else:
    print("ERROR: Raw data not loaded earlier: Cannot preprocess.")
    # Create empty dataframes to avoid subsequent errors, but notebook won't work
    df_train_full = pd.DataFrame(columns=['Text', 'Label', 'Class Name'])
    df_test_full = pd.DataFrame(columns=['Text', 'Label', 'Class Name'])

#### Sampling Data to Define "Normal" Class ( to differentiate from Anamoly)

*  We sample a subset of the data to represent our "normal" dataset for anomaly detection.
*  Here, we choose the `sci` categories.
*   We also sample the test set similarly to have a baseline of expected 'normal' test points.

In [None]:
# Function to sample data
def sample_data(df, num_samples_per_class, classes_to_keep_pattern):
    """
    Samples rows from the dataset based on the specified number of samples per label
    and filters the dataset to keep only specified categories matching a pattern. Handles cases where classes have fewer samples than requested.

    Args:
        df (pd.DataFrame): Input dataframe containing the data ('Text', 'Label', 'Class Name').
        num_samples_per_class (int): Max number of samples to take per label.
        classes_to_keep_pattern (str): Substring pattern to filter class names.

    Returns:
        pd.DataFrame: Filtered and sampled dataframe. Returns empty if no matching classes or input df is empty.
    """
    if df.empty:
         # print("Debug: Input DataFrame to sample_data is empty.") # Removed Debug
         return df

    # Filter rows based on the pattern first
    df['Class Name'] = df['Class Name'].astype(str)
    try:
        filter_mask = df["Class Name"].str.contains(classes_to_keep_pattern, na=False, regex=False)
        df_filtered = df[filter_mask].copy()
    except Exception as filter_e:
        print(f"Error filtering DataFrame by pattern '{classes_to_keep_pattern}': {filter_e}")
        return pd.DataFrame(columns=df.columns) # Return empty on error


    if df_filtered.empty:
        print(f"Warning: No classes found containing pattern '{classes_to_keep_pattern}' AFTER filtering.")
        return df_filtered

    print(f"Found classes containing '{classes_to_keep_pattern}': {df_filtered['Class Name'].unique().tolist()}")
    print(f"Number of samples before sampling (after filtering): {len(df_filtered)}")


    # Sample rows, selecting num_samples_per_class of each remaining label
    try:
        # Ensure the sampling function handles empty groups if any arise
        df_sampled = (
            df_filtered.groupby("Class Name", group_keys=False)
            # Handle potential deprecation warning by explicitly selecting columns if needed,
            # but standard apply should work. Add random_state for reproducibility.
            .apply(lambda x: x.sample(min(num_samples_per_class, x.shape[0]), random_state=42) if not x.empty else None)
        )
        # Remove potential None results if a group was empty
        if df_sampled is not None:
             # Check if df_sampled is a DataFrame before calling dropna
             if isinstance(df_sampled, pd.DataFrame):
                  df_sampled.dropna(inplace=True)
             else: # If apply returned something else (like Series if only one group)
                  print(f"Warning: Unexpected result type from groupby.apply: {type(df_sampled)}")
                  # Attempt to convert back to DataFrame or handle appropriately
                  # For simplicity, return empty if structure is unexpected
                  df_sampled = pd.DataFrame(columns=df_filtered.columns)

        else: # Handle case where apply returns None (e.g., only one empty group)
             df_sampled = pd.DataFrame(columns=df_filtered.columns)


    except Exception as e:
         print(f"Error during sampling: {e}")
         return pd.DataFrame(columns=df.columns) # Return empty df on error

    print(f"Number of samples after sampling: {len(df_sampled)}")
    return df_sampled.reset_index(drop=True) # Reset index after sampling

# --- Define constants (keep as before) ---
TRAIN_NUM_SAMPLES_NORMAL = 100
TEST_NUM_SAMPLES_NORMAL = 25
NORMAL_CLASSES_PATTERN = "sci" # Keep classes containing 'sci' (sci.crypt, sci.electronics, sci.med, sci.space)

# Initialize empty DataFrames for results
df_train = pd.DataFrame(columns=['Text', 'Label', 'Class Name'])
df_test = pd.DataFrame(columns=['Text', 'Label', 'Class Name'])

# --- Sample 'normal' training data ---
print("\n--- Preparing to sample TRAINING data ---")
if 'df_train_full' in locals() and not df_train_full.empty:
    print(f"\nSampling 'normal' training data (pattern: {NORMAL_CLASSES_PATTERN})...")
    df_train = sample_data(df_train_full, TRAIN_NUM_SAMPLES_NORMAL, NORMAL_CLASSES_PATTERN)
    if not df_train.empty:
         print(f"\n'Normal' Training samples per class:\n{df_train['Class Name'].value_counts()}")
    else:
         print("Resulting 'normal' training DataFrame (df_train) is empty after sampling.")
else:
    print("Skipping training data sampling as df_train_full is empty or not defined.")


# --- Sample 'normal' test data ---
print("\n--- Preparing to sample TEST data ---")
if 'df_test_full' in locals() and not df_test_full.empty:
    print(f"\nSampling 'normal' test data (pattern: {NORMAL_CLASSES_PATTERN})...")
    df_test = sample_data(df_test_full, TEST_NUM_SAMPLES_NORMAL, NORMAL_CLASSES_PATTERN)
    if not df_test.empty:
         print(f"\n'Normal' Test samples per class:\n{df_test['Class Name'].value_counts()}")
    else:
         print("Resulting 'normal' test DataFrame (df_test) is empty after sampling.")

else:
     print("Skipping test data sampling as df_test_full is empty or not defined.")



## Create the Embeddings

Generate embeddings for each piece of text using the Gemini API embeddings endpoint. 

### Task types

The `text-embedding-004` model supports a `task_type` parameter that generates embeddings tailored for the specific task. For general similarity or anomaly detection based on topic, `RETRIEVAL_DOCUMENT` or `CLUSTERING` might also be suitable, but we'll stick with `CLASSIFICATION` as used previously, assuming it captures general semantic meaning well.

In [None]:
from google.api_core import retry  # Import retry mechanism for handling API errors
import tqdm  # Import tqdm for progress bars
from tqdm.rich import tqdm as tqdmr  # Rich progress bars for better visualization
import warnings  # Suppress warnings where necessary

# Add tqdm to Pandas for progress tracking in DataFrame operations
tqdmr.pandas()

# Suppress experimental warnings from tqdm library
warnings.filterwarnings("ignore", category=tqdm.TqdmExperimentalWarning)

# Define a helper function to retry API calls when quota limits are reached
is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 503})

@retry.Retry(predicate=is_retriable, timeout=300)  # Retry on specific API errors with a timeout of 300 seconds
def embed_fn(text: str) -> list[float]:
    """
    Generates embeddings for a given text using an embedding model.

    Args:
        text (str): Input text to generate embeddings for.

    Returns:
        list[float]: Embedding vector as a list of floats.
    """
    response = client.models.embed_content(
        model="models/text-embedding-004",  # Specify the embedding model to use
        contents=text,  # Input text content from DF text column
        config=types.EmbedContentConfig(
            task_type="classification",  # Specify task type (e.g., classification)
        ),
    )

    return response.embeddings[0].values  # Return the embedding vector

def create_embeddings(df):
    """
    Adds an 'Embeddings' column to the DataFrame by generating embeddings from text.

    Args:
        df (pd.DataFrame): Input DataFrame with a 'Text' column.

    Returns:
        pd.DataFrame: DataFrame with an additional 'Embeddings' column.
    """
    df["Embeddings"] = df["Text"].progress_apply(embed_fn)  # Apply embedding generation with progress tracking
    return df


In [None]:
df_train = create_embeddings(df_train)
df_test = create_embeddings(df_test)

## Anomaly Detection Setup

Now we set up the anomaly detection task:
1.  **Create Synthetic Anomalies**: Define text examples representing categories completely different from the 'normal' (`sci.*`) newsgroup data, such as financial transactions and spam emails.
2.  **Embed Anomalies**: Generate embeddings for these synthetic samples using the parallel embedding function.
3.  **Combine**: Add the synthetic anomalous samples to the 'normal' test set (`df_test`).
4.  **Calculate Distances**: Compute the distance (e.g., cosine distance) of each point in the combined test set from the centroid (average embedding) of the 'normal' training set (`df_train`).
5.  **Identify Outliers**: Flag points with distances exceeding a threshold as anomalies. These should ideally be our synthetic examples.

In [None]:
# Ensure df_train and df_test are not empty before proceeding
if df_train.empty or df_test.empty:
     print("ERROR: 'Normal' train or test DataFrame is empty. Cannot proceed with anomaly detection.")
else:
     # --- 1. Create Synthetic Anomalous Samples ---
     print("Creating synthetic anomaly samples...")

     # Define synthetic texts for different categories
     # 5 Financial examples
     financial_texts = [
          "URGENT: Your account statement for March is ready. Payment due April 15th. Click here to view details.",
          "Stock Alert: ACME Corp (ACME) up 5% pre-market trading following positive earnings report.",
          "Transaction Confirmation: $150.75 paid to 'OnlineRetailer'. Your new balance is $1,234.56.",
          "Loan Application Update: Your mortgage pre-approval has been processed. A loan officer will contact you shortly.",
          "Investment Opportunity: Learn about our new high-yield savings account with competitive APY rates."
     ]
     # 5 Spam examples
     spam_texts = [
          "Congratulations! You've won a FREE iPhone 15! Click HERE to claim your prize NOW!",
          "VIAGRA special offer!! Cheap pills online, discrete shipping guaranteed. Limited time only!",
          "Urgent account security warning! Verify your login details immediately by clicking this link: http://totally-not-a-scam-site.com",
          "Meet hot singles in your area tonight! Join free now, easy registration, instant matches!",
          "Make $1000s working from home! Easy online job opportunity, no experience needed. Apply today!"
     ]

     synthetic_texts = financial_texts + spam_texts
     synthetic_labels = (['Financial'] * 5) + (['Spam'] * 5)
     num_anomalies_to_sample = len(synthetic_texts) # Should be 10

     # Create DataFrame for anomalies
     df_anomalies = pd.DataFrame({
          'Text': synthetic_texts,
          'Class Name': synthetic_labels,
          # Add dummy Label if needed by downstream code, though not strictly necessary for detection
          'Label': [-1] * num_anomalies_to_sample # Assign a dummy label like -1
     })

     print(f"Created {num_anomalies_to_sample} synthetic anomaly samples.")
     print(f"Synthetic classes: {df_anomalies['Class Name'].unique().tolist()}")


     # --- 2. Generate Embeddings for Anomalies ---
     print("\nGenerating embeddings for synthetic anomalies...")
     df_anomalies = create_embeddings(df_anomalies) # Uses 'Text' column by default

     # Check if embeddings were generated successfully for anomalies
     if 'Embeddings' not in df_anomalies.columns or df_anomalies.empty:
          print("Error: Failed to generate embeddings for synthetic anomaly samples. Cannot proceed.")
          # Handle error
     else:
          print(f"Successfully generated embeddings for {len(df_anomalies)} synthetic anomaly samples.")

          # --- 3. Combine Anomalies with Test Set ---
          # Add a flag to distinguish anomalies
          df_anomalies['Is_Anomaly'] = True
          # Add flag to original test set (ensure it hasn't been added before)
          if 'Is_Anomaly' not in df_test.columns:
               df_test['Is_Anomaly'] = False
          else: # Ensure existing flags are False for the normal test set
              df_test['Is_Anomaly'] = False


          # Combine the original test set with the new anomalies
          df_test_combined = pd.concat([df_test, df_anomalies], ignore_index=True)
          print(f"\nCombined test set size: {len(df_test_combined)} rows")

          # --- 4. Detect Anomalies using Distance from Training Centroid ---
          print("Calculating training centroid and distances...")

          # Ensure training embeddings are ready
          if 'Embeddings' not in df_train.columns or df_train.empty:
               print("Error: Training embeddings not available. Cannot calculate centroid.")
          else:
                x_train_embeddings = np.stack(df_train['Embeddings'].values)
                # Ensure combined test embeddings are ready (check after concat might be needed if anomalies failed)
                if 'Embeddings' not in df_test_combined.columns or df_test_combined['Embeddings'].isnull().any():
                     print("Warning: Missing embeddings in combined test set after concat. Dropping rows with missing embeddings.")
                     df_test_combined.dropna(subset=['Embeddings'], inplace=True)
                     print(f"Proceeding with {len(df_test_combined)} rows in combined test set.")


                if not df_test_combined.empty: # Check if still have data after potential drop
                     x_test_combined_embeddings = np.stack(df_test_combined['Embeddings'].values)

                     # Calculate the centroid (mean embedding) of the 'normal' training data
                     train_centroid = np.mean(x_train_embeddings, axis=0)

                     # Calculate cosine distance from each point in the combined test set to the training centroid
                     distances = cosine_distances(x_test_combined_embeddings, train_centroid.reshape(1, -1))
                     df_test_combined['Distance_to_Centroid'] = distances.flatten()

                     # --- 5. Identify & Save Anomalies ---
                     # Determine a threshold (e.g., 90th percentile of distances)
                     valid_distances = df_test_combined['Distance_to_Centroid'].dropna()
                     if not valid_distances.empty:
                          # Calculate percentile based on expected number of anomalies
                          # num_total = len(df_test_combined)
                          # anomaly_percentile = (1 - (num_anomalies_to_sample / num_total)) * 100 if num_total > 0 else 90
                          # Using fixed 90th percentile for simplicity
                          anomaly_percentile = 90
                          distance_threshold = np.percentile(valid_distances, anomaly_percentile)
                          print(f"Using distance threshold ({anomaly_percentile:.0f}th percentile): {distance_threshold:.4f}")

                          # Flag potential anomalies based on the threshold
                          df_test_combined['Detected_Anomaly'] = df_test_combined['Distance_to_Centroid'] > distance_threshold

                          # Separate the detected anomalies
                          detected_anomalies_df = df_test_combined[df_test_combined['Detected_Anomaly'] == True].copy() # Explicit check for True

                          print("\n--- Detected Anomalies ---")
                          if not detected_anomalies_df.empty:
                               # Display info about detected anomalies
                               print(detected_anomalies_df[['Text', 'Class Name', 'Is_Anomaly', 'Distance_to_Centroid']].head())

                               # Compare actual injected vs detected
                               # Recalculate num_anomalies_present in the potentially filtered df_test_combined
                               num_injected_present = df_test_combined['Is_Anomaly'].sum()
                               correctly_detected = detected_anomalies_df['Is_Anomaly'].sum()
                               false_positives = len(detected_anomalies_df) - correctly_detected
                               print(f"\nTotal detected: {len(detected_anomalies_df)}")
                               print(f"Correctly identified injected anomalies: {correctly_detected}/{num_injected_present} (Expected {num_anomalies_to_sample} injected initially)")
                               print(f"Incorrectly identified (false positives): {false_positives}")

                               # Save the detected anomalies to a CSV
                               try:
                                    anomaly_filename = 'detected_synthetic_anomalies.csv'
                                    # Select columns to save
                                    columns_to_save = ['Text', 'Class Name', 'Is_Anomaly', 'Distance_to_Centroid', 'Detected_Anomaly', 'Embeddings']
                                    detected_anomalies_df[columns_to_save].to_csv(anomaly_filename, index=False)
                                    print(f"\nDetected anomalies saved to {anomaly_filename}")
                               except Exception as e:
                                    print(f"\nError saving anomalies to CSV: {e}")
                          else:
                               print("No anomalies detected above the threshold.")
                     else:
                          print("Error: No valid distances found to calculate threshold.")

                     # Display the head of the combined test df showing new columns
                     print("\n--- Head of Combined Test Set with Anomaly Info ---")
                     print(df_test_combined[['Text', 'Class Name', 'Is_Anomaly', 'Distance_to_Centroid', 'Detected_Anomaly']].head())
                else:
                    print("Combined test set is empty after handling missing embeddings.")

---
---

## Jupyter Notebook to PDF/HTML Converter with Gradio

This project demonstrates a practical application combining:
* Gradio for creating a user-friendly UI.
* Python's `nbconvert` library via subprocess handling for converting notebooks to PDF or HTML format.

#### Key Features:
* Converts single notebooks or entire directories.
* Supports **PDF** (local default) and **HTML** (Kaggle default/local option) output formats.
* Intuitive **form-based interface**.
* **Environment-aware processing** (adapts output format and path for Kaggle vs local environments).
* **Safe file handling** using temporary copies to prevent conversion errors.
* Automatic dependency checking and installation attempt for `nbconvert`.
* Configurable output directory and overwrite options.

#### Dependencies:
* `gradio`
* `google-genai` (Required for API key check, though Gemini chat is removed)
* `jupyter` and `nbconvert[webpdf]` (Installs necessary components for PDF/HTML)
* `python-dotenv` (for local API key loading)

#### 1.Install necessary software


In [None]:
!pip install -U -q "google-genai==1.7.0"
!pip install python-dotenv gradio

In [None]:
import os # Ensure os is imported
def is_running_on_kaggle():
    """Check if the code is running on Kaggle by looking for a specific environment variable."""
    return 'KAGGLE_KERNEL_RUN_TYPE' in os.environ


In [None]:
if is_running_on_kaggle():
    base_op_folder_path = os.path.join(os.getcwd(), "GooglCapstone")
     # Output format for Kaggle
    output_format = "html"
    # Define the specific output path for HTML files on Kaggle
    html_output_path = os.path.join(base_op_folder_path, f"converted_{output_format}")
    # Create the specific output directory
    os.makedirs(html_output_path, exist_ok=True)
    # Confirm the specific HTML folder exists
    if os.path.exists(html_output_path):
       print(f'Kaggle HTML output folder ready at: {html_output_path}')
    else:
       print(f'Error: Failed to create Kaggle HTML output folder at {html_output_path}')
    # check and display input path
    all_files_recursive = []
    try:
        for dirpath, dirnames, filenames in os.walk("/kaggle/input"):
            # To get full paths, join the current directory path with the filename
            for filename in filenames:
                full_path = os.path.join(dirpath, filename)
                all_files_recursive.append(full_path)
    
        print("All files found recursively under /kaggle/input:")
        # Print each file path on a new line for readability
        for f in all_files_recursive:
            print(f)
    
    except Exception as e:
            # os.walk itself doesn't raise FileNotFoundError if the top dir is missing,
            # it just yields nothing. Check access or other errors.
            print(f"An error occurred during os.walk: {e}")

In [None]:
import gradio as gr
import os
import subprocess
import sys
import pkg_resources
import re
import shutil
import re
import tempfile # Used for safe copies
from google import genai
# # Used for modern Gradio chat format
from gradio import ChatMessage 

#### 2. Set up a retry helper.
*  This allows you to "Run all" without worrying about per-minute quota.
*  Include  common HTTP status codes associated with temporary server issues, the `is_retriable` lambda function will identify a broader range of errors that are likely to be resolved by a retry attempt.

In [None]:
from google.api_core import retry
is_retriable = lambda e: (isinstance(e, genai.errors.APIError) and e.code in {429, 500, 502, 503}) 
genai.models.Models.generate_content = retry.Retry( predicate=is_retriable)(genai.models.Models.generate_content)


3. #### API Key Management and Google Client Initialization
This code handles the secure retrieval of the Google API key needed for Gemini API access. It works in both Kaggle and local environments:

* **Purpose**: Securely load the Google API key without hardcoding it
* **Environment Detection**: Automatically detects whether running on Kaggle or locally
* **Key Features**:
  * Uses Kaggle Secrets when running in Kaggle notebooks
  * Falls back to environment variables when running locally
  * Provides clear error messages if the key is missing
  
##### How It Works:

* The `get_api_key()` function:
  * Checks whether code is running on Kaggle by looking for 'KAGGLE_KERNEL_RUN_TYPE' in environment variables
  * If on Kaggle: Uses Kaggle's UserSecretsClient to securely access the stored API key
  * If running locally: 
    * Loads variables from .env file using dotenv
    * Executes a batch script (`set_api_key.bat`)for Windows environments. This will Set environment variable to the GOOGLE API key.Alternatw way is to set the 
      system variable `GOOGLE_API_KEY` to the API key in Enviorment variable.
    * Retrieves key from environment variables
    * Raises helpful error messages if key isn't found

##### Security Best Practices:

* Never hardcodes API keys in the notebook
* Uses platform-specific secure storage methods
* Allows for flexible deployment across environments

This pattern is essential for any project using external APIs that require authentication while maintaining security.

In [None]:
import os ,sys, subprocess,pkg_resources
def get_api_key():
    """
    Retrieves the Google API key, attempting to get it from Kaggle Secrets
    if running on Kaggle, otherwise from an environment variable.
    """
    if 'KAGGLE_KERNEL_RUN_TYPE' in os.environ:
        # Running on Kaggle
        from kaggle_secrets import UserSecretsClient
        user_secrets = UserSecretsClient()
        api_key = user_secrets.get_secret("GOOGLE_API_KEY")
        print("Running on Kaggle, API key loaded from Kaggle Secrets.")
        return api_key
    else:

        # Running locally
        from dotenv import load_dotenv
        load_dotenv()  # Add this line after the import
        # Execute the batch script
        subprocess.call(['set_api_key.bat'])
        # Now load the environment variable
        api_key = os.environ.get('GOOGLE_API_KEY')
        # os.environ['GOOGLE_API_KEY'] = 'AIzaSyCNtEfGLS4qFdIbvoOxjSrOCzkGyUHk_kY'
        # api_key = os.environ.get("GOOGLE_API_KEY")
        print(api_key)
        if api_key:
            print("Running locally, API key loaded from environment variable.")
            return api_key
        else:
            raise EnvironmentError(
                "GOOGLE_API_KEY environment variable not set. "
                "Please set it before running locally."
            )
# Get the API key
GOOGLE_API_KEY = get_api_key()
# Initialize the generative AI client


In [None]:
# setup the Client with the API key
client = genai.Client(api_key=GOOGLE_API_KEY)

#### 4. Automatic nbConvert Installation

*  A key challenge for notebook converters is ensuring all dependencies are properly installed.
*   The function below (`check_and_install_dependencies`) checks if `nbconvert` is installed and attempts to install it with the `webpdf` extras if it's missing.
*   The `webpdf` extras include dependencies required for PDF conversion (like Playwright/Pyppeteer), although this app primarily uses HTML conversion on Kaggle.

This function demonstrates **automation and error handling** for better user experience.

In [None]:
def check_and_install_dependencies():
    """
    Checks if nbconvert[webpdf] is installed. If not, attempts to install it.
    
    Returns:
        bool: True if dependencies are met or installed successfully, False otherwise.
    
    This function performs several steps:
    1. Checks if nbconvert is already installed
    2. If not, attempts to install it using pip
    3. Verifies successful installation
    4. Handles potential errors during installation
    """
    try:
        # Check if nbconvert is already installed
        pkg_resources.get_distribution('nbconvert')
        print("✓ nbconvert is already installed.")
        return True
    except pkg_resources.DistributionNotFound as e:
        # nbconvert not found, attempt installation
        print(f"Dependency missing: {e}. Attempting installation...")
        try:
            # Use sys.executable to ensure pip runs in the correct Python environment
            install_command = [sys.executable, "-m", "pip", "install", "nbconvert[webpdf]"]
            print(f"Running: {' '.join(install_command)}")
            
            # Execute the installation command
            result = subprocess.run(install_command, check=True, capture_output=True, text=True)
            print("✓ Installation successful!")
            print(result.stdout)
            
            # Verify installation was successful
            pkg_resources.get_distribution('nbconvert') 
            return True
        except subprocess.CalledProcessError as install_error:
            # Installation failed - provide detailed error info
            print("-------------------- Installation Failed --------------------")
            print(f"Error installing nbconvert[webpdf]: {install_error}")
            print("STDERR:")
            print(install_error.stderr)
            print("STDOUT:")
            print(install_error.stdout)
            print("-------------------------------------------------------------")
            print("Please try installing manually: pip install nbconvert[webpdf]")
            return False
        except pkg_resources.DistributionNotFound:
            # Installation seemed to succeed but verification failed
            print("Verification failed after installation attempt.")
            return False
        except Exception as general_error:
            # Catch any other unexpected errors
            print(f"An unexpected error occurred during installation: {general_error}")
            return False
    except Exception as check_error:
        # Catch any other errors during the initial check
        print(f"Error checking dependencies: {check_error}")
        return False

#### 5. Safe Copy Handling for Notebooks

Sometimes, if the script converting a notebook is running from the same notebook file, it can lead to errors during conversion (`nbconvert` reading/writing the file simultaneously).

To mitigate this, these functions create safe copies of the target notebook(s) in a **temporary system directory** before conversion. This ensures the conversion process operates on a separate, static copy, which is automatically cleaned up afterwards.

* `create_safe_copy`: Handles single files.
* `create_safe_copies_from_dir`: Handles all `.ipynb` files in a directory.

In [None]:
def create_safe_copy(file_path, use_temp_dir=False):
    """
    Creates a copy of a notebook file in a safe location to prevent pipe errors.
    
    Args:
        file_path (str): Path to the original notebook file
        use_temp_dir (bool): If True, use system temp directory, otherwise use 'ip_folder'
        
    Returns:
        str: Path to the copied file
    """
    if not os.path.exists(file_path) or not file_path.endswith('.ipynb'):
        return file_path  # Return original if not a valid notebook
        
    # Get directory and filename
    original_dir = os.path.dirname(file_path)
    filename = os.path.basename(file_path)
    
    if use_temp_dir:
        # Use system temp directory
        import tempfile
        temp_dir = tempfile.mkdtemp()
        copy_path = os.path.join(temp_dir, filename)
    else:
        # Create ip_folder in the same directory as the original file
        copy_dir = os.path.join(original_dir, "ip_folder")
        os.makedirs(copy_dir, exist_ok=True)
        copy_path = os.path.join(copy_dir, filename)
    
    # Copy the file
    import shutil
    shutil.copy2(file_path, copy_path)
    print(f"Created copy of {file_path} at {copy_path}")
    
    return copy_path

def create_safe_copies_from_dir(dir_path, use_temp_dir=False):
    """
    Creates copies of all notebook files in a directory in a safe location.
    
    Args:
        dir_path (str): Path to the directory containing notebooks
        use_temp_dir (bool): If True, use system temp directory, otherwise use 'ip_folder'
        
    Returns:
        tuple: (directory path of copies, list of copied file paths)
    """
    if not os.path.isdir(dir_path):
        return dir_path, []  # Return original if not a valid directory
    
    if use_temp_dir:
        # Use system temp directory
        import tempfile
        copy_dir = tempfile.mkdtemp()
    else:
        # Create ip_folder in the same directory
        copy_dir = os.path.join(dir_path, "ip_folder")
        os.makedirs(copy_dir, exist_ok=True)
    
    # Copy all notebook files
    import shutil
    copied_files = []
    
    for filename in os.listdir(dir_path):
        if filename.endswith('.ipynb'):
            original_path = os.path.join(dir_path, filename)
            copy_path = os.path.join(copy_dir, filename)
            shutil.copy2(original_path, copy_path)
            copied_files.append(copy_path)
    
    print(f"Copied {len(copied_files)} notebook files to {copy_dir}")
    
    return copy_dir, copied_files


#### 6. Core Notebook Conversion Functions

These functions handle the actual conversion from Jupyter notebooks to **PDF or HTML** format.

They are designed to:
* Convert single notebooks (`convert_notebook_to_pdf`, `convert_notebook_to_html`) or all notebooks within a directory (`convert_all_notebooks_in_dir`).
* Be called by the UI handler (`handle_conversion_request`), which automatically determines the correct format: **HTML on Kaggle**, or **PDF/HTML locally** based on user selection.
* Utilize the **safe copy** functions to prevent errors when converting.
* Manage output directories, placing converted files in a structured way (e.g., in `/kaggle/working/` on Kaggle or relative to input/specified path locally).
* Provide clear status messages for success, failure, or skipped files (attempting to avoid self-conversion using `get_current_notebook_name`).
* Support overwriting existing output files.
* For HTML, support excluding code input cells via the `include_input` parameter.

In [None]:
def get_current_notebook_name():
    """
    Attempts to determine the filename of the currently running notebook.
    Returns None if it cannot be determined.
    (Based on the function in project1-goog-capstone.ipynb)
    """
    try:
        # Method 1: Try to get from IPython kernel information
        from IPython import get_ipython
        ipython = get_ipython()
        if ipython and hasattr(ipython, 'kernel') and hasattr(ipython.kernel, 'session') and hasattr(ipython.kernel.session, 'notebook_path'):
             # Path might be relative or absolute depending on environment
             notebook_path = ipython.kernel.session.notebook_path
             # We just need the filename part
             current_file = os.path.basename(notebook_path)
             # Basic check if it looks like a notebook file
             if current_file.endswith('.ipynb'):
                  print(f"DEBUG: Found current notebook name (method 1 - IPython kernel): {current_file}")
                  return current_file

        # Method 2: Inspect stack frames (less reliable in notebooks, but worth trying)
        # Look for a frame filename ending in .ipynb
        for frame_info in inspect.stack():
            # frame_info structure can vary, access filename carefully
            filename = frame_info.filename if hasattr(frame_info, 'filename') else getattr(frame_info, 'filename', None) # Adapt based on Python version/env
            if filename and filename.lower().endswith('.ipynb'):
                current_file = os.path.basename(filename)
                print(f"DEBUG: Found potential notebook name (method 2 - inspect stack): {current_file}")
                # This might pick up internal library files sometimes, add checks if needed
                # For now, return the first one found
                return current_file

        # Method 3: Check sys.argv[0] (often works when running as script, less so in kernels)
        if sys.argv[0].lower().endswith('.ipynb'):
            current_file = os.path.basename(sys.argv[0])
            print(f"DEBUG: Found current notebook name (method 3 - sys.argv): {current_file}")
            return current_file

        # Method 4: Check environment variables (might be set in some platforms)
        # Example: Check for VS Code interactive window variable
        vscode_nb_file = os.environ.get('VSCODE_NOTEBOOK_FILE')
        if vscode_nb_file and vscode_nb_file.lower().endswith('.ipynb'):
             current_file = os.path.basename(vscode_nb_file)
             print(f"DEBUG: Found current notebook name (method 4 - VSCODE_NOTEBOOK_FILE): {current_file}")
             return current_file
        # Add checks for other platform-specific variables if needed

        print("DEBUG: Could not determine current notebook name using available methods.")

    except Exception as e:
        # Catch potential errors during introspection (e.g., permissions, environment issues)
        print(f"Note: An error occurred while trying to determine the current notebook name: {e}")

    # If none of the methods worked, return None
    return None


def convert_notebook_to_pdf(notebook_path, final_output_dir, overwrite=False, use_safe_copy=True):
    """
    Converts a single notebook to PDF in the specified output directory.

    Args:
        notebook_path (str): Full path to the .ipynb file to convert
        final_output_dir (str): Directory where the PDF will be saved
        overwrite (bool, optional): Whether to overwrite existing PDFs. Defaults to False.
        use_safe_copy (bool, optional): Whether to create a safe copy to prevent pipe errors. Defaults to True.

    Returns:
        str: Status message indicating success or failure
    """
    # Get the current notebook name to avoid attempting self-conversion issues
    try:
        current_notebook = get_current_notebook_name() # Assumes this helper exists
        notebook_basename = os.path.basename(notebook_path)
        if current_notebook and notebook_basename == current_notebook:
            return f"Skipped PDF conversion (matches currently running notebook): {notebook_path}"
    except NameError:
        print("Note: get_current_notebook_name() helper not found, skipping self-conversion check for PDF.")

    # Validate notebook path
    if not notebook_path or not os.path.exists(notebook_path) or not notebook_path.lower().endswith('.ipynb'):
        return f"Error: Invalid or non-existent notebook file path for PDF conversion: {notebook_path}"

    # Track original path for result message
    original_path = notebook_path

    # Create a safe copy if requested (helps prevent pipe errors)
    temp_dir = None # Initialize temp_dir
    if use_safe_copy:
        try:
            # Assumes create_safe_copy helper exists
            notebook_path = create_safe_copy(notebook_path, use_temp_dir=True)
            temp_dir = os.path.dirname(notebook_path) # Store the temp dir path
            print(f"Using safe copy for PDF conversion at: {notebook_path}")
        except NameError:
             print("Warning: create_safe_copy() helper not found. Using original file for PDF conversion.")
             use_safe_copy = False # Disable cleanup logic if copy wasn't made
        except Exception as e:
            print(f"Warning: Could not create safe copy for PDF conversion: {e}")
            use_safe_copy = False # Continue with original file, disable cleanup

    # Ensure the output directory exists
    try:
        os.makedirs(final_output_dir, exist_ok=True)
    except OSError as e:
         # Clean up temp dir if creation failed early
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try: shutil.rmtree(temp_dir)
            except Exception as clean_e: print(f"Warning: Error cleaning up temp dir {temp_dir} after failed output dir creation: {clean_e}")
        return f"Error: Could not create output directory {final_output_dir}: {e}"

    # Determine output PDF path and check for existing files
    output_pdf_path = os.path.join(final_output_dir, os.path.basename(original_path).replace('.ipynb', '.pdf'))

    if not overwrite and os.path.exists(output_pdf_path):
        # Clean up temp dir if skipping
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try: shutil.rmtree(temp_dir)
            except Exception as clean_e: print(f"Warning: Error cleaning up temp dir {temp_dir} when skipping existing file: {clean_e}")
        return f"Skipped (already exists): {output_pdf_path}"

    # Prepare the nbconvert command
    command = [
        sys.executable, "-m", "jupyter", "nbconvert",
        "--to", "webpdf", "--allow-chromium-download", # webpdf format with Chromium download permission
        "--output-dir", final_output_dir,
        notebook_path # Use the (potentially copied) path
    ]

    try:
        # Execute the conversion command
        print(f"Running command: {' '.join(command)}")
        # Increased timeout for potentially long PDF rendering
        result = subprocess.run(command, capture_output=True, text=True, check=True, timeout=600) # e.g., 10 minutes

        # Log outputs for debugging
        print(f"nbconvert STDOUT:\n{result.stdout}")
        if result.stderr:
            print(f"nbconvert STDERR:\n{result.stderr}")

        # Verify successful conversion
        if os.path.exists(output_pdf_path):
            return f"Success: Converted {original_path} to {output_pdf_path}"
        else:
            # Check specific errors like Chromium issues
            stderr_lower = result.stderr.lower() if result.stderr else ""
            if "chromium" in stderr_lower and ("download" in stderr_lower or "executable" in stderr_lower or "timeout" in stderr_lower):
                return f"Warning: Command ran but output PDF not found. Chromium setup/download/timeout might have failed for {original_path}. Check logs."
            elif "timeout" in stderr_lower:
                 return f"Error: PDF conversion process timed out internally for {original_path}. Check logs."
            return f"Warning: Command ran but output PDF not found for {original_path}. Check logs."
    except subprocess.CalledProcessError as e:
        # Command execution failed
        print(f"nbconvert Error Output: {e.stderr}")
        # Check for common errors in stderr
        stderr_lower = e.stderr.lower() if e.stderr else ""
        if "pyppeteer" in stderr_lower or "playwright" in stderr_lower:
             return f"Error converting {original_path}: Missing PDF dependency (Pyppeteer/Playwright). Ensure `nbconvert[webpdf]` is installed. Details: {e.stderr[:300]}..."
        elif "timeout" in stderr_lower:
             return f"Error converting {original_path}: Process timed out. Details: {e.stderr[:300]}..."
        return f"Error converting {original_path}: nbconvert failed. Details: {e.stderr[:500]}..."
    except subprocess.TimeoutExpired:
        # Outer command timeout
        return f"Error: nbconvert command timed out (over 600s) for {original_path}."
    except FileNotFoundError:
        # Jupyter command not found
        return "Error: 'jupyter' command not found. Is Jupyter installed and in PATH?"
    except Exception as e:
        # Catch any other exceptions
        return f"An unexpected error occurred during PDF conversion: {e}"
    finally:
        # Clean up temporary directory if one was created
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try:
                shutil.rmtree(temp_dir)
                print(f"Cleaned up temporary directory: {temp_dir}")
            except Exception as e:
                print(f"Warning: Could not clean up temporary directory {temp_dir}: {e}")


def convert_notebook_to_html(notebook_path, final_output_dir, overwrite=False, use_safe_copy=True, include_input=True):
    """
    Converts a single notebook to HTML in the specified output directory.

    Args:
        notebook_path (str): Full path to the .ipynb file to convert
        final_output_dir (str): Directory where the HTML will be saved
        overwrite (bool, optional): Whether to overwrite existing HTML files. Defaults to False.
        use_safe_copy (bool, optional): Whether to create a safe copy to prevent pipe errors. Defaults to True.
        include_input (bool, optional): Whether to include code cells in the output. Defaults to True.

    Returns:
        str: Status message indicating success or failure
    """
    # Get the current notebook name to avoid attempting self-conversion issues
    try:
        current_notebook = get_current_notebook_name() # Assumes this helper exists
        notebook_basename = os.path.basename(notebook_path)
        if current_notebook and notebook_basename == current_notebook:
            return f"Skipped HTML conversion (matches currently running notebook): {notebook_path}"
    except NameError:
        print("Note: get_current_notebook_name() helper not found, skipping self-conversion check for HTML.")

    # Validate notebook path
    if not notebook_path or not os.path.exists(notebook_path) or not notebook_path.lower().endswith('.ipynb'):
        return f"Error: Invalid or non-existent notebook file path for HTML conversion: {notebook_path}"

    # Track original path for result message
    original_path = notebook_path

    # Create a safe copy if requested
    temp_dir = None # Initialize temp_dir
    if use_safe_copy:
        try:
            # Assumes create_safe_copy helper exists
            notebook_path = create_safe_copy(notebook_path, use_temp_dir=True)
            temp_dir = os.path.dirname(notebook_path) # Store the temp dir path
            print(f"Using safe copy for HTML conversion at: {notebook_path}")
        except NameError:
             print("Warning: create_safe_copy() helper not found. Using original file for HTML conversion.")
             use_safe_copy = False # Disable cleanup logic
        except Exception as e:
            print(f"Warning: Could not create safe copy for HTML conversion: {e}")
            use_safe_copy = False # Continue with original file, disable cleanup

    # Ensure the output directory exists
    try:
        os.makedirs(final_output_dir, exist_ok=True)
    except OSError as e:
        # Clean up temp dir if creation failed early
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try: shutil.rmtree(temp_dir)
            except Exception as clean_e: print(f"Warning: Error cleaning up temp dir {temp_dir} after failed output dir creation: {clean_e}")
        return f"Error: Could not create output directory {final_output_dir}: {e}"

    # Determine output HTML path and check for existing files
    output_html_path = os.path.join(final_output_dir, os.path.basename(original_path).replace('.ipynb', '.html'))

    if not overwrite and os.path.exists(output_html_path):
        # Clean up temp dir if skipping
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try: shutil.rmtree(temp_dir)
            except Exception as clean_e: print(f"Warning: Error cleaning up temp dir {temp_dir} when skipping existing file: {clean_e}")
        return f"Skipped (already exists): {output_html_path}"

    # Prepare the nbconvert command for HTML
    command = [
        sys.executable, "-m", "jupyter", "nbconvert",
        "--to", "html",
        # Add option to exclude input cells if requested
        *(["--no-input"] if not include_input else []), # Nicer way to add optional arg
        "--output-dir", final_output_dir,
        notebook_path # Use the (potentially copied) path
    ]

    try:
        # Execute the conversion command
        print(f"Running command: {' '.join(command)}")
        # Shorter timeout usually sufficient for HTML
        result = subprocess.run(command, capture_output=True, text=True, check=True, timeout=180)

        # Log outputs for debugging
        print(f"nbconvert STDOUT:\n{result.stdout}")
        if result.stderr:
            print(f"nbconvert STDERR:\n{result.stderr}")

        # Verify successful conversion by checking expected output path
        if os.path.exists(output_html_path):
            return f"Success: Converted {original_path} to {output_html_path}"
        else:
            # Check if nbconvert output indicates a different filename (less likely for HTML)
            stdout_output_match = re.search(r"Writing .+ bytes to (.*\.html)", result.stdout)
            if stdout_output_match and os.path.exists(stdout_output_match.group(1)):
                 return f"Success: Converted {original_path} to {stdout_output_match.group(1)}"
            else:
                 return f"Warning: Command ran but expected output HTML not found for {original_path}. Check logs."

    except subprocess.CalledProcessError as e:
        # Command execution failed
        print(f"nbconvert Error Output: {e.stderr}")
        return f"Error converting {original_path} to HTML: {e.stderr[:500]}..."
    except subprocess.TimeoutExpired:
        # Command took too long
        return f"Error: HTML conversion command timed out for {original_path}."
    except FileNotFoundError:
        # Jupyter command not found
        return "Error: 'jupyter' command not found. Is Jupyter installed and in PATH?"
    except Exception as e:
        # Catch any other exceptions
        return f"An unexpected error occurred during HTML conversion: {e}"
    finally:
        # Clean up temporary directory if one was created via safe copy
        if temp_dir and use_safe_copy and ('temp' in temp_dir.lower() or 'tmp' in temp_dir.lower()):
            try:
                shutil.rmtree(temp_dir)
                print(f"Cleaned up temporary directory: {temp_dir}")
            except Exception as e:
                print(f"Warning: Could not clean up temporary directory {temp_dir}: {e}")


def convert_all_notebooks_in_dir(input_dir, final_output_dir, overwrite=False, use_safe_copy=True, format="pdf", include_input=True):
    """
    Converts all notebooks in a directory to the specified format (PDF or HTML).

    Args:
        input_dir (str): Directory containing notebooks to convert
        final_output_dir (str): Directory where output files will be saved
        overwrite (bool, optional): Whether to overwrite existing files. Defaults to False.
        use_safe_copy (bool, optional): Whether to use safe copies for conversion. Defaults to True.
        format (str, optional): Output format, either "pdf" or "html". Defaults to "pdf".
        include_input (bool, optional): Whether to include code cells in HTML output. Defaults to True.

    Returns:
        list: List of status messages for each notebook conversion
    """
    results = []
    # Get the current notebook name to avoid attempting self-conversion issues
    try:
        current_notebook = get_current_notebook_name() # Assumes this helper exists
    except NameError:
        print("Note: get_current_notebook_name() helper not found, skipping self-conversion check.")
        current_notebook = None

    # Validate input directory
    if not input_dir or not os.path.isdir(input_dir):
        return [f"Error: Invalid input directory: {input_dir}"]

    # Ensure output directory exists (create if needed)
    try:
        os.makedirs(final_output_dir, exist_ok=True)
    except OSError as e:
        return [f"Error: Could not create output directory {final_output_dir}: {e}"]

    # Option 1: Create copies of all notebooks first (might use more temp space)
    temp_dir_for_all = None
    copied_notebooks_map = {} # Map copied path -> original path
    if use_safe_copy:
        try:
            # Assumes create_safe_copies_from_dir helper exists
            temp_dir_for_all, copied_paths = create_safe_copies_from_dir(input_dir, use_temp_dir=True)
            if copied_paths:
                # Create mapping from temp path back to original path for messages
                for cp_path in copied_paths:
                    original_filename = os.path.basename(cp_path)
                    copied_notebooks_map[cp_path] = os.path.join(input_dir, original_filename)

                print(f"Processing {len(copied_notebooks_map)} notebooks from temporary copies in {temp_dir_for_all}")
                items_to_process = list(copied_notebooks_map.keys()) # Process the copied paths
                process_with_individual_copies = False # Don't make another copy
            else:
                print("No notebooks found to copy. Processing originals.")
                items_to_process = os.listdir(input_dir) # Process original filenames
                process_with_individual_copies = True # Need individual copies if processing originals

        except NameError:
            print("Warning: create_safe_copies_from_dir() helper not found. Will process original files with individual copies.")
            items_to_process = os.listdir(input_dir)
            use_safe_copy = True # Ensure individual copy logic runs
            process_with_individual_copies = True
        except Exception as e:
            print(f"Warning: Could not create safe copies for directory: {e}. Will try processing originals with individual copies.")
            items_to_process = os.listdir(input_dir)
            use_safe_copy = True # Ensure individual copy logic runs
            process_with_individual_copies = True
    else:
         # Not using safe copies at all
         print("Processing original files without safe copies.")
         items_to_process = os.listdir(input_dir)
         process_with_individual_copies = False # No copies needed


    # Process items (either copied paths or original filenames)
    notebook_found_count = 0
    for item_name in items_to_process:
        # If processing originals, construct full path; otherwise, item_name is already full copied path
        if process_with_individual_copies:
             notebook_path = os.path.join(input_dir, item_name)
             original_path_for_msg = notebook_path # Original path is the one being processed
        else:
             # item_name is the copied path, get original path for messages
             notebook_path = item_name
             original_path_for_msg = copied_notebooks_map.get(notebook_path, notebook_path) # Fallback to copied path if map fails

        # Check if it's a notebook file
        if notebook_path.lower().endswith(".ipynb"):
             notebook_found_count += 1

             # Skip if this is the currently running notebook
             if current_notebook and os.path.basename(original_path_for_msg) == current_notebook:
                 results.append(f"Skipped (matches currently running notebook): {original_path_for_msg}")
                 continue

             try:
                 # Process each notebook individually
                 if format.lower() == "html":
                     # Pass process_with_individual_copies to decide if inner copy needed
                     result = convert_notebook_to_html(notebook_path, final_output_dir, overwrite,
                                                      use_safe_copy=process_with_individual_copies,
                                                      include_input=include_input)
                 else: # Default to PDF
                     result = convert_notebook_to_pdf(notebook_path, final_output_dir, overwrite,
                                                     use_safe_copy=process_with_individual_copies)

                 # If processing copies, replace temp path in result with original path for clarity
                 if not process_with_individual_copies and notebook_path in result:
                      result = result.replace(notebook_path, original_path_for_msg)

                 results.append(result)

             except Exception as e:
                 # Log the error but continue with other notebooks
                 results.append(f"Error processing {os.path.basename(original_path_for_msg)}: {str(e)}")


    if notebook_found_count == 0:
        results.append(f"No .ipynb files found in directory: {input_dir}")

    # Clean up the single temporary directory if one was created for all copies
    if temp_dir_for_all and use_safe_copy and ('temp' in temp_dir_for_all.lower() or 'tmp' in temp_dir_for_all.lower()):
         try:
             shutil.rmtree(temp_dir_for_all)
             print(f"Cleaned up temporary directory for all copies: {temp_dir_for_all}")
         except Exception as e:
             print(f"Warning: Could not clean up directory {temp_dir_for_all}: {e}")

    return results

# --- End of Core Conversion Functions ---

#### 8. Form-Based UI Handler (`handle_conversion_request`)

This function processes requests from the Gradio form interface. It acts as the bridge between the UI inputs and the core conversion functions.

It handles:
* Single file uploads or directory path inputs.
* Determining the correct output format (HTML on Kaggle, PDF/HTML locally).
* Constructing the appropriate output directory path (using `/kaggle/working/` on Kaggle).
* Passing user selections (overwrite, include code) to the conversion functions.
* Formatting the results for display in the UI log, including separators for readability.

In [None]:

def handle_conversion_request(input_path_type, single_file, input_dir, output_dir_base, overwrite, format="pdf", include_input=True):
    """
    Handles conversion requests from the form-based UI, adapting format for Kaggle,
    and adding log separators for directory processing.

    Args:
        input_path_type (str): "Single File" or "Directory"
        single_file (file object/temp file path): Uploaded file object (for Single File mode)
        input_dir (str): Directory path containing notebooks (for Directory mode)
        output_dir_base (str): Optional custom output directory path (base path)
        overwrite (bool): Whether to overwrite existing files
        format (str, optional): Preferred output format ("pdf" or "html"). Defaults to "pdf".
        include_input (bool, optional): Whether to include code cells in HTML output. Defaults to True.

    Returns:
        str: Conversion results as a formatted string, with newlines for readability.
    """
    results = []
    final_output_dir = None # Initialize
    try: # Wrap core logic in try/except for unexpected errors
        # 1. Check Core Dependencies (nbconvert)
        # Assumes check_and_install_dependencies() is defined elsewhere
        if not check_and_install_dependencies():
            return "Dependency check/installation failed (nbconvert). Cannot proceed."

        # 2. Determine Final Format based on Environment
        final_format = format.lower()
        # Assumes is_running_on_kaggle() is defined elsewhere
        if is_running_on_kaggle() and final_format == "pdf":
            final_format = "html"
            results.append("Running on Kaggle - defaulting to HTML format.")

        # 3. Determine Final Output Directory Path
        # Default base path = /kaggle/working on Kaggle, current dir otherwise
        base_path_for_output = os.getcwd()

        # Determine base path more intelligently
        if input_path_type == "Single File" and single_file:
            if hasattr(single_file, 'name') and isinstance(single_file.name, str):
                 try:
                     temp_dir = os.path.dirname(single_file.name)
                     if os.path.isdir(temp_dir):
                          base_path_for_output = temp_dir
                 except Exception as e:
                     print(f"Could not use temp file dir '{single_file.name}' as base: {e}. Defaulting to CWD.")
        elif input_path_type == "Directory" and input_dir and os.path.isdir(input_dir):
             # Use input dir as base *only if not Kaggle input*
            if not (is_running_on_kaggle() and input_dir.startswith("/kaggle/input")):
                 base_path_for_output = input_dir

        # Construct final path
        if output_dir_base: # User specified a base directory for the output folder
            final_output_dir = os.path.join(output_dir_base, f"converted_{final_format}")
        else: # Default: use determined base path
            final_output_dir = os.path.join(base_path_for_output, f"converted_{final_format}")

        results.append(f"Output directory set to: {final_output_dir}")
        os.makedirs(final_output_dir, exist_ok=True) # Ensure it exists

        # 4. Process Conversion based on Input Type and Format
        if input_path_type == "Single File":
            if single_file and hasattr(single_file, 'name') and isinstance(single_file.name, str):
                filepath = single_file.name # Path to the temporary uploaded file
                results.append(f"Processing file: {filepath}")
                # Assumes conversion functions are defined elsewhere
                if final_format == "html":
                    result = convert_notebook_to_html(filepath, final_output_dir, overwrite, use_safe_copy=True, include_input=include_input)
                else:
                    result = convert_notebook_to_pdf(filepath, final_output_dir, overwrite, use_safe_copy=True)
                results.append(result)
            else:
                results.append("Error: No file uploaded or file path unavailable.")

        elif input_path_type == "Directory":
            if input_dir and os.path.isdir(input_dir):
                results.append(f"Processing directory: {input_dir}")

                # --- Add separator line for readability ---
                results.append("\n--- File Conversion Status ---")
                # ------------------------------------------

                # Assumes conversion function is defined elsewhere
                conversion_results = convert_all_notebooks_in_dir(
                    input_dir, final_output_dir, overwrite, use_safe_copy=True,
                    format=final_format, include_input=include_input
                )
                # conversion_results is expected to be a list of strings

                results.extend(conversion_results) # Add the list of individual results

            else:
                results.append(f"Error: Invalid input directory path: {input_dir}")

    # --- Error Handling ---
    except OSError as e:
         results.append(f"Error creating output directory '{final_output_dir}': {e}")
    except NameError as e:
         results.append(f"Error: A required helper function might be missing: {e}")
         print(f"NameError in handle_conversion_request: {e}")
         traceback.print_exc()
    except Exception as e:
         results.append(f"An unexpected error occurred: {e}")
         print(f"Unexpected Error in handle_conversion_request: {e}")
         traceback.print_exc() # Print full traceback to console

    # --- Return ---
    # Join all accumulated messages (including the separator) with newlines
    return "\\n".join(results)

#### 9. Chat Interface Message Processing (`process_chat_message`)

*(This function handles the logic for the chat interface. It is defined here but the Gradio UI below **does not currently include the chat tab**, so this function is inactive unless the UI code is modified to add the chat components back.)*

This function represents the core of the AI-powered chat interface:

* Parses user messages to detect conversion intent and extract details (paths, options).
* Determines the correct output format (HTML on Kaggle, PDF/HTML locally).
* Constructs appropriate output paths (using `/kaggle/working/` on Kaggle).
* Calls the relevant conversion functions (`convert_notebook_to_...`, `convert_all_notebooks_in_dir`).
* Manages conversation state (`chat_state`) to handle multi-turn interactions (like asking for a path).
* Interacts with the Gemini AI model (`client.start_chat`) for non-conversion-related messages or general assistance.

In [None]:
def process_chat_message(message, chat_history, chat_state):
    """
    Handles messages from the chat interface, processes conversion requests with environment awareness.

    Args:
        message (str): The user's message text.
        chat_history (list): Current chat history (list of ChatMessage objects).
        chat_state (dict): State dictionary containing context and the Gemini chat object.

    Returns:
        tuple: (updated chat_history, updated chat_state)
    """
    # 1. Initialize chat state if needed
    if chat_state is None:
        chat_state = {"gemini_chat": init_chat(), "context": {}}

    # 2. Skip empty messages
    if not message.strip():
        return chat_history, chat_state

    user_message = message.strip()

    # 3. Add user message to Gradio chat history (modern format)
    chat_history.append(ChatMessage(role="user", content=user_message))

    # 4. --- Determine Conversion Parameters ---
    conversion_intent = (
        "convert" in user_message.lower() and
        ("notebook" in user_message.lower() or ".ipynb" in user_message.lower() or "file" in user_message.lower())
    )
    overwrite = "overwrite" in user_message.lower() or "replace" in user_message.lower()

    # Determine format preference (default pdf, check message, override for Kaggle)
    final_format = "pdf" # Default format
    if "html" in user_message.lower():
        final_format = "html"

    # Check if running on Kaggle and override to HTML if PDF was intended/defaulted
    is_kaggle = is_running_on_kaggle()
    if is_kaggle and final_format == "pdf":
        final_format = "html"
        # Optional: Add a message to history about the format change?
        # chat_history.append(ChatMessage(role="assistant", content="Note: Using HTML format on Kaggle."))

    # Determine code inclusion for HTML
    include_input = not ("no-input" in user_message.lower() or "hide code" in user_message.lower() or "without code" in user_message.lower())

    # --- End Parameter Determination ---


    # 5. Extract Path (if provided)
    # Regex for Windows/Linux paths, including quoted paths
    path_pattern = r'(?:in|at|from|path|is|=|\"|\')?[ \t]*([a-zA-Z]:[\\/](?:[^\"\'\\s\\/]| |[\\/])+|[/](?:[^\"\'\\s\\/]| |[\\/])+)(?:\"|\'|\s|$)'
    path_match = re.search(path_pattern, user_message)
    provided_path = None
    if path_match:
        raw_path = path_match.group(1).strip() # Get the matched path
        try:
            provided_path = os.path.normpath(raw_path)
            print(f"Detected path: {provided_path}")
        except Exception as e:
            print(f"Could not normalize detected path '{raw_path}': {e}")
            provided_path = raw_path # Use raw path if normalization fails


    # 6. --- Handle Conversion Intent ---
    if conversion_intent:

        # 6a. Check Dependencies before proceeding
        if not check_and_install_dependencies():
             chat_history.append(ChatMessage(role="assistant", content="Dependency check/installation failed. Cannot proceed with conversion."))
             return chat_history, chat_state

        # 6b. Handle "convert more files" request
        if ("more" in user_message.lower() or "again" in user_message.lower()):
            if "last_directory" in chat_state["context"]:
                previous_dir = chat_state["context"]["last_directory"]
                chat_history.append(ChatMessage(role="assistant", content=f"Converting more notebooks from: {previous_dir} to {final_format.upper()}"))
                output_dir = os.path.join(previous_dir, f"converted_{final_format}")
                # Call directory conversion, passing determined format/options
                results = convert_all_notebooks_in_dir(
                    previous_dir, output_dir, overwrite, use_safe_copy=True,
                    format=final_format, include_input=include_input
                )
                chat_history.append(ChatMessage(role="assistant", content="\\n".join(results)))
            else:
                chat_history.append(ChatMessage(role="assistant", content="I don't remember a previous directory. Please specify one."))
                chat_state["context"]["waiting_for"] = "directory_path" # Ask for path
                # Store current preferences in context for when user provides path
                chat_state["context"]["action"] = "convert_all"
                chat_state["context"]["overwrite"] = overwrite
                chat_state["context"]["format"] = final_format
                chat_state["context"]["include_input"] = include_input
            return chat_history, chat_state

        # 6c. Determine if request is for Directory or Single File
        is_directory_request = False
        if ("all" in user_message.lower() or "directory" in user_message.lower() or "folder" in user_message.lower()):
            is_directory_request = True
        if provided_path and os.path.isdir(provided_path):
             # If a valid directory path was provided, treat as directory request
             is_directory_request = True
        elif provided_path and provided_path.lower().endswith('.ipynb') and os.path.exists(provided_path):
             # If a valid file path was provided, treat as single file request
             is_directory_request = False
        # Note: Ambiguity remains if no path provided and no keyword used. Defaults to asking.

        # 6d. Handle Directory Conversion Flow
        if is_directory_request:
            if provided_path and os.path.isdir(provided_path):
                # Path provided and valid: Convert directly
                chat_history.append(ChatMessage(role="assistant",
                    content=f"Converting notebooks in directory: {provided_path} to {final_format.upper()}{' (with overwrite)' if overwrite else ''}"))
                output_dir = os.path.join(provided_path, f"converted_{final_format}")
                results = convert_all_notebooks_in_dir(
                    provided_path, output_dir, overwrite, use_safe_copy=True,
                    format=final_format, include_input=include_input
                )
                chat_history.append(ChatMessage(role="assistant", content="\\n".join(results)))
                chat_state["context"]["last_directory"] = provided_path # Remember for "more files"
                chat_state["context"]["waiting_for"] = None # Clear state

            elif chat_state["context"].get("waiting_for") == "directory_path":
                # User is providing path after being asked
                path_from_user = os.path.normpath(user_message) # Normalize the input message as path
                context_overwrite = chat_state["context"].get("overwrite", overwrite) # Use stored or current
                context_format = chat_state["context"].get("format", final_format)
                context_include_input = chat_state["context"].get("include_input", include_input)

                if not os.path.isdir(path_from_user):
                    chat_history.append(ChatMessage(role="assistant", content=f"Error: '{path_from_user}' is not a valid directory. Please provide a valid directory path."))
                    # Keep waiting_for state active
                else:
                    chat_history.append(ChatMessage(role="assistant",
                        content=f"Converting notebooks in directory: {path_from_user} to {context_format.upper()}{' (with overwrite)' if context_overwrite else ''}"))
                    output_dir = os.path.join(path_from_user, f"converted_{context_format}")
                    results = convert_all_notebooks_in_dir(
                        path_from_user, output_dir, context_overwrite, use_safe_copy=True,
                        format=context_format, include_input=context_include_input
                    )
                    chat_history.append(ChatMessage(role="assistant", content="\\n".join(results)))
                    chat_state["context"]["last_directory"] = path_from_user
                    # Clear context after successful conversion
                    chat_state["context"] = {"last_directory": path_from_user} # Reset context but keep last_directory

            else:
                # Ask for directory path
                chat_history.append(ChatMessage(role="assistant", content="Okay, please provide the path to the directory containing the notebooks:"))
                chat_state["context"]["waiting_for"] = "directory_path"
                chat_state["context"]["action"] = "convert_all"
                chat_state["context"]["overwrite"] = overwrite
                chat_state["context"]["format"] = final_format
                chat_state["context"]["include_input"] = include_input

        # 6e. Handle Single File Conversion Flow
        else: # Not a directory request
            if provided_path and provided_path.lower().endswith('.ipynb') and os.path.exists(provided_path):
                # Path provided and valid: Convert directly
                chat_history.append(ChatMessage(role="assistant",
                    content=f"Converting notebook: {provided_path} to {final_format.upper()}{' (with overwrite)' if overwrite else ''}"))
                output_dir = os.path.join(os.path.dirname(provided_path), f"converted_{final_format}")

                if final_format == "html":
                    result = convert_notebook_to_html(provided_path, output_dir, overwrite, use_safe_copy=True, include_input=include_input)
                else:
                    result = convert_notebook_to_pdf(provided_path, output_dir, overwrite, use_safe_copy=True)

                chat_history.append(ChatMessage(role="assistant", content=result))
                chat_state["context"]["last_directory"] = os.path.dirname(provided_path) # Remember parent dir
                chat_state["context"]["waiting_for"] = None # Clear state

            elif chat_state["context"].get("waiting_for") == "file_path":
                 # User is providing path after being asked
                path_from_user = os.path.normpath(user_message) # Normalize the input message as path
                context_overwrite = chat_state["context"].get("overwrite", overwrite) # Use stored or current
                context_format = chat_state["context"].get("format", final_format)
                context_include_input = chat_state["context"].get("include_input", include_input)

                if not os.path.exists(path_from_user):
                     chat_history.append(ChatMessage(role="assistant", content=f"Error: File not found at '{path_from_user}'. Please provide a valid path."))
                     # Keep waiting_for state active
                elif not path_from_user.lower().endswith('.ipynb'):
                     chat_history.append(ChatMessage(role="assistant", content=f"Error: '{path_from_user}' is not a Jupyter notebook (.ipynb) file."))
                     # Keep waiting_for state active
                else:
                    chat_history.append(ChatMessage(role="assistant",
                        content=f"Converting notebook: {path_from_user} to {context_format.upper()}{' (with overwrite)' if context_overwrite else ''}"))
                    output_dir = os.path.join(os.path.dirname(path_from_user), f"converted_{context_format}")

                    if context_format == "html":
                        result = convert_notebook_to_html(path_from_user, output_dir, context_overwrite, use_safe_copy=True, include_input=context_include_input)
                    else:
                        result = convert_notebook_to_pdf(path_from_user, output_dir, context_overwrite, use_safe_copy=True)

                    chat_history.append(ChatMessage(role="assistant", content=result))
                    chat_state["context"]["last_directory"] = os.path.dirname(path_from_user)
                    # Clear context after successful conversion
                    chat_state["context"] = {"last_directory": os.path.dirname(path_from_user)} # Reset context

            else:
                 # Ask for file path
                chat_history.append(ChatMessage(role="assistant", content="Okay, please provide the path to the notebook file (.ipynb) you want to convert:"))
                chat_state["context"]["waiting_for"] = "file_path"
                chat_state["context"]["action"] = "convert_single"
                chat_state["context"]["overwrite"] = overwrite
                chat_state["context"]["format"] = final_format
                chat_state["context"]["include_input"] = include_input

    # 7. --- Handle General Chat (No Conversion Intent) ---
    else:
        try:
            # Prepare prompt for Gemini
            prompt = (
                f"User message: '{user_message}'\n\n"
                "You are a helpful assistant for a notebook-to-PDF/HTML converter tool.\n"
                "You can help users convert Jupyter notebooks (.ipynb) to PDF or HTML format.\n"
                "If the user seems to be asking generally about converting notebooks, remind them of the commands:\n"
                "- 'Convert notebook at [path/to/file.ipynb]' (or '... in ...', '... from ...')\n"
                "- 'Convert all notebooks in [/path/to/directory]' (or '... folder ...')\n"
                "- 'Convert more files' (if you previously converted a directory)\n"
                "- You can specify 'to html' (default on Kaggle) or 'to pdf' (default locally).\n"
                "- You can add 'without code' or 'no-input' for HTML format.\n"
                "- You can add 'and overwrite' to replace existing output files.\n"
                "Keep your response conversational and helpful. If the user asks something unrelated to notebook conversion, politely decline and redirect."
            )

            # Send to Gemini using the chat session stored in state
            response = chat_state["gemini_chat"].send_message(prompt)
            chat_history.append(ChatMessage(role="assistant", content=response.text))
            # Clear any pending action state if Gemini handles the message
            if "waiting_for" in chat_state["context"]:
                 del chat_state["context"]["waiting_for"]
            if "action" in chat_state["context"]:
                 del chat_state["context"]["action"]


        except Exception as e:
            # Handle potential API errors
            error_message = f"Sorry, I encountered an error trying to process that: {str(e)}"
            chat_history.append(ChatMessage(role="assistant", content=error_message))
            print(f"Gemini API Error: {e}") # Log error to console

    # 8. Return updated history and state
    return chat_history, chat_state

#### 10. Gradio User Interface (Form Interface Only)

This section builds the user interface using Gradio (`gr.Blocks`). This version is configured to only display the **form-based interface** for converting notebooks.

* **UI Layout:** Defines the visual components for the form, including:
    * Radio buttons to select input type ('Single File' or 'Directory').
    * Conditional display of either a file upload component or a directory path textbox.
    * Radio buttons for selecting output format (PDF/HTML).
    * Checkboxes for options like 'Overwrite' and 'Include Code Cells'.
    * An optional textbox for specifying a base output directory.
    * A 'Convert' button to trigger the process.
    * A `gr.Textbox` to display the conversion log output.
* **Event Handling:** Connects the UI elements to the appropriate Python handler functions defined in previous cells:
    * The 'Input Type' radio button is linked to `update_visibility` to show/hide the correct input component.
    * The 'Convert Notebook(s)' button is linked to `handle_conversion_request` to perform the conversion.


In [None]:
# --- GRADIO APP DEFINITION (FORM ONLY) ---

with gr.Blocks(theme=gr.themes.Default()) as demo: # Using Default theme

    # --- UI Definition ---
    gr.Markdown("# Jupyter Notebook to PDF/HTML Converter")
    gr.Markdown("## Convert notebooks using the form below")

    with gr.Row():
        input_path_type_form = gr.Radio(
            ["Single File", "Directory"],
            label="Input Type",
            value="Single File"
        )
        overwrite_checkbox_form = gr.Checkbox(
            label="Overwrite existing output file(s)",
            value=False
        )

    # Input panels - controlled by radio button
    with gr.Column(visible=True) as single_file_panel:
        single_file_input_form = gr.File(
            label="Upload Notebook (.ipynb)",
            file_types=[".ipynb"],
            file_count="single"
        )

    with gr.Column(visible=False) as directory_panel:
        input_dir_textbox_form = gr.Textbox(
            label="Input Directory Path",
            placeholder="On Kaggle: Use /kaggle/input/... path after uploading data. Locally: Use local path.",
            info="Path to the folder containing notebooks. Must be accessible by the environment.",
            lines=1
        )

    # Format selection
    with gr.Row():
        format_radio = gr.Radio(
            ["PDF", "HTML"],
            label="Preferred Output Format",
            value="HTML" if is_running_on_kaggle() else "PDF" # Default based on env
        )
        include_code_checkbox = gr.Checkbox(
            label="Include Code Cells (HTML only)",
            value=True,
        )

    # Output directory selection
    with gr.Row():
        output_dir_textbox_form = gr.Textbox(
            label="Output Base Directory (Optional)",
            placeholder="Default: /kaggle/working/ or relative to input",
            info="Specify a base path where the 'converted_...' folder will be created."
        )

    # Action button and log
    convert_button = gr.Button("Convert Notebook(s)", variant="primary")
    output_log = gr.Textbox(label="Conversion Log", lines=15, interactive=False, autoscroll=True)


    # --- Event Handlers ---

    # Handler for Form Tab Radio button visibility
    def update_visibility(choice):
        """Updates UI visibility based on input type selection."""
        return {
            single_file_panel: gr.update(visible=(choice == "Single File")),
            directory_panel: gr.update(visible=(choice == "Directory"))
        }

    input_path_type_form.change(
        fn=update_visibility,
        inputs=input_path_type_form,
        outputs=[single_file_panel, directory_panel]
    )

    # Handler for Form Tab Convert button
    # Assumes handle_conversion_request is defined in a previous cell
    convert_button.click(
        fn=handle_conversion_request, # Function defined in a previous cell
        inputs=[
            input_path_type_form,
            single_file_input_form,
            input_dir_textbox_form,
            output_dir_textbox_form, # Base output dir
            overwrite_checkbox_form,
            format_radio,            # Pass format selection
            include_code_checkbox    # Pass code inclusion preference
        ],
        outputs=output_log
    )

# --- End of Gradio App Definition ---

#### 11. Launch the app
Launch the Gradio app, try both tabs.

In [None]:
# Launch the app
# share=True allows access from outside Kaggle/local network (use with caution)
# debug=True provides more detailed logs in the console if errors occur
demo.launch(share=True)

In [None]:
demo.close()
print("Gradio app closed.")

#### Project Summary and Next Steps

This project provides a functional tool for converting Jupyter Notebooks using a Gradio web interface, offering both form-based and chat-based interaction (though the chat relies on Gemini and requires API key setup).

##### Key Functionality Delivered:
* **Dual Interface:** Users can convert via a structured form or a conversational chat interface.
* **Form-Based Conversion:** Convert single `.ipynb` files or entire directories using the form.
* **Chat-Based Conversion:** Use natural language commands to convert files/directories via the chat tab (powered by `process_chat_message` and potentially Gemini).
* **Format Support:** Converts to **HTML** (especially suitable for Kaggle) or **PDF** (primarily for local use).
* **Environment Awareness:** Automatically adapts output format (HTML on Kaggle) and output paths (uses `/kaggle/working/`) based on the execution environment.
* **Robust Conversion:** Uses temporary copies to avoid self-conversion issues and handles common errors.
* **Dependency Management:** Checks for `nbconvert` and attempts installation.
* **Secure API Key Handling:** Loads Google API keys securely (needed for Gemini chat and client initialization).

##### Potential Enhancements:
* Improve error handling and feedback consistency between Form and Chat.
* Add more specific examples to the Chat interface description.
* Allow conversion to other formats (e.g., Markdown, Python script).
* Implement asynchronous conversion for large directories/files.

##### How to Use:
1.  **Setup:** Ensure your Google API key is configured (via Kaggle Secrets or local environment variables/.env file). This is required for the Gemini chat functionality.
2.  **Run All Cells:** Execute all cells in the notebook sequentially.
3.  **Launch:** Run the final `demo.launch()` cell.
4.  **Use the Form OR Chat:**
    * **Form Tab:** Select input type, provide file/path, choose options, click 'Convert'.
    * **Chat Tab:** Type conversion commands (see examples in UI) or ask for help.
    * **Paths on Kaggle:** Remember to use `/kaggle/input/...` paths for directories after uploading data via '+ Add Data'.
5.  **Find Output:** Check the 'Conversion Log' (Form) or chat responses for status. Converted files appear in the determined output directory (e.g., `/kaggle/working/..._converted_html` on Kaggle).