In [4]:
import gradio as gr
import google.generativeai as genai
import PyPDF2
import logging
import os
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [5]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [6]:
# Initialize global variables
chat_history = []
vectorizer = TfidfVectorizer(stop_words='english')
reference_vectors = {}
model = None
training_data = None

In [7]:
def process_training_data(csv_path: str = 'training_data.csv'):
    """Process training data from CSV with Context and Response columns."""
    try:
        df = pd.read_csv(csv_path)
        
        # Verify required columns exist
        if 'Context' not in df.columns or 'Response' not in df.columns:
            logging.error("Training data missing required columns 'Context' and 'Response'")
            return None
            
        # Clean the data
        df['Context'] = df['Context'].fillna('')
        df['Response'] = df['Response'].fillna('')
        
        # Combine context and response for vectorization
        df['combined_text'] = df['Context'] + ' ' + df['Response']
        
        logging.info(f"Successfully loaded {len(df)} training examples")
        return df
    except FileNotFoundError:
        logging.error(f"Training data file not found: {csv_path}")
        return None
    except Exception as e:
        logging.error(f"Error processing training data: {e}")
        return None

In [8]:
def extract_pdf_text(pdf_path: str) -> str:
    """Extract text from PDF files."""
    try:
        with open(pdf_path, "rb") as file:
            reader = PyPDF2.PdfReader(file)
            text = "\n".join(page.extract_text() for page in reader.pages)
            # Clean and normalize the text
            text = text.replace('\n', ' ').replace('\r', ' ')
            text = ' '.join(text.split())  # Normalize whitespace
            return text
    except Exception as e:
        logging.error(f"PDF extraction error - {pdf_path}: {str(e)}")
        return ""


In [9]:
def load_resources():
    """Load and process reference materials."""
    global reference_vectors, training_data
    reference_vectors.clear()

    # Load specific resource files
    resources = {
        'Ethics Code': 'ethics-code-2017.pdf',
        'Treatment Principles': 'principles.pdf',
        'Ethical Foundations': 'EthicalFoundationsofPsychology.pdf'
    }

    texts = []
    for source, file in resources.items():
        if os.path.exists(file):
            text = extract_pdf_text(file)
            if text:
                reference_vectors[source] = text
                texts.append(text)
                logging.info(f"Successfully loaded: {file}")
        else:
            logging.error(f"Resource file not found: {file}")

    # Load and process training data
    training_data = process_training_data()
    if training_data is not None and not training_data.empty:
        training_text = " ".join(
            training_data['Context'].astype(str) + " " + 
            training_data['Response'].astype(str)
        )
        training_text = ' '.join(training_text.split())  # Normalize whitespace
        reference_vectors['Training Examples'] = training_text
        texts.append(training_text)
        logging.info("Successfully integrated training data")

    if texts:
        vectorizer.fit(texts)
        logging.info("Vectorizer fitted successfully")
        return True
    else:
        logging.error("No reference materials loaded")
        return False

In [10]:
def calculate_resource_usage(response: str) -> str:
    """Calculate and format resource usage."""
    if not isinstance(response, str):
        logging.error("Response is not a string. Cannot calculate usage.")
        return "Invalid response type"
    
    if not reference_vectors:
        logging.error("No reference materials loaded")
        return "No reference materials available"
    
    try:
        response_vector = vectorizer.transform([response])
        usage = {}

        for source, text in reference_vectors.items():
            source_vector = vectorizer.transform([text])
            similarity = float(cosine_similarity(source_vector, response_vector)[0][0])
            percentage = round(similarity * 100, 1)
            if percentage > 2:  # Lower threshold to catch more subtle matches
                usage[source] = percentage

        if usage:
            sorted_usage = sorted(usage.items(), key=lambda x: x[1], reverse=True)
            return "\n".join(f"{source}: {pct}%" for source, pct in sorted_usage)
        return "No significant resource usage detected"
    
    except Exception as e:
        logging.error(f"Resource calculation error: {str(e)}")
        return "Unable to calculate resource usage"

In [12]:
def find_similar_examples(message: str, n=2):
    """Find similar examples from training data for context."""
    if training_data is None or training_data.empty:
        return []
    
    try:
        message_vector = vectorizer.transform([message])
        context_vectors = vectorizer.transform(training_data['Context'])
        similarities = cosine_similarity(message_vector, context_vectors)[0]
        top_indices = similarities.argsort()[-n:][::-1]
        
        return [
            {
                'context': training_data.iloc[i]['Context'],
                'response': training_data.iloc[i]['Response'],
                'similarity': similarities[i]
            }
            for i in top_indices if similarities[i] > 0.1
        ]
    except Exception as e:
        logging.error(f"Error finding similar examples: {e}")
        return []

In [13]:
def detect_crisis(text: str) -> bool:
    """Detect potential crisis situations."""
    crisis_keywords = {
        'suicide', 'kill myself', 'end it all', 'want to die', 'suicidal',
        'self harm', 'hopeless', 'worthless', 'no reason to live'
    }
    return any(keyword in text.lower() for keyword in crisis_keywords)

In [14]:
def get_crisis_response() -> tuple:
    """Generate response for crisis situations."""
    response = """I'm very concerned about what you're sharing and want you to know you're not alone. 

    Please reach out for immediate help:

    • Emergency: Call 911 (US) or your local emergency number
    • 24/7 Crisis Hotline: 988 (US)
    • Crisis Text Line: Text HOME to 741741

    Would you like help finding mental health resources in your area?"""

    return response, "Crisis Response: 100%"

In [15]:
def chat_response(message: str, history: list) -> tuple:
    """Generate chat response with resource tracking."""
    if not message.strip():
        return [], ""

    # Check for crisis
    if detect_crisis(message):
        response, usage = get_crisis_response()
        history.append((message, response))
        return history, usage

    try:
        # Find similar examples from training data
        similar_examples = find_similar_examples(message)
        
        # Build prompt using history and similar examples
        history_text = "\n".join([f"User: {h[0]}\nAssistant: {h[1]}" for h in history[-3:]])
        examples_text = "\n".join([
            f"Similar situation:\nUser: {ex['context']}\nResponse: {ex['response']}"
            for ex in similar_examples
        ])

        prompt = f"""You are a mental health support assistant. Your role is to provide empathetic, practical guidance while following these guidelines (which should not appear in your response):

        Previous conversation:
        {history_text}

        Reference examples:
        {examples_text}

        User message: {message}

        Respond in a natural, conversational way that:
        - Validates and acknowledges emotions first
        - Provides specific, evidence-based suggestions
        - Uses warm, supportive language
        - Includes practical coping strategies
        - Keeps responses clear and concise

        Important: Do not include these guidelines or any headers/sections in your response. Write naturally as if having a conversation."""

        model_output = model.generate_content(prompt)
        response = model_output.text if hasattr(model_output, 'text') else str(model_output)
        
        usage = calculate_resource_usage(response)
        history.append((message, response))
        return history, usage

    except Exception as e:
        logging.error(f"Response generation error: {str(e)}")
        history.append((message, "I apologize, but I'm having trouble right now. Could you rephrase your message?"))
        return history, "Error calculating resource usage"



In [16]:
def create_interface():
    """Create and configure Gradio interface."""
    with gr.Blocks(css="""
        .chatbot { height: 70vh; overflow-y: auto }
        .usage-box { background: #f6f6f6; padding: 10px; border-radius: 5px }
    """) as interface:
        gr.Markdown("""# 🤖 Mental Health Support Assistant""")

        with gr.Row():
            chatbot = gr.Chatbot(value=[], elem_classes="chatbot", bubble_full_width=True)
            with gr.Column():
                usage_display = gr.Textbox(label="Resource Usage", elem_classes="usage-box", lines=4, interactive=False)

        msg = gr.Textbox(show_label=False, placeholder="Type your message here...")
        submit = gr.Button("Send", variant="primary")
        clear = gr.Button("Clear conversation")

        gr.Markdown("""**Disclaimer**: This AI assistant is not a substitute for professional mental health care.""")

        msg.submit(chat_response, [msg, chatbot], [chatbot, usage_display], queue=False)
        submit.click(chat_response, [msg, chatbot], [chatbot, usage_display], queue=False)
        clear.click(lambda: ([], ""), None, [chatbot, usage_display], queue=False)

    return interface


In [17]:
def initialize_chatbot(api_key: str):
    """Initialize the chatbot."""
    global model
    try:
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel("gemini-pro")
        success = load_resources()
        if not success:
            logging.error("Failed to load resources")
            return False
        logging.info("Chatbot initialized successfully")
        return True
    except Exception as e:
        logging.error(f"Initialization error: {str(e)}")
        return False

In [18]:
def main():
    """Main function to run the chatbot"""
    API_KEY = "API_KEY"  # Replace with your API key
    
    if initialize_chatbot(API_KEY):
        interface = create_interface()
        interface.launch(share=False)
    else:
        print("Error initializing chatbot. Please check your resources and API key.")

if __name__ == "__main__":
    main()

2025-02-17 18:59:22,599 - INFO - Successfully loaded: ethics-code-2017.pdf
2025-02-17 18:59:25,157 - INFO - Successfully loaded: principles.pdf
2025-02-17 18:59:36,350 - INFO - Successfully loaded: EthicalFoundationsofPsychology.pdf
2025-02-17 18:59:36,498 - INFO - Successfully loaded 3512 training examples
2025-02-17 18:59:36,819 - INFO - Successfully integrated training data
2025-02-17 18:59:38,370 - INFO - Vectorizer fitted successfully
2025-02-17 18:59:38,377 - INFO - Chatbot initialized successfully
2025-02-17 18:59:38,862 - INFO - HTTP Request: GET http://127.0.0.1:7863/startup-events "HTTP/1.1 200 OK"
2025-02-17 18:59:38,911 - INFO - HTTP Request: HEAD http://127.0.0.1:7863/ "HTTP/1.1 200 OK"


Running on local URL:  http://127.0.0.1:7863

To create a public link, set `share=True` in `launch()`.


2025-02-17 18:59:42,102 - INFO - HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"
