# core

> Main codebase for fetching and saving RSS feeds

In [None]:
# | default_exp core

In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export

import json
import logging
from pathlib import Path
from typing import *

import fire
import reader
import rich
from bs4 import BeautifulSoup
from rich import print
from rich.console import Console
from rich.panel import Panel
from rich.text import Text

console = Console()

logger = logging.getLogger(__name__)


def strip_html(s: str):
    s = BeautifulSoup(s, "html.parser")
    return s.text

In [None]:
# | export


class Feed:
    """RSS feed class"""

    def __init__(self, url: str, name: str = None, tags: List[str] = []):
        self.url = url
        self.name = name
        self.tags = tags

    def add_tag(self, tag: str):
        if tag not in self.tags:
            self.tags.append(tag)

    def remove_tag(self, tag: str):
        if tag in self.tags:
            self.tags.remove(tag)

In [None]:
# | export


class PyNewsReader:
    def __init__(self, feeds=List[Feed]):
        self.dbfolder = Path().home() / ".cache/pynewsreader"
        if not self.dbfolder.exists():
            self.dbfolder.mkdir(parents=True)

        # If custom feed names exist, read them in
        feed_names_path = self.dbfolder / "feed_names.json"
        if feed_names_path.exists():
            with open(self.dbfolder / "feed_names.json", "rt") as myfile:
                self._feed_names = json.load(myfile)
        else:
            self._feed_names = {}

        # If title blacklist exists, read it in
        title_blacklist_path = self.dbfolder / "title_blacklist.json"
        if title_blacklist_path.exists():
            with open(self.dbfolder / "title_blacklist.json", "rt") as myfile:
                self._title_blacklist = json.load(myfile)
        else:
            self._title_blacklist = []

        self._reader = reader.make_reader(
            self.dbfolder / "db.sqlite",
            plugins=["reader.enclosure_dedupe", "reader.entry_dedupe"],
        )

        # If title whitelist exists, read it in
        title_whitelist_path = self.dbfolder / "title_whitelist.json"
        if title_whitelist_path.exists():
            with open(self.dbfolder / "title_whitelist.json", "rt") as myfile:
                self._title_whitelist = json.load(myfile)
        else:
            self._title_whitelist = []

        self._reader = reader.make_reader(
            self.dbfolder / "db.sqlite",
            plugins=["reader.enclosure_dedupe", "reader.entry_dedupe"],
        )

        self._reader.enable_search()

    def _print_entries(
        self, entries: List[reader.Entry], mark_as_read: bool = True, limit: int = 10
    ):
        """
        Pretty print entries - supports reader.Reader.get_entries arguments
        """
        displayed_links = set()
        for e in entries:
            if e.link in displayed_links:
                # Don't display duplicates
                self._reader.mark_entry_as_read(e)
            else:
                displayed_links.add(e.link)
                if e.published:
                    published_date = "Date: " + e.published.isoformat()[:10]
                else:
                    published_date = "Date: Unknown"
                if mark_as_read:
                    self._reader.mark_entry_as_read(e)

                feed_title = f"[bold]{self._get_feed_title(e.original_feed_url)}[/bold]"

                if e.important:
                    panel_body = ":exclamation_mark:"
                else:
                    panel_body = ""
                panel_body += f"Title: [bold]{e.title}[/bold]" + "\n"
                panel_body += str(published_date) + "\n\n"
                panel_body += strip_html(e.summary).strip() + "\n"

                console.print(
                    Panel(
                        panel_body,
                        title=feed_title,
                        subtitle=f"[link={e.link}]{e.link}[/link]",
                    )
                )
                console.print()
            if len(displayed_links) == limit:
                return

    def _search_to_entry(self, search_result):
        for i in self._reader.get_entries():
            if i.id == search_result.id and i.feed_url == search_result.feed_url:
                return i

    def _get_feed_title(self, url: str):
        """Get display title for pynewsreader feed

        Args:
            url (str): URL of pynewsreader feed

        Returns:
            str: Display title
        """
        if url in self._feed_names and self._feed_names[url] is not None:
            return self._feed_names[url]
        elif self._reader.get_feed(url).title:
            return self._reader.get_feed(url).title
        else:
            return self._reader.get_feed(url).url

    def _mark_matching_entries_as_read(self, match_strings: List):
        for i in self._reader.get_entries(read=False):
            for filter_string in match_strings:
                if filter_string in i.title:
                    print(f"Marking entry as read: {i.title}")
                    self._reader.mark_entry_as_read(i)

    def _mark_matching_entries_as_important(self, match_strings: List):
        for i in self._reader.get_entries(read=False):
            for filter_string in match_strings:
                if filter_string in i.title:
                    print(f"Marking entry as important: {i.title}")
                    self._reader.mark_entry_as_important(i)

    def update(self):
        """Update feeds and search"""
        self._reader.update_feeds()
        self._reader.update_search()
        if len(self._title_blacklist) > 0:
            self._mark_matching_entries_as_read(self._title_blacklist)
        if len(self._title_whitelist) > 0:
            self._mark_matching_entries_as_important(self._title_whitelist)

    def _get_entries(
        self, important: bool = None, read: Union[None, bool] = None, limit: int = 10
    ):
        """Get entries in reader.Entry format

        Args:
            read (reader.Entry.read, optional): Filter on `read` status (None, True, False). Defaults to None.
            limit (int, optional): Number of entries to return. Defaults to 10.

        Returns:
            List[reader.Entry]: List of entries
        """
        return self._reader.get_entries(read=read, limit=limit, important=important)

    def _get_tags(self, entry: reader.Entry):
        """Get tags for a given entry"""
        return [i[0] for i in list(self._reader.get_tags(entry))]

    def add_feed(self, feed: Union[Feed, str]):
        """Add feed to pynewsreader

        Args:
            feed (Feed): pynewsreader Feed to add
        """
        if isinstance(feed, Feed):
            self._feed_names[feed.url] = feed.name
            self._reader.add_feed(feed.url, exist_ok=True)
        elif isinstance(feed, str):
            self._reader.add_feed(feed, exist_ok=True)
        else:
            raise Exception("Must be str or Feed type to add")

        # Save names to file
        with open(self.dbfolder / "feed_names.json", "wt") as myfile:
            json.dump(self._feed_names, myfile)

    def remove_feed(self, feed: Feed):
        """Remove feed from pynewsreader instance

        Args:
            feed (Feed): Feed to remove
        """

        self._reader.delete_feed(feed.url)

    def _add_to_blacklist(self, blacklist_string: str):
        if blacklist_string not in self._title_blacklist:
            self._title_blacklist.append(blacklist_string)
            with open(self.dbfolder / "title_blacklist.json", "wt") as myfile:
                json.dump(self._title_blacklist, myfile)

    def _remove_from_blacklist(self, blacklist_string: str):
        if blacklist_string in self._title_blacklist:
            self._title_blacklist.remove(blacklist_string)
            with open(self.dbfolder / "title_blacklist.json", "wt") as myfile:
                json.dump(self._title_blacklist, myfile)

    def _add_to_whitelist(self, whitelist_string: str):
        if whitelist_string not in self._title_whitelist:
            self._title_whitelist.append(whitelist_string)
            with open(self.dbfolder / "title_whitelist.json", "wt") as myfile:
                json.dump(self._title_whitelist, myfile)
            for entry in self._get_entries():
                if whitelist_string in entry.title:
                    self._reader.mark_entry_as_important(entry)

    def _remove_from_whitelist(self, whitelist_string: str):
        if whitelist_string in self._title_whitelist:
            self._title_whitelist.remove(whitelist_string)
            with open(self.dbfolder / "title_whitelist.json", "wt") as myfile:
                json.dump(self._title_whitelist, myfile)
            for entry in self._get_entries():
                if whitelist_string in entry.title:
                    self._reader.mark_entry_as_unimportant(entry)

    def feeds(self):
        """List pynewsreader feeds

        Returns:
            List[str]: List of names of current pynewsreader feeds
        """
        return [self._get_feed_title(i.url) for i in self._reader.get_feeds()]

    def show(
        self,
        limit: int = 6,
        read: bool = False,
        important: bool = None,
        mark_as_read: bool = True,
    ):
        """Pretty print entries

        Args:
            limit (int, optional): Number of entries to show. Defaults to 5.
            read (bool, optional): Show read entries (True), unread entries (False), or all entries (None). Defaults to None.
            mark_as_read (bool, optional): Mark displayed entries as read. Defaults to False.
        """
        self._print_entries(
            self._get_entries(read=read, important=important, limit=limit * 2),
            limit=limit,
            mark_as_read=mark_as_read,
        )

    def search(self, query: str, mark_as_read: bool = True, limit: int = 10):
        """Search entries and pretty print results

        Args:
            query (str): Search query
            mark_as_read (bool, optional): Mark results as read? Defaults to True.
        """
        self._print_entries(
            [self._search_to_entry(i) for i in self._reader.search_entries(query)],
            mark_as_read=mark_as_read,
            limit=limit,
        )

    def _mark_important(self, entry: reader.Entry = None):
        """Mark entry as important

        Args:
            entry (reader.Entry): Entry to mark as important
        """
        if entry is not None:
            reader.Reader.mark_entry_as_important(entry)

    def _mark_unimportant(self, entry: reader.Entry = None):
        """Mark entry as important

        Args:
            entry (reader.Entry): Entry to mark as important
        """
        if entry is not None:
            reader.Reader.mark_entry_as_unimportant(entry)

    def _add_tag(self, entry: reader.Entry, tag_key: str, tag_value: Dict = None):
        """Add tag to entry

        Args:
            entry (reader.Entry): Entry to tag
            tag_key (str): Key of tag
            tag_value (Dict, optional): Value of tag. Defaults to None.
        """
        reader.Reader.set_tag(entry, tag_key, tag_value)

    def _remove_tag(self, entry: reader.Entry, tag_key: str):
        """Remove tag from entry

        Args:
            entry (reader.Entry): Entry to tag
            tag_key (str): Key of tag
        """
        self._reader.delete_tag(entry, tag_key)

In [None]:
r = PyNewsReader()

## Add Feeds

In [None]:
# With names
for i in [
    Feed(url="https://ricochet.media/en/feed", name="Richochet Media"),
    Feed(url="https://thetyee.ca/rss2.xml", name="The Tyee"),
    Feed(
        url="https://www.thestar.com/content/thestar/feed.RSSManagerServlet.articles.topstories.rss",
        name="Toronto Star | Top Stories",
    ),
    Feed(
        url="https://www.thestar.com/content/thestar/feed.RSSManagerServlet.articles.news.investigations.rss",
        name="Toronto Star | Investigations",
    ),
    Feed(
        url="https://www.thestar.com/content/thestar/feed.RSSManagerServlet.articles.opinion.editorials.rss",
        name="Toronto Star | Editorials",
    ),
    Feed(url="https://www.macleans.ca/feed/", name="Macleans"),
]:
    r.add_feed(i)

# Use the default name from the feed
for i in [
    "https://rss.cbc.ca/lineup/topstories.xml",
    "https://rss.cbc.ca/lineup/world.xml",
    "https://rss.cbc.ca/lineup/canada.xml",
    "https://rss.cbc.ca/lineup/business.xml",
    "https://rss.cbc.ca/lineup/technology.xml",
    "https://www.cbc.ca/cmlink/rss-canada-ottawa",
    "https://thenarwhal.ca/feed/",
]:
    r.add_feed(Feed(i))

## Update Feeds

In [None]:
r.update()

## List Feeds

In [None]:
r.feeds()

['Richochet Media',
 'CBC | Business News',
 'CBC | Canada News',
 'CBC | Ottawa News',
 'CBC | Technology News',
 'CBC | Top Stories News',
 'CBC | World News',
 'Macleans',
 'The Narwhal',
 'The Tyee',
 'Toronto Star | Investigations',
 'Toronto Star | Editorials',
 'Toronto Star | Top Stories']

## Display Entries

`read` = None shows all entries

`read` = False shows only unread entries

`read` = True shows only read entries

In [None]:
r.show(limit=5, read=None, mark_as_read=False)

## Search Entries

In [None]:
r.search("Tyson", mark_as_read=False)

## Tag Entry

In [None]:
test = list(r._reader.get_entries())

In [None]:
r._reader.set_tag(test[0], "foobar")

In [None]:
list(r._reader.get_tags(test[0]))

[('foobar', None)]

In [None]:
# We added a method to just return the tag key:
r._get_tags(test[0])

['foobar']

## Remove tag from entry

In [None]:
r._reader.delete_tag(test[0], "foobar")

In [None]:
list(r._get_tags(test[0]))

[]

## Mark as Important/Unimportant

In [None]:
r._reader.mark_entry_as_important(test[0])
r._reader.mark_entry_as_unimportant(test[0])

## Mark as Read / Unread

In [None]:
r._reader.mark_entry_as_read(test[0])
r._reader.mark_entry_as_unread(test[0])

## Automatically mark entries as read

When updating feeds, entries with titles matching these strings will automatically be marked as read

In [None]:
for i in ["Musk", "Apple", "Bezos", "Google", "Samsung", "iPhone", "iPad"]:
    r._add_to_blacklist(i)

In [None]:
r._title_blacklist

['Musk', 'Apple', 'Bezos', 'Google', 'Samsung', 'iPhone', 'iPad', 'Canada']

In [None]:
r.update()

## Automatically mark entries as important

In [None]:
for i in ["interest rate", "Bank of Canada", "housing market"]:
    r._add_to_whitelist(i)

In [None]:
r.update()

## CLI

In [None]:
# | export


def main():
    fire.Fire(PyNewsReader)

In [None]:
# | hide

# To Do:
# * User interface

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()