In [1]:
# Run the other component notebooks to bring in their definitions
%run "./assessment_criteria.ipynb"
%run "./guidelines_helper.ipynb"

Assessment criteria loaded successfully.
Guidelines retriever loaded successfully.


In [2]:
"""
Assessment tools for insurance underwriting evaluation.
This module provides functions to assess hazard, vulnerability, and catastrophe risks
using structured criteria and guidelines retrieval.
"""

import logging
import re
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime

logger = logging.getLogger("InsuranceAgentSystem")

class AssessmentTools:
    """
    Provides assessment functions for evaluating insurance submissions
    against structured criteria and guidelines.
    """
    
    def __init__(self, guidelines_retriever):
        """
        Initialize with a guidelines retriever.
        
        Args:
            guidelines_retriever: A configured GuidelinesRetriever instance
        """
        self.guidelines_retriever = guidelines_retriever

    # # Add this function to assessment_tools.ipynb under the AssessmentTools class
    # async def check_eligibility(self, submission_data: Dict[str, Any]) -> Tuple[bool, List[str]]:
    #     """
    #     Performs explicit eligibility checks based on underwriting guidelines.
        
    #     Args:
    #         submission_data: Dictionary containing submission details
            
    #     Returns:
    #         Tuple containing (is_eligible, list_of_eligibility_issues)
    #     """
    #     eligibility_issues = []
    #     property_details = submission_data.get("property_details", {})
        
    #     # Check building age
    #     year_built = property_details.get("year_built", 0)
    #     building_age = datetime.now().year - year_built if year_built else 0
        
    #     if building_age > 35:
    #         eligibility_issues.append(f"Building age ({building_age} years) exceeds maximum guideline of 35 years")
        
    #     # Check building type and occupancy
    #     building_type = property_details.get("building_type", "").lower()
    #     occupancy = property_details.get("occupancy", "").lower()
        
    #     # List of ineligible occupancies from the guidelines (Item 35 in General Underwriting Criteria)
    #     INELIGIBLE_OCCUPANCIES = {
    #         "amusement", "attorney", "auto filing", "auto parking", "bank", "credit union",
    #         "bar", "tavern", "bowling", "camp", "carpet stock", "car wash", "church", "synagogue",
    #         "civic", "fraternal", "club", "hall", "collection", "credit", "loan", "contractor",
    #         "drive-in", "feed mill", "fraternity", "sorority", "fruit packing", "government",
    #         "greenhouse", "lodge", "lumber yard", "manufacturing", "nightclub", "newspaper",
    #         "packing house", "political", "pool hall", "restaurant", "school", "union", "ymca", "ywca",
    #         "welfare", "woodworker"
    #     }
        
    #     # Check for ineligible occupancies
    #     for ineligible in INELIGIBLE_OCCUPANCIES:
    #         if ineligible in building_type or ineligible in occupancy:
    #             eligibility_issues.append(f"Occupancy type '{occupancy}' or building type '{building_type}' matches ineligible category: '{ineligible}'")
    #             break
        
    #     # Check construction type
    #     construction = property_details.get("construction", "").lower()
    #     if "wood shake" in construction or "shake roof" in construction:
    #         eligibility_issues.append("Buildings with wood shake roofs are ineligible for coverage")
        
    #     # Check number of stories
    #     stories = property_details.get("stories", 0)
    #     has_sprinklers = property_details.get("sprinklers", False)
        
    #     if stories > 3:
    #         eligibility_issues.append(f"Buildings exceeding 3 stories are ineligible for coverage")
    #     elif stories > 2 and not has_sprinklers:
    #         eligibility_issues.append(f"Buildings over 2 stories must be fully sprinklered")
        
    #     # Check vacancy
    #     vacancy = property_details.get("vacancy_percentage", 0)
    #     if vacancy > 25:
    #         eligibility_issues.append(f"Buildings with more than 25% vacancy are ineligible for coverage")
        
    #     # Check protection class
    #     protection_class = property_details.get("protection_class", 0)
    #     if protection_class > 6:
    #         eligibility_issues.append(f"Property must be within Protection Classes 1-6")
        
    #     # Return eligibility status
    #     is_eligible = len(eligibility_issues) == 0
        
    #     return is_eligible, eligibility_issues
    
    async def evaluate_hazard(self, property_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Evaluates the hazard classification of a property based on building type,
        construction materials, and occupancy.
        
        Args:
            property_data: Dictionary containing property details
            
        Returns:
            A dictionary with hazard assessment results
        """
        logger.info("Starting hazard classification assessment")
        
        # Extract property characteristics
        building_type = property_data.get("building_type", "")
        construction = property_data.get("construction", "")
        occupancy = property_data.get("occupancy", "")
        year_built = property_data.get("year_built", 0)
        building_age = datetime.now().year - year_built if year_built > 0 else 30  # Assume 30 years if unknown
        
        logger.info(f"Analyzing property: {building_type} building with {construction} construction")
        
        # Retrieve relevant guidelines
        guidelines = await self.guidelines_retriever.get_hazard_guidelines(building_type, construction)
        
        # Calculate building type score
        bt_score = 3.0  # Default score
        for bt, score in HAZARD_CLASSIFICATION["physical"]["factors"]["construction_type"].items():
            if bt.lower() in building_type.lower():
                bt_score = score
                break
        
        # Calculate construction score
        const_score = 3.0  # Default score
        for material, score in HAZARD_CLASSIFICATION["physical"]["factors"]["construction_type"].items():
            if material.lower() in construction.lower():
                const_score = score
                break
        
        # Calculate age score
        age_score = 3.0  # Default score
        if building_age <= 10:
            age_score = HAZARD_CLASSIFICATION["physical"]["factors"]["building_age"]["0-10 years"]
        elif building_age <= 20:
            age_score = HAZARD_CLASSIFICATION["physical"]["factors"]["building_age"]["11-20 years"]
        elif building_age <= 35:
            age_score = HAZARD_CLASSIFICATION["physical"]["factors"]["building_age"]["21-35 years"]
        else:
            age_score = HAZARD_CLASSIFICATION["physical"]["factors"]["building_age"]["Over 35 years"]
        
        # Calculate occupancy score
        occ_score = 3.0  # Default score
        for occ_type, score in HAZARD_CLASSIFICATION["physical"]["factors"]["occupancy"].items():
            if occ_type.lower() in occupancy.lower():
                occ_score = score
                break
        
        # Calculate overall hazard score (weighted average)
        hazard_score = (bt_score * 0.3 + const_score * 0.3 + age_score * 0.2 + occ_score * 0.2)
        logger.info(f"Calculated hazard score: {hazard_score}")
        
        # Prepare assessment results
        assessment_results = {
            "score": hazard_score,
            "building_type_assessment": f"{building_type}: Risk level {bt_score}",
            "construction_assessment": f"{construction}: Risk level {const_score}",
            "age_assessment": f"Building age {building_age} years: Risk level {age_score}",
            "occupancy_assessment": f"{occupancy}: Risk level {occ_score}",
            "guidelines_referenced": guidelines[:200] + "..." if len(guidelines) > 200 else guidelines
        }
        
        return assessment_results
    
    async def evaluate_vulnerability(self, security_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Evaluates the vulnerability of a property based on security systems,
        protective measures, and other safety features.
        
        Args:
            security_data: Dictionary containing security details
            
        Returns:
            A dictionary with vulnerability assessment results
        """
        logger.info("Starting vulnerability assessment")
        
        # Extract security characteristics
        has_sprinklers = security_data.get("sprinklers", False)
        alarm_system = security_data.get("alarm_system", "None")
        building_type = security_data.get("building_type", "")
        building_age = datetime.now().year - security_data.get("year_built", 0) if security_data.get("year_built", 0) > 0 else 30
        
        logger.info(f"Security features: Sprinklers={has_sprinklers}, Alarm={alarm_system}")
        
        # Retrieve relevant guidelines
        guidelines = await self.guidelines_retriever.get_vulnerability_guidelines(building_type, building_age)
        
        # Calculate sprinkler score
        sprinkler_score = 1.0 if has_sprinklers else 4.0
        
        # Calculate alarm score
        alarm_scores = {
            "None": 5.0,
            "Local": 3.0,
            "Monitored": 2.0,
            "Grade A - 24hr Monitored": 1.0
        }
        alarm_score = alarm_scores.get(alarm_system, 3.0)
        
        # Count identified vulnerability hazards
        hazards_count = 0
        if not has_sprinklers:
            hazards_count += 1
        if alarm_system == "None":
            hazards_count += 1
        if building_age > 35:
            hazards_count += 1
        
        # Calculate overall vulnerability score (weighted average)
        vulnerability_score = (sprinkler_score * 0.6 + alarm_score * 0.4)
        logger.info(f"Calculated vulnerability score: {vulnerability_score}")
        
        # Determine risk level
        risk_level = "moderate"
        for level, criteria in VULNERABILITY_RISK_LEVELS.items():
            min_score, max_score = criteria["score_range"]
            if min_score <= vulnerability_score <= max_score:
                risk_level = level
                break
        
        # Prepare assessment results
        assessment_results = {
            "score": vulnerability_score,
            "risk_level": risk_level,
            "hazards_count": hazards_count,
            "sprinkler_assessment": f"Sprinklers: {'Present' if has_sprinklers else 'Absent'}, Risk level {sprinkler_score}",
            "alarm_assessment": f"Alarm: {alarm_system}, Risk level {alarm_score}",
            "guidelines_referenced": guidelines[:200] + "..." if len(guidelines) > 200 else guidelines
        }
        
        return assessment_results
    
    async def evaluate_cat_modeling(self, location_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Evaluates catastrophe risks based on geographical location and environmental factors.
        
        Args:
            location_data: Dictionary containing location details
            
        Returns:
            A dictionary with CAT modeling assessment results
        """
        logger.info("Starting CAT modeling assessment")
        
        # Extract location information
        address = location_data.get("address", "")
        logger.info(f"Analyzing location: {address}")
        
        # Retrieve relevant guidelines
        guidelines = await self.guidelines_retriever.get_cat_modeling_guidelines(address)
        
        # Evaluate flood risk
        flood_risk = 2.0  # Default moderate risk
        flood_risk_rationale = "Standard flood risk assessment"
        
        for keyword in CAT_MODELING_GEO_FACTORS["flood_risk"]["keywords"]:
            if keyword.lower() in address.lower():
                flood_risk = 4.0
                flood_risk_rationale = f"Potential high flood risk area (keyword: {keyword})"
                break
                
        for area in CAT_MODELING_GEO_FACTORS["flood_risk"]["high_risk_areas"]:
            if area.lower() in address.lower():
                flood_risk = 4.5
                flood_risk_rationale = f"Known high flood risk area: {area}"
                break
        
        # Evaluate earthquake/geographic risk
        earthquake_risk = 1.0  # Default low risk
        earthquake_risk_rationale = "Standard geographic risk assessment"
        
        # Check address against known regions with earthquake risk
        for region, risk in CAT_MODELING_GEO_FACTORS["earthquake_risk"]["regions"].items():
            if region.lower() in address.lower():
                earthquake_risk = risk
                earthquake_risk_rationale = f"Known region with earthquake risk: {region}"
                break
        
        # Check for hurricane risk in coastal areas
        hurricane_risk = 1.0  # Default low risk
        for region, risk in CAT_MODELING_GEO_FACTORS["hurricane_risk"]["regions"].items():
            if region.lower() in address.lower():
                hurricane_risk = risk
                earthquake_risk_rationale = f"Area with hurricane risk: {region}"
                break
        
        # Check for wildfire risk
        wildfire_risk = 1.0  # Default low risk
        for region, risk in CAT_MODELING_GEO_FACTORS["wildfire_risk"]["regions"].items():
            if region.lower() in address.lower():
                wildfire_risk = risk
                earthquake_risk_rationale = f"Area with wildfire risk: {region}"
                break
        
        # Calculate overall CAT score (taking worst-case scenario)
        cat_risks = [flood_risk, earthquake_risk, hurricane_risk, wildfire_risk]
        cat_score = max(cat_risks)
        logger.info(f"Calculated CAT score: {cat_score} (flood_risk: {flood_risk}, earthquake_risk: {earthquake_risk}, hurricane_risk: {hurricane_risk}, wildfire_risk: {wildfire_risk})")
        
        # Prepare assessment results
        assessment_results = {
            "score": cat_score,
            "flood_risk_assessment": f"Flood risk level: {flood_risk:.1f}/5.0 - {flood_risk_rationale}",
            "earthquake_risk_assessment": f"Earthquake risk: {earthquake_risk:.1f}/5.0 - {earthquake_risk_rationale}",
            "hurricane_risk_assessment": f"Hurricane risk: {hurricane_risk:.1f}/5.0",
            "wildfire_risk_assessment": f"Wildfire risk: {wildfire_risk:.1f}/5.0",
            "location_analyzed": address,
            "guidelines_referenced": guidelines[:200] + "..." if len(guidelines) > 200 else guidelines
        }
        
        return assessment_results
    
    async def make_underwriting_decision(self, assessment_results: Dict[str, Dict[str, Any]], submission_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Analyzes assessment results and makes a final underwriting decision.
        
        Args:
            assessment_results: Dictionary containing hazard, vulnerability and CAT assessment results
            submission_data: Dictionary containing the original submission data
            
        Returns:
            A dictionary with the decision outcome, reason, and confidence score
        """
        logger.info("Starting decision making process")
        
        # Retrieve relevant guidelines
        guidelines = await self.guidelines_retriever.get_decision_guidelines(submission_data)
        
        # Extract scores from assessments
        hazard_score = assessment_results.get("hazard", {}).get("score", 3.0)
        vulnerability_score = assessment_results.get("vulnerability", {}).get("score", 3.0)
        cat_score = assessment_results.get("cat_modeling", {}).get("score", 3.0)
        
        logger.info(f"Assessment scores - Hazard: {hazard_score}, Vulnerability: {vulnerability_score}, CAT: {cat_score}")
        
        # Calculate weighted composite risk score
        composite_score = (
            hazard_score * ASSESSMENT_WEIGHTS["hazard"] +
            vulnerability_score * ASSESSMENT_WEIGHTS["vulnerability"] +
            cat_score * ASSESSMENT_WEIGHTS["cat"]
        )
        
        logger.info(f"Calculated composite risk score: {composite_score}")
        
        # Decision logic based on thresholds from the guidelines
        thresholds = GENERAL_UNDERWRITING_CRITERIA["decision_thresholds"]
        
        if composite_score <= thresholds["proceed_to_quotation"]["max_composite_score"]:
            decision = "PROCEED_TO_QUOTATION"
            reason = "Risk profile is within acceptable parameters"
            confidence = 1.0 - (composite_score / 5.0)  # Higher when score is lower
        elif composite_score <= thresholds["recommend_surveyor"]["max_composite_score"]:
            decision = "RECOMMEND_SURVEYOR"
            reason = "Risk profile requires additional assessment"
            confidence = thresholds["recommend_surveyor"]["confidence"]
        else:
            decision = "REJECT"
            reason = "Risk profile exceeds acceptable parameters"
            confidence = min(1.0, max(0.0, composite_score / 5.0 - 0.2))  # Higher when score is higher
        
        logger.info(f"Initial decision: {decision}, Reason: {reason}, Confidence: {confidence:.2f}")
        
        # Check for eligibility issues
        eligibility_issues = []
        
        # Check building age
        building_age = datetime.now().year - submission_data.get("property_details", {}).get("year_built", 0)
        if building_age > GENERAL_UNDERWRITING_CRITERIA["building_eligibility"]["age"]["max_age"]:
            eligibility_issues.append(f"Building age ({building_age} years) exceeds maximum guideline of 35 years")
            
        # Check stories
        stories = submission_data.get("property_details", {}).get("stories", 0)
        has_sprinklers = submission_data.get("property_details", {}).get("sprinklers", False)
        max_stories = GENERAL_UNDERWRITING_CRITERIA["building_eligibility"]["stories"]["max_stories"]["sprinklered"] if has_sprinklers else GENERAL_UNDERWRITING_CRITERIA["building_eligibility"]["stories"]["max_stories"]["default"]
        
        if stories > max_stories:
            eligibility_issues.append(f"Building stories ({stories}) exceeds maximum guideline of {max_stories} stories")
            
        # Check roof type
        construction = submission_data.get("property_details", {}).get("construction", "").lower()
        for ineligible_roof in GENERAL_UNDERWRITING_CRITERIA["building_eligibility"]["roofs"]["ineligible"]:
            if ineligible_roof.lower() in construction:
                eligibility_issues.append(f"Building has ineligible roof type: {ineligible_roof}")
        
        # Modify decision if there are eligibility issues
        if eligibility_issues and decision != "REJECT":
            decision = "RECOMMEND_SURVEYOR" if decision == "PROCEED_TO_QUOTATION" else decision
            reason = f"Eligibility concerns: {'; '.join(eligibility_issues[:2])}"
            confidence = min(confidence, 0.7)  # Lower confidence due to eligibility issues
        
        # Determine if human review is needed
        requires_human_review = (
            confidence < GENERAL_UNDERWRITING_CRITERIA["human_review_triggers"]["confidence_threshold"] or
            decision in GENERAL_UNDERWRITING_CRITERIA["human_review_triggers"]["decision_types"]
        )
        
        # Create structured decision object
        decision_object = {
            "outcome": decision,
            "reason": reason,
            "composite_score": composite_score,
            "confidence": confidence,
            "requires_human_review": requires_human_review,
            "eligibility_issues": eligibility_issues,
            "assessment_summary": {
                "hazard_score": hazard_score,
                "vulnerability_score": vulnerability_score,
                "cat_score": cat_score
            },
            "guidelines_referenced": guidelines[:200] + "..." if len(guidelines) > 200 else guidelines,
            "human_reviewed": False  # Will be updated after human review
        }
        
        return decision_object
    
    def format_notification(self, decision: Dict[str, Any], submission_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        Formats a notification based on the underwriting decision.
        
        Args:
            decision: Dictionary containing the decision outcome and details
            submission_data: Dictionary containing the original submission data
            
        Returns:
            A dictionary with the notification content
        """
        # Get decision outcome
        outcome = decision.get("outcome", "UNKNOWN")
        reason = decision.get("reason", "No reason provided")
        confidence = decision.get("confidence", 0.0)
        assessment_summary = decision.get("assessment_summary", {})
        
        # Basic email templates based on decision outcome
        email_templates = {
            "PROCEED_TO_QUOTATION": f"""
Subject: Submission {submission_data.get('submission_id', 'Unknown')} Approved for Quotation

Dear Distribution Team,

The submission for {submission_data.get('insured_name', 'Unknown')} (ID: {submission_data.get('submission_id', 'Unknown')}) has been reviewed and approved to proceed to quotation.

Risk Assessment Summary:
- Hazard Score: {assessment_summary.get('hazard_score', 0):.1f}/5.0
- Vulnerability Score: {assessment_summary.get('vulnerability_score', 0):.1f}/5.0
- CAT Risk Score: {assessment_summary.get('cat_score', 0):.1f}/5.0

Decision: Proceed to Quotation
Confidence: {confidence:.0%}

Please proceed with the quotation process.

Regards,
Underwriting AI Assistant
""",
            "RECOMMEND_SURVEYOR": f"""
Subject: Submission {submission_data.get('submission_id', 'Unknown')} Requires Surveyor Assessment

Dear Distribution Team,

The submission for {submission_data.get('insured_name', 'Unknown')} (ID: {submission_data.get('submission_id', 'Unknown')}) has been reviewed and requires a surveyor assessment before proceeding.

Risk Assessment Summary:
- Hazard Score: {assessment_summary.get('hazard_score', 0):.1f}/5.0
- Vulnerability Score: {assessment_summary.get('vulnerability_score', 0):.1f}/5.0
- CAT Risk Score: {assessment_summary.get('cat_score', 0):.1f}/5.0

Reason for surveyor recommendation: {reason}
Confidence: {confidence:.0%}

Please arrange for a risk assessment survey.

Regards,
Underwriting AI Assistant
""",
            "REJECT": f"""
Subject: Submission {submission_data.get('submission_id', 'Unknown')} Rejected

Dear Distribution Team,

The submission for {submission_data.get('insured_name', 'Unknown')} (ID: {submission_data.get('submission_id', 'Unknown')}) has been reviewed and cannot proceed.

Risk Assessment Summary:
- Hazard Score: {assessment_summary.get('hazard_score', 0):.1f}/5.0
- Vulnerability Score: {assessment_summary.get('vulnerability_score', 0):.1f}/5.0
- CAT Risk Score: {assessment_summary.get('cat_score', 0):.1f}/5.0

Reason for rejection: {reason}
Confidence: {confidence:.0%}

If you have additional information that might change this assessment, please provide it.

Regards,
Underwriting AI Assistant
"""
        }
        
        # Get template for current decision
        template = email_templates.get(outcome, "Unknown decision type")
        
        # Format notification object
        notification = {
            "recipient": "distribution_team@company.com",
            "content": template,
            "sent": False,
            "timestamp": datetime.now().isoformat()
        }
        
        return notification
    
# Print confirmation that this notebook was loaded
print("Assessment tools loaded successfully.")

Assessment tools loaded successfully.
