# ==========================================
# Sanvia - 3 - Feature Extraction & Report Generation
# ==========================================

In [None]:
import os
import json
import numpy as np
import pandas as pd
import tensorflow as tf
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import requests
import base64
from io import BytesIO
from PIL import Image
import sys
sys.path.append("/content/drive/MyDrive/models")

from sanvia_layers import (
    EfficientCrossAttention,
    TabularEncoder,
    GatedFusionLayer,
    FocalLoss
)


# ==========================================
# 1. ENVIRONMENT SETUP
# ==========================================


In [None]:
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# API Configuration - Change these to your actual API credentials
OPENAI_API_KEY = ##"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"


# Use OPENAI or GEMINI
VLM_API_PROVIDER = "openai"  # ÿ£Ÿà "gemini"

DATA_DIR = Path('/content/drive/MyDrive/VnDir_Mammo')
OUTPUT_DIR = DATA_DIR / 'sanvia_outputs'
CONFIG_PATH = OUTPUT_DIR / 'config.json'
MODEL_PATH = OUTPUT_DIR / 'best_sanvia_model.h5'
VIEW_MAPPING_PATH = OUTPUT_DIR / 'view_mapping_final.csv'
IMAGE_CACHE_PATH = OUTPUT_DIR / 'image_paths_cache.json'

# ==========================================
# 2. LOAD CONFIGURATION & TRAINED MODEL
# ==========================================

In [None]:
# Load configuration
with open(CONFIG_PATH, 'r') as f:
    CONFIG = json.load(f)

# CRITICAL FIX: Convert string paths back to Path objects after JSON loading
CONFIG['data_dir'] = Path(CONFIG['data_dir'])
CONFIG['images_dir'] = Path(CONFIG['images_dir'])
CONFIG['output_dir'] = Path(CONFIG['output_dir'])
if 'scaler_path' in CONFIG:
    CONFIG['scaler_path'] = Path(CONFIG['scaler_path'])
if 'class_weights_path' in CONFIG:
    CONFIG['class_weights_path'] = Path(CONFIG['class_weights_path'])

# Load view mapping
view_mapping_df = pd.read_csv(VIEW_MAPPING_PATH)

# Load image cache
with open(IMAGE_CACHE_PATH, 'r') as f:
    IMAGE_PATH_CACHE = json.load(f)

print("‚úÖ Configuration and data loaded successfully")
# Load the trained model from notebook 2
def load_trained_model(model_path: Path):
    """Load the complete trained model directly without rebuilding"""
    if not model_path.exists():
        raise FileNotFoundError(f"Model not found at {model_path}")

    # ÿ™ÿ≠ŸÖŸäŸÑ ÿßŸÑŸÖŸàÿØŸÑ ÿßŸÑŸÉÿßŸÖŸÑ ŸÖÿ®ÿßÿ¥ÿ±ÿ©
    sanvia_model = tf.keras.models.load_model(str(model_path))
    print(f"‚úÖ Complete model loaded from: {model_path}")
    print(f"   - Model name: {sanvia_model.name}")
    print(f"   - Number of layers: {len(sanvia_model.layers)}")
    return sanvia_model

# ÿßÿ≥ÿ™ÿÆÿØÿßŸÖ ÿßŸÑŸÖÿ≥ÿßÿ± ÿßŸÑÿµÿ≠Ÿäÿ≠ ÿßŸÑÿ∞Ÿä ÿ≠ŸÅÿ∏ÿ™ ÿ®Ÿá
MODEL_PATH = Path('/content/drive/MyDrive/models/Sanvia.keras')  # ÿ£Ÿà .h5

# ÿ™ÿ≠ŸÖŸäŸÑ ÿßŸÑŸÖŸàÿØŸÑ ŸÖÿ®ÿßÿ¥ÿ±ÿ© ÿ®ÿØŸàŸÜ ÿ•ÿπÿßÿØÿ© ÿ®ŸÜÿßÿ°
sanvia_model = load_trained_model(MODEL_PATH)

print(f"‚úÖ Model loaded successfully!")
print(f"   - Total parameters: {sanvia_model.count_params():,}")

AttributeError: 'str' object has no attribute 'exists'

In [None]:
CONFIG.update({
    'img_size': (768, 768),
})


# ==========================================
# 3. FEATURE EXTRACTION PIPELINE
# ==========================================

In [None]:
def extract_features_from_study(study_id: str, view_df: pd.DataFrame, model_path: Path) -> Dict:
    """
    Extract deep features using a clean approach
    """
    # Load the model FRESH inside this function to avoid state issues
    model = tf.keras.models.load_model(
        str(model_path),
        custom_objects={
            'TabularEncoder': type('TabularEncoder', (tf.keras.layers.Layer,), {}),
            'EfficientCrossAttention': type('EfficientCrossAttention', (tf.keras.layers.Layer,), {}),
            'GatedFusionLayer': type('GatedFusionLayer', (tf.keras.layers.Layer,), {})
        },
        compile=False
    )

    study_data = view_df[view_df['study_id'] == study_id].iloc[0]

    # Load and preprocess images
    image_arrays = []
    for view in ['L_CC', 'L_MLO', 'R_CC', 'R_MLO']:
        img_id = study_data[f'image_id_{view}']
        if pd.isna(img_id) or img_id == 'missing.png':
            img_path = str(CONFIG['images_dir'] / "missing.png")
        else:
            if not img_id.endswith('.png'):
                img_id += '.png'
            img_path = IMAGE_PATH_CACHE.get(img_id, str(CONFIG['images_dir'] / img_id))

        # Load and preprocess
        img = tf.io.read_file(img_path)
        img = tf.io.decode_png(img, channels=CONFIG['num_channels'])
        img = tf.image.convert_image_dtype(img, tf.float32)
        img = tf.image.resize_with_pad(img, CONFIG['img_size'][0], CONFIG['img_size'][1])
        image_arrays.append(img)

    # Prepare tabular data
    tabular = tf.stack([
        float(study_data['age_norm']),
        float(study_data['age_missing_flag'])
    ], axis=0)[tf.newaxis, ...]

    # Create input list (order matters - same as model.input order)
    # Get the input names in order
    input_names = [inp.name.split(':')[0] for inp in model.input]

    # Create input dictionary
    inputs_dict = {
        'L_CC': image_arrays[0][tf.newaxis, ...],
        'L_MLO': image_arrays[1][tf.newaxis, ...],
        'R_CC': image_arrays[2][tf.newaxis, ...],
        'R_MLO': image_arrays[3][tf.newaxis, ...],
        'tabular': tabular
    }

    # Get predictions (this should work now)
    try:
        predictions = model(inputs_dict)
        print("‚úÖ Model prediction successful")
    except Exception as e:
        print(f"‚ùå Model prediction failed: {e}")
        # Try alternative approach
        inputs_list = [
            image_arrays[0][tf.newaxis, ...],
            image_arrays[1][tf.newaxis, ...],
            image_arrays[2][tf.newaxis, ...],
            image_arrays[3][tf.newaxis, ...],
            tabular
        ]
        predictions = model(inputs_list)

    # Create feature extractor
    layer_names = ['efficientnetb4', 'dense_10', 'dense_11']
    outputs = []
    for name in layer_names:
        try:
            layer = model.get_layer(name)
            outputs.append(layer.output)
        except:
            print(f"Warning: Layer {name} not found")
            continue

    if len(outputs) == 3:
        feature_extractor = tf.keras.Model(
            inputs=model.input,
            outputs=outputs
        )

        # Extract features
        features = feature_extractor(inputs_dict)
    else:
        # Fallback - use predictions as features
        features = [predictions[0], predictions[1]]

    return {
        'study_id': study_id,
        'visual_features': features[0].numpy() if hasattr(features[0], 'numpy') else features[0],
        'birads_features': features[1].numpy() if hasattr(features[1], 'numpy') else features[1],
        'density_features': features[2].numpy() if len(features) > 2 else features[1],
        'tabular_data': tabular.numpy(),
        'predictions': {
            'birads': predictions[0].numpy(),
            'density': predictions[1].numpy()
        }
    }

# ==========================================
# 4. EXPLAINABLE AI - GRAD-CAM
# ==========================================

In [None]:
def generate_gradcam_heatmap(model, img_array, layer_name='efficientnetb4'):
    """
    Generate Grad-CAM heatmap for visualization
    """
    # Get the last convolutional layer from EfficientNet
    last_conv_layer = model.get_layer(layer_name)

    # Create a model that maps the input image to the activations of the last conv layer
    # and the output predictions
    grad_model = tf.keras.Model(
        [model.inputs],
        [last_conv_layer.output, model.output]
    )

    # Record gradients
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model([img_array])
        # Use birads prediction
        loss = predictions[0][:, tf.argmax(predictions[0][0])]

    # Extract gradients
    grads = tape.gradient(loss, conv_outputs)

    # Global average pooling of gradients
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # Weighted combination of feature maps
    conv_outputs = conv_outputs[0]
    heatmap = tf.reduce_mean(
        tf.multiply(pooled_grads, conv_outputs),
        axis=-1
    )

    # Normalize heatmap
    heatmap = np.maximum(heatmap, 0)
    heatmap /= np.max(heatmap)

    return heatmap.numpy()

def overlay_heatmap_on_image(img_path, heatmap, alpha=0.4):
    """
    Overlay heatmap on original image for visualization
    """
    # Load original image
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, CONFIG['img_size'])

    # Resize heatmap
    heatmap = cv2.resize(heatmap, CONFIG['img_size'])
    heatmap = np.uint8(255 * heatmap)

    # Apply colormap
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)

    # Overlay
    superimposed_img = heatmap * alpha + img * (1 - alpha)
    superimposed_img = np.uint8(superimposed_img)

    return superimposed_img

# ==========================================
# 5. VISION-LANGUAGE MODEL API INTEGRATION
# ==========================================

In [None]:
def encode_image_to_base64(image_path: str) -> str:
    """Convert image to base64 string for API transmission"""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

def prepare_images_for_vlm(study_id: str, view_df: pd.DataFrame) -> Dict[str, str]:
    """
    Prepare all four views encoded as base64 for API transmission
    """
    study_data = view_df[view_df['study_id'] == study_id].iloc[0]
    encoded_images = {}

    for view in ['L_CC', 'L_MLO', 'R_CC', 'R_MLO']:
        img_id = study_data[f'image_id_{view}']
        if pd.isna(img_id) or img_id == 'missing.png':
            img_path = str(CONFIG['images_dir'] / "missing.png")
        else:
            if not img_id.endswith('.png'):
                img_id += '.png'
            img_path = IMAGE_PATH_CACHE.get(img_id, str(CONFIG['images_dir'] / img_id))

        encoded_images[view] = encode_image_to_base64(img_path)

    return encoded_images

def generate_medical_report_via_api(
    study_id: str,
    view_df: pd.DataFrame,
    features: Dict,
    api_provider: str = VLM_API_PROVIDER
) -> str:
    """
    Generate medical report using Vision-Language Model API
    """
    study_data = view_df[view_df['study_id'] == study_id].iloc[0]

    # Extract predictions
    birads_pred = np.argmax(features['predictions']['birads'][0])
    density_pred = np.argmax(features['predictions']['density'][0])

    # Convert to actual BI-RADS scale (1-5)
    birads_class = birads_pred + 1
    density_map = {0: 'A', 1: 'B', 2: 'C', 3: 'D'}
    density_class = density_map.get(density_pred, 'Unknown')

    # Prepare clinical data text
    clinical_info = f"""
    Patient Age: {int(study_data.get('age', 45)) if not pd.isna(study_data.get('age')) else 'Not specified'}
    Laterality: Left and Right Breast
    Views: CC and MLO
    Predicted BI-RADS Category: {birads_class}
    Predicted Breast Density: Category {density_class}
    """

    # Prepare VLM prompt
    prompt = f"""
    You are a radiologist assistant analyzing mammogram images. Based on the provided mammogram images (CC and MLO views for both left and right breasts) and the following clinical predictions, generate a structured medical report.

    CLINICAL PREDICTIONS:
    {clinical_info}

    Please provide:
    1. Overall impression
    2. Key findings (if any abnormalities detected)
    3. BI-RADS classification justification
    4. Recommendations for follow-up

    Report should be professional, concise, and suitable for clinical documentation.
    """

    # Get encoded images
    encoded_images = prepare_images_for_vlm(study_id, view_df)

    if api_provider == "openai":
        return _call_openai_api(prompt, encoded_images)
    elif api_provider == "gemini":
        return _call_gemini_api(prompt, encoded_images)
    else:
        raise ValueError(f"Unsupported API provider: {api_provider}")

def _call_openai_api(prompt: str, encoded_images: Dict[str, str]) -> str:
    """
    Call OpenAI GPT-4 Vision API
    """
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {OPENAI_API_KEY}"
    }

    # Prepare messages with images
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": prompt
                }
            ]
        }
    ]

    # Add all four views as separate images
    for view_name, encoded_image in encoded_images.items():
        image_content = {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/png;base64,{encoded_image}",
                "detail": "high"
            }
        }
        messages[0]['content'].append(image_content)

    payload = {
        "model": "gpt-4-vision-preview",
        "messages": messages,
        "max_tokens": 1000,
        "temperature": 0.3  # Low temperature for consistent medical reports
    }

    try:
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers=headers,
            json=payload,
            timeout=60
        )
        response.raise_for_status()
        return response.json()['choices'][0]['message']['content']
    except Exception as e:
        print(f"‚ùå API Error: {e}")
        return f"Error generating report: {str(e)}"

def _call_gemini_api(prompt: str, encoded_images: Dict[str, str]) -> str:
    """
    Call Google Gemini Vision API
    """
    headers = {
        "Content-Type": "application/json"
    }

    # Prepare parts with images
    parts = [{"text": prompt}]
    for view_name, encoded_image in encoded_images.items():
        parts.append({
            "inline_data": {
                "mime_type": "image/png",
                "data": encoded_image
            }
        })

    payload = {
        "contents": [{
            "parts": parts
        }],
        "generation_config": {
            "maxOutputTokens": 1000,
            "temperature": 0.3
        }
    }

    try:
        response = requests.post(
            f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro-vision:generateContent?key={GEMINI_API_KEY}",
            headers=headers,
            json=payload,
            timeout=60
        )
        response.raise_for_status()
        return response.json()['candidates'][0]['content']['parts'][0]['text']
    except Exception as e:
        print(f"‚ùå API Error: {e}")
        return f"Error generating report: {str(e)}"


# ==========================================
# 6. COMPREHENSIVE REPORT GENERATION
# ==========================================

In [None]:
def generate_comprehensive_report(
    study_id: str,
    view_df: pd.DataFrame,
    model,
    include_xai: bool = True,
    save_report: bool = True
) -> Dict:
    """
    Generate complete medical report with features, predictions, XAI visualizations, and VLM report
    """
    print(f"\n" + "="*60)
    print(f"Generating Report for Study: {study_id}")
    print("="*60)

    # Step 1: Extract features
    print("üìä Step 1: Extracting deep features...")
    features = extract_features_from_study(study_id, view_df, model)

    # Step 2: Generate XAI visualizations
    xai_images = {}
    if include_xai:
        print("üî• Step 2: Generating XAI heatmaps...")
        study_data = view_df[view_df['study_id'] == study_id].iloc[0]
        for view in ['L_CC', 'L_MLO', 'R_CC', 'R_MLO']:
            img_id = study_data[f'image_id_{view}']
            if pd.isna(img_id) or img_id == 'missing.png':
                continue

            if not img_id.endswith('.png'):
                img_id += '.png'
            img_path = IMAGE_PATH_CACHE.get(img_id, str(CONFIG['images_dir'] / img_id))

            # Generate heatmap
            heatmap = generate_gradcam_heatmap(
                model,
                [img[tf.newaxis, ...] for img in [tf.image.resize_with_pad(
                    tf.image.convert_image_dtype(
                        tf.io.decode_png(tf.io.read_file(img_path), channels=3),
                        tf.float32
                    ),
                    CONFIG['img_size'][0], CONFIG['img_size'][1]
                ) for _ in range(4)]][0],  # Simplified for brevity
                'efficientnetb4'
            )

            # Overlay on image
            overlay_img = overlay_heatmap_on_image(img_path, heatmap)
            xai_images[view] = overlay_img

    # Step 3: Generate VLM report
    print("ü§ñ Step 3: Generating AI report via VLM API...")
    vlm_report = generate_medical_report_via_api(study_id, view_df, features)

    # Step 4: Compile comprehensive report
    print("üìÑ Step 4: Compiling final report...")

    # Extract predictions
    birads_pred = np.argmax(features['predictions']['birads'][0]) + 1
    density_pred = np.argmax(features['predictions']['density'][0])
    density_map = {0: 'A', 1: 'B', 2: 'C', 3: 'D'}

    report = {
        'study_id': study_id,
        'timestamp': datetime.now().isoformat(),
        'patient_info': {
            'age': study_data.get('age', 'Not specified'),
        },
        'clinical_predictions': {
            'bi_rads_category': birads_pred,
            'breast_density': density_map.get(density_pred, 'Unknown'),
            'confidence_scores': {
                'bi_rads': float(np.max(features['predictions']['birads'][0])),
                'density': float(np.max(features['predictions']['density'][0]))
            }
        },
        'xai_visualizations': xai_images if include_xai else None,
        'ai_generated_report': vlm_report,
        'feature_vectors': {
            'visual_features_shape': features['visual_features'].shape,
            'birads_features_shape': features['birads_features'].shape,
            'density_features_shape': features['density_features'].shape
        }
    }

    # Step 5: Save report
    if save_report:
        save_report_to_file(report)

    print("‚úÖ Report generation completed successfully!")
    return report

def save_report_to_file(report: Dict, format: str = 'pdf'):
    """
    Save the comprehensive report to file (Word/PDF)
    """
    import matplotlib.pyplot as plt
    from matplotlib.backends.backend_pdf import PdfPages

    report_dir = OUTPUT_DIR / 'generated_reports'
    report_dir.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    report_path = report_dir / f"report_{report['study_id']}_{timestamp}.pdf"

    with PdfPages(report_path) as pdf:
        # Page 1: Summary
        fig, ax = plt.subplots(figsize=(8.5, 11))
        ax.axis('off')

        report_text = f"""
        MAMMOGRAPHY AI REPORT
        =====================

        Study ID: {report['study_id']}
        Generated: {report['timestamp']}

        PATIENT INFORMATION:
        - Age: {report['patient_info']['age']}

        AI PREDICTIONS:
        - BI-RADS Category: {report['clinical_predictions']['bi_rads_category']}
        - Breast Density: Category {report['clinical_predictions']['breast_density']}
        - BI-RADS Confidence: {report['clinical_predictions']['confidence_scores']['bi_rads']:.2%}
        - Density Confidence: {report['clinical_predictions']['confidence_scores']['density']:.2%}

        AI GENERATED REPORT:
        --------------------
        {report['ai_generated_report']}
        """

        ax.text(0.05, 0.95, report_text, transform=ax.transAxes,
                fontsize=11, verticalalignment='top', fontfamily='monospace')

        plt.tight_layout()
        pdf.savefig(fig)
        plt.close()

        # Pages 2-3: XAI Visualizations
        if report['xai_visualizations']:
            for view, overlay_img in report['xai_visualizations'].items():
                fig, ax = plt.subplots(figsize=(8.5, 11))
                ax.imshow(overlay_img)
                ax.set_title(f'XAI Heatmap - {view} View', fontsize=14, pad=20)
                ax.axis('off')
                plt.tight_layout()
                pdf.savefig(fig)
                plt.close()

    print(f"üìÑ Report saved to: {report_path}")
    return report_path

# ==========================================
# 7. BATCH REPORT GENERATION
# ==========================================

In [None]:
def generate_reports_for_studies(
    study_ids: List[str],
    view_df: pd.DataFrame,
    model,
    max_reports: int = 10
) -> List[Dict]:
    """
    Generate reports for multiple studies
    """
    reports = []

    for i, study_id in enumerate(study_ids[:max_reports]):
        print(f"\nProcessing study {i+1}/{len(study_ids[:max_reports])}")
        try:
            report = generate_comprehensive_report(study_id, view_df, model)
            reports.append(report)
        except Exception as e:
            print(f"‚ùå Error processing study {study_id}: {e}")
            continue

    return reports

# ==========================================
# 8. MAIN EXECUTION EXAMPLE
# ==========================================

In [None]:
sanvia.input

NameError: name 'sanvia' is not defined

In [None]:
# ==========================================
# 8. MAIN EXECUTION EXAMPLE
# ==========================================

if __name__ == "__main__":
    # Test on a few studies from validation set
    test_studies = view_mapping_df[view_mapping_df['split_final'] == 'val']['study_id'].tolist()[:3]

    print(f"Testing report generation on {len(test_studies)} studies...")

    # Generate reports - pass MODEL_PATH instead of model object
    generated_reports = []
    for i, study_id in enumerate(test_studies):
        print(f"\nProcessing study {i+1}/{len(test_studies)}")
        try:
            report = generate_comprehensive_report(study_id, view_mapping_df, MODEL_PATH)
            generated_reports.append(report)
        except Exception as e:
            print(f"‚ùå Error: {e}")
            continue

    # Summary
    print("\n" + "="*60)
    print("REPORT GENERATION SUMMARY")
    print("="*60)
    print(f"‚úÖ Successfully generated {len(generated_reports)} reports")
    print(f"üìÅ Reports saved in: {OUTPUT_DIR / 'generated_reports'}")

Testing report generation on 3 studies...

Processing study 1/3

Generating Report for Study: 00568c6b2c47f99e0156c9bca84c3963
üìä Step 1: Extracting deep features...
‚úÖ Model prediction successful
‚ùå Error: "Exception encountered when calling Functional.call().\n\n\x1b[1m138492903705824\x1b[0m\n\nArguments received by Functional.call():\n  ‚Ä¢ inputs={'L_CC': 'tf.Tensor(shape=(1, 768, 768, 3), dtype=float32)', 'L_MLO': 'tf.Tensor(shape=(1, 768, 768, 3), dtype=float32)', 'R_CC': 'tf.Tensor(shape=(1, 768, 768, 3), dtype=float32)', 'R_MLO': 'tf.Tensor(shape=(1, 768, 768, 3), dtype=float32)', 'tabular': 'tf.Tensor(shape=(1, 2), dtype=float32)'}\n  ‚Ä¢ training=None\n  ‚Ä¢ mask={'L_CC': 'None', 'L_MLO': 'None', 'R_CC': 'None', 'R_MLO': 'None', 'tabular': 'None'}\n  ‚Ä¢ kwargs=<class 'inspect._empty'>"

Processing study 2/3

Generating Report for Study: 00dfcde5aaf6cd0aab3c3a0435632b3f
üìä Step 1: Extracting deep features...
‚úÖ Model prediction successful
‚ùå Error: "Exception encounte