In [62]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

import pandas as pd

***

Запускать командой Run All Cells!

Получаем три последние новости по тикеру с сайта Финам, с помощью ML-модели выделяем главное, отображаем результат.

***

## Токенизация

In [63]:

import razdel
import spacy

# Список частей речи, которые мы не хотим считать значимыми.
# Подбирался на глаз.
BAD_POS = ("PREP", "NPRO", "CONJ", "PRCL", "NUMR", "PRED", "INTJ", "PUNCT", "CCONJ", "ADP", "DET", "ADV")

# Загрузка модели для частеречной разметки.
spacy_model = spacy.load("ru_core_news_md")


# Метод для разбиения текста на предложения.
def sentenize(text):
    return [s.text for s in razdel.sentenize(text)]


# Метод для токенизации предложения.
def tokenize_sentence(sentence):
    sentence = sentence.strip().replace("\xa0", "")
    tokens = [token.lemma_ for token in spacy_model(sentence) if token.pos_ not in BAD_POS]
    tokens = [token for token in tokens if len(token) > 2]
    return tokens


# Метод для токенизации всего текста.
def tokenize_text(text):
    all_tokens = []
    for sentence in sentenize(text):
        all_tokens.extend(tokenize_sentence(sentence))
    return all_tokens


## Text Rank

In [64]:
import math
import numpy as np
import networkx as nx
from scipy.linalg import eig
from tokenization import *

def text_rank_preprocessing(sentence):
    return tokenize_sentence(sentence)

def text_rank_similarity(tokens1, tokens2):
    intersection_size = sum(tokens2.count(w) for w in tokens1)
    if intersection_size == 0:
        return 0.0

    if len(tokens1) <= 1 and len(tokens2) <= 1:
        return intersection_size

    assert len(tokens1) > 0 and len(tokens2) > 0
    norm = math.log(len(tokens1)) + math.log(len(tokens2))
    return intersection_size / norm


class TextRankSummarizer:
    """
    TextRank.
    Основано на: https://github.com/miso-belica/sumy/blob/main/sumy/summarizers/text_rank.py
    Оригинальная статья: https://web.eecs.umich.edu/~mihalcea/papers/mihalcea.emnlp04.pdf
    """

    def __init__(
            self,
            damping=0.85,
            epsilon=1e-4,
            niter=100,
            preprocessing_function=text_rank_preprocessing,
            similarity_function=text_rank_similarity,
            verbose=False
    ):
        self.damping = damping
        self.epsilon = epsilon
        self.niter = niter
        self.preprocessing_function = preprocessing_function
        self.similarity_function = similarity_function
        self.threshold = None
        self.verbose = True

    def __call__(self, text, target_sentences_count):
        original_sentences = sentenize(text)
        sentences = [self.preprocessing_function(s) for s in original_sentences]

        graph = self._create_graph(sentences)
        graph = self._apply_threshold(graph)
        # if self.verbose:
        #     plt.figure(figsize=(15, 10))
        #     sns.heatmap(graph, annot=True, fmt=".2f").set_title("Матрица схожести предложений")
        norm_graph = self._norm_graph(graph)
        ranks = self._iterate(norm_graph)

        if self.verbose:
            print("Значимости: {}".format(ranks))

            # Можно считать PageRank библиотечными методами.
            # При запуске на оригинальном графе должно быть то же самое.
            nx_graph = nx.from_numpy_array(graph)
            indices = list(range(len(sentences)))
            nx_ranks = nx.pagerank(nx_graph)
            nx_ranks = [ranks[i] for i in indices]
            assert np.all(np.isclose(nx_ranks, ranks))
            print("Проверка через NetworkX в порядке!")

            # Можно считать через честный метод степенных итераций над
            # модифицированной матрицей. Должно быть то же самое.
            random_transitions = np.full(graph.shape, 1.0 / len(graph))
            full_matrix = (1.0 - self.damping) * random_transitions + self.damping * norm_graph
            pm_ranks = self._power_method(full_matrix)
            assert np.all(np.isclose(pm_ranks, ranks))
            assert np.all(np.isclose(np.dot(full_matrix.T, pm_ranks), pm_ranks, atol=self.epsilon))
            print("Проверка через метод степенных итераций в порядке!")

            # А ещё можно через собственные вектора.
            # Только они могут отличаться на константный множитель из-за нормировки.
            vals, vecs = eig(full_matrix.T, left=False, right=True)
            eig_ranks = vecs[:, vals.argmax()]
            assert np.all(np.isclose(np.dot(full_matrix.T, eig_ranks), eig_ranks, atol=self.epsilon))
            multiplier = ranks[0] / eig_ranks[0]
            eig_ranks *= multiplier
            assert np.all(np.isclose(eig_ranks, ranks, atol=self.epsilon * 100))
            print("Проверка через собственные вектора в порядке!")

        indices = list(range(len(sentences)))
        indices = [idx for _, idx in sorted(zip(ranks, indices), reverse=True)]
        indices = indices[:target_sentences_count]
        indices.sort()
        return " ".join([original_sentences[idx] for idx in indices])

    def set_sim_function(self, func):
        self.similarity_function = func

    def set_preprocessing_function(self, func):
        self.preprocessing_function = func

    def set_threshold(self, threshold):
        self.threshold = threshold

    def _create_graph(self, sentences):
        """ Сборка изначального графа схожостей """
        sentences_count = len(sentences)
        graph = np.zeros((sentences_count, sentences_count))
        for sentence_num1, sentence1 in enumerate(sentences):
            for sentence_num2 in range(sentence_num1, sentences_count):
                sentence2 = sentences[sentence_num2]
                sim = self.similarity_function(sentence1, sentence2)
                graph[sentence_num1, sentence_num2] = sim
                graph[sentence_num2, sentence_num1] = sim
        return graph

    def _apply_threshold(self, graph):
        """ Обрезка графа по порогу, понадобится в LexRank """
        if self.threshold is None:
            return graph
        graph[graph < self.threshold] = 0.0
        return graph

    def _norm_graph(self, graph):
        """
        Нормировка по строкам, потому что ниже p_vector - вектор, а не столбец.
        Если бы p_vector был столбцом, надо было бы нормировать по столбцам.
        """
        norm = graph.sum(axis=1)[:, np.newaxis]
        norm_graph = graph / (norm + 1e-7)
        assert np.isclose(np.sum(norm_graph[0, :]), 1.0)
        assert np.all(np.isclose(norm_graph.sum(axis=1), np.ones((graph.shape[0],))))
        return norm_graph

    def _iterate(self, matrix):
        sentences_count = len(matrix)
        iter = 0
        lambda_val = 0.1
        p_vector = np.full((sentences_count,), 1.0 / sentences_count)
        random_transitions = np.full((sentences_count,), 1.0 / sentences_count)

        transposed_matrix = matrix.T
        while iter < self.niter and lambda_val > self.epsilon:
            next_p = (1.0 - self.damping) * random_transitions + self.damping * np.dot(transposed_matrix, p_vector)
            lambda_val = np.linalg.norm(np.subtract(next_p, p_vector))
            p_vector = next_p
            iter += 1
        return p_vector

    def _power_method(self, matrix):
        sentences_count = len(matrix)
        iter = 0
        lambda_val = 0.1
        p_vector = np.full((sentences_count,), 1.0 / sentences_count)

        transposed_matrix = matrix.T
        while iter < self.niter and lambda_val > self.epsilon:
            next_p = np.dot(transposed_matrix, p_vector)
            lambda_val = np.linalg.norm(np.subtract(next_p, p_vector))
            p_vector = next_p
            iter += 1
        return p_vector


## Получение новостей с сайта Финам

In [65]:
class Article:
    def __init__(self, elem) -> None:
        self.link = elem.get_attribute('href')
        span_elems = elem.find_elements(By.TAG_NAME, 'span')
        self.date = span_elems[0].text
        self.author = span_elems[1].text if len(span_elems) > 1 else ''
        self.text = ''
        self.title = ''

class FinamNewsParser:
    def __init__(self) -> None:
        user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.50 Safari/537.36'

        options = webdriver.ChromeOptions()
        options.add_argument('headless')
        options.add_argument(f'user-agent={user_agent}')

        self.driver = webdriver.Chrome(options=options)
        
    def collect_news(self, ticker, start = None, end = None, maxCount = None):
        template_url = 'https://www.finam.ru/quote/moex/{}/publications/'
        url = template_url.format(ticker)
    
        if not start or not end:
            self.driver.get(url)
        else:
            url +=  "{}/{}/{}".format('date',  start, end)
            self.driver.get(url)
            stop = False
            #кликаем кнопочку "Загрузить еще", пока не получим все новости за период
            while not stop:
                try:
                    WebDriverWait(self.driver, 20).until(EC.element_to_be_clickable((By.XPATH, '//span[(starts-with(@class, "pointer")) and (contains(@class, "cl-blue"))]')))
                    self.driver.execute_script("finfin.local.plugin_block_item_publication_list_filter_date.loadMore(this);")
                except:
                    stop = True

        print("Getting news from:  {}".format(url))
        links_section = self.driver.find_element(By.ID, 'finfin-local-plugin-block-item-publication-list-filter-date-content')
        a_elems = links_section.find_elements(By.TAG_NAME, 'a')

        articles = list(map(lambda elem: Article(elem), a_elems))

        for id, article in enumerate(articles):
            if maxCount is not None and id == maxCount:
                return articles[:maxCount]
    
            self.driver.get(article.link)
            try:
                title_section = self.driver.find_element(By.TAG_NAME, 'h1')
                article.title = title_section.text

                text_section = self.driver.find_element(
                    By.XPATH, 
                    '//div[(starts-with(@class, "finfin-local-plugin-publication-item-item-")) and (contains(@class, "-text"))]'
                )
                
                p_elems = text_section.find_elements(By.TAG_NAME, 'p')
                p_elems_text = list(map(lambda elem: elem.text, p_elems))

                if len(p_elems_text):
                    article.text = ' '.join(p_elems_text)
            except:
                print('Couldnt parse article from href: {}'.format(article.link))
        
        return articles


## Обработка новостей

In [None]:
parser = FinamNewsParser()
news = parser.collect_news('sber', maxCount=3)

if len(news) == 0:
    print("Мы не смогли найти новости по вашей компании. Давайте поробуем другую.")

print("Краткий пересказ последних новостей компании, которые нам удалось найти.")

string_builder = []
result = []
for article in news:
    text_rank = TextRankSummarizer()
    summary = text_rank(article.text, 3)
    article.summary = summary

pd.set_option('display.max_colwidth', None)

df = pd.DataFrame([t.__dict__ for t in news ])

df[['title', 'text', 'summary']]