In [1]:
import torch
import pandas as pd
from torch.nn import Module
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments

2024-05-10 21:32:29.721907: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-10 21:32:29.721966: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-10 21:32:29.723908: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-05-10 21:32:29.733336: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:
class FocalLoss(Module):
    def __init__(self, alpha=0.5, gamma=2.0, logits=True, reduce=True):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.logits = logits
        self.reduce = reduce

    def forward(self, inputs, targets):
        if self.logits:
            BCE_loss = torch.nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        else:
            BCE_loss = torch.nn.functional.binary_cross_entropy(inputs, targets, reduction='none')
        
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss

        if self.reduce:
            return torch.mean(F_loss)
        else:
            return F_loss

# Custom Dataset Class
class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

# Preprocessing Data
def preprocess_data(csv_file):
    data = pd.read_csv(csv_file)
    text_data = []
    for file_path in data['filename']:
        file_path = file_path.replace('\\', '/')
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                text_data.append(text)
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                text = file.read()
                text_data.append(text)
        except FileNotFoundError:
            print(f"File not found: {file_path}")

    data['text'] = text_data
    seizure_types = ['bckg', 'cpsz', 'gnsz', 'fnsz', 'absz', 'tnsz', 'tcsz', 'spsz', 'mysz']
    data = data.dropna(subset=['text'])
    data[seizure_types] = data[seizure_types].applymap(lambda x: 1 if x > 0 else 0)
    return data, seizure_types

def split_data(data):
    return train_test_split(data, test_size=0.2, random_state=42)

# Compute Metrics for Evaluation
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = (logits >= 0).astype(int)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='micro')
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

# Training the Model with Custom Trainer
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = FocalLoss()
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.float().view(-1, self.model.config.num_labels))
        return (loss, outputs) if return_outputs else loss

def fine_tune_model(train_data, val_data, seizure_types):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    train_encodings = tokenizer(list(train_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    val_encodings = tokenizer(list(val_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    
    train_dataset = CustomDataset(train_encodings, train_data[seizure_types].values)
    val_dataset = CustomDataset(val_encodings, val_data[seizure_types].values)

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",  # Evaluate each epoch
        save_strategy="epoch"  # Save model each epoch
    )

    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(seizure_types))

    trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics
    )

    # Train the model
    trainer.train()

    return model

# Example usage:
if __name__ == "__main__":
    data, seizure_types = preprocess_data("seizure_counts.csv")
    train_data, val_data = split_data(data)
    fine_tune_model(train_data, val_data, seizure_types)


In [2]:
import torch
import pandas as pd
from torch.nn import Module
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
import torch.nn as nn
import math

class FocalLoss(Module):
    def __init__(self, alpha=0.5, gamma=2.0, logits=True, reduce=True):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.logits = logits
        self.reduce = reduce

    def forward(self, inputs, targets):
        if self.logits:
            BCE_loss = torch.nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
        else:
            BCE_loss = torch.nn.functional.binary_cross_entropy(inputs, targets, reduction='none')
        
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss

        if self.reduce:
            return torch.mean(F_loss)
        else:
            return F_loss

# Custom Dataset Class
class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

# Preprocessing Data
def preprocess_data(csv_file):
    data = pd.read_csv(csv_file)
    text_data = []
    for file_path in data['filename']:
        file_path = file_path.replace('\\', '/')
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                text_data.append(text)
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                text = file.read()
                text_data.append(text)
        except FileNotFoundError:
            print(f"File not found: {file_path}")

    data['text'] = text_data
    seizure_types = ['bckg', 'cpsz', 'gnsz', 'fnsz', 'absz', 'tnsz', 'tcsz', 'spsz', 'mysz']
    data = data.dropna(subset=['text'])
    data[seizure_types] = data[seizure_types].applymap(lambda x: 1 if x > 0 else 0)
    return data, seizure_types

def split_data(data):
    return train_test_split(data, test_size=0.2, random_state=42)

# Compute Metrics for Evaluation
from sklearn.metrics import confusion_matrix

# Compute Metrics for Evaluation
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = (logits >= 0).astype(int)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='micro')
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

# Training the Model with Custom Trainer
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = FocalLoss()
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.float().view(-1, self.model.config.num_labels))
        return (loss, outputs) if return_outputs else loss

def fine_tune_model(train_data, val_data, seizure_types):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    train_encodings = tokenizer(list(train_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    val_encodings = tokenizer(list(val_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    
    train_dataset = CustomDataset(train_encodings, train_data[seizure_types].values)
    print(train_dataset)
    val_dataset = CustomDataset(val_encodings, val_data[seizure_types].values)

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=2,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",  # Evaluate each epoch
        save_strategy="epoch"  # Save model each epoch
    )

    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(seizure_types))

    trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics
    )

    # Train the model
    trainer.train()
    predictions = trainer.predict(val_dataset)[0]  # Get only the predictions

#     # Print one example from the validation dataset along with its predicted label
#     idx = 3  # Change this index to print a different example
#     y_pred = torch.sigmoid(torch.tensor(predictions[idx])).detach().cpu().numpy()
#     y_actual = val_data[seizure_types].values[idx]
#     print("Predicted Label:", y_pred)
#     print("Actual Label:", y_actual)

    return model

# Transformer Encoder model
# class Embeddings(nn.Module):
#     def __init__(self, d_model, vocab_size):
#         super(Embeddings, self).__init__()
#         self.emb = nn.Embedding(vocab_size, d_model)
#         self.d_model = d_model

#     def forward(self, x):
#         return self.emb(x) * math.sqrt(self.d_model)

# class PositionalEncoding(nn.Module):
#     def __init__(self, d_model, vocab_size=5000, dropout=0.1):
#         super().__init__()
#         self.dropout = nn.Dropout(p=dropout)

#         pe = torch.zeros(vocab_size, d_model)
#         position = torch.arange(0, vocab_size, dtype=torch.float).unsqueeze(1)
#         div_term = torch.exp(
#             torch.arange(0, d_model, 2).float()
#             * (-math.log(10000.0) / d_model)
#         )

#         pe[:, 0::2] = torch.sin(position * div_term)
#         pe[:, 1::2] = torch.cos(position * div_term)
#         pe = pe.unsqueeze(0)
#         self.register_buffer("pe", pe)

#     def forward(self, x):
#         x = x + self.pe[:, : x.size(1), :]
#         return self.dropout(x)

# class SingleHeadAttention(nn.Module):
#     def __init__(self, d_model, d_head_size):
#         super().__init__()
#         self.lin_key = nn.Linear(d_model, d_head_size, bias=False)
#         self.lin_query = nn.Linear(d_model, d_head_size, bias=False)
#         self.lin_value = nn.Linear(d_model, d_head_size, bias=False)
#         self.d_model = d_model

#     def forward(self, x):
#         query = self.lin_query(x)
#         key = self.lin_key(x)
#         value = self.lin_value(x)

#         scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_model)
#         p_attn = scores.softmax(dim=-1)
#         x = torch.matmul(p_attn, value)

#         return x

# class MultiHeadAttention(nn.Module):
#     def __init__(self, h, d_model, dropout=0.1):
#         super().__init__()
#         assert d_model % h == 0
#         d_k = d_model // h
#         self.multi_head = nn.ModuleList([SingleHeadAttention(d_model, d_k) for _ in range(h)])
#         self.lin_agg = nn.Linear(d_model, d_model)

#     def forward(self, x):
#         x = torch.cat([head(x) for head in self.multi_head], dim=-1)
#         return self.lin_agg(x)

# class LayerNorm(nn.Module):
#     def __init__(self, d_model, eps=1e-6):
#         super(LayerNorm, self).__init__()
#         self.a_2 = nn.Parameter(torch.ones(d_model))
#         self.b_2 = nn.Parameter(torch.zeros(d_model))
#         self.eps = eps

#     def forward(self, x):
#         mean = x.mean(-1, keepdim=True)
#         std = x.std(-1, keepdim=True)
#         return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

# class ResidualConnection(nn.Module):
#     def __init__(self, d_model, dropout=0.1):
#         super().__init__()
#         self.norm = LayerNorm(d_model)
#         self.dropout = nn.Dropout(dropout)

#     def forward(self, x1, x2):
#         return self.dropout(self.norm(x1 + x2))

# class FeedForward(nn.Module):
#     def __init__(self, d_model, d_ff, dropout=0.1):
#         super().__init__()
#         self.w_1 = nn.Linear(d_model, d_ff)
#         self.w_2 = nn.Linear(d_ff, d_model)
#         self.dropout = nn.Dropout(dropout)

#     def forward(self, x):
#         return self.w_2(self.dropout(self.w_1(x).relu()))

# class SingleEncoder(nn.Module):
#     def __init__(self, d_model, self_attn, feed_forward, dropout):
#         super().__init__()
#         self.self_attn = self_attn
#         self.feed_forward = feed_forward
#         self.res_1 = ResidualConnection(d_model, dropout)
#         self.res_2 = ResidualConnection(d_model, dropout)

#         self.d_model = d_model

#     def forward(self, x):
#         x_attn = self.self_attn(x)
#         x_res_1 = self.res_1(x, x_attn)
#         x_ff = self.feed_forward(x_res_1)
#         x_res_2 = self.res_2(x_res_1, x_ff)

#         return x_res_2

# class EncoderBlocks(nn.Module):
#     def __init__(self, layer, N):
#         super().__init__()
#         self.layers = nn.ModuleList([layer for _ in range(N)])
#         self.norm = LayerNorm(layer.d_model)

#     def forward(self, x):
#         for layer in self.layers:
#             x = layer(x)
#         return self.norm(x)

# class TransformerEncoderModel(nn.Module):
#     def __init__(self, vocab_size, d_model, nhead, d_ff, N,
#                 dropout=0.1):
#         super().__init__()
#         assert d_model % nhead == 0, "nheads must divide evenly into d_model"

#         self.emb = Embeddings(d_model, vocab_size)
#         self.pos_encoder = PositionalEncoding(d_model=d_model, vocab_size=vocab_size)

#         attn = MultiHeadAttention(nhead, d_model)
#         ff = FeedForward(d_model, d_ff, dropout)
#         self.transformer_encoder = EncoderBlocks(SingleEncoder(d_model, attn, ff, dropout), N)
#         self.classifier = nn.Linear(d_model, 2)
#         self.d_model = d_model

#     def forward(self, x):
#         x = self.emb(x) * math.sqrt(self.d_model)
#         x = self.pos_encoder(x)
#         x = self.transformer_encoder(x)
#         x = x.mean(dim=1)
#         x = self.classifier(x)
#         return x

# Main execution
if __name__ == "__main__":
    data, seizure_types = preprocess_data("seizure_counts.csv")
    train_data, val_data = split_data(data)
    fine_tune_model(train_data, val_data, seizure_types)


  data[seizure_types] = data[seizure_types].applymap(lambda x: 1 if x > 0 else 0)


<__main__.CustomDataset object at 0x75ae44d49a80>


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
[34m[1mwandb[0m: Currently logged in as: [33mprachi-parakh[0m ([33mperks[0m). Use [1m`wandb login --relogin`[0m to force relogin


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.0207,0.021862,0.642105,1.0,0.703704,0.826087
2,0.0137,0.019109,0.642105,0.996516,0.706173,0.82659


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


Predicted Label: [0.86922014 0.13121882 0.20300347 0.29096717 0.14290977 0.1340278
 0.15388453 0.1441897  0.12161492]
Actual Label: [1 0 0 1 0 0 0 0 0]


In [None]:
    data, seizure_types = preprocess_data("seizure_counts.csv")
    train_data, val_data = split_data(data)

In [None]:
train_data[seizure_types].values[40]

In [None]:
import os

def remove_seizure_words_in_directory(input_directory, output_directory, keyword="seizure"):
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
    
    # List all files in the input directory
    files = os.listdir(input_directory)
    
    # Iterate over each file
    for file_name in files:
        input_file_path = os.path.join(input_directory, file_name)
        output_file_path = os.path.join(output_directory, file_name)
        
        if os.path.isfile(input_file_path) and file_name.endswith('.txt'):
            # Read the content of the input file
            with open(input_file_path, 'r', encoding='utf-8') as input_file:
                try:
                    content = input_file.read()
                except UnicodeDecodeError:
                    # If utf-8 decoding fails, try decoding with ISO-8859-1
                    with open(input_file_path, 'r', encoding='ISO-8859-1') as alt_input_file:
                        content = alt_input_file.read()
            
            # Remove occurrences of the keyword
            cleaned_content = content.replace(keyword, "")
            
            # Write the cleaned content to the output file in the new directory
            with open(output_file_path, 'w', encoding='utf-8') as output_file:
                output_file.write(cleaned_content)

# Example usage
input_directory = "brain_old"
output_directory = "brain"

remove_seizure_words_in_directory(input_directory, output_directory)


In [None]:
import torch
import pandas as pd
from torch.nn import Module
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments

# Custom Dataset Class
class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

# Preprocessing Data
def preprocess_data_with_augmentation(csv_file):
    data = pd.read_csv(csv_file)
    text_data = []
    for file_path in data['filename']:
        file_path = file_path.replace('\\', '/')
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                # Augment text
                augmented_text = augment_text(text)
                text_data.append(augmented_text)
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                text = file.read()
                # Augment text
                augmented_text = augment_text(text)
                text_data.append(augmented_text)
        except FileNotFoundError:
            print(f"File not found: {file_path}")

    data['text'] = text_data
    seizure_types = ['bckg', 'cpsz', 'gnsz', 'fnsz', 'absz', 'tnsz', 'tcsz', 'spsz', 'mysz']
    data = data.dropna(subset=['text'])
#     data[seizure_types] = data[seizure_types].applymap(lambda x: 1 if x > 0 else 0)
    data[seizure_types] = data[seizure_types]
    return data, seizure_types

def split_data(data):
    return train_test_split(data, test_size=0.2, random_state=42)

# Compute Metrics for Evaluation
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = (logits >= 0).astype(int)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='micro')
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

# Training the Model with Custom Trainer
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = torch.nn.functional.binary_cross_entropy_with_logits
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.float().view(-1, self.model.config.num_labels))
        return (loss, outputs) if return_outputs else loss

def fine_tune_model(train_data, val_data, seizure_types):
    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
    train_encodings = tokenizer(list(train_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    val_encodings = tokenizer(list(val_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    
    train_dataset = CustomDataset(train_encodings, train_data[seizure_types].values)
    val_dataset = CustomDataset(val_encodings, val_data[seizure_types].values)

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",  # Evaluate each epoch
        save_strategy="epoch"  # Save model each epoch
    )

    model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=len(seizure_types))

    trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics
    )

    # Train the model
    trainer.train()

    return model

# Example usage:
if __name__ == "__main__":
    data, seizure_types = preprocess_data("seizure_counts.csv")
    train_data, val_data = split_data(data)
    fine_tune_model(train_data, val_data, seizure_types)


In [None]:
!pip install transformers torch torchvision
!pip install accelerate>=0.21.0


In [None]:
# data = pd.read_csv('seizure_counts.csv')
# count_columns = data.columns[1:]
# data[count_columns] = np.log1p(data[count_columns])
# scaler = StandardScaler()
# data[count_columns] = scaler.fit_transform(data[count_columns])

# def load_text(filename):
#     corrected_filename = filename.replace('\\', os.sep)
#     file_path = os.path.join(os.getcwd(), corrected_filename)  # Replace backslash with forward slash for file path
#     try:
#         # Try reading with UTF-8 encoding
#         with open(file_path, 'r', encoding='utf-8') as file:
#             return file.read()
#     except UnicodeDecodeError:
#         # If UTF-8 fails, try a different encoding such as ISO-8859-1
#         with open(file_path, 'r', encoding='ISO-8859-1') as file:
#             return file.read()
#     except FileNotFoundError:
#         print(f"File not found: {file_path}")
#         return ""
    
# data['text'] = data['filename'].apply(load_text)
# train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)


In [None]:
class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

def preprocess_data(csv_file):
    data = pd.read_csv(csv_file)
    text_data = []
    for file_path in data['filename']:
        file_path = file_path.replace('\\', '/')
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                text_data.append(text)
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                text = file.read()
                text_data.append(text)
        except FileNotFoundError:
            print(f"File not found: {file_path}")

    data['text'] = text_data
    seizure_types = ['bckg', 'cpsz', 'gnsz', 'fnsz', 'absz', 'tnsz', 'tcsz', 'spsz', 'mysz']
    for seizure_type in seizure_types:
        data[seizure_type] = data[seizure_type].apply(lambda x: 1 if x > 0 else 0)
    return data, seizure_types

def split_data(data):
    return train_test_split(data, test_size=0.2, random_state=42)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = (logits >= 0).astype(int)
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='micro')
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

def fine_tune_model(train_data, val_data, seizure_types):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    train_encodings = tokenizer(list(train_data['text']), truncation=True, padding=True, return_tensors='pt')
    val_encodings = tokenizer(list(val_data['text']), truncation=True, padding=True, return_tensors='pt')

    train_dataset = CustomDataset(train_encodings, train_data[seizure_types].values)
    val_dataset = CustomDataset(val_encodings, val_data[seizure_types].values)

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch",  # Match eval and save strategies
        save_strategy="epoch",        # Match eval and save strategies
        load_best_model_at_end=True,
        metric_for_best_model="f1"
    )

    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(seizure_types))

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics
    )

    trainer.train()
    return model

# Load CSV and text files
csv_file = 'seizure_counts.csv'

# Preprocess data
data, seizure_types = preprocess_data(csv_file)

# Split data
train_data, val_data = split_data(data)

# Fine-tune model
model = fine_tune_model(train_data, val_data, seizure_types)

# Save model
model.save_pretrained("fine_tuned_model")

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset
from torch.nn import BCEWithLogitsLoss
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split

class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

def preprocess_data(csv_file):
    data = pd.read_csv(csv_file)
    text_data = []
    for file_path in data['filename']:
        file_path = file_path.replace('\\', '/')
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                text_data.append(text)
        except UnicodeDecodeError:
            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                text = file.read()
                text_data.append(text)
        except FileNotFoundError:
            print(f"File not found: {file_path}")  # Append empty string for missing or unreadable files
    data['text'] = text_data
    
    seizure_types = ['bckg', 'cpsz', 'gnsz', 'fnsz', 'absz', 'tnsz', 'tcsz', 'spsz', 'mysz']
    for seizure_type in seizure_types:
        data[seizure_type] = data[seizure_type].apply(lambda x: 1 if x > 0 else 0)
    
    return data, seizure_types

class CustomTrainer(Trainer):
    def __init__(self, *args, loss_fn=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss_fn = loss_fn

    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = self.loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

def train_model(train_data, val_data, seizure_types):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    train_encodings = tokenizer(list(train_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')
    val_encodings = tokenizer(list(val_data['text']), truncation=True, padding=True, max_length=512, return_tensors='pt')

    train_dataset = CustomDataset(train_encodings, train_data[seizure_types].values)
    val_dataset = CustomDataset(val_encodings, val_data[seizure_types].values)

    pos_weights = torch.tensor([0.1] + [1.0] * (len(seizure_types) - 1))
    loss_fn = BCEWithLogitsLoss(pos_weight=pos_weights)

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch"
    )

    model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(seizure_types))

    trainer = CustomTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        loss_fn=loss_fn
    )

    trainer.train()
    return model

# Load CSV and text files
csv_file = 'seizure_counts.csv'

# Preprocess data
data, seizure_types = preprocess_data(csv_file)

# Split data
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)

# Train the model
model = train_model(train_data, val_data, seizure_types)

# Save model
model.save_pretrained("fine_tuned_model")


In [None]:
class SeizureDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(texts, truncation=True, padding=True, max_length=512)
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
config = BertConfig.from_pretrained('bert-base-uncased', num_hidden_layers=12, num_attention_heads=12, num_labels=len(count_columns))
model = BertForSequenceClassification(config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
train_dataset = SeizureDataset(train_data['text'].tolist(), train_data[count_columns].values)
val_dataset = SeizureDataset(val_data['text'].tolist(), val_data[count_columns].values)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

In [None]:
class MSLELoss(nn.Module):
    def __init__(self):
        super(MSLELoss, self).__init__()

    def forward(self, predicted, actual):
        predicted = torch.relu(predicted)  # Ensuring predictions are non-negative
        return torch.mean((torch.log1p(predicted) - torch.log1p(actual)) ** 2)

# Initialize MSLE Loss
msle_loss = MSLELoss().to(device)

In [None]:
optimizer = AdamW(model.parameters(), lr=5e-5)
model.train()
num_epochs = 3
for epoch in range(num_epochs):
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)
        outputs = model(**inputs)
        loss = msle_loss(outputs.logits, labels)
        if torch.isnan(loss):
            continue  # Skip the batch if loss is nan
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss/len(train_loader)}")

# Evaluation Phase with corrected data flattening
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for batch in val_loader:
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
        labels = batch['labels'].to(device)
        outputs = model(**inputs)
        all_preds.extend(outputs.logits.view(-1).cpu().numpy())
        all_labels.extend(labels.view(-1).cpu().numpy())

# Calculate and print MSE
all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
mse = mean_squared_error(all_labels, all_preds)
print(f"Validation Mean Squared Error: {mse}")

# Save the trained model if needed
model_path = "seizure_model"
model.save_pretrained(model_path)
tokenizer.save_pretrained(model_path)

print("Training complete!")

In [None]:
import matplotlib.pyplot as plt

# Assuming `data` is your original DataFrame
numeric_data = data.iloc[:, 1:].select_dtypes(include=[np.number])
seizure_counts = numeric_data.sum(axis=1)
plt.figure(figsize=(10, 6))
plt.hist(seizure_counts, bins=30, alpha=0.7, color='blue')
plt.title('Distribution of Seizure Counts')
plt.xlabel('Seizure Counts')
plt.ylabel('Frequency')
plt.show()


In [None]:
print("Descriptive Statistics of Seizure Counts:")
print(seizure_counts.describe())


In [None]:
# Assuming `all_preds` and `all_labels` from the model's validation step are available
residuals = all_labels - all_preds

plt.figure(figsize=(10, 6))
plt.scatter(all_labels, residuals, alpha=0.5)
plt.title('Residual Plot')
plt.xlabel('Actual Seizure Counts')
plt.ylabel('Residuals (Actual - Predicted)')
plt.axhline(y=0, color='r', linestyle='--')
plt.show()


In [None]:
# Convert residuals to absolute errors for easier interpretation
absolute_errors = np.abs(residuals)
sorted_indices = np.argsort(absolute_errors)[::-1]  # Indices of the errors sorted from largest to smallest

# Print the texts with the largest errors
print("Texts with Largest Errors:")
for i in sorted_indices[:5]:  # Change 5 to the number of examples you want to review
    print(f"\nText:\n{data.iloc[i]['text']}")
    print(f"Actual Count: {all_labels[i]}, Predicted Count: {all_preds[i]}, Error: {absolute_errors[i]}")
