# 🚀 Neural Network-Based Text Compression

#### 🖋️ Authors
- Feidnand Eide
- Seran Shanmugathas


## 📚 Install Libraries
We will need the following libraries:
- `pytorch`
- `pytorch-lightning`

In [1]:
%pip install numpy pandas torch pytorch-lightning --quiet

Note: you may need to restart the kernel to use updated packages.


## 📌 Import Dependencies
The following libraries are used in this project:
- Standard libraries: `enum`, `ast`
- PyTorch and PyTorch Lightning for model building and training
- Transformers from Hugging Face for NLP tasks
- Pandas for data handling

In [2]:
from enum import Enum
import pandas as pd
import pytorch_lightning as pl
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader

## 🔧 Configuration
Set up the configuration for the model training.

In [3]:
config: dict = {
    "data_path": "data/uncompressed_and_compressed.csv",
    "save_path": "models/model.pth",
    "batch_size": 32,
    "max_length": 512,
    "vocab_size": 256,
    "embedding_dim": 128,
    "hidden_dim": 256,
    "num_layers": 2,
    "dropout_rate": 0.5,
    "max_epochs": 1,
    "gpus": 0
}

## 🗂️ Load and Preprocess the Dataset
We define a custom dataset class for handling our text compression data.

In [4]:
class Columns(Enum):
    """
    Enum containing the columns of the dataset.
    """

    UNCOMPRESSED = "uncompressed"
    COMPRESSED = "compressed"


class TextCompressionDataset(Dataset):
    def __init__(self, uncompressed_texts, compressed_texts):
        self.uncompressed_texts = uncompressed_texts
        self.compressed_texts = compressed_texts

    def __len__(self):
        return len(self.uncompressed_texts)

    def __getitem__(self, idx):
        # Convert each character in uncompressed text to its corresponding integer value
        uncompressed_text = [ord(c) for c in self.uncompressed_texts[idx]]
        uncompressed_text_tensor = torch.tensor(uncompressed_text, dtype=torch.long)

        # Handle compressed text
        compressed_text = self.compressed_texts[idx]
        if isinstance(compressed_text, str):
            # If it's a string representation of bytes, convert it to actual bytes
            compressed_text = bytes(compressed_text, encoding="latin1")
        compressed_text = [b for b in compressed_text]
        compressed_text_tensor = torch.tensor(compressed_text, dtype=torch.long)
        return uncompressed_text_tensor, compressed_text_tensor

In [5]:
def collate_batch(batch):
    uncompressed_texts, compressed_texts = zip(*batch)

    max_seq_length = min(
        max(len(t) for t in uncompressed_texts), max(len(t) for t in compressed_texts)
    )

    uncompressed_texts_padded = pad_sequence(
        [t[:max_seq_length] for t in uncompressed_texts],
        batch_first=True,
        padding_value=0,
    )

    # Truncate or pad the compressed texts
    compressed_texts_padded = pad_sequence(
        [t[:max_seq_length] for t in compressed_texts],
        batch_first=True,
        padding_value=0,
    )

    return uncompressed_texts_padded, compressed_texts_padded

In [6]:
class TextDataModule(pl.LightningDataModule):
    def __init__(self, dataset, batch_size=32, collate_fn=None):
        super().__init__()
        self.dataset = dataset
        self.batch_size = batch_size
        self.collate_fn = collate_fn

    def setup(self, stage=None):
        # Here you could split your dataset into train, val, and test sets if needed
        # For example:
        # self.train, self.val, self.test = random_split(self.dataset, [sizes])

        # If you don't need to split, just assign the dataset to self.train
        self.train = self.dataset

    def train_dataloader(self):
        return DataLoader(self.train, batch_size=self.batch_size, shuffle=True, collate_fn=self.collate_fn)

    # Implement val_dataloader and test_dataloader if you have validation and test sets
    # def val_dataloader(self):
    #     return DataLoader(self.val, batch_size=self.batch_size, shuffle=False, collate_fn=self.collate_fn)

    # def test_dataloader(self):
    #     return DataLoader(self.test, batch_size=self.batch_size, shuffle=False, collate_fn=self.collate_fn)

## 🤖 The Model
Here we define our LSTM-based compression model.

In [7]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class LSTMTextCompressor(pl.LightningModule):
    def __init__(self, vocab_size, hidden_dim, num_layers):
        super(LSTMTextCompressor, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.vocab_size = vocab_size

        self.embedding = nn.Embedding(vocab_size, hidden_dim).to(device)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_dim, vocab_size).to(device)

    def forward(self, x):
        x = x.to(device)
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        out = self.linear(lstm_out)
        return out

    def training_step(self, batch, batch_idx):
        uncompressed_text, compressed_text = batch
        output = self(uncompressed_text)

        loss = F.cross_entropy(
            output.view(-1, self.vocab_size), compressed_text.view(-1)
        )
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)


    def train_dataloader(self, dataset):
        return DataLoader(
            dataset, batch_size=32, shuffle=True, collate_fn=collate_batch
        )

In [8]:
def save_model(model: LSTMTextCompressor, file_path: str) -> None:
    """
    Save the trained model to a file.

    Parameters
    ----------
    model : LSTMTextCompressor
        The trained model.
    file_path : str
        The path to save the model to.
    """
    torch.save(model, file_path)

def load_model(file_path: str) -> LSTMTextCompressor:
    """
    Load a trained model from a file.

    Parameters
    ----------
    file_path : str
        The path to load the model from.

    Returns
    -------
    LSTMTextCompressor
        The loaded model.
    """
    return torch.load(file_path)

## 🏋️‍♂️ Training
Setting up the training environment and initiating the training process.

In [9]:
df = pd.read_csv(config["data_path"], delimiter=";")

dataset = TextCompressionDataset(
    uncompressed_texts=df["uncompressed"].values,
    compressed_texts=df["compressed"].values,
)
text_datamodule = TextDataModule(dataset, batch_size=32, collate_fn=collate_batch)

model = LSTMTextCompressor(
    vocab_size=config["vocab_size"],
    hidden_dim=config["hidden_dim"],
    num_layers=config["num_layers"],
)

trainer = pl.Trainer(
    max_epochs=config["max_epochs"],
    enable_progress_bar=True,
    accelerator="gpu" if torch.cuda.is_available() else "cpu",
    devices="auto"
)

trainer.fit(model, text_datamodule)

GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
/Users/seranshanmugathas/anaconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/setup.py:187: GPU available but not used. You can set it by doing `Trainer(accelerator='gpu')`.

  | Name      | Type      | Params
----------------------------------------
0 | embedding | Embedding | 65.5 K
1 | lstm      | LSTM      | 1.1 M 
2 | linear    | Linear    | 65.8 K
----------------------------------------
1.2 M     Trainable params
0         Non-trainable params
1.2 M     Total params
4.736     Total estimated model params size (MB)
/Users/seranshanmugathas/anaconda3/lib/python3.11/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to impr

Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


In [10]:
save_model(model, config["save_path"])
print(f"Model saved to {config['save_path']}")

Model saved to models/model.pth


## 📈 Evaluation
Evaluating the model on the test set.

In [13]:
def evaluate_model(model: LSTMTextCompressor, dataloader: DataLoader) -> dict:
    """
    Evaluate the model on a given dataset.

    Parameters
    ----------

    model : LSTMTextCompressor
        The model to evaluate.
    dataloader : DataLoader
        The dataloader to use for evaluation.

    Returns
    -------
    dict
        A dictionary containing the evaluation results.
    """
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in dataloader:
            uncompressed_text, compressed_text = batch
            output = model(uncompressed_text)
            loss = F.cross_entropy(
                output.view(-1, model.vocab_size), compressed_text.view(-1)
            )
            total_loss += loss.item()
            total_correct += calculate_correct_predictions(output, compressed_text)
            total_samples += uncompressed_text.size(0)

    avg_loss = total_loss / len(dataloader)
    accuracy = total_correct / total_samples if total_samples > 0 else 0

    return {"avg_loss": avg_loss, "accuracy": accuracy}


def calculate_correct_predictions(output: torch.Tensor, targets: torch.Tensor) -> int:
    """
    Calculate the number of correct predictions.

    Parameters
    ----------
    output : torch.Tensor
        The model output.
    targets : torch.Tensor
        The target values.

    Returns
    -------
    int
        The number of correct predictions.
    """
    _, predicted_tokens = torch.max(output, dim=2)

    correct_predictions = (predicted_tokens == targets).all(dim=1)

    total_correct = correct_predictions.sum().item()

    return total_correct


test_dataset = TextCompressionDataset(
    uncompressed_texts=df[Columns.UNCOMPRESSED.value].values,
    compressed_texts=df[Columns.COMPRESSED.value].values,
)
test_datamodule = TextDataModule(
    test_dataset, batch_size=config["batch_size"], collate_fn=collate_batch
)
test_datamodule.setup()
test_dataloader = test_datamodule.train_dataloader()

trained_model = load_model(config["save_path"])
trained_model.to(device)

evaluation_results = evaluate_model(trained_model, test_dataloader)
print(f"Evaluation Results: {evaluation_results}")