- [A Position-Biased PageRank Algorithm for Keyphrase Extraction](https://www.semanticscholar.org/paper/A-Position-Biased-PageRank-Algorithm-for-Keyphrase-Florescu-Caragea/407c61430a924fe4d260aa411523c1276f14f0e5)
- https://github.com/DerwenAI/pytextrank/issues/78

In [14]:
from collections import Counter
import math

import spacy
import networkx as nx

In [2]:
nlp = spacy.load("en_core_web_sm")

In [223]:
text = """
Chelsea 'opted against' signing Salomon Rondón on deadline day.

Chelsea reportedly opted against signing Salomón Rondón on deadline day despite their long search for a new centre forward.
With Olivier Giroud expected to leave, the Blues targeted Edinson Cavani, Dries Mertens and Moussa Dembele – only to end up with none of them.
According to Telegraph Sport, Dalian Yifang offered Rondón to Chelsea only for them to prefer keeping Giroud at the club.
Manchester United were also linked with the Venezuela international before agreeing a deal for Shanghai Shenhua striker Odion Ighalo.
Manager Frank Lampard made no secret of his transfer window frustration, hinting that to secure top four football he ‘needed’ signings.
Their draw against Leicester on Saturday means they have won just four of the last 13 Premier League matches.
"""
nlp = spacy.load("en_core_web_sm")
doc = nlp(text)

In [227]:
from collections import defaultdict, Counter
from dataclasses import dataclass
import itertools
import math
from typing import List, Tuple, Dict, Iterable, Set, Optional

import networkx as nx
from spacy.tokens import Doc, Span


@dataclass
class Phrase:
    text: str
    rank: float
    count: int
    phrase_list: List[Span]


Node = Tuple[str, str]  # (lemma, pos)


class BaseTextRank:
    _EDGE_WEIGHT = 1.0
    _POS_KEPT = ["ADJ", "NOUN", "PROPN", "VERB"]
    _TOKEN_LOOKBACK = 3

    def __init__(
        self,
        edge_weight=_EDGE_WEIGHT,
        pos_kept=_POS_KEPT,
        scrubber=str.strip,
        token_lookback=_TOKEN_LOOKBACK,
    ):
        self.edge_weight = edge_weight
        self.pos_kept = pos_kept
        self.scrubber = scrubber
        self.token_lookback = token_lookback

        self.doc = None

    def __call__(self, doc: Doc) -> Doc:
        """Call the inner EntityRuler."""
        self.doc = doc
        Doc.set_extension("phrases", force=True, default=[])
        Doc.set_extension("textrank", force=True, default=self)
        doc._.phrases = self.calc_textrank()
        return doc

    def calc_textrank(self) -> List[Phrase]:
        lemma_graph = self.construct_graph()

        pagerank_personalization = self.get_personalization()

        ranks: Dict[str, float] = nx.pagerank(
            lemma_graph, personalization=pagerank_personalization
        )

        nc_phrases = self.collect_phrases(self.doc.noun_chunks, ranks)
        ent_phrases = self.collect_phrases(self.doc.ents, ranks)
        all_phrases = {**nc_phrases, **ent_phrases}

        phrase_list: List[Phrase] = self.get_min_phrases(all_phrases)

        return sorted(phrase_list, key=lambda p: p.rank, reverse=True)

    def construct_graph(self) -> nx.Graph:
        G = nx.Graph()
        # add nodes made of (lemma, pos)
        G.add_nodes_from(self.node_list)
        # add edges between nodes co-occuring within a window, weighted by the count
        G.add_edges_from(self.edge_list)
        return G

    @property
    def node_list(self) -> List[Tuple[str, str]]:
        nodes = [
            (token.lemma_, token.pos_)
            for token in self.doc
            if token.pos_ in self.pos_kept
        ]
        return nodes

    @property
    def edge_list(self) -> List[Tuple[Node, Node, Dict[str, float]]]:
        edges = []
        for sent in self.doc.sents:
            H = [
                (token.lemma_, token.pos_)
                for token in sent
                if token.pos_ in self.pos_kept
            ]
            for hop in range(self.token_lookback):
                for idx, node in enumerate(H[: -1 - hop]):
                    nbor = H[hop + idx + 1]
                    edges.append((node, nbor))

        # Include weight on the edge: (2, 3, {'weight': 3.1415})
        edges = [
            (*n, dict(weight=w * self.edge_weight)) for n, w in Counter(edges).items()
        ]
        return edges

    def get_personalization(self) -> Optional[Dict[Node, float]]:
        return None

    def collect_phrases(
        self, spans: Iterable[Span], ranks: Dict[str, float]
    ) -> Dict[Span, float]:
        phrases = {
            span: sum(
                ranks[(token.lemma_, token.pos_)]
                for token in span
                if token.pos_ in self.pos_kept
            )
            for span in spans
        }
        return {
            span: self._calc_discounted_normalised_rank(span, sum_rank)
            for span, sum_rank in phrases.items()
        }

    def _calc_discounted_normalised_rank(self, span: Span, sum_rank: float) -> float:
        non_lemma = len([tok for tok in span if tok.pos_ not in self.pos_kept])

        # although the noun chunking is greedy, we discount the ranks using a
        # point estimate based on the number of non-lemma tokens within a phrase
        non_lemma_discount = len(span) / (len(span) + (2.0 * non_lemma) + 1.0)

        # use root mean square (RMS) to normalize the contributions of all the tokens
        phrase_rank = math.sqrt(sum_rank / (len(span) + non_lemma))

        return phrase_rank * non_lemma_discount

    def get_min_phrases(self, all_phrases=Dict[Span, float]) -> List[Phrase]:
        data = [
            (self.scrubber(span.text), rank, span) for span, rank in all_phrases.items()
        ]

        keyfunc = lambda x: x[0]
        applyfunc = lambda g: list((rank, spans) for text, rank, spans in g)
        phrases: List[Tuple[str, List[Tuple[float, Span]]]] = groupby_apply(
            data, keyfunc, applyfunc
        )

        phrase_list = [
            Phrase(
                text=p[0],
                rank=max(rank for rank, span in p[1]),
                count=len(p[1]),
                phrase_list=list(span for rank, span in p[1]),
            )
            for p in phrases
        ]
        return phrase_list


def groupby_apply(data, keyfunc, applyfunc):
    """Groupby a key and sum without pandas dependency.

    Arguments:
        data: iterable
        keyfunc: callable to define the key by which you want to group
        applyfunc: callable to apply to the group

    Returns:
        Iterable with accumulated values.

    Ref: https://docs.python.org/3/library/itertools.html#itertools.groupby
    """
    data = sorted(data, key=keyfunc)
    return [(k, applyfunc(g)) for k, g in itertools.groupby(data, keyfunc)]


In [228]:
import operator
from typing import Optional, Dict, List, Tuple



class PositionRank(BaseTextRank):
    def get_personalization(self) -> Optional[Dict[Node, float]]:
        """
        Specifically, we propose to assign a higher probability to a word
        found on the 2nd position as compared with a word found on the
        50th position in the same document. The weight of each candidate
        word is equal to its inverse position in the document.
        If the same word appears multiple times in the target document,
        then we sum all its position weights.
        For example, a word v_i occurring in the following positions:
        2nd, 5th and 10th, has a weight p(v_i) = 1/2 + 1/5 + 1/10 = 4/5 = 0.8
        The weights of words are normalized before they are used in the
        position-biased PageRank.
        """
        weighted_tokens: List[Tuple[str, float]] = [
            (tok, 1 / (i + 1))
            for i, tok in enumerate(
                token.lemma_ for token in self.doc if token.pos_ in self.pos_kept
            )
        ]

        keyfunc = lambda x: x[0]
        applyfunc = lambda g: sum(w for text, w in g)
        accumulated_weighted_tokens: List[Tuple[str, float]] = groupby_apply(
            weighted_tokens, keyfunc, applyfunc
        )
        accumulated_weighted_tokens = sorted(
            accumulated_weighted_tokens, key=lambda x: x[1]
        )

        norm_weighted_tokens = {
            k: w / sum(w_ for _, w_ in accumulated_weighted_tokens)
            for k, w in accumulated_weighted_tokens
        }

        weighted_nodes = {
            (token.lemma_, token.pos_): norm_weighted_tokens[token.lemma_]
            for token in self.doc
            if token.pos_ in self.pos_kept
        }
        return weighted_nodes

In [229]:
position_rank = PositionRank()
processed_doc = position_rank(doc)
processed_doc._.phrases[:10]

[Phrase(text='Salomon Rondón', rank=0.1701999924977688, count=2, phrase_list=[Salomon Rondón, Salomon Rondón]),
 Phrase(text='Salomón Rondón', rank=0.16702503586296494, count=2, phrase_list=[Salomón Rondón, Salomón Rondón]),
 Phrase(text='Chelsea', rank=0.15139173066873274, count=5, phrase_list=[
 Chelsea, Chelsea, Chelsea, Chelsea, Chelsea]),
 Phrase(text='deadline day', rank=0.15057992229816541, count=2, phrase_list=[deadline day, deadline day]),
 Phrase(text='Rondón', rank=0.15013300784484088, count=2, phrase_list=[Rondón, Rondón]),
 Phrase(text='Dalian Yifang', rank=0.0977676631859978, count=2, phrase_list=[Dalian Yifang, Dalian Yifang]),
 Phrase(text='Olivier Giroud', rank=0.08862796221453886, count=2, phrase_list=[Olivier Giroud, Olivier Giroud]),
 Phrase(text='Giroud', rank=0.08205056885278611, count=2, phrase_list=[Giroud, Giroud]),
 Phrase(text='Telegraph Sport', rank=0.07833387901020238, count=2, phrase_list=[Telegraph Sport, Telegraph Sport]),
 Phrase(text='Edinson Cavani', 

In [230]:
base_text_rank = BaseTextRank()
comparison_doc = base_text_rank(doc)
comparison_doc._.phrases[:10]

[Phrase(text='Salomon Rondón', rank=0.09871585315384829, count=2, phrase_list=[Salomon Rondón, Salomon Rondón]),
 Phrase(text='Salomón Rondón', rank=0.09871585315384829, count=2, phrase_list=[Salomón Rondón, Salomón Rondón]),
 Phrase(text='Shanghai Shenhua striker Odion Ighalo', rank=0.09814252545640166, count=1, phrase_list=[Shanghai Shenhua striker Odion Ighalo]),
 Phrase(text='deadline day', rank=0.09102501388558137, count=2, phrase_list=[deadline day, deadline day]),
 Phrase(text='Rondón', rank=0.0893689802029807, count=2, phrase_list=[Rondón, Rondón]),
 Phrase(text='Shanghai Shenhua', rank=0.08608180019168989, count=1, phrase_list=[Shanghai Shenhua]),
 Phrase(text='Dries Mertens', rank=0.08353691638443825, count=2, phrase_list=[Dries Mertens, Dries Mertens]),
 Phrase(text='Edinson Cavani', rank=0.08231821728419356, count=2, phrase_list=[Edinson Cavani, Edinson Cavani]),
 Phrase(text='Moussa Dembele', rank=0.08208705978960228, count=2, phrase_list=[Moussa Dembele, Moussa Dembele]),