<a href="https://www.kaggle.com/code/debbiechu/medical-transcriptions-nlp?scriptVersionId=174770820" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

## Purpose

Classify transcriptions into correct medical specialties (imbalanced multiclass) using **transformer & NNs**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import torch

%matplotlib inline

## Dataset

In [None]:
df = pd.read_csv("mtsamples.csv")
print(df.shape)
df.head()

In [None]:
df.isnull().sum()

In [None]:
# drop rows where transcription is na

df = df.dropna(subset=['transcription'])
df.isnull().sum()

In [None]:
df.description[0]

In [None]:
df.transcription[0]

We will use only the transcription because description info is included in the transcriptions.

In [None]:
df.medical_specialty.head()

In [None]:
# number of specialties
len(df.medical_specialty.unique())

In [None]:
# transform the classes to integers using factorization

df['medical_specialty_code'] = pd.factorize(df['medical_specialty'])[0]
df['medical_specialty_code'].head()

In [None]:
df.medical_specialty_code.dtypes

In [None]:
# check balance in classes
counts = df.medical_specialty.value_counts()

plt.figure(figsize=(20, 10)) 
counts.plot(kind='bar', color='skyblue', edgecolor='black')
plt.title('Medical Specialty Counts')
plt.xlabel('Medical Specialty')
plt.ylabel('Count')
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

Imbalanced. Will treat later.

In [None]:
df=df[['transcription','medical_specialty_code']]

## Transcriptions Classification

### RoBERTa + Oversampling + MLP

Utilize `Roberta` to generate embeddings as features, `randomoversampler` for oversampling the minority classes, and `MLP` for the classification task.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

Generate embeddings

In [None]:
from transformers import RobertaModel, RobertaTokenizer
import torch

# define X and y
X = df['transcription'].tolist()
y = df['medical_specialty_code'].tolist()

tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base').to(device)

# Tokenize X
inputs = tokenizer(X, padding=True, truncation=True, return_tensors="pt", max_length=256)

# generate embeddings in batches
def generate_embeddings(model, inputs, batch_size=16):
    model.eval()
    embeddings = []
    for i in range(0, inputs['input_ids'].size(0), batch_size):
        batch_input_ids = inputs['input_ids'][i:i+batch_size].to(device)
        batch_attention_mask = inputs['attention_mask'][i:i+batch_size].to(device)
        with torch.no_grad():
            batch_outputs = model(batch_input_ids, attention_mask=batch_attention_mask)
            batch_embeddings = batch_outputs.last_hidden_state.mean(dim=1)
            embeddings.append(batch_embeddings.cpu().numpy())
    embeddings = np.concatenate(embeddings, axis=0)
    return embeddings

# Generate embeddings
embeddings = generate_embeddings(model, inputs, batch_size=16)

Oversampling

In [None]:
# apply oversampling
from imblearn.over_sampling import RandomOverSampler
ros = RandomOverSampler(random_state=42)
embeddings_resampled, labels_resampled = ros.fit_resample(embeddings, y)

classification

In [None]:
# train test val split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(embeddings_resampled, labels_resampled, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)

from torch import nn, optim
import torch.nn.functional as F

# MLP classifier
class SimpleClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(SimpleClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize the classifier, loss criterion, and optimizer
hidden_dim = 128
num_classes = 40
mlp_model = SimpleClassifier(input_dim=768, hidden_dim=hidden_dim, num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(mlp_model.parameters(), lr=0.001)

# Prepare data loaders
from torch.utils.data import DataLoader, TensorDataset
batch_size = 8
train_data = TensorDataset(torch.tensor(X_train, dtype=torch.float32).to(device), torch.tensor(y_train, dtype=torch.long).to(device))
val_data = TensorDataset(torch.tensor(X_val, dtype=torch.float32).to(device), torch.tensor(y_val, dtype=torch.long).to(device))
train_loader = DataLoader(train_data, batch_size=batch_size)
val_loader = DataLoader(val_data, batch_size=batch_size)

In [None]:
# Training loop
best_val_loss = float('inf')
best_model_state = None

epochs = 30
for epoch in range(epochs):
    mlp_model.train()
    train_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = mlp_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)

    # Validation for loss
    mlp_model.eval()
    val_loss = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = mlp_model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    val_loss /= len(val_loader)

    print(f'Epoch {epoch+1}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}')

    # Check if it's the lowest validation loss
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = mlp_model.state_dict() # Save the best model

In [None]:
# Save the best model
torch.save(best_model_state, 'simple_classifier.pth')

# Clear memory
del inputs, labels, outputs, train_data, val_data, train_loader, val_loader
torch.cuda.empty_cache()

In [None]:
from sklearn.metrics import classification_report

# initialize the loaded model
loaded_model = SimpleClassifier(input_dim=768, hidden_dim=128, num_classes=40).to(device)

# Load the saved model parameters
loaded_model.load_state_dict(torch.load('simple_classifier.pth'))

# Ensure the model is in evaluation mode
loaded_model.eval()

# Test dataset preparation
test_data = TensorDataset(torch.tensor(X_test, dtype=torch.float32).to(device), torch.tensor(y_test, dtype=torch.long).to(device))
test_loader = DataLoader(test_data, batch_size=8)

# Evaluate on the test set using the loaded model
test_preds, test_labels = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = loaded_model(inputs)
        preds = torch.argmax(outputs, dim=1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(labels.cpu().numpy())

print("Classification Report on Test Set (Using Loaded Model):")
print(classification_report(test_labels, test_preds))
report = classification_report(test_labels, test_preds, output_dict=True)
macro_avg_f1 = report['macro avg']['f1-score']
weighted_avg_f1 = report['weighted avg']['f1-score']
print(f"Macro Average F1 Score: {macro_avg_f1}")
print(f"Weighted Average F1 Score: {weighted_avg_f1}")

### RoBERTa + Oversampling + RF

Generate embeddings

In [None]:
from transformers import RobertaModel, RobertaTokenizer
import torch

# define X and y
X = df['transcription'].tolist()
y = df['medical_specialty_code'].tolist()

tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base').to(device)

# Tokenize X
inputs = tokenizer(X, padding=True, truncation=True, return_tensors="pt", max_length=256)

# generate embeddings in batches
def generate_embeddings(model, inputs, batch_size=16):
    model.eval()
    embeddings = []
    for i in range(0, inputs['input_ids'].size(0), batch_size):
        batch_input_ids = inputs['input_ids'][i:i+batch_size].to(device)
        batch_attention_mask = inputs['attention_mask'][i:i+batch_size].to(device)
        with torch.no_grad():
            batch_outputs = model(batch_input_ids, attention_mask=batch_attention_mask)
            batch_embeddings = batch_outputs.last_hidden_state.mean(dim=1)
            embeddings.append(batch_embeddings.cpu().numpy())
    embeddings = np.concatenate(embeddings, axis=0)
    return embeddings

# Generate embeddings
embeddings = generate_embeddings(model, inputs, batch_size=16)

Oversampling

In [None]:
# apply oversampling
from imblearn.over_sampling import RandomOverSampler
ros = RandomOverSampler(random_state=42)
embeddings_resampled, labels_resampled = ros.fit_resample(embeddings, y)

Classification

In [None]:
from joblib import dump, load

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
import numpy as np
from sklearn.metrics import classification_report

# train test split
X_train, X_test, y_train, y_test = train_test_split(embeddings_resampled, labels_resampled, test_size=0.2, random_state=42)

# RF
rf = RandomForestClassifier(random_state=42)

# Param grid
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

# Initialize and fit GridSearchCV
grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2, scoring='f1_weighted')
grid_search.fit(X_train, y_train)

print("Best parameters found: ", grid_search.best_params_)
print("Best weighted F1 score found: ", grid_search.best_score_)

dump(grid_search, 'grid_search_rf.joblib')

In [None]:
from sklearn.metrics import classification_report

# load the trained gridsearchcv object
grid_search = load('grid_search_rf.joblib')

# Predicting on the test
y_pred = grid_search.predict(X_test)

print("Classification Report on Test Set:")
print(classification_report(y_test, y_pred))
report = classification_report(y_test, y_pred, output_dict=True)
macro_avg_f1 = report['macro avg']['f1-score']
weighted_avg_f1 = report['weighted avg']['f1-score']
print(f"Macro Average F1 Score: {macro_avg_f1}")
print(f"Weighted Average F1 Score: {weighted_avg_f1}")

Slightly better than MLP