# 🚀 Neural Network-Based Text Compression

#### 🖋️ Authors

- Feidnand Eide
- Seran Shanmugathas


## 📚 Install Libraries

We will need the following libraries:

- `pytorch`
- `pytorch-lightning`
- `pandas`
- `numpy`


In [None]:
%pip install numpy pandas torch torch_geometric pytorch-lightning --quiet

## 📌 Import Dependencies

The following libraries are used in this project:

- Standard libraries: `enum`
- PyTorch and PyTorch Lightning for model building and training
- Pandas for data handling


In [None]:
# Standard libraries
import os
from enum import Enum

# Pandas
import pandas as pd

# Numpy
import numpy as np

# Scikit-learn
from sklearn import preprocessing

# PyTorch Lightning
import pytorch_lightning as pl

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset

# PyTorch geometric
import torch_geometric
import torch_geometric.data as geom_data
import torch_geometric.nn as geom_nn
from torch_geometric.nn import GraphConv

# PL callbacks
from pytorch_lightning.callbacks import ModelCheckpoint
from torch import Tensor

## 🔧 Configuration

Set up the configuration for the model training.


In [None]:
config: dict = {
    "data_path": "dataset_tsmc2014/dataset_TSMC2014_NYC.txt",
    "save_path": "models/model.pth",
    "batch_size": 32,
    "max_length": 512,
    "input_channels": 2,
    "output_channels": 2,
    "num_layers": 2,
    "dropout_rate": 0.5,
    "max_epochs": 1,
    "learning_rate": 1e-3,
    "num_workers": 11,
    "log_every_n_steps": 20,
    "pin_memory": True if torch.cuda.is_available() else False,
    "accelerator": "cuda" if torch.cuda.is_available() else "cpu",
}

## 🗂️ Load and Preprocess the Dataset


> Enum for defining columns in the dataset


In [None]:
class Columns(Enum):
    """
    Enum containing the columns of the dataset.
    """

    USER_ID = "User ID"
    VENUE_ID = "Venue ID"
    VENUE_CATEGORY_ID = "Venue category ID"
    VENUE_CATEGORY_NAME = "Venue category name"
    LATITUDE = "Latitude"
    LONGITUDE = "Longitude"
    TIMEZONE = "Timezone"
    UTC_TIME = "UTC time"

    # The following columns are not present in the dataset, but are added
    # during the preprocessing phase.
    HOUR = "Hour"
    WEEKDAY = "Weekday"
    IS_WEEKEND = "Is weekend"
    NEXT_VENUE_ID = "Next venue ID"

In [None]:
def load_data(path: str) -> pd.DataFrame:
    """
    Load the data as a dataframe

    Parameters
    ----------
    path: str
        The path to load the data

    Returns
    -------
    pd.DataFrame
        The dataset
    """
    columns = [column.value for column in Columns]
    return pd.read_csv(path, sep="\t", encoding="latin-1", names=columns)


df = load_data(config["data_path"])

In [None]:
df.head()

> Create dataset class and data module


In [None]:
class TMSCDataset(Dataset):
    def __init__(self, data: pd.DataFrame, transform=None):
        """
        Dataset class for the TSMC dataset

        Parameters
        ----------
        data: pd.DataFrame
            The dataset
        transform: Callable
            The transform to apply to the dataset
        """
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        Get an item from the dataset

        Parameters
        ----------
        idx: int
            The index of the item

        Returns
        -------
        dict
            The item
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()
        sample = self.data.iloc[idx]
        if self.transform:
            sample = self.transform(sample)
        return sample

In [None]:
class TMSCDataModule(pl.LightningDataModule):
    def __init__(self, batch_size: int, data_path: str):
        """
        Data module for the TSMC dataset

        Parameters
        ----------
        batch_size: int
            The batch size
        data_path: str
            The path to the dataset
        """
        super().__init__()
        self.batch_size = batch_size
        self.data_path = data_path
        self.train_dataset = None
        self.val_dataset = None
        self.test_dataset = None

    def prepare_data(self) -> None:
        # Download the dataset if it doesn't exist already
        if not os.path.exists(self.data_path):
            os.system(
                "wget http://www-public.tem-tsp.eu/~zhang_da/pub/dataset_tsmc2014.zip"
            )
            os.system("unzip dataset_tsmc2014.zip -d dataset_tsmc2014")
            # Assuming the dataset is unzipped into a directory named 'dataset_tsmc2014'

    def setup(self, stage: str = None) -> None:
        """
        Setup the dataset

        Parameters
        ----------
        stage: str
            The stage of the training
        """
        data = load_data(self.data_path)
        data = self.preprocess_data(data)
        data = self.create_graph_data(data)

        train, validate, test = np.split(
            data.sample(frac=1, random_state=42),
            [int(0.6 * len(data)), int(0.8 * len(data))],
        )

        self.train_dataset = TMSCDataset(train)
        self.val_dataset = TMSCDataset(validate)
        self.test_dataset = TMSCDataset(test)

    def preprocess_data(self, data: pd.DataFrame) -> Tensor:
        """
        Preprocess the data

        Parameters
        ----------
        data: pd.DataFrame
            The data to preprocess

        Returns
        -------
        Tensor
            The preprocessed data
        """
        # Parse timestamps and create temporal features
        data[Columns.UTC_TIME.value] = pd.to_datetime(
            data[Columns.UTC_TIME.value], format="%a %b %d %H:%M:%S +0000 %Y"
        )
        data[Columns.HOUR.value] = data[Columns.UTC_TIME.value].dt.hour
        data[Columns.WEEKDAY.value] = data[Columns.UTC_TIME.value].dt.weekday
        data[Columns.IS_WEEKEND.value] = (
            data[Columns.WEEKDAY.value].isin([5, 6]).astype(int)
        )

        # Normalize geospatial features
        scaler = preprocessing.MinMaxScaler()
        data[[Columns.LATITUDE.value, Columns.LONGITUDE.value]] = scaler.fit_transform(
            data[[Columns.LATITUDE.value, Columns.LONGITUDE.value]]
        )

        # Encode categorical features using LabelEncoder
        label_encoder_user = preprocessing.LabelEncoder()
        label_encoder_venue = preprocessing.LabelEncoder()
        label_encoder_category = preprocessing.LabelEncoder()

        data[Columns.USER_ID.value] = label_encoder_user.fit_transform(
            data[Columns.USER_ID.value]
        )
        data[Columns.VENUE_ID.value] = label_encoder_venue.fit_transform(
            data[Columns.VENUE_ID.value]
        )
        data[Columns.VENUE_CATEGORY_ID.value] = label_encoder_category.fit_transform(
            data[Columns.VENUE_CATEGORY_ID.value]
        )

        return data

    def create_graph_data(self, data: pd.DataFrame) -> geom_data.Data:
        """
        Create the graph data

        Parameters
        ----------
        data: pd.DataFrame
            The data to create the graph data from

        Returns
        -------
        geom_data.Data
            The graph data
        """
        # Encoding categorical features to integers
        user_encoder = preprocessing.LabelEncoder()
        venue_encoder = preprocessing.LabelEncoder()

        data[Columns.USER_ID.value] = user_encoder.fit_transform(
            data[Columns.USER_ID.value]
        )
        data[Columns.VENUE_ID.value] = venue_encoder.fit_transform(
            data[Columns.VENUE_ID.value]
        )

        # Creating the edge index
        def create_edge_index(data: pd.DataFrame) -> Tensor:
            """
            Create the edge index

            Parameters
            ----------
            data: pd.DataFrame
                The data to create the edge index from

            Returns
            -------
            Tensor
                The edge index
            """
            edge_index = []
            for user_id in data[Columns.USER_ID.value].unique():
                user_df = data[data[Columns.USER_ID.value] == user_id]
                user_edges = [
                    (
                        user_df.iloc[i][Columns.VENUE_ID.value],
                        user_df.iloc[i + 1][Columns.VENUE_ID.value],
                    )
                    for i in range(len(user_df) - 1)
                ]
                edge_index.extend(user_edges)

            return torch.tensor(edge_index, dtype=torch.long).t().contiguous()

        edge_index = create_edge_index(data)

        # Creating the node features and labels
        def create_labels_for_next_poi(data: pd.DataFrame) -> pd.DataFrame:
            """
            Create the labels for the next POI

            Parameters
            ----------
            data: pd.DataFrame
                The data to create the labels from

            Returns
            -------
            pd.DataFrame
                The data with the labels
            """
            data[Columns.NEXT_VENUE_ID.value] = data[Columns.VENUE_ID.value].shift(-1)
            return data

        node_features = torch.tensor(
            data[[Columns.LATITUDE.value, Columns.LONGITUDE.value]].values,
            dtype=torch.float,
        )
        data = create_labels_for_next_poi(data)
        labels = torch.tensor(
            data[[Columns.NEXT_VENUE_ID.value]].values, dtype=torch.long
        )

        return geom_data.Data(
            x=node_features, edge_index=edge_index, y=labels.squeeze()
        )

    def train_dataloader(self) -> geom_data.DataLoader:
        """
        Get the training dataloader
        """
        return geom_data.DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=config["num_workers"],
            pin_memory=config["pin_memory"],
        )

    def val_dataloader(self) -> geom_data.DataLoader:
        """
        Get the validation dataloader
        """
        return geom_data.DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=config["num_workers"],
            pin_memory=config["pin_memory"],
        )

    def test_dataloader(self) -> geom_data.DataLoader:
        """
        Get the testing dataloader
        """
        return geom_data.DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=config["num_workers"],
            pin_memory=config["pin_memory"],
        )

## 🤖 The Model

> Here is the implementation of our GRN-based Recommendation system. Using LSTM under the hood


In [None]:
class GRN(nn.Module):
    def __init__(self, input_channels: int, hidden_channels: int):
        """
        GRN model

        Parameters
        ----------
        input_channels: int
            The number of input channels
        hidden_channels: int
            The number of hidden channels
        """
        super().__init__()
        self.graph_conv1 = GraphConv(input_channels, hidden_channels)
        self.graph_conv2 = GraphConv(hidden_channels, hidden_channels)

        self.lstm = LSTM(hidden_channels, hidden_channels)

    def forward(self, x: Tensor, edge_idx: Tensor) -> Tensor:
        """
        Forward pass of the GRN

        Parameters
        ----------
        x: Tensor
            The input tensor
        edge_idx: Tensor
            The edge indices

        Returns
        -------
        Tensor
            The output of the GRN
        """
        x = self.graph_conv1(x, edge_idx)
        x = F.relu(x)
        x = self.graph_conv2(x, edge_idx)
        x = F.relu(x)

        out, (hidden_state, cell_state) = self.lstm(x)
        return out


class LSTM(nn.Module):
    def __init__(self, input_size: int, hidden_size: int):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Gate definitions
        self.input_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.forget_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.output_gate = nn.Linear(input_size + hidden_size, hidden_size)
        self.cell_gate = nn.Linear(input_size + hidden_size, hidden_size)

    def forward(self, input_seq: Tensor, inital_state: tuple = None) -> tuple:
        """
        Forward pass of the LSTM

        Parameters
        ----------
        input_seq: Tensor
            The input sequence
        inital_state: tuple
            The initial state of the LSTM

        Returns
        -------
        tuple
            The output and the state of the LSTM
        """
        batch_size, seq_len, _ = input_seq.size()

        if inital_state is None:
            hidden_state = torch.zeros(batch_size, self.hidden_size).to(
                input_seq.device
            )
            cell_state = torch.zeros(batch_size, self.hidden_size).to(input_seq.device)
        else:
            hidden_state, cell_state = inital_state

        hidden_states_over_time = []

        for time_step in range(seq_len):
            input_at_time_step = input_seq[:, time_step, :]

            combined_input = torch.cat((hidden_state, input_at_time_step), dim=1)

            input_gate = torch.sigmoid(self.input_gate(combined_input))
            forget_gate = torch.sigmoid(self.forget_gate(combined_input))
            output_gate = torch.sigmoid(self.output_gate(combined_input))
            cell_state_candidate = torch.tanh(self.cell_gate(combined_input))

            cell_state = forget_gate * cell_state + input_gate * cell_state_candidate
            hidden_state = output_gate * torch.tanh(cell_state)

            hidden_states_over_time.append(hidden_state.unsqueeze(0))

        hidden_sequence = torch.cat(hidden_states_over_time, dim=0)
        hidden_sequence = hidden_sequence.transpose(0, 1).contiguous()

        return hidden_sequence, (hidden_state, cell_state)

In [None]:
class HMTGRNModel(pl.LightningModule):
    def __init__(self, input_channels: int, hidden_channels: int, num_classes: int):
        """
        HMTGRN model

        Parameters
        ----------
        input_channels: int
            The number of input channels
        hidden_channels: int
            The number of hidden channels
        num_classes: int
            The number of classes
        """
        super().__init__()
        self.grn = GRN(input_channels, hidden_channels)
        self.classifier = nn.Linear(hidden_channels, num_classes)

    def forward(self, x: Tensor, edge_idx: Tensor) -> Tensor:
        """
        Forward pass of the HMTGRN

        Parameters
        ----------
        x: Tensor
            The input tensor
        edge_idx: Tensor
            The edge indices

        Returns
        -------
        Tensor
            The output of the HMTGRN
        """
        x = self.grn(x, edge_idx)
        x = self.classifier(x[:, -1, :]) # Get the last output
        return x
    
    def training_step(self, batch: tuple, batch_idx: int) -> Tensor:
        """
        Training step

        Parameters
        ----------
        batch: tuple
            The batch
        batch_idx: int
            The index of the batch

        Returns
        -------
        Tensor
            The loss
        """
        x, edge_idx, y = batch
        y_hat = self.forward(x, edge_idx)
        loss = F.cross_entropy(y_hat, y)
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, batch: tuple, batch_idx: int) -> Tensor:
        """
        Validation step

        Parameters
        ----------
        batch: tuple
            The batch
        batch_idx: int
            The index of the batch

        Returns
        -------
        Tensor
            The loss
        """
        x, edge_idx, y = batch
        y_hat = self.forward(x, edge_idx)
        loss = F.cross_entropy(y_hat, y)
        self.log("val_loss", loss)
        return loss
    
    def test_step(self, batch: tuple, batch_idx: int) -> Tensor:
        """
        Test step

        Parameters
        ----------
        batch: tuple
            The batch
        batch_idx: int
            The index of the batch

        Returns
        -------
        Tensor
            The loss
        """
        x, edge_idx, y = batch
        y_hat = self.forward(x, edge_idx)
        loss = F.cross_entropy(y_hat, y)
        self.log("test_loss", loss)
        return loss
    
    def configure_optimizers(self) -> optim.Optimizer:
        """
        Configure the optimizer

        Returns
        -------
        optim.Optimizer
            The optimizer
        """
        return optim.Adam(self.parameters(), lr=config["learning_rate"])

## 🏋️‍♂️ Training

Setting up the training environment and initiating the training process.


> Initialize Data Module and Model

In [None]:
data_module = TMSCDataModule(
    batch_size=config["batch_size"], data_path=config["data_path"]
)
model = HMTGRNModel(
    input_channels=config["input_channels"],
    hidden_channels=config["hidden_channels"],
    num_classes=config["num_classes"],
)

> Set Up Training Callbacks

In [None]:
checkpoint_callback = ModelCheckpoint(
    dirpath="checkpoints", save_top_k=1, verbose=True, monitor="val_loss", mode="min"
)

> Configure Trainer and Start Training

In [None]:
trainer = pl.Trainer(
    max_epochs=config["max_epochs"],
    accelerator=config["accelerator"],
    callbacks=[checkpoint_callback],
)

trainer.fit(model, data_module)

## 📈 Evaluation

Evaluating the model on the test set.


In [None]:
model = HMTGRNModel.load_from_checkpoint(checkpoint_callback.best_model_path)
trainer.test(model, datamodule=data_module)