In [None]:
from xml.dom import minidom
from typing import Dict, List, Tuple
import sqlite3
from pathlib import Path

import requests
from IPython.display import display, HTML
from bs4 import BeautifulSoup

from secrets.config import config

In [None]:
debug = False

In [None]:
def pretty_print_xml(xml: str) -> None:
    xml = minidom.parseString(xml)
    print(xml.toprettyxml())

In [None]:
def get_url(url: str) -> str:
    try:
        response = requests.get(url)
        if response.ok:
            return response.content.decode('utf-8')
        else:
            raise Exception('invalid response code', response)
    except Exception as e:
        raise e

def get_token(url: str='https://commerce.reuters.com/rmd/rest/xml/login?username={username}&password={password}',
              config: Dict[str, str] = None) -> str:
    url = url.format(username=config['user'], password=config['password'])
    content = get_url(url)
    soup = BeautifulSoup(content, 'lxml')
    return soup.find('authtoken').text

token = get_token(config=config)

In [None]:
def get_channels(token: str,
                 url: str='https://rmb.reuters.com/rmd/rest/xml/channels?&token={token}'
                ) -> List[Tuple[str, str]]:
    url = url.format(token=token)
    channels_raw = get_url(url)
    
    if debug:
        pretty_print_xml(channels_raw)
        
    soup = BeautifulSoup(channels_raw, 'lxml')
    channels = [(ci.find('alias').text, ci.find('description').text)
                for ci in soup.find_all('channelinformation')
                if ci.find('category').get('id') == 'OLR']
    return channels

channels = get_channels(token)
channels[:5]

In [None]:
def _filter_versions(items: List) -> List:
    tmp = {}
    for item in items:
        guid = item[2]
        if guid not in tmp:
            tmp[guid] = item[3]
        else:
            version = tmp[guid]
            if version < item[3]:
                tmp[guid] = item[3]

    items = [(item[0], item[1])
             for item in items
             if tmp[item[2]] == item[3]]
    return items

def get_items(token: str, channel_id: str, limit: int=100, max_age: str='7D', 
              media_type: str='T', language: str='en',
              url: str='https://rmb.reuters.com/rmd/rest/xml/items?channel={channel_id}&limit={limit}&maxAge={max_age}&mediaType={media_type}&language={language}&token={token}'
             ) -> List:
    url = url.format(token=token, channel_id=channel_id, limit=str(limit), max_age=max_age, 
                     media_type=media_type, language=language)
    items_raw = get_url(url)

    if debug:
        pretty_print_xml(items_raw)

    soup = BeautifulSoup(items_raw, 'lxml')
    items = [(ci.find('id').text, ci.find('headline').text,
              ci.find('guid').text, (ci.find('version').text))
                for ci in soup.find_all('result')]
    # print(f'items received: {len(items)}')
    items = _filter_versions(items)
    return items


items = get_items(token, channels[0][0])
print(f'items filtered: {len(items)}')
items[:5]

In [None]:
item_id = items[0][0]

def get_item(token: str, item_id: str,
              url: str='https://rmb.reuters.com/rmd/rest/xml/item?id={item_id}&token={token}'
             ) -> List:
    url = url.format(token=token, item_id=item_id)
    item_raw = get_url(url)

    if debug:
        pretty_print_xml(item_raw)

    soup = BeautifulSoup(item_raw, 'lxml')
    content = [c.text for c in soup.find('inlinexml').find_all('p')]
    return content

content = get_item(token, items[0][0])
content[:5]

In [None]:
path_data = Path('sql')
path_data.mkdir(exist_ok=True)

db_file = path_data / 'db.sqlite'
create_tables_file = path_data / 'create_tables.sql'
with sqlite3.connect(str(db_file)) as con:
    with open(create_tables_file, 'r') as f:
        script = f.read()
    con.executescript(script)
    con.commit()
    cur = con.cursor()
    cur.execute("select count(*) from article")
    print(cur.fetchone()[0])