In [None]:
import re, math, random, argparse, json, os, numpy as np, torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score

In [None]:
def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")
def save_ckpt(path, model, opt, epoch, best_metric):
    torch.save({"model":model.state_dict(),"opt":opt.state_dict(),"epoch":epoch,"best":best_metric}, path)
def load_ckpt(path, model, opt=None, map_location="cpu"):
    ckpt=torch.load(path, map_location=map_location); model.load_state_dict(ckpt["model"])
    if opt: opt.load_state_dict(ckpt["opt"]); return ckpt.get("epoch",0), ckpt.get("best",None)
    return ckpt.get("epoch",0), ckpt.get("best",None)

In [None]:
_tok_re = re.compile(r"[A-Za-z0-9']+")
def tokenize(text:str):
    return _tok_re.findall(text.lower())
def build_vocab(texts, min_freq=2, max_size=50000):
    cnt=Counter()
    for t in texts: cnt.update(tokenize(t))
    specials=["<pad>","<unk>"]; vocab=specials[:]
    for w,f in cnt.most_common():
        if f<min_freq: break
        if len(vocab)>=max_size: break
        vocab.append(w)
    stoi={w:i for i,w in enumerate(vocab)}; itos={i:w for w,i in stoi.items()}
    return vocab, stoi, itos
def numericalize(tokens, stoi, unk_idx=1):
    return [stoi.get(tok, unk_idx) for tok in tokens]

In [None]:
class IMDBDataset(Dataset):
    def __init__(self, texts, labels, stoi, max_len=256):
        self.texts=texts; self.labels=labels; self.stoi=stoi; self.max_len=max_len
    def __len__(self): return len(self.texts)
    def __getitem__(self, idx):
        toks=tokenize(self.texts[idx])
        ids=numericalize(toks, self.stoi)
        if len(ids)>self.max_len: ids=ids[:self.max_len]
        return torch.tensor(ids, dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long)
def collate_pad(batch, pad_idx=0):
    seqs, labels = zip(*batch)
    lengths = torch.tensor([len(s) for s in seqs], dtype=torch.long)
    max_len = max(lengths).item() if lengths.numel()>0 else 1
    padded = torch.full((len(seqs), max_len), pad_idx, dtype=torch.long)
    for i,s in enumerate(seqs): padded[i,:len(s)] = s
    return padded, lengths, torch.stack(labels)

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim=200, hidden=256, num_layers=1, bidir=True, dropout=0.2, pad_idx=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embed_dim, hidden, num_layers=num_layers, batch_first=True, bidirectional=bidir, dropout=dropout if num_layers>1 else 0.0)
        out_dim = hidden * (2 if bidir else 1)
        self.fc = nn.Sequential(nn.Dropout(dropout), nn.Linear(out_dim, 2))
    def forward(self, x, lengths):
        emb = self.embedding(x)
        packed = pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        out_packed, (h_n, c_n) = self.lstm(packed)
        # Use last hidden states (concatenate directions)
        if self.lstm.bidirectional:
            h = torch.cat([h_n[-2], h_n[-1]], dim=1)
        else:
            h = h_n[-1]
        logits = self.fc(h)
        return logits

In [None]:
def run_epoch(model, loader, opt, device, train=True, amp=True, clip=1.0):
    crit = nn.CrossEntropyLoss()
    scaler = torch.cuda.amp.GradScaler(enabled=(amp and train and device.type=="cuda"))
    total_loss=0.0; all_y=[]; all_p=[]
    if train: model.train()
    else: model.eval()
    for xb, lengths, yb in loader:
        xb, lengths, yb = xb.to(device), lengths.to(device), yb.to(device)
        if train: opt.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=(amp and device.type=="cuda")), torch.set_grad_enabled(train):
            logits = model(xb, lengths)
            loss = crit(logits, yb)
        if train:
            scaler.scale(loss).backward()
            if clip: nn.utils.clip_grad_norm_(model.parameters(), clip)
            scaler.step(opt); scaler.update()
        total_loss += loss.item()*xb.size(0)
        preds = logits.argmax(1).detach().cpu().numpy()
        all_p.extend(list(preds)); all_y.extend(list(yb.detach().cpu().numpy()))
    avg_loss = total_loss/len(loader.dataset)
    acc = accuracy_score(all_y, all_p)
    f1 = f1_score(all_y, all_p)
    return avg_loss, acc, f1

In [None]:
def main(args):
    set_seed(args.seed); device=get_device()
    # TODO: Load IMDB data into lists of texts and labels below.
    # texts_train, labels_train = [...]
    # texts_val, labels_val = [...]
    # For interview: you can simulate small samples or describe downloading via HuggingFace datasets.
    # Example (pseudo):
    # from datasets import load_dataset
    # ds = load_dataset("imdb")
    # texts_train = [x["text"] for x in ds["train"]]; labels_train = [x["label"] for x in ds["train"]]
    # texts_val = [x["text"] for x in ds["test"]]; labels_val = [x["label"] for x in ds["test"]]

    raise_if_placeholder = False
    if raise_if_placeholder:
        raise RuntimeError("Replace the TODO with real IMDB loading code.")

    # For offline demo, hereâ€™s a tiny mock (replace in real run)
    texts_train = ["i love this movie it is great","terrible film waste of time","fantastic acting and plot","bad script and worse ending"]*256
    labels_train = [1,0,1,0]*256
    texts_val = ["great movie","worst movie ever","it was okay not great","absolutely loved it"]*64
    labels_val = [1,0,0,1]*64

    vocab, stoi, itos = build_vocab(texts_train, min_freq=args.min_freq, max_size=args.vocab_size)
    pad_idx=0; unk_idx=1
    train_ds = IMDBDataset(texts_train, labels_train, stoi, max_len=args.max_len)
    val_ds = IMDBDataset(texts_val, labels_val, stoi, max_len=args.max_len)
    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True, num_workers=4, pin_memory=True, collate_fn=lambda b: collate_pad(b,pad_idx))
    val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False, num_workers=4, pin_memory=True, collate_fn=lambda b: collate_pad(b,pad_idx))

    model = LSTMClassifier(vocab_size=len(vocab), embed_dim=args.embed_dim, hidden=args.hidden, num_layers=args.num_layers, bidir=not args.unidirectional, dropout=args.dropout, pad_idx=pad_idx).to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=args.lr, weight_decay=args.weight_decay)

    best_f1=-1.0; bad=0
    for epoch in range(1, args.epochs+1):
        tr_loss, tr_acc, tr_f1 = run_epoch(model, train_loader, opt, device, train=True, amp=True, clip=1.0)
        va_loss, va_acc, va_f1 = run_epoch(model, val_loader, opt, device, train=False, amp=False)
        print(f"epoch={epoch} train_loss={tr_loss:.4f} acc={tr_acc:.4f} f1={tr_f1:.4f} val_loss={va_loss:.4f} acc={va_acc:.4f} f1={va_f1:.4f}")
        if va_f1>best_f1:
            best_f1=va_f1; bad=0; save_ckpt("best_imdb_lstm.pt", model, opt, epoch, best_f1)
        else:
            bad+=1
            if bad>=args.patience:
                print("early stopping"); break

    # Load best and report
    load_ckpt("best_imdb_lstm.pt", model, None, map_location=device)
    va_loss, va_acc, va_f1 = run_epoch(model, val_loader, opt, device, train=False, amp=False)
    print(f"best_eval val_loss={va_loss:.4f} val_acc={va_acc:.4f} val_f1={va_f1:.4f}")