Text Analytics I HWS 22/23

# Home Assignment 2 (25 pts)



Submit your solution via Ilias until 23.59pm on Tuesday, October 25th. Late submissions are accepted until 10:15am on the following day (start of the exercise), with 1/4 of the total possible points deducted from the score.

Submit your solutions in teams of 3-4 students. Unless explicitly agreed otherwise in advance, **submissions from teams with more or less members will NOT be graded**.
List all members of the team with their student ID and full name in the cell below, and submit only one notebook per team. Only submit a notebook, do not submit the dataset(s) you used or image files that you have created - these have to be created from your notebook. Also, do NOT compress/zip your submission!

You may use the code from the exercises and basic functionalities that are explained in the official documentation of Python packages without citing, __all other sources must be cited__. In case of plagiarism (copying solutions from other teams or from the internet) ALL team members will be expelled from the course without warning.

#### General guidelines:
* Make sure that your code is executable, any task for which the code does not directly run on our machine will be graded with 0 points.
* Use only packages that are automatically installed along with Anaconda, plus some additional packages that have been introduced in the context of this class.
* Ensure that the notebook does not rely on the current notebook or system state!
  * Use `Kernel --> Restart & Run All` to see if you are using any definitions, variables etc. that 
    are not in scope anymore.
  * Do not rename any of the datasets you use, and load it from the same directory that your ipynb-notebook is located in, i.e., your working directory.
* Make sure you clean up your code before submission, e.g., properly align your code, and delete every line of code that you do not need anymore, even if you may have experimented with it. Minimize usage of global variables. Do not reuse variable names multiple times!
* Ensure your code/notebook terminates in reasonable time.
* Feel free to use comments in the code. While we do not require them to get full marks, they may help us in case your code has minor errors.
* For questions that require a textual answer, please do not write the answer as a comment in a code cell, but in a Markdown cell below the code. Always remember to provide sufficient justification for all answers.
* You may create as many additional cells as you want, just make sure that the solutions to the individual tasks can be found near the corresponding assignment.
* If you have any general question regarding the understanding of some task, do not hesitate to post in the student forum in Ilias, so we can clear up such questions for all students in the course.

In [1]:
# credentials of all team members
team_members = [
    {
        'first_name': 'Pavani',
        'last_name': 'Rajula',
        'student_id': 1870208
    },
    {
        'first_name': 'Yves',
        'last_name': 'Staudenmaier',
        'student_id': 1754294
    },
    {
        'first_name': 'Zeynep',
        'last_name': 'Eroglu',
        'student_id': 1834472
    },
    {
        'first_name': 'Priyanka',
        'last_name': 'Roy',
        'student_id': 1933097
    }
]

In [2]:
from typing import List, Union, Dict, Set, Tuple, Sequence

### Task 1: WordNet word similarity (9 points)

In this task, we want to implement the similarity between two words in WordNet (https://www.nltk.org/api/nltk.corpus.reader.wordnet.html) using the NLTK package. The word similarity between two words is given by
$$
\frac{1}{1+d}
$$
where $d$ is the distance of the shortest path in the hypernym/hyponym hierarchy tree in WordNet between any pair of synsets that are associated with the two input words.

From NLTK you should __only__ use the `synsets`, `hypernyms` and `instance_hpyernyms` functions.

The following subtasks build on each other, i.e. the functions of the preceding subtasks can be used for the current subtask.

_Note: the distance of a synset to itself is 0, the distance to a direct hypernym is 1, ..._

In [3]:
from nltk.corpus import wordnet as wn
from nltk.corpus.reader.wordnet import Synset
import nltk
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/pavanirajula/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/pavanirajula/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

__a)__ Write a function ``shortest_paths_to`` that takes a synset as input and computes the shortest paths to all nodes on the way to the root in the hypernym/hyponym hierarchy tree of WordNet. The function should return a dictionary that matches all visited hypernyms on the way(s) to the root to the distance of the shortest path from the input synset. Consider that a synset might have multiple paths to the root and that some nodes might appear in multiple paths. However, we only want to store the shortest distances. Moreover, keep in mind that the input synset might be an instance. __(3 pts)__

Use the signature in the cell below.

__Example:__ _Calling_ ``shortest_paths_to(s)`` _on the synset_ ``s = wn.synset('calculator.n.01')`` _should yield the following result:_

``
{Synset('calculator.n.01'): 0,
 Synset('expert.n.01'): 1,
 Synset('person.n.01'): 2,
 Synset('causal_agent.n.01'): 3,
 Synset('organism.n.01'): 3,
 Synset('physical_entity.n.01'): 4,
 Synset('living_thing.n.01'): 4,
 Synset('entity.n.01'): 5,
 Synset('whole.n.02'): 5,
 Synset('object.n.01'): 6}
``

In [4]:
def shortest_paths_to(start_syn: Synset) -> Dict[Synset, int]:
    """Compute the shortest distance to all nodes on paths to the root.
    :param start_syn: synset to which we want to compute the shortest distances
    :return: dict that matches all visited hypernyms to their distance to the input synset  
    """ 
    # your code here
    not_visited = [(start_syn, 0)]
    path = {}

    while not_visited:
        syn, depth = not_visited.pop(0)
        if syn in path:
            continue
        path[syn] = depth

        depth += 1
        not_visited.extend((hyp, depth) for hyp in syn.hypernyms())
        not_visited.extend((hyp, depth) for hyp in syn.instance_hypernyms())
    return path

In [5]:
s = wn.synset('weather.n.01')
shortest_paths_to(s)

{Synset('weather.n.01'): 0,
 Synset('atmospheric_phenomenon.n.01'): 1,
 Synset('physical_phenomenon.n.01'): 2,
 Synset('natural_phenomenon.n.01'): 3,
 Synset('phenomenon.n.01'): 4,
 Synset('process.n.06'): 5,
 Synset('physical_entity.n.01'): 6,
 Synset('entity.n.01'): 7}

__b)__ Write a function ``merge_paths`` that gets two dictionaries that map synsets to shortest distances (you can assume they were created by the function from __a)__) and merges them. The function shold return a dictionary that includes all synsets and distances that appear in any of the input dictionaries. If a synset appears in both input dictionaries, we want to keep the shorter distance. Leave the input dictionaries unaltered. __(1.5 pts)__

Use the signature in the cell below.

In [6]:
def merge_paths(p1: Dict[Synset, int], p2: Dict[Synset, int]) -> Dict[Synset, int]:
    """Merge two paths keeping the shorter distance for synsets that appear more than once.
    :param p1: first dict that maps synsets to their shortest distances
    :param p2: second dict that maps synsets to their shortest distances
    :return: merged dict
    """
    # your code here          
    merged_dict = p1           
    for synset, d2 in p2.items():
        if synset in p1.keys():
            merged_dict[synset] = min(p1[synset],d2)
        else:
            merged_dict[synset] = d2
            
    return merged_dict


In [7]:
s1 = wn.synset('calculator.n.01')
s2 = wn.synset('person.n.01')
p1 = shortest_paths_to(s1)
p2 = shortest_paths_to(s2)
merge_paths(p1,p2)

{Synset('calculator.n.01'): 0,
 Synset('expert.n.01'): 1,
 Synset('person.n.01'): 0,
 Synset('causal_agent.n.01'): 1,
 Synset('organism.n.01'): 1,
 Synset('physical_entity.n.01'): 2,
 Synset('living_thing.n.01'): 2,
 Synset('entity.n.01'): 3,
 Synset('whole.n.02'): 3,
 Synset('object.n.01'): 4}

__c)__ Write a function ``all_hypernym_paths`` that gets a word as input and returns a dictionary that maps all hypernyms that are reachable from the set of synsets associated with the word to the shortest distance leading there. __(1.5 pts)__

Use the signature in the cell below.

In [8]:
def all_hypernym_paths(word: str) -> Dict[Synset, int]:
    """Get all hypernyms of all synsets associated with the input word and compute the shortest distance leading there.
    :param word: input word
    :return: dict that matches all reachable hypernyms to their shortest distance 
    """
    # your code here
    synsets = wn.synsets(word)
    distances = {}
    for syn1 in synsets:
        for syn2 in synsets:
            dict1 = shortest_paths_to(syn1)
            dict2 = shortest_paths_to(syn2)
            if len(set(dict1.keys()) - set(dict2.keys())) > 1:
                distances = merge_paths(distances, dict1)
                distances = merge_paths(distances, dict2)
    return distances

In [9]:
all_hypernym_paths('machine')

{Synset('machine.n.01'): 0,
 Synset('device.n.01'): 1,
 Synset('instrumentality.n.03'): 2,
 Synset('artifact.n.01'): 3,
 Synset('whole.n.02'): 4,
 Synset('object.n.01'): 5,
 Synset('physical_entity.n.01'): 3,
 Synset('entity.n.01'): 4,
 Synset('machine.n.02'): 0,
 Synset('person.n.01'): 1,
 Synset('causal_agent.n.01'): 2,
 Synset('organism.n.01'): 2,
 Synset('living_thing.n.01'): 3,
 Synset('machine.n.03'): 0,
 Synset('organization.n.01'): 1,
 Synset('social_group.n.01'): 2,
 Synset('group.n.01'): 3,
 Synset('abstraction.n.06'): 4,
 Synset('machine.n.05'): 0,
 Synset('car.n.01'): 0,
 Synset('motor_vehicle.n.01'): 1,
 Synset('self-propelled_vehicle.n.01'): 2,
 Synset('wheeled_vehicle.n.01'): 3,
 Synset('container.n.01'): 4,
 Synset('vehicle.n.01'): 4,
 Synset('conveyance.n.03'): 5,
 Synset('machine.v.01'): 0,
 Synset('shape.v.02'): 1,
 Synset('create_from_raw_material.v.01'): 2,
 Synset('make.v.03'): 3,
 Synset('machine.v.02'): 0,
 Synset('produce.v.02'): 1,
 Synset('machine.n.04'): 0,


__d)__  Write a function ``get_dist`` that returns the word similarity between two input words, according to the formula given in the task description at the beginning.  __(3 pts)__

Use the signature in the cell below.

In [10]:
def get_dist(w1 : str, w2 : str) -> float:
    """Compute the similarity between two input words in the WordNet hierarchy tree.
    :param w1: first input word
    :param w2: second input word
    :return: word similarity
    """
    # your code here
    p1 = all_hypernym_paths(w1)
    p2 = all_hypernym_paths(w2)
    distances = merge_paths(p1,p2)
    distance = sum([dist for _, dist in distances.items()])
    return 1.0 / (distance + 1)

In [11]:
get_dist('calculator', 'person')

0.02040816326530612

### Task 2: Lesk algorithm (4 points)

In this task we want to implement a simple version of the Lesk algorithm, a knowledge-based method for word sense disambiguation. Given a target word $w$ and a context, the algorithm finds the word sense that is most fitting in the context. To achieve this, the Lesk algorithm computes the number of overlapping words between the context sentence and the definitions of the WordNet synsets, associated with $w$.

Write a function ``lesk`` that takes a word and a context string (e.g. a sentence) and returns the most fitting sense from the synsets associated with the word and the corresponding context overlap. The most fitting sense is the one whose definition shares the most words with the context string. Before matching tokens, make sure to 

* only include valid tokens (cf. HA 1, task 2a)
* remove stopwords
* ony match stems of words (e.g. consider the ``PorterStemmer`` from ``nltk``)

When computing the context overlap, count each stemmed word only once, even if they occur multiple times. If there is no fitting synset, i.e. the context overlap between the context and the synset definitions is 0, return None instead.

Use the signature in the cell below.

In [12]:
# HA 1, task 2a)
from nltk.corpus.reader.util import StreamBackedCorpusView 
from nltk.corpus import stopwords
import re
import string
from itertools import chain
from nltk.stem import PorterStemmer
  
ps = PorterStemmer()

def get_valid_tokens(tokens: Union[List[str], StreamBackedCorpusView], remove_stopwords: bool=False) -> List[str]:
    """
    :param tokens: list of tokens that should be cleaned
    :param remove_stopwords: bool indicating if stopwords should be removed 
                             False by default
    :return: list of valid tokens
    """
    valid = []
    punct = string.punctuation
    stop = stopwords.words('english')
    digit = re.compile(r"\d+")
    
    for t in tokens:
        if t in punct:
            continue
        if remove_stopwords and t.lower() in stop:
            continue
        if re.fullmatch(digit, t):
            continue
        valid.append(t.lower())
    return valid

In [13]:
def lesk(word: str, context: str) -> (Synset, int):
    '''
    Compute the most probable sense of a word in the given context.
    :param word: ambiguous word
    :param context: context in which the word appears
    :returns: 
        - synset with the most likely word sense
        - context overlap of synset and context          
    '''
    # your code here
    best_sense = []
    max_overlap = -1
    context = set([ps.stem(word) for word in get_valid_tokens(context.split(), True)])
    for syn in wn.synsets(word):
        signature = set([ps.stem(word) for word in get_valid_tokens(syn.definition().split(), True)])
        overlap = len(context & signature)
        if overlap >= max_overlap:
            best_sense = syn
            max_overlap = overlap
    if max_overlap == 0:
        return None
    return best_sense, max_overlap

In [14]:
print(lesk("bank","I went to the bank to withdraw some money from my savings account and went back home"))
print(lesk("bank","I sat on the bank of the river and enjoyed the water currents striking the land slope"))

(Synset('deposit.v.02'), 2)
(Synset('bank.n.01'), 2)


### Task 3: Markov chains (12 points)

In this task we want to create a language model by using the independence assumption af Markov. We therefore assume that the probability of a word is only dependent on a fixed number of preceding words. E.g. by restricting the number of preceding words to $n$ we can approximate the probability of a word $w_{i}$ following a sequence of words $w_1, ..., w_{i-1}$ by:

$P(w_{i}|w_1, ..., w_{i-1}) \approx P(w_{i}|w_{i-n}, ..., w_{i-1})$

We will first train our model on a given corpus and then use it to automatically generate text.

Throughout this task we will define a single class with different functions. If you're unsure how to access class methods and attributes, take a look at the documentation (https://docs.python.org/3/tutorial/classes.html).

__a) Collecting the counts (3 pts)__

Write a function `process_corpus` that takes a corpus of text (as a sequence of tokens) as input and counts how often a n-gram of length $n$ (``order=n``) is followed by a certain word (the n-grams should __not__ be padded). The function should return a dictionary that maps every n-gram that is observed in the corpus to an inner dictionary. The inner dictionary maps each word to a number, that indicates how often the word succeeds the n-gram in the given corpus. We will need these counts to compute the conditional probabilities $P(w_{i}|w_{i-n}, ..., w_{i-1})$.
Additionally, also return the entire vocabulary $V$ (i.e. the set of all unique tokens that appear in the corpus).

Make sure your implementation is efficient by using e.g. ``Counter`` and ``defaultdict`` from the package ``collections``.

_Note: In this task, the variable_ ``order`` _does not relate to order of the Markov Model but to the length of the n-gram preceding some word $w_i$. The order of a Markov Model that predicts a word $w_i$ from its $n$ preceding words would be $n+1$._   

__b) Conditional probabilities (3 pts)__

Write a function `transition_prob` that takes a n-gram $(w_{i-n}, ..., w_{i-1})$ and a word $w_{i}$ of the vocabulary $V$ as input and returns the conditional probability that the given n-gram is followed by the input word $w_{i}$. Recall that this conditional probability can be computed as follows:

$P(w_{i}|w_{i-n}, ..., w_{i-1}) = \frac{\text{Count}(w_{i-n}, ..., w_{i-1}, w_{i})}{\sum_{w \in V}\text{Count}(w_{i-n}, ..., w_{i-1}, w)}$

If the n-gram has never been observed, return $\frac{1}{|V|}$.

__c) Most likely word (3 pts)__

Write a function `most_likely_word` that gets a n-gram as input and returns the word that is most likely to succeed the given n-gram. In case there are multiple words that are equally likely to follow the given n-gram, return all of them. 
Note that you should **not** loop over the **entire** vocabulary to obtain the most likely word.
In case the given n-gram has never been observed, return the entire vocabulary.

__d) Generating text (2 pts)__

Write a function `generate_text` that generates a text sequence of length `k`, given a starting sequence of words (`ngram`). The function should choose the most likely next word in every step; in case there are multiple equally likely words, randomly choose one. You should return a list of ``k`` words, that includes the starting sequence and is the most probable continuation. 


Please do not implement other functions for the MarkovModel class.

Use the function signatures in the cell below.

In [15]:
from collections import defaultdict, Counter
from nltk.util import ngrams
import random

class MarkovModel():
    '''Markov model for generating text.'''
    
    def __init__(self, tokens: Sequence[str], order: int):
        '''
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :param order: length of the n-gram 
        '''
        self.order = order 
        self.counts, self.v = self.process_corpus(tokens)
        
    def process_corpus(self, tokens: Sequence[str]) -> (Dict[str, Dict[str, int]], Set):
        '''Training method of the model, counts the occurences of each word after each observed n-gram.
        :param tokens: text corpus on which the model is trained on as an iterator of tokens
        :returns: 
            - nested dict that maps each n-gram to the counts of the words succeeding it
            - the whole vocabulary as a set
        '''
        n = self.order
        tokens = list(tokens)
        start = tokens[0:n]
        tokens.extend(start)
        ngrams = defaultdict(lambda: defaultdict(int))
        for i in range(0, len(tokens)-n, 1):
            key = tuple(tokens[i:i+n])
            char = tokens[i+n] 
            if char not in ngrams[key].keys():
                ngrams[key][char] = 1
            else:
                ngrams[key][char] += 1
        return ngrams, set(tokens)
        
        
    def transition_prob(self, ngram: Tuple[str, ...], word: str) -> float:
        '''Compute the conditional probability that the input word follows the given n-gram.
        :param ngram: string tuple that represents an n-gram
        :param word: input word
        :return: probability that the n-gram is followed by the input word
        '''
        # your code here
        counts = self.counts
        vocab = self.v
        denominator = sum([val for key, val in counts[ngram].items()])
        numerator = sum([val for key, val in counts[ngram].items() if key == word])
        if numerator == 0:
            return 1/len(vocab)
        return numerator/denominator
        
    
    def most_likely_word(self, ngram) -> Set[str]:
        '''Computes which word is most likely to follow a given n-gram.
        :param ngram: n-gram we are interested in
        return: set of words that are most likely to follow the n-gram
        '''
        # your code here
        counts = self.counts
        if ngram not in counts.keys():
            return self.v
        prob_words = defaultdict(list)
        for word, count in counts[ngram].items():
            prob_words[self.transition_prob(ngram, word)].append(word)
        return set([prob_words[prob] for prob in prob_words.keys() if prob == max(prob_words.keys())][0])
    
    def generate_text(self, ngram: Tuple[str, ...], k: int) -> List[str]:
        '''Generates a text sequence of length k, given a starting sequence.
        :param ngram: starting sequence
        :param k: total number of words in the generated sequence
        :return: sequence of generated words, including the starting sequence
        '''
        # your code here
        text = list(ngram)
        most_likely_words = list(self.most_likely_word(ngram))
        while k != 0:
            text.append(random.choice(most_likely_words))
            k -= 1
            most_likely_words = list(self.most_likely_word(tuple(text[-len(ngram):])))
        return text

In [23]:
model = MarkovModel(['good','happy', 'sad', 'good', 'nice', 'fun', 'good','happy','sad', 'happy', 'sad', 'good'],2)
print(model.counts)
print(model.generate_text(('happy', 'sad'), 5))

defaultdict(<function MarkovModel.process_corpus.<locals>.<lambda> at 0x7f857f12b4c0>, {('good', 'happy'): defaultdict(<class 'int'>, {'sad': 2}), ('happy', 'sad'): defaultdict(<class 'int'>, {'good': 2, 'happy': 1}), ('sad', 'good'): defaultdict(<class 'int'>, {'nice': 1, 'good': 1}), ('good', 'nice'): defaultdict(<class 'int'>, {'fun': 1}), ('nice', 'fun'): defaultdict(<class 'int'>, {'good': 1}), ('fun', 'good'): defaultdict(<class 'int'>, {'happy': 1}), ('sad', 'happy'): defaultdict(<class 'int'>, {'sad': 1}), ('good', 'good'): defaultdict(<class 'int'>, {'happy': 1})})
['happy', 'sad', 'good', 'nice', 'fun', 'good', 'happy']


__e) Apply the model to a corpus (2 pts)__

Finally, we want to apply our functions to the King James Bible (`'bible-kjv.txt'`) that is part of the ``gutenberg`` corpus. Use the function from HA 1, task 2a) to obtain a list of valid tokens (do not remove stopwords) from the King Jame Bible.

Initialize the MarkovModel with the list of valid tokens and ``order=3`` and answer the following subtasks:

i) What is the probability that the word ``babylon`` follows the sequence ``the king of``? 

ii) What are the most likely words to follow the sequence ``the world is``? 

iii) Generate a sequence of length 20 that starts with ``mother mary was``.


In [24]:
nltk.download("gutenberg")
from nltk.corpus import gutenberg
tokens = nltk.corpus.gutenberg.words('bible-kjv.txt')
valid_tokens = get_valid_tokens(tokens, False)
model = MarkovModel(valid_tokens, 3)

[nltk_data] Downloading package gutenberg to
[nltk_data]     /Users/pavanirajula/nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


In [25]:
model.transition_prob(('the', 'king', 'of'), 'babylon')

0.1906779661016949

In [26]:
model.most_likely_word(('the', 'world','is'))

{'crucified', 'enmity', 'gone', 'mine', 'the'}

In [27]:
model.generate_text(('mother', 'mary', 'was'), 20)

['mother',
 'mary',
 'was',
 'espoused',
 'to',
 'joseph',
 'before',
 'they',
 'came',
 'together',
 'she',
 'was',
 'found',
 'with',
 'child',
 'of',
 'the',
 'devil',
 'for',
 'the',
 'devil',
 'sinneth',
 'from']