In [1]:
# Install the required package first if not already installed
# !pip install git+https://github.com/YoSTEALTH/Liburing.git

import os

# set visible GPUs
os.environ["CUDA_VISIBLE_DEVICES"] = "5"

import torch
import numpy as np
from datasets import load_dataset
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from transformers import DataCollatorWithPadding
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import asyncio
import pickle
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
# import liburing  # This is the YoSTEALTH/Liburing implementation
from liburing import (
    io_uring, io_uring_queue_init, io_uring_prep_write, io_uring_get_sqe,
    io_uring_prep_read, io_uring_submit, io_uring_wait_cqe, io_uring_cqe,
    io_uring_cqe_seen, io_uring_queue_exit
)

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class YoStealthUringDataLoader:
    """Custom data loader using YoSTEALTH's Liburing for efficient I/O operations"""
    
    def __init__(self, cache_dir=None, queue_depth=32):
        # Initialize the io_uring instance with the YoSTEALTH implementation
        self.ring = io_uring(queue_depth) # Changed to io_uring.io_uring
        self.cache_dir = cache_dir or Path(tempfile.mkdtemp(prefix="iouring_cache_"))
        os.makedirs(self.cache_dir, exist_ok=True)
        self.executor = ThreadPoolExecutor(max_workers=4)
        print(f"YoStealthUringDataLoader initialized with cache directory: {self.cache_dir}")
        
    def __del__(self):
        # Ensure proper cleanup
        if hasattr(self, 'ring'):
            self.ring.close()
        
    def _get_cache_path(self, key, prefix="data"):
        """Generate a cache file path for a given key"""
        # Use a hash to create a unique filename
        filename = f"{prefix}_{hash(str(key))}.pkl"
        return Path(self.cache_dir) / filename
    
    async def save_batch(self, batch_idx, data):
        """Save a batch of data to disk using io_uring"""
        path = self._get_cache_path(batch_idx)
        serialized_data = pickle.dumps(data)
        
        # Use a Future to handle the asynchronous operation
        future = self.executor.submit(self._save_batch_sync, path, serialized_data)
        return await asyncio.wrap_future(future)
    
    def _save_batch_sync(self, path, data):
        """Synchronous implementation of batch saving with io_uring"""
        with open(path, 'wb') as f:
            fd = f.fileno()
            # # Using the YoSTEALTH implementation for write
            # result = self.ring.write(fd, data)
            # if result < 0:
            #     raise OSError(f"io_uring write error: {os.strerror(-result)}")

            # Using the lower-level functions for write
            sqe = io_uring_get_sqe(self.ring.ring_fd)  # Get a submission queue entry
            io_uring_prep_write(sqe, fd, data, len(data), 0)  # Prepare the write operation
            io_uring_submit(self.ring.ring_fd)  # Submit the request

            # Wait for completion
            cqe = io_uring_wait_cqe(self.ring.ring_fd)  
            result = cqe.res
            io_uring_cqe_seen(self.ring.ring_fd, cqe)  # Mark CQE as seen

            if result < 0:
                raise OSError(f"io_uring write error: {os.strerror(-result)}")
        
        return path
    
    async def load_batch(self, batch_idx):
        """Load a batch of data from disk using io_uring"""
        path = self._get_cache_path(batch_idx)
        
        if not path.exists():
            return None
        
        # Use a Future to handle the asynchronous operation
        future = self.executor.submit(self._load_batch_sync, path)
        return await asyncio.wrap_future(future)
    
    def _load_batch_sync(self, path):
        """Synchronous implementation of batch loading with io_uring"""
        file_size = path.stat().st_size
        
        with open(path, 'rb') as f:
            fd = f.fileno()
            # Using the YoSTEALTH implementation for read
            buffer = bytearray(file_size)
            result = self.ring.read(fd, buffer)
            
            if result < 0:
                raise OSError(f"io_uring read error: {os.strerror(-result)}")
            elif result != file_size:
                raise IOError(f"Incomplete read: {result} of {file_size} bytes")
            
        # Deserialize the data
        return pickle.loads(buffer)
    
    def cache_dataset(self, dataset, batch_size=16):
      """Cache an entire dataset to disk using io_uring"""
      start_time = time.time()
      total_samples = len(dataset)
      num_batches = (total_samples + batch_size - 1) // batch_size
      
      # Setup async event loop for Jupyter compatibility
      try:
          loop = asyncio.get_running_loop()  # Check if we're in a running loop
      except RuntimeError:
          # Create a new event loop if there isn't one
          loop = asyncio.new_event_loop()
          asyncio.set_event_loop(loop)
          needs_cleanup = True
      else:
          needs_cleanup = False
      
      async def cache_all_batches():
          tasks = []
          for i in range(num_batches):
              start_idx = i * batch_size
              end_idx = min(start_idx + batch_size, total_samples)
              batch = dataset[start_idx:end_idx]
              tasks.append(self.save_batch(i, batch))
          return await asyncio.gather(*tasks)
      
      # Run the caching operation using the detected/created loop
      if loop.is_running():
          # We're in Jupyter or another environment with a running loop
          # Use create_task + Future pattern
      #     future = asyncio.Future()
      #     async def wrapper():
      #         try:
      #             result = await cache_all_batches()
      #             future.set_result(result)
      #         except Exception as e:
      #             future.set_exception(e)
          
      #     loop.create_task(wrapper())
      #     paths = loop.run_until_complete(future)
      # else:
      #     # No running loop, just run the task to completion
      #     paths = loop.run_until_complete(cache_all_batches())
          with ThreadPoolExecutor(max_workers=1) as executor:
              paths = executor.submit(asyncio.run, cache_all_batches()).result()  
      else:
          # No running loop, just run the task to completion
          paths = loop.run_until_complete(cache_all_batches())
      
      # Clean up the loop if we created it
      if needs_cleanup:
          loop.close()
      
      elapsed = time.time() - start_time
      print(f"Cached {total_samples} samples in {num_batches} batches to {len(paths)} files in {elapsed:.2f} seconds")
      
      return num_batches    

    def get_batch(self, batch_idx):
        """Synchronous wrapper to load a batch"""
        # Setup event loop for Jupyter compatibility
        try:
            # loop = asyncio.get_event_loop()
            loop = asyncio.get_running_loop() # Use get_running_loop()

        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            
        result = loop.run_until_complete(self.load_batch(batch_idx))
        return result

class IOUringDataset(torch.utils.data.Dataset):
    """Dataset that uses YoStealthUringDataLoader for efficient I/O"""
    
    def __init__(self, original_dataset, batch_size=16, cache_dir=None):
        self.original_dataset = original_dataset
        self.batch_size = batch_size
        self.total_samples = len(original_dataset)
        
        # Initialize the loader and cache the dataset
        self.loader = YoStealthUringDataLoader(cache_dir=cache_dir)
        self.num_batches = self.loader.cache_dataset(original_dataset, batch_size)
        
        # Store the dataset format
        if hasattr(original_dataset, "format"):
            self.format = original_dataset.format
        else:
            self.format = None
            
    def __len__(self):
        return self.total_samples
    
    def __getitem__(self, idx):
        if isinstance(idx, slice):
            # Convert slice to range
            start = idx.start if idx.start is not None else 0
            stop = idx.stop if idx.stop is not None else len(self)
            step = idx.step if idx.step is not None else 1
            return [self[i] for i in range(start, stop, step)]
        
        # Calculate which batch contains this index
        batch_idx = idx // self.batch_size
        idx_in_batch = idx % self.batch_size
        
        # Load the batch
        batch = self.loader.get_batch(batch_idx)
        if batch is None:
            raise IndexError(f"Failed to load batch {batch_idx} for index {idx}")
            
        # Return the specific item
        if idx_in_batch >= len(batch):
            raise IndexError(f"Index {idx} out of bounds for batch of size {len(batch)}")
            
        return batch[idx_in_batch]

# Define a cell execution wrapper for Jupyter notebook
def run_training():
    # Load the dataset
    dataset = load_dataset("imdb")

    # Load the tokenizer
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

    # Select only the first 100 training and testing examples
    train_subset = dataset["train"].select(range(10))
    test_subset = dataset["test"].select(range(10))

    # Tokenization function
    def preprocess_data(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)

    # Tokenize examples
    tokenized_train = train_subset.map(preprocess_data, batched=True)
    tokenized_test = test_subset.map(preprocess_data, batched=True)

    # Convert to PyTorch format
    tokenized_train = tokenized_train.rename_column("label", "labels")
    tokenized_test = tokenized_test.rename_column("label", "labels")

    tokenized_train.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
    tokenized_test.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

    # Create io_uring backed datasets
    print("Initializing io_uring for training data...")
    train_cache_dir = Path("./iouring_cache/train")
    io_uring_train = IOUringDataset(tokenized_train, batch_size=10, cache_dir=train_cache_dir)

    print("Initializing io_uring for test data...")
    test_cache_dir = Path("./iouring_cache/test")
    io_uring_test = IOUringDataset(tokenized_test, batch_size=10, cache_dir=test_cache_dir)

    # Load pre-trained BERT with classification head
    model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)

    # Move model to GPU (if available)
    model.to(device)

    # Data collator (handles dynamic padding)
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    # Compute metrics for evaluation
    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        acc = accuracy_score(labels, predictions)
        precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="binary")
        return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

    # Training arguments with GPU acceleration
    training_args = TrainingArguments(
        output_dir="./bert_sentiment",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        per_device_train_batch_size=8,  # Adjust if running out of memory
        per_device_eval_batch_size=8,
        num_train_epochs=1,
        weight_decay=0.01,
        logging_dir="./logs",
        logging_steps=500,
        load_best_model_at_end=True,
        save_total_limit=2,
        report_to="none",  # Disable reporting to Weights & Biases (optional)
        push_to_hub=False,  # Set True if you want to push to Hugging Face Hub
        dataloader_num_workers=4,  # Using multiple workers with our custom loader
    )

    # Trainer with GPU support using our io_uring dataset
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=io_uring_train,
        eval_dataset=io_uring_test,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    # Fine-tune the model
    trainer.train()

    # Evaluate the model
    eval_results = trainer.evaluate()
    print("Evaluation Results:", eval_results)
    
    return model, tokenizer

# Define a function to save the model using io_uring
def save_model_with_iouring(model, tokenizer, save_dir="./bert_sentiment_model"):
    print(f"Saving model to {save_dir} using io_uring...")
    os.makedirs(save_dir, exist_ok=True)
    
    # Create an io_uring instance for saving
    ring = liburing.IoUring(8)  # Smaller queue depth for saving
    
    try:
        # Save the model and tokenizer to temporary files
        temp_model_dir = Path(tempfile.mkdtemp(prefix="model_save_"))
        model.save_pretrained(temp_model_dir)
        tokenizer.save_pretrained(temp_model_dir)
        
        # Use io_uring to copy files from temp dir to final location
        for src_path in temp_model_dir.glob("**/*"):
            if src_path.is_file():
                # Create relative path for destination
                rel_path = src_path.relative_to(temp_model_dir)
                dst_path = Path(save_dir) / rel_path
                
                # Create directory if needed
                os.makedirs(dst_path.parent, exist_ok=True)
                
                # Read file content
                with open(src_path, 'rb') as f:
                    content = f.read()
                
                # Write to destination using io_uring
                with open(dst_path, 'wb') as f:
                    fd = f.fileno()
                    result = ring.write(fd, content)
                    if result < 0:
                        print(f"io_uring write error for {dst_path}: {os.strerror(-result)}")
    finally:
        ring.close()
    
    print(f"Model saved to {save_dir}")
    return save_dir

# Jupyter notebook cells for running the training process
# Run this in separate cells
# Cell 1: Train the model
# model, tokenizer = run_training()

# Cell 2: Save the model using io_uring
# save_model_with_iouring(model, tokenizer)
model, tokenizer = run_training()

Using device: cpu


Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

: 