<center><img src="img/torch.png" alt="drawing" width="300"/></center>

# Recursive Neural Networks

In [1]:
import warnings
warnings.filterwarnings('ignore')
import requests
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
plt.rcParams.update({'figure.figsize':(6,3), 'legend.loc':"best", 'lines.linewidth':1.5, 'lines.marker':".", "image.cmap":"tab10", 'axes.prop_cycle':plt.cycler(color=plt.cm.tab10.colors), 'axes.formatter.useoffset':False, 'axes.titlesize': 12,'axes.labelsize': 10,'ytick.labelsize':8,'xtick.labelsize':8,'legend.fontsize': 10})
from sklearn.model_selection import train_test_split
import torch
from torch import nn
import torchmetrics
from torchinfo import summary
import mlflow
from helper_functions import train_val_loss_plot

## Twitter Sentiment Analysis

In the first project we will be using the *Real or Not?* dataset from Kaggle which contains text-based Tweets about natural disasters. Out task is to train a sentiment classifier able to figure out if a message is about a disaster or not.

### Data

In [2]:
def get_data():
    # load dataframe
    df = pd.read_csv("data/twitter.csv").sample(frac=1, random_state=42)
    
    print(f"- Complete Dataset Shape: {df.shape}")
    print(f"- Complete Dataset Targets: {str(df.target.value_counts().to_dict())}")
    print(25*"-")
    
    # split train and test dataset
    X,y = df.text, df.target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42, stratify=y)
    
    print(f"- Train/Test Shapes: X_train: {X_train.shape} | X_test: {X_test.shape} | y_train: {y_train.shape} | y_test: {y_test.shape}")
    print(f"- Train/Test Targets: y_train: {str(y_train.value_counts().to_dict())} | y_test: {str(y_test.value_counts().to_dict())}")
    
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = get_data()

- Complete Dataset Shape: (7613, 5)
- Complete Dataset Targets: {0: 4342, 1: 3271}
-------------------------
- Train/Test Shapes: X_train: (6851,) | X_test: (762,) | y_train: (6851,) | y_test: (762,)
- Train/Test Targets: y_train: {0: 3907, 1: 2944} | y_test: {0: 435, 1: 327}


> **Tokenization**: A direct mapping from either word (every word in a sequence considered a single token), or character (every word in a sequence considered a single token), or sub-word (smaller parts of invidual words into tokens), to a numerical value.

In [16]:
def tokenize_data(X_train, X_test, token):
    tweets_list = []
    X_train.apply(lambda x: tweets_list.append(x))
    X_test.apply(lambda x: tweets_list.append(x))
    
    if token == "word":
        unique_tokens = set([word for sentence in tweets_list for word in sentence.split(" ")])
    elif token == "character":
        unique_tokens = set([character for sentence in tweets_list for character in sentence])
    else:
        return ("Error: token must be either word or character")
    
    token_to_int_dict = {token:num+1 for num,token in enumerate(unique_tokens)}
    
    print(f"Number of unique {token}s: {len(token_to_int_dict)}")
    print(25*"-")
    
    if token == "word":
        X_train_tokenized = X_train.apply(lambda x: np.array([token_to_int_dict[token] for token in x.split(" ")]))
        X_test_tokenized = X_test.apply(lambda x: np.array([token_to_int_dict[token] for token in x.split(" ")]))
    elif token == "character":
        X_train_tokenized = X_train.apply(lambda x: np.array([token_to_int_dict[token] for token in x]))
        X_test_tokenized = X_test.apply(lambda x: np.array([token_to_int_dict[token] for token in x]))
        
    # fill in missing dimensions
    max_ndim = max(X_train_tokenized.apply(lambda x: len(x)).max(), X_train_tokenized.apply(lambda x: len(x)).max())
    
    X_train_tokenized = X_train_tokenized.apply(lambda x: np.pad(x, (0,max_ndim - x.shape[0]), mode='constant'))
    X_test_tokenized = X_test_tokenized.apply(lambda x: np.pad(x, (0,max_ndim - x.shape[0]), mode='constant'))

    # print a random sample
    random_int = np.random.randint(X_train.shape[0])
    print(f"- Random Sample: {X_train.reset_index().text[random_int]} -> {y_train.reset_index().target[random_int]}")
    print(f"- Random Sample Encoded: {X_train_tokenized.reset_index().text[random_int]} -> {y_train.reset_index().target[random_int]}")
    
    return X_train_tokenized, X_test_tokenized, token_to_int_dict

X_train_tokenized, X_test_tokenized, token_to_int_dict = tokenize_data(X_train, X_test, token="character")

Number of unique characters: 122
-------------------------
- Random Sample: @Allahsfinest12 ...death to muslims -> 1
- Random Sample Encoded: [ 27  55  28  28  54 120  75  29  92  68 118  75 100  57  48  35 106 106
 106  86 118  54 100 120  35 100  36  35  34  13  75  28  92  34  75   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0] -> 1


> **Embedding**: A representation of natural language in the form of a feature vector which can be learned. One can either create her own embedding or reuse a prelearned embedding.

In [None]:
def embed_data(X_train_tokenized, X_test_tokenized, token_to_int_dict):
    num_embeddings = len(token_to_int_dict)
    torch.nn.Embedding(num_embeddings=num_embeddings, embedding_dim=15)
    

### Models

In [None]:
class Classifier(nn.Module):
    def __init__(self, in_features: int, out_features: int):
        super().__init__()
       
        self.input_layer = nn.Linear(in_features=in_features, out_features=1)
        self.output_layer = nn.Sigmoid()
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.output_layer(self.input_layer(x))
    
    def model_summary(self, input_size):
        return summary(self, input_size=input_size, col_names=["input_size", "output_size", "num_params", "trainable"], col_width=15, row_settings=["var_names"])

classifier = Classifier(in_features=2, out_features=1)

In [None]:
def train(
    experiment_name: str,
    model: nn.Module,
    data: (torch.utils.data.DataLoader, torch.utils.data.DataLoader),
    loss_fn: nn.Module,
    optimizer: torch.optim.Optimizer,
    metrics: list,
    epochs: int,
    description: str = None
):
    mlflow.set_experiment(experiment_name=experiment_name)    
    
    with mlflow.start_run(
        run_name=f"{model.__class__.__name__} {datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}",
        description=description):
 
        train_dataloader, test_dataloader = data
        
        for epoch in range(epochs):   
            model.train()
            
            train_loss = 0
            validation_loss = 0
            metrics_dict = {}   
            for metric in metrics:
                metrics_dict[f"train_{metric.__class__.__name__}"] = 0
                metrics_dict[f"validation_{metric.__class__.__name__}"] = 0

            # train in batches
            for batch, (X_train, y_train) in enumerate(train_dataloader):
                y_logits = model(X_train)
                loss = loss_fn(y_logits, y_train)
                train_loss += loss.item() / len(train_dataloader) # divide by length of train_dataloader to get average per batch
                y_pred = torch.argmax(torch.softmax(y_logits, dim=1), dim=1)
                
                for metric in metrics:
                    # divide by length of train_dataloader to get average per batch
                    metrics_dict[f"train_{metric.__class__.__name__}"] += metric(y_pred, y_train).item() / len(train_dataloader)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
            model.eval()
            with torch.inference_mode():
                for batch, (X_test, y_test) in enumerate(test_dataloader): 
                    y_logits = model(X_test)
                    loss = loss_fn(y_logits, y_test)
                    validation_loss += loss.item() / len(test_dataloader) # divide by length of test_dataloader to get average per batch
                    y_pred = torch.argmax(torch.softmax(y_logits, dim=1), dim=1)
                    
                    for metric in metrics:
                        # divide by length of test_dataloader to get average per batch
                        metrics_dict[f"validation_{metric.__class__.__name__}"] += metric(y_pred, y_test).item() / len(test_dataloader)

            if epoch % (epochs/10) == 0:
                print(f"Epoch: {epoch} | Train Loss: {train_loss:.3f} | Validation Loss: {validation_loss:.3f}")
            
            mlflow.log_metrics({
                "train_loss": train_loss,
                "validation_loss": validation_loss
            }, step=epoch)
            
            for metric_name, metric_value in metrics_dict.items():
                mlflow.log_metric(key=metric_name, value=metric_value, step=epoch)

        mlflow.pytorch.log_model(model, "model")
        mlflow.log_params({
            "epochs": epochs,
            "optimizer": optimizer.__class__.__name__,
            "lr": optimizer.param_groups[0]["lr"]
        })
        fig = train_val_loss_plot(run_id= mlflow.active_run().info.run_id, plot=True)
        mlflow.log_figure(fig, "plots/train_validation_loss_curves.png")

## Generate New Text

### Data

In this notebook we will develop a model that accepts as input a text document, and can generate new text that is similar in style to the input document. More specificaly, we will use the book "The Mysterious Island", by Jules Verne in plain text format.

In character-level language modeling, the input is broken down into a sequence of characters that are fed into our network one character at a time. The network will process each new character in conjunction with the memory of the previously seen characters to predict the next one.

<center><img src="img/torch_04_01.png" alt="drawing" width="500"/></center>


In [None]:
def download_text():
    text = requests.get("https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt").text
    print(f"Dataset Downloaded")  
    
    vocabulary = sorted(set(text))
    print(f"- Length of dataset in characters: {len(text)} \n- Unique number of characters: {len(vocabulary)}")  
    print(f"- Unique characters:{''.join(vocabulary[1:])}")
    print(f"- Sample:\n\n{text[:1000]}")

    return text
    
text = download_text()

In [None]:
def make_dataloader(text_dir, seq_length, batch_size):
    
    # unique characters
    char_set = sorted(set(text))
    
    # convert text into a numeric format
    char_to_int_dict = {c:i for i,c in enumerate(char_set)}
    text_encoded = [char_to_int_dict[c] for c in text]
    assert len(text) == len(text_encoded)
    
    # create chunks
    text_chunks = [
        text_encoded[i:(i + seq_length) + 1] for i in range(len(text_encoded) - (seq_length+1))
    ]
    
    # create dataset object
    X,y = torch.Tensor(text_chunks[:-1]).long(), torch.Tensor(text_chunks[1:]).long()
    dataset = torch.utils.data.dataset.TensorDataset(X, y)
    
    # create dataloader object
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    
    # log some information
    print(f'Total Text Length: {len(text)} | Unique Characters: {len(char_set)} | Dataloader: {len(dataloader)} Batches Of Size {batch_size}')
    
    # create decoder for when the model is done
    int_to_char = {i:c for c,i in char_to_int_dict.items()}
    def decoder(encoding):
        return "".join([int_to_char[i] for i in encoding.numpy()])
    dataloader.decoder = decoder

    return dataloader

dataloader = make_dataloader(text_dir='data/the_mysterious_island.txt', seq_length=40, batch_size=64)

In [None]:
# a random (decoded) example
X,y = dataloader.dataset[np.random.randint(0, len(dataloader.dataset))]
print(f'Input: "{dataloader.decoder(X)}" -> Target: "{dataloader.decoder(y)}"')

### Model

In [None]:
class RNN(nn.Module):
    
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size, 
            embedding_dim=embed_dim)
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True)
        self.fc = nn.Linear(in_features=rnn_hidden_size, out_features=vocab_size)

    def forward(self, x, hidden, cell):
        out = self.embedding(x).unsqueeze(1)
        out, (hidden, cell) = self.rnn(out, (hidden, cell))
        out = self.fc(out).reshape(out.size(0), -1)
        return out, hidden, cell

    def init_hidden(self, batch_size):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size)
        return hidden, cell
    
model = RNN(vocab_size=80, embed_dim=256, rnn_hidden_size=512)