# FNLP: Lab Session 6
# Pointwise Mutual Information - Finding Collocations

## Aim

The aims of this lab session are to
  1. familiarize you with pointwise mutual information (PMI);
  2. show how to apply PMI for the task of finding word collocations;
  3. identify shortcomings of this approach.

By the end of this lab session, you should be able to:
* Compute the PMI.
* Apply PMI to find word collocations in a corpus.

# Introduction

In this lab we consider the task of identifying word collocations in a corpus to demonstrate the use of Pointwise
Mutual Information (PMI).

$PMI(x_i, y_j) = log\frac{P(X=x_i,Y=y_j)}{P(X=x_i)P(Y =y_j)}$

In [None]:
import operator

import nltk

from nltk.probability import FreqDist, MLEProbDist

from nltk.corpus import gutenberg
from nltk.corpus import stopwords
from math import log
from pprint import pprint

class FasterMLEProbDist(MLEProbDist):
    '''Speed up prob lookup for large sample sizes'''
    def __init__(self,freqdist):
        '''
        :param freqdist: samples and their counts
        :type freqdist: nltk.probability.FreqDist
        :return: A (slightly) faster probability distribution
        :rtype: FasterMLEProbDist
        '''
        self._N = freqdist.N()
        if self._N == 0:
            self._empty = True
        else:
            self._empty = False
            self._pq=float(self._N)
            MLEProbDist.__init__(self, freqdist)

    def prob(self, sample):
        '''Faster version of MLEProbDist.prob, using cached quotient for division
        
        :param sample: The sample to look up
        :type sample: [anything]
        :return: MLE estimate of probability of sample given distribution
        :rtype: float
        '''
        if self._empty:
            return 0
        else:
            return float(self._freqdist[sample]) / self._pq

STOPWORDS = stopwords.words('english')

def Filter1(word):
    '''Test for all-alpha string
    
    :param word: word to check
    :type word: str
    :return: True iff no non-alpha chars in word
    :rtype: bool
    '''
    return word.isalpha()

def Filter2(word):
    '''Test for all-alpha string not in stopwords list
    
    :param word: word to check, should be all lowercase
    :type word: str
    :return: True iff no non-alpha chars in word and not an nltk English stopword
    :rtype: bool
    '''
    return (word.isalpha() and word not in STOPWORDS)

The data set consists of bigrams extracted from the Herman Melville's novel _Moby Dick_.

In [None]:
sentences = gutenberg.sents('melville-moby_dick.txt')
sentences

# Estimating the Joint and Marginal Probability Distributions

In order to compute the PMI we need the joint probability $P(X = x, Y = y)$ and the marginal probabilities $P (X = x)$ and $P (Y = y)$. In our case $P (X = x)$ and $P (Y = y)$ will be the unigram probabilities of the two
words that are considered to form a collocation, and $P (X = x, Y = y)$ will be the bigram probability.

### Exercise 1:

In this exercise we will compute the joint and marginal probability distributions for the word bigrams. You will
have to fill in two functions to achieve this:
* The function `BuildData` receives as parameters a list of sentences and the name of a Filter function. Two Filter functions are already defined, one eliminates just non-alphanumeric tokens, the other eliminates stop-words as well.
* The helper function ex1 should return a list of bigrams and a list of unigrams extracted from the sentences.

Specifically:

1. Build the two data structures in the BuildData function. Lowercase the words and eliminate unigrams and bigrams that do not pass the filter.
  Remember, help is your friend:
    `help(nltk.bigrams)`
2. The function ex1 receives as parameters a list of bigrams and a list of unigrams and returns the corresponding probability distributions. Construct a `FreqDist` for each of the two lists. Transform each `FreqDist` into a probability distribution using the `FasterMLEProbDist` estimator.
  Again, help is your friend:
    `help(FasterMLEProbDist)`

In [None]:
def BuildData(sentences, filter):
    '''Tabulate token unigrams and bigrams lowercased from sentences, filtered
    
    :param sentences: corpus of sentences
    :type sentences: list(list(str))
    :param filter: filter to test tokens for inclusion
    :type filter: function(str)==>bool
    
    :return: unigrams and bigrams, lowercased, filtered, from sentences
    :rtype: tuple(iterable(tuple(str,str)),iterable(str))'''
    # TODO: build the lists of unigrams and bigrams from the sentences
    unigrams = []
    bigrams = []
    return bigrams, unigrams

In [None]:
# TODO: using the data build the probability distribution over bigrams and unigrams using FasterMLEProbDist
def my_prob_distributions(bigrams, unigrams):
    '''Build probability distributions from bigram and unigram sequences
    
    :param bigrams: sequence of pairs of tokens
    :type bigrams: iterable(tuple(str,str))
    :param unigrams: sequence of tokens
    :type unigrams: iterable(str)
    
    :return: MLE probability distributions for the two inputs
    :rtype tuple(FasterMLEProbDist,FasterMLEProbDist)'''
    # TODO build the frequency distribution over bigrams and unigrams
    bigramFreqDist = None
    unigramFreqDist = None

    # TODO build the probability distribuition from the above frequency distributions using the FasterMLEProbDist estimator
    bigramProbDist = None
    unigramProbist = None

    return bigramProbDist, unigramProbist

Now try the test code. 

In [None]:
def test_prob_distributions():
    '''Test ex1'''
    bigrams, unigrams = BuildData(sentences, Filter1)

    bigramProbDist1, unigramProbist1 = my_prob_distributions(bigrams, unigrams)
    print("Unigram Probability Distribution (type): %s"%type(unigramProbist1))
    print("Bigram Probability Distribution (type): %s"%type(bigramProbDist1))
    
    MLESorted = bigramProbDist1.freqdist().most_common(30)
    print("Bigrams sorted by MLE (type): %s"%type(MLESorted))
    print()
    print("Using filter 1:")
    pprint(MLESorted)

    bigrams, unigrams = BuildData(sentences, Filter2)
    bigramProbDist, unigramProbist = my_prob_distributions(bigrams, unigrams)
    MLESorted = bigramProbDist.freqdist().most_common(30)
    print("Using filter 2:")
    pprint(MLESorted)

    return bigramProbDist1, unigramProbist1

# TEST EXERCISE 1 - return values will be used for exercise 2
bigramProbDist, unigramProbDist = test_prob_distributions()

Compare the top 30 most frequent bigrams arising from the two different filters.

Most of the former are made up of closed-class words. If we eliminate stopwords some interesting bigrams over content words show up.

# Computing the Pointwise Mutual Information

In the previous section we estimated the joint and marginal probability distributions from the data. In this
section we use these distributions to compute the PMI for a given sample. In order to avoid multiplication of
small floating point numbers (probabilities), we can rewrite the formula for PMI as:
$P M I(x_i , y_j ) = log P (x_i , y_j ) − (log P (x_i ) + log P (y_j ))$

### Exercise 2

The template of the function that you have to implement takes two parameters: the bigram and unigram
probability distributions.
1. Create a list of pairs of samples in the distribution and their PMI, using `FasterMLEProbDist.logprob` to get values for unigrams and bigrams from the two FasterMLEProbDist instances.
2. Make a dictionary from that list
3. Sort the list of pairs in descending order or PMI.

In [None]:
def ComputePMI(bpd, upd):
    '''Compute PMI for bigrams from a corpus, sorted in descending order
    
    :param bpd: Bigram probability distribution
    :type bpd: nltk.probability.MLEProbDist
    :param upd: Unigram probability distribution
    :type upd: nltk.probability.MLEProbDist
    :return: samples and their PMIs
    :rtype: tuple(dict,list(tuple(str,float)))
    '''
    
    PMIs = [] # TODO: compute list of (sample,PMI) pair for every bigram in bpd

    PMIsorted = sorted(PMIs,key=operator.itemgetter(1), reverse=True)
    return dict(PMIsorted), PMIsorted

Now run the test code.

In [None]:
def test_compute_pmi(bpd, upd):
    '''Test ex2 with values from test1'''
  
    print("bpd type: %s"%type(bpd))
    print("upd type: %s"%type(upd))

    PMIs, PMIsorted = ComputePMI(bpd, upd)
    print("PMIs type: %s"%type(PMIs))
    print("PMIsorted type: %s"%type(PMIsorted))

    print("Some illustrative pairs:\nsperm whale %0.2f" % PMIs[("sperm","whale")])
    print("of the %0.2f" % PMIs[("of","the")])
    print("old man %0.2f" % PMIs[("old","man")])
    print("one side %0.2f" % PMIs[("one","side")])
    
    bcount = bpd.freqdist()
    print("\nTop 10 by PMI")
    print("%s\t%s\t%s"%('PMI','n','pair'))
    for pair in PMIsorted[:10]:
        print("%0.2f\t%d\t%s" % (pair[1], bcount[pair[0]], pair[0]))

    return PMIsorted

# TEST EXERCISE 2 - return values will be used for exercise 3
PMIsorted = test_compute_pmi(bigramProbDist, unigramProbDist)

Look at the PMI and bigrams above. We can see that the PMI for _sperm whale_ is 5 binary orders of magnitude greater than the PMI of ”of the”. Can you think of some reasons why the PMI for _old man_ is not as low as we would expect?

What can you observe by looking at the top 10 bigrams according to the PMI? How do low counts affect the PMI?

### Exercise 3

In the previous exercise we found that the PMI is very sensitive to data sparsity. Bigrams composed of low
frequency words are ranked higher than bigrams with high frequency words according to PMI. One way to fix
this issue is by putting a threshold for the frequency of words.
Edit the `filtered_pmi` function to do this:
1. Filter the full list of bigrams and their corresponding PMI to include only bigrams with frequency greater than 30.

In [None]:
def filtered_pmi(samplesAndPMIs, bpd):
    '''Filter a list of sample and PMIs to include only those samples with frequency over 30
    
    :param samplesAndPMIs: samples and their PMIs
    :type samplesAndPMIs: list(tuple(str,float))
    :param bpd: probability distribution for the samples
    :type bpd: nltk.probability.MLEProbDist
    :return: filtered list of samples, PMIs and the sample frequency
    :rtype: list(tuple(str,float,int))'''
    
    bcount = None # help(MLEProbDist) is your friend
    return [(t,pmi,bcount[t]) for t,pmi in samplesAndPMIs if bcount[t]>30]

In [None]:
def test_filtered_pmi(PMIsorted, bpd):
    '''Test ex3 with values from test1 and test2'''

    high_freq = filtered_pmi(PMIsorted, bpd)

    print("\nTop 20 by PMI where pair count>30")
    print("%s\t%s\t%s"%('PMI','n','pair'))
    for t,pmi,n in high_freq[:20]:
        print("%0.2f\t%d\t%s" % (pmi, n, t))

    print("\nBottom 20 by PMI where pair count>30")
    
    for t,pmi,n in high_freq[-20:]:
        print("%0.2f\t%d\t%s" % (pmi, n, t))

# TEST EXERCISE 3
test_filtered_pmi(PMIsorted, bigramProbDist)

What does a negative score say about a bigram?

**Optionally** you can eliminate stop-words from the corpus by applying the second Filter function, then
recompute the PMI and investigate the top and last bigrams.