In [157]:
import os
from nltk.tokenize import word_tokenize
from nltk.stem.wordnet import WordNetLemmatizer
from collections import Counter, defaultdict
import nltk
from nltk.corpus import wordnet as wn
from enum import Enum
import re
import math
from statistics import mean

from typing import AnyStr, List, Set, Dict, Tuple, Callable

In [158]:
file_name = 'Trump-wall.txt'
reduction = 10

with open(f'data/docs/{file_name}', encoding='utf-8') as f:
    doc = f.readlines()
    
    doc = list(filter(lambda x: x and not x.startswith('#'), (line.rstrip() for line in doc)))
    doc = [doc[0]] + [line for line in doc[1:] if len(line) > 50] # remove sub-titles

In [159]:
with open('data/stop_words_FULL.txt') as f:
    stop_words = {line for line in f.read().splitlines()}

# Bonus words allow us to understand that important things are about to be said
with open('data/bonus_words.txt') as f:
    bonus_words = {line for line in f.read().splitlines()}

# Stigma words allow us to understand that unimportant things are about to be said
with open('data/stigma_words.txt') as f:
    stigma_words = {line for line in f.read().splitlines()}

nasari = dict()
with open('data/dd-small-nasari-15.txt', encoding='utf-8') as f:
    for line in f.readlines():
        splits = line.replace('\n', '').split(";")
        items = list()
        for item in splits[2:]:
            if '_' in item:
                word, score = item.split("_")
                items.append((word, score))
        nasari[splits[1].lower()] = items

### Pre-Processing

In [160]:
def bag_of_words(sentence: AnyStr) -> Set[AnyStr]:
    return set(remove_stopwords(tokenize_sentence(remove_punctuation(sentence))))


def remove_stopwords(words: List[AnyStr]) -> List[AnyStr]:
    return [value for value in words if value not in stop_words]


# Get tokens from sentence
def tokenize_sentence(sentence: AnyStr) -> List[AnyStr]:
    words = []
    lmtzr = WordNetLemmatizer()
    for tag in nltk.pos_tag(word_tokenize(sentence)):
        words.append(lmtzr.lemmatize(tag[0]).lower())
    return words


# Remove punctuation and multiple spaces
def remove_punctuation(sentence: AnyStr) -> AnyStr:
    return re.sub('\s\s+', ' ', re.sub(r'[^\w\s]', '', sentence))

### Weighted Overlap between Nasari vectors

In [161]:
def max_similarity(context: List[Tuple], topic: List[Tuple]) -> int:
    overlaps = [math.sqrt(weighted_overlap(context, topic))]
    return max(overlaps)


def rank(word: AnyStr, vector: List[Tuple]) -> int:
    for index, (elem, _) in enumerate(vector):
        if word == elem: return index + 1


def weighted_overlap(vector1: List[Tuple], vector2: List[Tuple]) -> float:
    overlap = 0
    if common_words := {v[0] for v in vector1} & {v[0] for v in vector2}:
        numerator = 0
        for w in common_words:
            numerator += (1 / (rank(w, vector1) + rank(w, vector2)))

        denominator = 0
        for i in range(len(common_words)):
            denominator += 1 / (2 * (i+1))
        
        overlap = numerator / denominator
    return overlap

In [162]:
def nasari_vectors_for_bow(bow: List[AnyStr]) -> Dict[AnyStr, List[Tuple]]:
    nasari_vectors = dict()
    for word in bow:
        if nasari_entry := nasari.get(word, None):
            nasari_vectors[word] = nasari_entry
    return nasari_vectors


def get_cue_score(paragraph: AnyStr) -> int:
    word_list = tokenize_sentence(remove_punctuation(paragraph))
    score = 0
    # score = len(set(word_list) & bonus_words) - len(set(word_list) & stigma_words)
    for word in word_list:
        if word in bonus_words: 
            score += 1
        elif word in stigma_words: 
            score -= 1
    return score
    

# We extract the nasari vectors from the title of the document
def get_title_topics(doc: List[AnyStr]) -> Dict[AnyStr, List[Tuple]]:
    title = doc[0]
    return nasari_vectors_for_bow(bag_of_words(title))


# We extract the nasari vectors from the most important paragraph of the document
# according to the cue score (+1 for each bonus_words present, -1 for each stigma_words present)
def get_topics(doc: List[AnyStr]) -> Dict[AnyStr, List[Tuple]]:
    scores = [(paragraph, get_cue_score(paragraph)) for paragraph in doc]
    most_important_paragraph = max(scores, key=lambda x: x[1])[0] # paragraph with maximum score associated
    return nasari_vectors_for_bow(bag_of_words(most_important_paragraph))


def summarization(doc: List[AnyStr], reduction: int, relevance_criteria: Callable) -> List[AnyStr]:
    topics = relevance_criteria(doc)

    paragraphs_overlap = list()
    for paragraph in doc[1:]: # excluding title
        paragraph_context = nasari_vectors_for_bow(bag_of_words(paragraph))
        total_overlap = 0
        average_paragraph_overlap = 0
        overlaps_count = 0

        # compute average paragraph overlap with the topics found before
        for context in paragraph_context.values():
            for topic in topics.values():
                total_overlap += max_similarity(context, topic)
                overlaps_count += 1
        
        if overlaps_count > 0:
            average_paragraph_overlap = total_overlap / overlaps_count
            paragraphs_overlap.append((paragraph, average_paragraph_overlap))
    
    # number of paragraphs to mantain after reduction
    paragraphs_num = len(paragraphs_overlap) - int(round((reduction / 100) * len(paragraphs_overlap), 0))

    # we mantain only the first 'paragraphs_num' paragraphs in order of importance (i.e. average overlap)
    reduced_paragraphs = sorted(paragraphs_overlap, key=lambda x: x[1], reverse=True)[:paragraphs_num]
    reduced_paragraphs = [p[0] for p in reduced_paragraphs] # remove scores associated

    summary = list()
    summary.append(doc[0]) # append title
    for paragraph in reduced_paragraphs:
        summary.append(paragraph) # append reduced paragraphs

    return summary

### Evaluation Metrics

In [163]:
# compute Term Frequencies for each paragraph of the document (tf)
def compute_tf(doc: List[AnyStr]) -> Dict[AnyStr, List[float]]:
    tf = defaultdict(list)

    for paragraph in doc[1:]:
        paragraph_bow = remove_stopwords(tokenize_sentence(remove_punctuation(paragraph)))
        counter_dict = Counter(paragraph_bow)

        for term, term_count in counter_dict.items():
            tf[term].append(term_count / len(paragraph_bow))
    
    return dict(tf)


# compute Inverse Document Frequency (idf)
def compute_idf(doc: List[AnyStr], tf: Dict[AnyStr, List[float]]) -> Dict[AnyStr, float]:
    idf = dict()
    paragraphs_count = len(doc[1:])

    for term, frequencies in tf.items():
        count_paragraphs_term_is_present = len(frequencies)
        idf[term] = math.log(paragraphs_count / count_paragraphs_term_is_present)

    return idf

# couple tf and idf
def get_tf_idf(tf: Dict[AnyStr, List[float]], idf: Dict[AnyStr, float]) -> Dict[AnyStr, float]:
    tf_idf = dict()
    for term, freqs in tf.items():
        tf_idf[term] = mean([freq * idf[term] for freq in freqs])
    return tf_idf


def get_important_words(doc: List[AnyStr], reduction: int) -> Set[AnyStr]:
    tf = compute_tf(doc) # term frequencies
    idf = compute_idf(doc, tf) # inverse document frequency
    tf_idf = get_tf_idf(tf, idf) # coupled

    # we keep only the first (100 - reduction)% terms with highest score
    percentage = (100 - reduction) / 100
    threshold = int(round(len(tf_idf) * percentage))
    sorted_tf_idf = sorted(tf_idf.items(), key=lambda x: x[1], reverse=True)[:threshold]

    important_words = set([item[0] for item in sorted_tf_idf])
    return important_words


def get_words(summary: List[AnyStr]) -> Set[AnyStr]:
    words = set()
    for paragraph in summary[1:]:
        words = words | (bag_of_words(paragraph))
    return words


# We compute precision for the most important terms
def blue_evaluation(important_words, candidate_words) -> float:
    return len(important_words & candidate_words) / len(candidate_words)


# We compute recall for the most important terms
def rouge_evaluation(important_words, candidate_words) -> float:
    return len(important_words & candidate_words) / len(important_words)

### Execution

In [164]:
class RelevanceCriteria(Enum):
    title = get_title_topics
    topic = get_topics

relevance_criteria = RelevanceCriteria.title

summary = summarization(doc, reduction, relevance_criteria)
print(f'\n\nCOMPUTED SUMMARY')
print(','.join(summary))

paragraphs_overlap: 45
paragraphs num: 41


COMPUTED SUMMARY
The Trump wall, commonly referred to as "The Wall", was an expansion of the Mexico–United States barrier during the U.S. presidency of Donald Trump. Throughout his 2016 presidential campaign, Trump called for the construction of a border wall. He said that, if elected, he would "build the wall and make Mexico pay for it". Then-Mexican president Enrique Peña Nieto said that Mexico would not pay for the wall.,Further information: Donald Trump 2016 presidential campaign,Wall prototypes being presented to President Trump in San Diego, March 2018,The Mexico–United States barrier is a series of vertical barriers along the Mexico–United States border aimed at preventing illegal crossings from Mexico into the United States. The barrier is not one contiguous structure, but a discontinuous series of physical obstructions variously classified as "fences" or "walls".,The Build the Wall, Enforce the Law Act of 2018 was introduced on Octob

### Evaluation

In [165]:
important_words = get_important_words(doc, reduction)
candidate_summary_words = get_words(summary)

precision = blue_evaluation(important_words, candidate_summary_words)
recall = rouge_evaluation(important_words, candidate_summary_words)

print(f'Precision: {precision}')
print(f'Recall: {recall}')

Precision: 0.892512077294686
Recall: 0.9272271016311167
