In [None]:

import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score, recall_score
from transformers import BertTokenizer, BertModel
import numpy as np
from scipy import stats
import spacy

# Load the dataset
df = pd.read_csv("D:/Downloads/newData.csv")




# Use this code if you want to preprocess the dataset
# load english language model and create nlp object from it
nlp = spacy.load("en_core_web_sm")
'''
def preprocess(text):
    doc = nlp(text)
    filtered_tokens = []
    for token in doc:
        if token.is_stop or token.is_punct:
            continue
        filtered_tokens.append(token.lemma_)
    return " ".join(filtered_tokens)

df['preprocessed_txt'] = df['Report'].apply(preprocess)
'''
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    df['Report'],
    df['BIRADS'],
    test_size=0.2,
    random_state=2022,
    stratify=df['BIRADS']
)

# Hyperparameters
BERT_MODEL_NAME = "bert-base-uncased"
BATCH_SIZE = 16
EPOCHS = 30
MAX_LEN = 128
NUM_CLASSES = 5  # From 1 to 5

# Tokenizer
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME)

# Prepare custom Dataset
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx] - 1  # Convert labels from 1-5 to 0-4 for BERT compatibility

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

# Convert datasets to torch Datasets
train_dataset = TextDataset(X_train.tolist(), y_train.tolist(), tokenizer, MAX_LEN)
test_dataset = TextDataset(X_test.tolist(), y_test.tolist(), tokenizer, MAX_LEN)

# Data loaders
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Define the BERT-based model
class BERTClassifier(nn.Module):
    def __init__(self, bert_model_name, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # Use pooled output
        output = self.dropout(pooled_output)
        return self.fc(output)

# Instantiate the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERTClassifier(bert_model_name=BERT_MODEL_NAME, num_classes=NUM_CLASSES)
model.to(device)

# Loss function and optimizer
class_weights = torch.tensor([1.0, 0.5, 1.5, 2.0, 3.0], dtype=torch.float).to(device)  # Update based on class distribution
loss_fn = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)

# Training the model
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for batch in train_dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss/len(train_dataloader)}")

# Evaluation
model.eval()
predictions, true_labels = [], []

with torch.no_grad():
    for batch in test_dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

# Convert labels back from 0-4 to 1-5
predictions = [p+1 for p in predictions]
true_labels = [t+1 for t in true_labels]

# Classification report
print(classification_report(true_labels, predictions, target_names=[str(i+1) for i in range(NUM_CLASSES)]))

# Bootstrapping to calculate confidence intervals
def bootstrap_confidence_interval(metric_func, true_labels, predictions, num_bootstrap=1000, alpha=0.05):
    bootstrapped_scores = []
    for _ in range(num_bootstrap):
        indices = np.random.randint(0, len(true_labels), len(true_labels))
        if len(np.unique(np.array(true_labels)[indices])) < 2:
            # Skip resamples that don't have at least two classes present
            continue
        score = metric_func(np.array(true_labels)[indices], np.array(predictions)[indices])
        bootstrapped_scores.append(score)
    sorted_scores = np.sort(bootstrapped_scores)
    lower_bound = np.percentile(sorted_scores, 100 * alpha / 2)
    upper_bound = np.percentile(sorted_scores, 100 * (1 - alpha / 2))
    return lower_bound, upper_bound

# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
accuracy_ci = bootstrap_confidence_interval(accuracy_score, true_labels, predictions)
print(f"Accuracy: {accuracy:.4f}, 95% CI: [{accuracy_ci[0]:.4f}, {accuracy_ci[1]:.4f}]")

# Calculate macro-average recall
macro_recall = recall_score(true_labels, predictions, average='macro')
macro_recall_ci = bootstrap_confidence_interval(lambda y_true, y_pred: recall_score(y_true, y_pred, average='macro'), true_labels, predictions)
print(f"Macro Average Recall: {macro_recall:.4f}, 95% CI: [{macro_recall_ci[0]:.4f}, {macro_recall_ci[1]:.4f}]")
