In [None]:
# NER
# severity:
# absence:
# presence: 
# anatomy: 
# observation
# anatomical
# relations

In [None]:
# https://stanfordmlgroup.github.io/competitions/chexpert/
# https://huggingface.co/Angelakeke/RaTE-NER-Deberta
# https://huggingface.co/emilyalsentzer/Bio_ClinicalBERT

In [4]:
import sparknlp
spark = sparknlp.start()

:: loading settings :: url = jar:file:/Users/JanayeCheong/Documents/DATATHON/nusdatathon2024/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/JanayeCheong/.ivy2/cache
The jars for the packages stored in: /Users/JanayeCheong/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b6ecee71-9670-4781-95fe-b8ec145b38a1;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.5.2 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12

In [5]:
# Import the required modules and classes
from sparknlp.base import DocumentAssembler, Pipeline
from sparknlp.annotator import (
    Tokenizer,
    SentenceDetector,
    BertEmbeddings
)

In [None]:
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
import pandas as pd

class MedicalNERPipeline:
    def __init__(self, model_name="emilyalsentzer/Bio_ClinicalBERT"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForTokenClassification.from_pretrained(model_name)
        self.nlp = pipeline(
            "ner",
            model=self.model,
            tokenizer=self.tokenizer,
            aggregation_strategy="simple"
        )
    
    def process_text(self, text):
        """Process a single text through the NER pipeline"""
        entities = self.nlp(text)
        return entities

    def process_batch(self, texts):
        """Process a batch of texts"""
        results = []
        for text in texts:
            entities = self.process_text(text)
            results.append(entities)
        return results

def process_radiology_reports(reports):
    """
    Process a collection of radiology reports using Bio_ClinicalBERT
    """
    # Initialize pipeline
    ner_pipeline = MedicalNERPipeline()
    
    # Process reports
    results = ner_pipeline.process_batch(reports)
    
    # Extract findings and organize results
    organized_results = []
    for report, entities in zip(reports, results):
        findings = [
            {
                'text': entity['word'],
                'label': entity['entity_group'],
                'score': entity['score'],
                'start': entity['start'],
                'end': entity['end']
            }
            for entity in entities
        ]
        organized_results.append({
            'report': report,
            'findings': findings,
        })
    
    return organized_results

if __name__ == "__main__":
    reports = [
        """Chest radiograph demonstrates bilateral lower lobe infiltrates 
        with small pleural effusions. No pneumothorax identified.""",
        """Heart size is normal. Lungs are clear without focal consolidation. 
        No pleural effusion or pneumothorax."""
    ]
    
    # Process reports
    results = process_radiology_reports(reports)
    
    # Print findings
    for result in results:
        print("\nReport:", result['report'][:50], "...")
        print("Findings:")
        for finding in result['findings']:
            print(f"- {finding['text']} ({finding['label']}: {finding['score']:.3f})")

In [None]:
import os
import logging
from typing import Optional, Tuple

import sparknlp
from sparknlp.base import DocumentAssembler, Pipeline
from sparknlp.annotator import (
    BertEmbeddings,
    Tokenizer,
    SentenceDetector,
    NerDLApproach,
    NerDLModel
)
from sparknlp.training import CoNLL
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

class ClinicalNERPipeline:
    """
    A pipeline for training and using NER models on clinical text using Spark NLP.
    """
    
    def __init__(
        self,
        model_name: str = "biobert_pubmed_base_cased",
        max_epochs: int = 10,
        learning_rate: float = 0.003,
        batch_size: int = 32,
        validation_split: float = 0.2
    ):
        """
        Initialize the NER pipeline with configurable parameters.
        
        Args:
            model_name: Name of the pre-trained BERT model to use
            max_epochs: Number of training epochs
            learning_rate: Learning rate for training
            batch_size: Batch size for training
            validation_split: Fraction of data to use for validation
        """
        self.spark = self._initialize_spark()
        self.model_name = model_name
        self.max_epochs = max_epochs
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.validation_split = validation_split
        self.pipeline = None
        self.trained_model = None
        
    def _initialize_spark(self) -> SparkSession:
        """Initialize and return a Spark session with Spark NLP."""
        logging.info("Initializing Spark session with Spark NLP...")
        return sparknlp.start()
    
    def _create_pipeline(self) -> Pipeline:
        """Create and return the NER pipeline with all necessary stages."""
        logging.info(f"Creating pipeline with {self.model_name}...")
        
        # Document assembly
        document_assembler = DocumentAssembler()\
            .setInputCol("text")\
            .setOutputCol("document")

        # Sentence detection
        sentence_detector = SentenceDetector()\
            .setInputCols(["document"])\
            .setOutputCol("sentence")

        # Tokenization
        tokenizer = Tokenizer()\
            .setInputCols(["sentence"])\
            .setOutputCol("token")

        # BERT embeddings
        embeddings = BertEmbeddings.pretrained(self.model_name)\
            .setInputCols(["sentence", "token"])\
            .setOutputCol("embeddings")

        # NER model
        ner_model = NerDLApproach()\
            .setInputCols(["sentence", "token", "embeddings"])\
            .setOutputCol("ner")\
            .setLabelColumn("label")\
            .setMaxEpochs(self.max_epochs)\
            .setLr(self.learning_rate)\
            .setBatchSize(self.batch_size)\
            .setValidationSplit(self.validation_split)\
            .setEvaluationLogExtended(True)\
            .setIncludeConfidence(True)\
            .setEnableOutputLogs(True)\
            .setGraphFolder("./ner_graphs")

        return Pipeline(stages=[
            document_assembler,
            sentence_detector,
            tokenizer,
            embeddings,
            ner_model
        ])
    
    def load_data(
        self,
        data_path: str,
        format: str = "text",
        limit: Optional[int] = None
    ) -> DataFrame:
        """
        Load data from a file into a Spark DataFrame.
        
        Args:
            data_path: Path to the data file
            format: Format of the data ('text' or 'conll')
            limit: Optional limit on number of rows to load
            
        Returns:
            Loaded DataFrame
        """
        logging.info(f"Loading data from {data_path}...")
        
        if format == "text":
            df = self.spark.read.text(data_path)
        elif format == "conll":
            df = CoNLL().readDataset(self.spark, data_path)
        else:
            raise ValueError(f"Unsupported format: {format}")
            
        if limit:
            df = df.limit(limit)
            
        return df
    
    def train(self, training_data: DataFrame) -> None:
        """
        Train the NER model on the provided training data.
        
        Args:
            training_data: DataFrame containing training data
        """
        logging.info("Creating and training NER pipeline...")
        self.pipeline = self._create_pipeline()
        self.trained_model = self.pipeline.fit(training_data)
        logging.info("Training complete!")
    
    def predict(self, text_data: DataFrame) -> DataFrame:
        """
        Make predictions on new text data.
        
        Args:
            text_data: DataFrame containing text to analyze
            
        Returns:
            DataFrame with predictions
        """
        if not self.trained_model:
            raise RuntimeError("Model must be trained before making predictions")
            
        logging.info("Making predictions...")
        predictions = self.trained_model.transform(text_data)
        
        # Extract and format predictions
        result = predictions.select(
            F.explode(F.arrays_zip(
                predictions.token.result,
                predictions.ner.result
            )).alias("cols")
        ).select(
            F.expr("cols['0']").alias("token"),
            F.expr("cols['1']").alias("prediction")
        )
        
        return result
    
    def save_model(self, path: str) -> None:
        """
        Save the trained model to disk.
        
        Args:
            path: Path where the model should be saved
        """
        if not self.trained_model:
            raise RuntimeError("No trained model to save")
            
        logging.info(f"Saving model to {path}...")
        self.trained_model.write().overwrite().save(path)
    
    def load_model(self, path: str) -> None:
        """
        Load a previously saved model.
        
        Args:
            path: Path to the saved model
        """
        logging.info(f"Loading model from {path}...")
        self.trained_model = NerDLModel.load(path)\
            .setInputCols(["sentence", "token", "embeddings"])\
            .setOutputCol("ner")

def main():
    """Example usage of the ClinicalNERPipeline."""
    # Initialize pipeline
    ner_pipeline = ClinicalNERPipeline(
        model_name="biobert_pubmed_base_cased",
        max_epochs=10,
        batch_size=32
    )
    
    # Load training data
    training_data = ner_pipeline.load_data(
        "path_to_labeled_dataset.conll",
        format="conll",
        limit=5000
    )
    
    # Train model
    ner_pipeline.train(training_data)
    
    # Load MIMIC-CXR reports
    reports = ner_pipeline.load_data(
        "path_to_mimic_cxr_reports.txt",
        format="text"
    )
    
    # Make predictions
    predictions = ner_pipeline.predict(reports)
    predictions.show(30, truncate=False)
    
    # Save model
    ner_pipeline.save_model("ner_mimic_model")

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    main()