In [3]:
import wget, os, gzip, pickle, random, re, sys, importlib, tqdm, math, os, gzip, re, string

from tqdm import trange
from collections import Counter

import torch

import random
from random import choice

IMDB_URL = 'http://dlvu.github.io/data/imdb.{}.pkl.gz'
IMDB_FILE = 'imdb.{}.pkl.gz'
WP_DATA = 'https://codeberg.org/pbm/former/raw/branch/master/data/enwik8.gz'

PAD, START, END, UNK = '.pad', '.start', '.end', '.unk'

SENT = '_s'
TOY = {
    '_s': ['_s _adv', '_np _vp', '_np _vp _prep _np', '_np _vp ( _prep _np )', '_np _vp _con _s','_np _vp ( _con _s )'],
    '_adv': ['briefly', 'quickly', 'impatiently'],
    '_np': ['a _noun', 'the _noun', 'a _adj _noun', 'the _adj _noun'],
    '_prep': ['on', 'with', 'to', 'for', 'at'],
    '_con': ['while', 'but'],
    '_noun': ['mouse', 'bunny', 'cat', 'dog', 'man', 'woman', 'person', 'bear', 'koala', 'judge', 'businessman',
        'businesswoman', 'lawyer', 'teacher', 'engineer'],
    '_vp': ['walked', 'walks', 'ran', 'runs', 'goes', 'went', 'hiked'],
    '_adj': ['short', 'quick', 'busy', 'nice', 'gorgeous', 'spectacular', 'reluctant', 'systematic', 'willowy', 'engaged', 'synthetic']
}

PRINTABLE = set(ord(c) for c in (string.digits + string.ascii_letters + string.punctuation + string.whitespace))

def cas(i):
    """
    Character-as-string. Filters out the ascii codes that aren't safe to print.
    :return:
    """
    assert i >= 0 and i < 256
    return 'â–¡' if i not in PRINTABLE else str(chr(i))

def t(blist):
    return torch.tensor([int(b) for b in blist], dtype=torch.uint8)

def gen_sentence(sent=SENT, g=TOY):

    symb = '_[a-z]*'

    while True:

        match = re.search(symb, sent)
        if match is None:
            return sent

        s = match.span()
        sent = sent[:s[0]] + random.choice(g[sent[s[0]:s[1]]]) + sent[s[1]:]

def load_toy(ntrain=100_000, ntest=20_000, to_torch=True, final=False, seed=0):
    """
    Generates language from a toy grammar.
    :param ntrain:
    :param ntest:
    :param to_torch: Whether to return torch tensors (if false, returns python lists)
    :param final: Whether to return the test set or the validation set (True for test)
    :return:
    """

    random.seed(seed)

    train, test = '', ''
    while len(train) < ntrain:
        train += gen_sentence() + ' . '

    random.seed(seed if final else seed + 1)
    # -- change the seed so we get different test/val sets depending on `final`

    while len(test) < ntest:
        test += gen_sentence() + ' . '

    ctr = Counter(train + test)
    i2t = [PAD, START, END, UNK] + [t for t, _ in ctr.most_common()]
    t2i = { w : i for  i, w in enumerate(i2t)}

    train = [t2i[t] for t in train]
    test  = [t2i[t] for t in test]
    
    if to_torch:
        return (t(train), t(test)), (i2t, t2i) # Torch vectors (this takes a few seconds)

    return (train, test), (i2t, t2i)

def load_wp(fname='enwik8.gz', split=(90, 5, 5), to_torch=True, final=False):
    """
    Load the enwik8 dataset from the Hutter challenge as a list or vector of bytes.
    :param fname: Filename for the downloaded data.
    :param split: Percentages for the train/val/test split.
    :param to_torch: Whether to return torch tensors (True) or python lists (False)
    :param final: If False, returns train/val if True returns train/test with the validation
    data added to the training data.
    :return:
    """

    if not os.path.exists(fname):
        # If it doesn't exist, download it
        print('Downloading')
        wget.download(WP_DATA, out=fname)
        
    with gzip.open(fname, 'r') if fname.endswith('.gz') else open(fname, 'rb') as file:

        all = file.read()
        ctr = Counter(all)

        i2t = {token : cas(token) for token, freq in ctr.most_common()}
        t2i = {w : i for i, w in enumerate(i2t)}

        split = tuple(s/sum(split) for s in split)
        split = tuple(int(s * len(all)) for s in split)

        train, val, test = all[:split[0]], all[split[0]:split[0]+split[1]], all[split[0]+split[1]:]

        if final:
            train = train + val
            wh = test
        else:
            wh = val

        if to_torch:
            return (t(train), t(wh)), (i2t, t2i)

        return (train, wh), (i2t, t2i)


def load_xor(ntrain=25_000, ntest=25_000, seed=0):

    random.seed(seed)

    i2w = [PAD, START, END, UNK, 'true', 'false'] #
    w2i = {w : i for i, w in enumerate(i2w)}

    dataset, labels = [], []
    for _ in range(ntrain + ntest):
        sentence = [
            choice((i2w[4], i2w[5])),
            choice((i2w[4], i2w[5]))
        ]

        f1, f2 = (sentence[0] == i2w[4]), (sentence[1] == i2w[4]) # true: very/great false: not/terrible
        # -- these words are the only meaningful features
        label = 0 if f1 != f2 else 1

        dataset.append([w2i[word] for word in sentence])
        labels.append(label)

    return \
        (dataset[:ntrain], labels[:ntrain]), \
        (dataset[ntrain:], labels[ntrain:]), \
        (i2w, w2i), 2

def load_imdb_synth(ntrain=25_000, ntest=25_000, seed=0):
    """
    Synthetic IMDb dataset
    :param seed:
    :param voc:
    :return:
    """

    random.seed(seed)

    adjectives = ['classic', 'silent', 'modern', 'vintage', 'independent', 'foreign', 'animated', 'documentary',
    'epic', 'dramatic', 'romantic', 'comic', 'thrilling', 'mysterious', 'gritty', 'stylized', 'iconic', 'acclaimed',
    'popular', 'forgettable', 'unreleased', 'awardwinning', 'blockbuster', 'lowbudget', 'highbudget', 'experimental',
    'mainstream', 'cult', 'notable', 'original']
    nouns = ['movie', 'film', 'motion-picture', 'feature', 'featurette', 'picture', 'flick', 'cinema', 'screenplay',
    'blockbuster', 'talkie', 'silent', 'biopic', 'short', 'docudrama', 'documentary', 'animation', 'cartoon',
    'anime', 'telefilm', 'miniseries', 'drama', 'comedy', 'thriller', 'western', 'musical', 'noir']
    verbs = ['was', 'is', 'became', 'becomes', 'seemed', 'seems']

    i2w = [PAD, START, END, UNK, 'this', 'not', 'very', 'great','terrible'] + verbs + adjectives + nouns
    w2i = {w : i for i, w in enumerate(i2w)}

    dataset, labels = [], []
    for _ in range(ntrain + ntest):
        sentence = [
            i2w[4], # this
            choice(adjectives), # old
            choice(nouns), # movie
            choice(verbs), # was
            choice((i2w[5], i2w[6])),
            choice((i2w[7], i2w[8]))
        ]

        f1, f2 = (sentence[4] == i2w[6]), (sentence[5] == i2w[7]) # true: very/great false: not/terrible
        # -- these words are the only meaningful features
        label = 0 if f1 != f2 else 1

        dataset.append([w2i[word] for word in sentence])
        labels.append(label)

    return \
        (dataset[:ntrain], labels[:ntrain]), \
        (dataset[ntrain:], labels[ntrain:]), \
        (i2w, w2i), 2


def load_imdb(final=False, val=5000, seed=0, voc=None, char=False):

    cst = 'char' if char else 'word'

    imdb_url = IMDB_URL.format(cst)
    imdb_file = IMDB_FILE.format(cst)

    if not os.path.exists(imdb_file):
        wget.download(imdb_url)

    with gzip.open(imdb_file) as file:
        sequences, labels, i2w, w2i = pickle.load(file)

    if voc is not None and voc < len(i2w):
        nw_sequences = {}

        i2w = i2w[:voc]
        w2i = {w: i for i, w in enumerate(i2w)}

        mx, unk = voc, w2i['.unk']
        for key, seqs in sequences.items():
            nw_sequences[key] = []
            for seq in seqs:
                seq = [s if s < mx else unk for s in seq]
                nw_sequences[key].append(seq)

        sequences = nw_sequences

    if final:
        return (sequences['train'], labels['train']), (sequences['test'], labels['test']), (i2w, w2i), 2

    # Make a validation split
    random.seed(seed)

    x_train, y_train = [], []
    x_val, y_val = [], []

    val_ind = set( random.sample(range(len(sequences['train'])), k=val) )
    for i, (s, l) in enumerate(zip(sequences['train'], labels['train'])):
        if i in val_ind:
            x_val.append(s)
            y_val.append(l)
        else:
            x_train.append(s)
            y_train.append(l)

    return (x_train, y_train), \
           (x_val, y_val), \
           (i2w, w2i), 2

  cpu = _conversion_method_template(device=torch.device("cpu"))


In [None]:
# Load the IMDb dataset
(x_train, y_train), (x_val, y_val), (i2w, w2i), numcls = load_imdb(final=False)

In [7]:
[i2w[w] for w in x_train[:5]]


TypeError: list indices must be integers or slices, not list

In [None]:
x_train[:5], y_train[:5] 

([[14, 19, 9, 379, 22, 11, 50, 52, 53, 290],
  [13, 574, 25, 809, 14, 32, 63, 26, 2722, 2231, 312],
  [10721, 4, 10956, 129, 6, 124, 88114, 5, 6, 19, 93, 4118],
  [198, 351, 17697, 116, 31, 13, 80, 40, 1240, 8, 69, 272, 883, 1749],
  [60, 913, 366, 19, 118, 836, 44, 431, 902, 60, 286, 35, 34, 1834, 11]],
 [1, 1, 1, 1, 1])

### Question 1

To-Do:
1. Select a batch of the IMDB dataset.
2. Determine the longest sequence in the batch.
3. Pad all other sequences in the batch with the pad token index (w2i[".pad"]).
4. Convert the result to a tensor of type torch.long.

In [5]:
import torch

def prepare_batch(sequences, labels, batch_size, w2i):
    """
    Prepare batches with padding for transformer input.
    
    Args:
        sequences: List of sequences (each sequence is a list of token indices)
        labels: List of labels corresponding to sequences
        batch_size: Size of each batch
        w2i: Word to index dictionary (contains the padding token)
    
    Yields:
        Tuple of (padded_batch, labels_batch) where padded_batch is a tensor
    """
    # Get the padding token index
    pad_idx = w2i[PAD]
    
    # Process data in batches
    for i in range(0, len(sequences), batch_size):
        # Get batch slice
        batch_sequences = sequences[i:i + batch_size]
        batch_labels = labels[i:i + batch_size]
        
        # Find max length in this batch
        max_len = max(len(seq) for seq in batch_sequences)
        
        # Pad sequences to max length
        padded_batch = []
        for seq in batch_sequences:
            # Pad sequence to max_len
            padded_seq = seq + [pad_idx] * (max_len - len(seq))
            padded_batch.append(padded_seq)
        
        # Convert to tensors
        batch_tensor = torch.tensor(padded_batch, dtype=torch.long)
        labels_tensor = torch.tensor(batch_labels, dtype=torch.long)
        
        yield batch_tensor, labels_tensor


# Example usage: create a batch
batch_size = 32
batches = list(prepare_batch(x_train[:100], y_train[:100], batch_size, w2i))

# Show first batch
first_batch, first_labels = batches[0]
print(f"Batch shape: {first_batch.shape}")
print(f"Labels shape: {first_labels.shape}")
print(f"Padding token index: {w2i[PAD]}")
print(f"\nFirst sequence in batch (first 20 tokens):")
print(first_batch[0][:20])
print(f"\nWords: {[i2w[idx.item()] for idx in first_batch[0][:20]]}")

Batch shape: torch.Size([32, 30])
Labels shape: torch.Size([32])
Padding token index: 0

First sequence in batch (first 20 tokens):
tensor([ 14,  19,   9, 379,  22,  11,  50,  52,  53, 290,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0])

Words: ['this', 'movie', 'is', 'terrible', 'but', 'it', 'has', 'some', 'good', 'effects', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad', '.pad']


### Question 2

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class SimpleClassifier(nn.Module):
    """
    Simple classification model with embedding layer and global pooling.

    Architecture:
    1. Embedding layer: converts token indices to embedding vectors
    2. Global pooling: pools along the time dimension
    3. Linear layer: projects to number of classes
    """

    def __init__(self, vocab_size, emb_size, num_classes, pooling="max"):
        """
        Args:
            vocab_size: Number of tokens in vocabulary
            emb_size: Size of embedding vectors (300)
            num_classes: Number of output classes
            pooling: Type of pooling ('max', 'mean', or 'sum')
        """
        super(SimpleClassifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.linear = nn.Linear(emb_size, num_classes)
        self.pooling = pooling

    def forward(self, x):
        """
        Forward pass.

        Args:
            x: Input tensor of shape (batch, time) with dtype=torch.long

        Returns:
            Output tensor of shape (batch, num_classes) with dtype=torch.float
        """
        # Step 1: Embedding layer
        # Input: (batch, time) -> Output: (batch, time, emb)
        embedded = self.embedding(x)

        # Step 2: Global pooling along time dimension (dim=1)
        # Input: (batch, time, emb) -> Output: (batch, emb)
        if self.pooling == "max":
            pooled, _ = torch.max(embedded, dim=1)  # max pooling
        elif self.pooling == "mean":
            pooled = torch.mean(embedded, dim=1)  # mean pooling
        elif self.pooling == "sum":
            pooled = torch.sum(embedded, dim=1)  # sum pooling
        
        elif self.pooling == "select":
            # Example: select the first time step
            pooled = embedded[:, 0, :]  # select pooling
        else:
            raise ValueError(f"Unknown pooling type: {self.pooling}")

        # Step 3: Linear projection to number of classes
        # Input: (batch, emb) -> Output: (batch, num_classes)
        output = self.linear(pooled)

        # Note: We don't apply softmax here - it's included in cross_entropy loss
        return output


# Model hyperparameters
emb_size = 300
vocab_size = len(i2w)  # Get vocabulary size from i2w
num_classes = numcls  # Number of classes (2 for binary classification)

# Create model
model = SimpleClassifier(vocab_size, emb_size, num_classes, pooling="mean")

print(f"Model created:")
print(f"  Vocabulary size: {vocab_size}")
print(f"  Embedding size: {emb_size}")
print(f"  Number of classes: {num_classes}")
print(f"\nModel architecture:")
print(model)

# Test with a small batch
test_batch, test_labels = next(
    prepare_batch(x_train, y_train, batch_size=500, w2i=w2i)
)
print(f"\nTest batch shape: {test_batch.shape}")
output = model(test_batch)
print(f"Output shape: {output.shape}")
print(f"Output dtype: {output.dtype}")

# Calculate loss using cross_entropy (includes softmax)
loss = F.cross_entropy(output, test_labels)
print(f"\nLoss: {loss.item():.4f}")

# Calculate accuracy
predictions = torch.argmax(output, dim=1)
accuracy = (predictions == test_labels).float().mean()
print(f"Accuracy: {accuracy.item():.4f}")