# NLP Assignment 1 — Enhancing Figurative Language Recognition Using POS Tagging

Author: Tohuto Sema (Roll: 21CS10072)

This notebook contains step-by-step code for:
- Task 1: Implementing an HMM POS tagger (Viterbi) from scratch using NLTK Treebank.
- Task 2: Baseline figurative-language classifier (text-only features).
- Task 3: Improved classifier that augments text features with POS tag sequences from Task 1.
- Task 4: Reporting and an interactive prediction utility.

Run cells sequentially. Cells that install packages can be skipped if you already installed dependencies.

## 1) Install dependencies

Run this cell once in the notebook environment. If you are running in a conda/venv, you can skip and install from terminal.


In [1]:
# Install required packages (run once)
!pip install --upgrade pip
!pip install --quiet nltk scikit-learn datasets huggingface_hub
# Note: 'datasets' will be used to download ColumbiaNLP/V-FLUTE if you have access.


## 2) Imports and NLTK data download
Download the Treebank corpus and universal tagset (this is required for Task 1).

In [2]:
import os
print('cwd =', os.getcwd())
from collections import Counter
import math
import random
from dataclasses import dataclass
from typing import List, Tuple, Dict, Iterable, Optional

import nltk
nltk.download('treebank')
nltk.download('universal_tagset')
from nltk.corpus import treebank
print('Treebank sentences:', len(treebank.tagged_sents(tagset='universal')))


cwd = c:\Users\Benjamin Doley\Documents\NLP_Assignment_1_21CS10072


[nltk_data] Downloading package treebank to C:\Users\Benjamin
[nltk_data]     Doley\AppData\Roaming\nltk_data...
[nltk_data]   Package treebank is already up-to-date!
[nltk_data] Downloading package universal_tagset to C:\Users\Benjamin
[nltk_data]     Doley\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


Treebank sentences: 3914


## 3) HMM + Viterbi implementation (Task 1)
This cell defines the HMM dataclass, training and Viterbi decoding functions.


In [5]:
START = "<s>"
STOP = "</s>"
UNK = "<unk>"
@dataclass
class HMMParams:
    tag_to_id: Dict[str, int]
    id_to_tag: List[str]
    word_to_id: Dict[str, int]
    id_to_word: List[str]
    log_trans: List[List[float]]
    log_emit: List[Dict[int, float]]
    log_start: List[float]
    log_stop: List[float]

def add1_log_prob(count: int, total: int, vocab: int) -> float:
    return math.log(count + 1) - math.log(total + vocab)

def build_vocab(tagged_sents: List[List[Tuple[str,str]]], min_freq:int=1):
    wc = Counter(w.lower() for sent in tagged_sents for (w,_) in sent)
    words = [w for w,c in wc.items() if c >= min_freq]
    word_to_id = {UNK:0, **{w:i+1 for i,w in enumerate(sorted(words))}}
    id_to_word = [None]*len(word_to_id)
    for w,i in word_to_id.items():
        id_to_word[i] = w
    tags = sorted({t for sent in tagged_sents for (_,t) in sent})
    tag_to_id = {t:i for i,t in enumerate(tags)}
    id_to_tag = tags[:]
    return word_to_id, id_to_word, tag_to_id, id_to_tag

def train_hmm(tagged_train: List[List[Tuple[str,str]]], tagged_dev: Optional[List[List[Tuple[str,str]]]]=None) -> HMMParams:
    word_to_id, id_to_word, tag_to_id, id_to_tag = build_vocab(tagged_train)
    T = len(tag_to_id)
    trans = [[0 for _ in range(T)] for _ in range(T)]
    emit: List[Counter] = [Counter() for _ in range(T)]
    start = [0 for _ in range(T)]
    stop = [0 for _ in range(T)]
    for sent in tagged_train:
        if not sent: continue
        t0 = tag_to_id[sent[0][1]]
        start[t0] += 1
        for i, (w,t) in enumerate(sent):
            tid = tag_to_id[t]
            wid = word_to_id.get(w.lower(), word_to_id[UNK])
            emit[tid][wid] += 1
            if i+1 < len(sent):
                tid_next = tag_to_id[sent[i+1][1]]
                trans[tid][tid_next] += 1
            else:
                stop[tid] += 1
    total_start = sum(start)
    log_start = [add1_log_prob(start[i], total_start, T) for i in range(T)]
    total_stop = sum(stop)
    log_stop = [add1_log_prob(stop[i], total_stop, T) for i in range(T)]
    log_trans = []
    for i in range(T):
        row_total = sum(trans[i])
        log_row = [add1_log_prob(trans[i][j], row_total, T) for j in range(T)]
        log_trans.append(log_row)
    log_emit: List[Dict[int,float]] = []
    for tid in range(T):
        total = sum(emit[tid].values())
        V = len(word_to_id)
        tag_emit = {}
        for wid, c in emit[tid].items():
            tag_emit[wid] = add1_log_prob(c, total, V)
        tag_emit[word_to_id[UNK]] = add1_log_prob(0, total, V)
        log_emit.append(tag_emit)
    return HMMParams(tag_to_id, id_to_tag, word_to_id, id_to_word, log_trans, log_emit, log_start, log_stop)

def viterbi_decode(words: List[str], params: HMMParams) -> List[str]:
    T = len(params.id_to_tag)
    n = len(words)
    dp = [[-1e18]*T for _ in range(n)]
    bp = [[-1]*T for _ in range(n)]
    def emit_log(tid:int, wid:int) -> float:
        d = params.log_emit[tid]
        return d.get(wid, d[params.word_to_id[UNK]])
    wid0 = params.word_to_id.get(words[0].lower(), params.word_to_id[UNK])
    for t in range(T):
        dp[0][t] = params.log_start[t] + emit_log(t, wid0)
        bp[0][t] = -1
    for i in range(1, n):
        wid = params.word_to_id.get(words[i].lower(), params.word_to_id[UNK])
        for t in range(T):
            best = -1e18
            arg = -1
            e = emit_log(t, wid)
            for tprev in range(T):
                score = dp[i-1][tprev] + params.log_trans[tprev][t] + e
                if score > best:
                    best, arg = score, tprev
            dp[i][t] = best
            bp[i][t] = arg
    best_final = -1e18
    last_tag = -1
    for t in range(T):
        score = dp[n-1][t] + params.log_stop[t]
        if score > best_final:
            best_final, last_tag = score, t
    tags_idx = [0]*n
    tags_idx[-1] = last_tag
    for i in range(n-2, -1, -1):
        tags_idx[i] = bp[i+1][tags_idx[i+1]]
    return [params.id_to_tag[t] for t in tags_idx]

def tag_sentence(sentence: List[str], params: HMMParams) -> List[Tuple[str,str]]:
    tags = viterbi_decode(sentence, params)
    return list(zip(sentence, tags))

def accuracy(tagged_gold: List[List[Tuple[str,str]]], params: HMMParams) -> float:
    correct = total = 0
    for sent in tagged_gold:
        words = [w for w,_ in sent]
        gold = [t for _,t in sent]
        pred = viterbi_decode(words, params)
        for g,p in zip(gold, pred):
            total += 1
            if g == p: correct += 1
    return correct / max(1,total)


## 4) Train the HMM on NLTK Treebank and evaluate
This cell loads Treebank, splits train/dev/test, trains HMM and prints accuracies.

In [6]:
# Load Treebank tagged sentences (universal tagset)
tagged_sents = treebank.tagged_sents(tagset='universal')
n = len(tagged_sents)
train_sents = tagged_sents[:int(0.8*n)]
dev_sents = tagged_sents[int(0.8*n):int(0.9*n)]
test_sents = tagged_sents[int(0.9*n):]
print('Train sents:', len(train_sents), 'Dev sents:', len(dev_sents), 'Test sents:', len(test_sents))
print('Training HMM... (this may take a few seconds)')
hmm = train_hmm(train_sents)
print('Evaluating...')
dev_acc = accuracy(dev_sents, hmm)
test_acc = accuracy(test_sents, hmm)
print(f'Dev Accuracy: {dev_acc:.4f}')
print(f'Test Accuracy: {test_acc:.4f}')


Train sents: 3131 Dev sents: 391 Test sents: 392
Training HMM... (this may take a few seconds)
Evaluating...
Dev Accuracy: 0.8912
Test Accuracy: 0.8834


## 5) Baseline classifier (Task 2) and helper functions
Defines dataset loader (HF V-FLUTE) and classifier training function.

In [7]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline

try:
    from datasets import load_dataset
except Exception:
    load_dataset = None

def load_flute(split: str = 'train', use_hf: bool = True, local_csv: Optional[str] = None):
    if use_hf and load_dataset is not None:
        ds = load_dataset('ColumbiaNLP/V-FLUTE', split=split)
        texts = [(rec['claim'] or '') + ' ' + (rec['explanation'] or '') for rec in ds]
        labels = [rec['phenomenon'] for rec in ds]
        label_names = sorted(set(labels))
        name_to_id = {n:i for i,n in enumerate(label_names)}
        y = [name_to_id[l] for l in labels]
        return texts, y, label_names
    else:
        if local_csv is None:
            raise RuntimeError('Provide local_csv path if not using HF datasets.')
        import csv
        texts, labels = [], []
        with open(local_csv, newline='', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for r in reader:
                texts.append((r.get('claim','') + ' ' + r.get('explanation','')).strip())
                labels.append(r['phenomenon'])
        label_names = sorted(set(labels))
        name_to_id = {n:i for i,n in enumerate(label_names)}
        y = [name_to_id[l] for l in labels]
        return texts, y, label_names

def train_baseline_classifier(texts: List[str], y: List[int], cls: str = 'svm'):
    if cls == 'svm':
        model = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=50000, ngram_range=(1,2))),
            ('clf', LinearSVC())
        ])
    else:
        model = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=50000, ngram_range=(1,2))),
            ('clf', MultinomialNB())
        ])
    X_train, X_test, y_train, y_test = train_test_split(texts, y, test_size=0.2, random_state=42, stratify=y)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    return model, (y_test, y_pred)


  from .autonotebook import tqdm as notebook_tqdm


## 6) Improved classifier (Task 3) that uses POS sequences
This defines helper to produce POS sequences and trains the combined pipeline.

In [9]:
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import FeatureUnion

def gen_pos_sequence(text: str, hmm: HMMParams) -> str:
    words = [w for w in text.split() if w.strip()]
    if not words:
        return ''
    tags = viterbi_decode(words, hmm)
    return ' '.join(tags)

def train_improved_classifier(texts: List[str], y: List[int], hmm: HMMParams, cls: str = 'svm'):
    def pos_seq_transform(batch: Iterable[str]):
        return [gen_pos_sequence(t, hmm) for t in batch]

    text_ft = ('text_ft', Pipeline([
        ('id', FunctionTransformer(lambda x: x, validate=False)),
        ('tfidf', TfidfVectorizer(max_features=50000, ngram_range=(1,2)))
    ]))
    pos_ft = ('pos_ft', Pipeline([
        ('pos', FunctionTransformer(pos_seq_transform, validate=False)),
        ('tfidf', TfidfVectorizer(ngram_range=(1,3)))
    ]))
    features = FeatureUnion([text_ft, pos_ft])
    clf = LinearSVC() if cls == 'svm' else MultinomialNB()
    model = Pipeline([
        ('feats', features),
        ('clf', clf)
    ])
    X_train, X_test, y_train, y_test = train_test_split(texts, y, test_size=0.2, random_state=42, stratify=y)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    return model, (y_test, y_pred)


## 7) Train classifiers (if FLUTE available)
This cell attempts to load FLUTE from Hugging Face. If the dataset is gated, you'll need to accept the dataset page on HF and login (see notes below).


In [10]:
texts = y = label_names = None
try:
    texts, y, label_names = load_flute(split='train', use_hf=True)
    print('Loaded FLUTE examples:', len(texts), 'labels:', len(label_names))
except Exception as e:
    print('[WARN] Could not load FLUTE from HF:', e)
    print('If you have the dataset CSVs locally, call load_flute(use_hf=False, local_csv="path.csv")')

if texts:
    base_model, (y_true_base, y_pred_base) = train_baseline_classifier(texts, y, cls='svm')
    print('\n=== Baseline (No POS) ===')
    print(classification_report(y_true_base, y_pred_base, target_names=label_names))
    print('Confusion Matrix:\n', confusion_matrix(y_true_base, y_pred_base))
    if hmm is not None:
        pos_model, (y_true_pos, y_pred_pos) = train_improved_classifier(texts, y, hmm, cls='svm')
        print('\n=== Improved (With POS Features) ===')
        print(classification_report(y_true_pos, y_pred_pos, target_names=label_names))
        print('Confusion Matrix:\n', confusion_matrix(y_true_pos, y_pred_pos))
else:
    print('[INFO] FLUTE dataset not available. Skipping classifiers.')


Loaded FLUTE examples: 4578 labels: 5

=== Baseline (No POS) ===
              precision    recall  f1-score   support

       humor       0.99      1.00      1.00       404
       idiom       1.00      1.00      1.00        34
    metaphor       1.00      0.98      0.99       229
     sarcasm       0.99      0.99      0.99       166
      simile       1.00      1.00      1.00        83

    accuracy                           0.99       916
   macro avg       1.00      1.00      1.00       916
weighted avg       0.99      0.99      0.99       916

Confusion Matrix:
 [[404   0   0   0   0]
 [  0  34   0   0   0]
 [  3   0 225   1   0]
 [  1   0   0 165   0]
 [  0   0   0   0  83]]

=== Improved (With POS Features) ===
              precision    recall  f1-score   support

       humor       0.99      1.00      0.99       404
       idiom       1.00      1.00      1.00        34
    metaphor       1.00      0.98      0.99       229
     sarcasm       0.99      0.99      0.99       166
  

## 8) Interactive prediction helper
Call `predict_sentence(sentence, use_pos=False)` to get a prediction. This cell defines the helper.

In [12]:
def predict_sentence(sentence: str, use_pos: bool = False):
    if texts is None:
        print('[WARN] No FLUTE dataset/model available. Train or load models first.')
        return None
    model = pos_model if use_pos and 'pos_model' in globals() else base_model
    pred_id = model.predict([sentence])[0]
    pred_label = label_names[pred_id]
    print('Predicted class:', pred_label)
    return pred_label


## Notes & Hugging Face access
- The ColumbiaNLP/V-FLUTE dataset on Hugging Face may be gated. Visit the dataset page and click **Accept** to gain access.
- To authenticate in a terminal, run `huggingface-cli login` and provide your token, or configure `HF_TOKEN` as shown on the Hugging Face docs.
- If you cannot access HF, export the dataset to CSVs (train/validation/test) and provide the `local_csv` path to `load_flute(..., use_hf=False, local_csv=...)`.


### How to use this notebook
1. Run the cells in order.
2. If FLUTE loading fails, provide a local CSV and call `load_flute(use_hf=False, local_csv='data/flute_train.csv')`.
3. Use `predict_sentence('your text', use_pos=True)` to test the POS-improved model (if trained).
