In [1]:
import json
from pathlib import Path
from typing import Union, List
import requests

from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5
import weaviate

# Querying and parsing

In [17]:
def get_links(filepath: str = "../data/links.txt") -> list:
    with open(filepath, "r", encoding="utf-8") as f:
        links = [line.strip() for line in f.readlines()]
    return links

In [18]:
def request_article(link: str):
    response = requests.get(link)
    html = response.text
    return html

In [19]:
def write_html_to_file(filepath: str, html) -> None:
    with open(filepath, "w", encoding="utf-8") as f:
        f.write(html)

def read_html_from_file(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        html = f.read()
    return html

In [84]:
def parse_paragraphs(html) -> List[str]:
    soup = BeautifulSoup(html)
    results = soup.article.find_all("p")
    paragraphs = [p.text for p in results]
    return paragraphs

def parse_summary(html) -> str:
    def _description_match_func(tag):
        if tag.name == "meta":
            if name := tag.get("name"):
                if name == "og:description":
                    return True
                elif name == "description":
                    return True
            elif tag.get("property") == "og:description":
                return True
        else:
            return False

    soup = BeautifulSoup(html)
    results = soup.head.find_all(_description_match_func)
    summary = results[0]["content"]
    return summary

def parse_title(html) -> str:
    def _title_match_func(tag):
        if tag.name == "meta":
            if name := tag.get("name"):
                if name == "og:title":
                    return True
                elif name == "description":
                    return True
            elif tag.get("property") == "og:title":
                return True
        # elif tag.name == "title":
        #     return True
        else:
            return False

    soup = BeautifulSoup(html)
    results = soup.head.find_all(_title_match_func)
    title = results[0]["content"]
    return title

def parse_source(html) -> str:
    def _site_match_func(tag):
        if tag.name == "meta":
            if tag.get("name") == "og:site_name":
                return True
            elif tag.get("property") == "og:site_name":
                return True
        else:
            return False

    soup = BeautifulSoup(html)
    if results := soup.head.find_all(_site_match_func):
        source = results[0]["content"]
    else:
        source = []
    return source

In [29]:
def download_articles(links: List[str]) -> None:
    for url in links:
        html = request_article(url)
        write_html_to_file(f"../data/articles/{url[-10:-1]}.html", html)

In [26]:
def read_articles(dirpath: str) -> str:
    dirpath = Path(dirpath)
    for file in dirpath.iterdir():
        if file.suffix == ".html":
            html = read_html_from_file(file)
            yield html

In [27]:
def create_article_obj(html: str) -> dict:
    article_obj = dict(
        title=parse_title(html),
        summary=parse_summary(html),
        source=parse_source(html)
    )
    return article_obj

In [30]:
download_articles(get_links())

In [85]:
all_html = [h for h in read_articles("../data/articles")]
articles = []
for idx, article_html in enumerate(all_html):
    print(idx)
    articles.append(create_article_obj(article_html))

0


KeyError: 'content'

In [82]:
soup = BeautifulSoup(all_html[3])
soup.head.find_all()

[<meta charset="utf-8"/>,
 <meta content="IE=edge" http-equiv="X-UA-Compatible"/>,
 <meta content="width=device-width, initial-scale=1" name="viewport"/>,
 <title>Introduction to streaming for data scientists</title>,
 <meta content="As machine learning moves towards real-time, streaming technology is becoming increasingly important for data scientists. Like many people coming from a mach..." name="description"/>,
 <link href="/assets/main.css" rel="stylesheet"/>,
 <link href="https://huyenchip.com/2022/08/03/stream-processing-for-data-scientists.html" rel="canonical"/>,
 <link href="/feed.xml" rel="alternate" title="Chip Huyen" type="application/rss+xml"/>,
 <script>
   (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
   (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
   m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
   })(window,document,'script','https://www.google-analytics.com/analytics.js'

# Weaviate

In [34]:
def load_schema_classes(schema_path) -> dict:
    schema_path = Path(schema_path)

    schema_classes = dict(classes=[])
    for file in schema_path.iterdir():
        if file.suffix == ".json":
            with open(file) as f:
                schema = json.load(f)
                schema_classes["classes"].append(schema)

    return schema_classes

In [35]:
def load_article(client, html) -> None:
    with client.batch() as batch:
        article_obj = create_article_obj(html)
        article_uuid = generate_uuid5(article_obj, "Article")

        batch.add_data_object(data_object=article_obj, class_name="Article", uuid=article_uuid)

        for idx, paragraph in enumerate(parse_paragraphs(html)):
            paragraph_obj = dict(index=idx, content=paragraph)
            paragraph_uuid = generate_uuid5(paragraph_obj, "Paragraph")
            batch.add_data_object(data_object=paragraph_obj, class_name="Paragraph", uuid=paragraph_uuid)

            batch.add_reference(article_uuid, "Article", "hasParagraphs", paragraph_uuid, "Paragraph")
            batch.add_reference(article_uuid, "Paragraph", "fromArticle", article_uuid, "Article")

In [36]:
client = weaviate.client.Client(url="http://localhost:8080")

client.schema.delete_all()
schemas = load_schema_classes("../schema")
client.schema.create(schemas)

In [37]:
data_path = Path("../data/articles")
for file in data_path.iterdir():
    if file.name.endswith(".html"):
        html = read_html_from_file(file)
        load_article(client, html)