In [0]:
%pip install -U -qqq langchain_core langchain_databricks langchain_community
%restart_python

In [0]:
import pandas as pd
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_databricks import ChatDatabricks
from databricks.sdk import WorkspaceClient
import os

In [0]:

# configure workspace tokens
w = WorkspaceClient()
os.environ["DATABRICKS_HOST"] = w.config.host
os.environ["DATABRICKS_TOKEN"] = w.tokens.create(comment="for model serving", lifetime_seconds=1200).token_value

In [0]:
MODEL = "databricks-llama-4-maverick"
# "databricks-meta-llama-3-1-405b-instruct"

In [0]:
# save data to a csv locally:
df = spark.sql("SELECT * FROM `dais-hackathon-2025`.bright_initiative.google_maps_businesses where category = 'Psychotherapist' AND country = 'US'").toPandas().to_csv("data.csv", index=0)

In [0]:
llm = ChatDatabricks(endpoint=MODEL)

def format_context(df: pd.DataFrame) -> str:
    """
    Converts the DataFrame into a JSON string to ensure all data is passed
    to the model without truncation. JSON is also a great format for structured data
    like you have in 'description_by_sections'.
    """
    return df.to_json(orient='records', indent=2)

def find_accessible_psychotherapists(location: str) -> pd.DataFrame:
  """
  Queries the Bright Initiative google maps dataset for psychotherapists on a specific location.
  """
  query = pd.read_csv("data.csv")
  query = query[query.address.fillna("").str.lower().str.contains(location.lower())]
  
  return format_context(query)

# Define the prompt template for the LLM
prompt_template = PromptTemplate.from_template(
  """
  You are a helpful assistant with the purpose of finding helpful mental health resources. Your goal is to summarize potential psychoterapist venues a user can get in touch.

  Summarize the information the user can use to get in touch with.

  If the person types the name of a city or location, you should only show the closest locations to the city or place the user sent

  Here is the JSON data:
  {context}
  """
)

llm = ChatDatabricks(endpoint=MODEL)

# This is our simple "agentic" chain
chain = (
    find_accessible_psychotherapists
    | prompt_template
    | llm
    | StrOutputParser()
)

# Let's run the chain for Chicago!
result = chain.invoke("Chicago")

print(result)

### Risk Detection

In [0]:
%pip install openai
dbutils.library.restartPython()

In [0]:
from openai import OpenAI
import json
import logging
import re
from typing import Dict, List, Tuple
from enum import Enum
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RiskLevel(Enum):
    """Risk levels for mental health assessment"""
    CRITICAL = "critical"  # Immediate danger, explicit suicidal intent
    HIGH     = "high"      # Suicidal ideation present, planning elements
    MODERATE = "moderate"  # Depression, hopelessness, passive ideation
    LOW      = "low"       # General distress, no immediate concern
    MINIMAL  = "minimal"   # No significant risk indicators

class RiskDetector:
    """
    Risk detection system using LLM and rule-based approaches
    """
    
    def __init__(
        self,
        databricks_endpoint: str = None,
        model_name: str = "databricks-llama-4-maverick",
        enable_llm: bool = True,
        fallback_models: List[str] = None
    ):
        """
        Initialize the risk detector for Databricks environment
        
        Args:
            databricks_endpoint: The Databricks serving endpoint base URL
            model_name: The model to use for inference
            enable_llm: Whether to use LLM for enhanced detection
            fallback_models: List of alternative models to try if primary fails
        """
        # Grab Databricks token if running in notebook, else placeholder
        try:
            self.DATABRICKS_TOKEN = (
                dbutils.notebook.entry_point.getDbutils()
                .notebook().getContext().apiToken().get()
            )
        except:
            self.DATABRICKS_TOKEN = "YOUR_TOKEN_HERE"
            logger.warning("Could not get Databricks token automatically. Using placeholder.")
        
        self.endpoint       = databricks_endpoint or "https://dbc-9615609d-a312.cloud.databricks.com/serving-endpoints"
        self.model_name     = model_name
        self.enable_llm     = enable_llm
        self.fallback_models = fallback_models or [
            "databricks-meta-llama-3-1-8b-instruct",
            "databricks-mistral-7b-instruct"
        ]
        
        # Initialize OpenAI client pointing at Databricks
        self.client = OpenAI(
            api_key=self.DATABRICKS_TOKEN,
            base_url=self.endpoint
        )
        
        # Rule-based indicator lists
        self.critical_indicators = [
            "kill myself", "end my life", "suicide plan", "method to die",
            "goodbye note", "final goodbye", "won't be here tomorrow",
            "end it all tonight", "overdose", "jump off", "hanging myself",
            "cutting myself", "slit my wrists"
        ]
        self.high_risk_indicators = [
            "want to die", "wish i was dead", "better off dead",
            "thinking about suicide", "suicidal thoughts", "no reason to live",
            "can't go on", "considering that last option",
            "don't want to wake up", "hope i don't wake up"
        ]
        self.moderate_risk_indicators = [
            "hopeless", "worthless", "burden", "don't want to exist",
            "exhausted from breathing", "self destructive", "really bad place",
            "worse than ever", "don't feel real", "can't take it anymore",
            "no point", "why bother", "given up", "don't feel like i should exist",
            "exhausted from just breathing"
        ]
    
    def detect_risk(self, text: str) -> Tuple[RiskLevel, float, Dict]:
        """
        Detect risk level from user text
        
        Returns:
            (risk_level, confidence_score, detailed_analysis)
        """
        try:
            # rule-based quick screen
            rule_risk = self._rule_based_screening(text)
            llm_analysis = {}
            
            # LLM-based nuance
            if self.enable_llm:
                llm_analysis = self._llm_risk_assessment(text)
            
            final_risk, confidence, analysis = self._combine_assessments(
                rule_risk, llm_analysis, text
            )
            
            logger.info(f"Risk Assessment - Level: {final_risk.value}, Confidence: {confidence}")
            return final_risk, confidence, analysis
        
        except Exception as e:
            logger.error(f"Error in risk detection: {e}")
            # fallback to safe setting
            return RiskLevel.HIGH, 0.5, {
                "error": str(e),
                "message": "Error in assessment, treating as high risk for safety"
            }
    
    def _rule_based_screening(self, text: str) -> RiskLevel:
        """Quick rule-based screening for immediate risk detection"""
        tl = text.lower()
        
        # critical?
        for kw in self.critical_indicators:
            if kw in tl:
                return RiskLevel.CRITICAL
        
        # high?
        for kw in self.high_risk_indicators:
            if kw in tl:
                return RiskLevel.HIGH
        
        # moderate?
        count = sum(1 for kw in self.moderate_risk_indicators if kw in tl)
        if count >= 3:
            return RiskLevel.HIGH
        elif count == 2:
            return RiskLevel.MODERATE
        elif count == 1:
            return RiskLevel.LOW
        
        return RiskLevel.MINIMAL
    
    def _llm_risk_assessment(self, text: str) -> Dict:
        """
        Use LLM for sophisticated risk assessment with fallback models
        """
        system_prompt = """
You are a mental health risk assessment specialist. Analyze text
for suicide risk and mental health crisis indicators. Respond with JSON
only, following this scheme:

{
  "risk_level": "critical/high/moderate/low/minimal",
  "confidence": 0.0-1.0,
  "risk_factors": [...],
  "protective_factors": [...],
  "immediate_danger": true/false,
  "reasoning": "brief explanation"
}
Always err on the side of caution.
""".strip()
        
        user_prompt = f"""
Analyze the following text for suicide risk indicators:

Text: "{text}"

Respond in the exact JSON format.
""".strip()
        
        for model in [self.model_name] + self.fallback_models:
            try:
                resp = self.client.chat.completions.create(
                    model=model,
                    messages=[
                        {"role":"system", "content": system_prompt},
                        {"role":"user",   "content": user_prompt}
                    ],
                    max_tokens=500,
                    temperature=0.1
                )
                content = resp.choices[0].message.content
                match = re.search(r'(\{.*?\})', content, re.DOTALL)
                if match:
                    return json.loads(match.group(1))
                else:
                    logger.warning(f"Could not parse JSON from model {model}")
                    continue
            except Exception as e:
                logger.warning(f"Model {model} failed: {e}")
                continue
        
        logger.warning("All LLM models failed; falling back to rule-based only.")
        return {}
    
    def _combine_assessments(
        self,
        rule_risk: RiskLevel,
        llm_analysis: Dict,
        original_text: str
    ) -> Tuple[RiskLevel, float, Dict]:
        """
        Combine rule-based and LLM assessments
        """
        final_risk = rule_risk
        confidence = 0.7
        
        if llm_analysis:
            llm_str = llm_analysis.get("risk_level", "").lower()
            llm_conf = llm_analysis.get("confidence", 0.5)
            map_int = lambda lvl: {
                "minimal": 1, "low": 2,
                "moderate": 3, "high": 4, "critical": 5
            }.get(lvl, 3)
            llm_risk = RiskLevel(llm_str) if llm_str in RiskLevel._value2member_map_ else RiskLevel.MODERATE
            
            # pick higher risk
            if map_int(llm_risk.value) > map_int(rule_risk.value):
                final_risk = llm_risk
                confidence = llm_conf
            else:
                confidence = (confidence + llm_conf) / 2
        
        analysis = {
            "risk_level": final_risk.value,
            "confidence": confidence,
            "rule_based_risk": rule_risk.value,
            "timestamp": datetime.utcnow().isoformat(),
            "text_length": len(original_text),
            "assessment_method": "hybrid"
        }
        
        if llm_analysis:
            analysis.update({
                "llm_risk":          llm_analysis.get("risk_level"),
                "risk_factors":      llm_analysis.get("risk_factors", []),
                "protective_factors":llm_analysis.get("protective_factors", []),
                "immediate_danger":  llm_analysis.get("immediate_danger", False),
                "reasoning":         llm_analysis.get("reasoning", "")
            })
        
        return final_risk, confidence, analysis
    
    def assess_from_unity_catalog(
        self,
        table_path: str,
        text_column: str = "text",
        limit: int = None
    ) -> List[Dict]:
        """
        Read rows from Unity Catalog table, assess risk for each, and return results.
        """
        try:
            sql = f"SELECT * FROM {table_path}"
            if limit:
                sql += f" LIMIT {limit}"
            df = spark.sql(sql)
            
            results = []
            for row in df.collect():
                idx = row["index_column"]
                txt = row[text_column]
                cls = row["class"]
                src = row["source"]
                
                rl, conf, analysis = self.detect_risk(txt)
                
                results.append({
                    "index_column": idx,
                    "original_text": txt,
                    "class": cls,
                    "source": src,
                    "risk_assessment": {
                        "risk_level": rl.value,
                        "confidence": conf,
                        "analysis": analysis
                    }
                })
            return results
        
        except Exception as e:
            logger.error(f"Error reading from Unity Catalog: {e}")
            return []

