In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## requests

In [None]:
import requests as r
import json

In [None]:
# Sending a GET request to http://localhost:5000
r.get('http://localhost:5000').json()

In [None]:
# Creating a shopping list via the /shoppingList endpoint on http://localhost:5000

# DELETE method without any data passed will drop the whole shopping list
r.delete('http://localhost:5000/shoppingList')

# GET method retrieves the (as of now empty) shopping list
r.get('http://localhost:5000/shoppingList').json()

# POST method creates a new entry in the shopping list
r.post('http://localhost:5000/shoppingList', data={'entry': 'milk'})
r.post('http://localhost:5000/shoppingList', data={'entry': 'tea'})
r.post('http://localhost:5000/shoppingList', data={'entry': 'wine'})

# POST method changes an entry in the shopping list
r.put(
    'http://localhost:5000/shoppingList', 
    data=json.dumps({'entry': {'wine': 'beer'}}),
    headers={'content-type': 'application/json'}
)

r.get('http://localhost:5000/shoppingList').json()

# DELETE method removes an entry
r.delete('http://localhost:5000/shoppingList', data={'entry': 'beer'})

r.get('http://localhost:5000/shoppingList').json()

In [None]:
# The `params` argument
r.get('http://localhost:5000/shoppingList', params={'a': 'b', 'c': 'd'}).url

## Dealing with API limitations

In [None]:
with open('credentials.json') as f:
    creds = json.load(f)

CLOCKIFY_API_KEY = creds['CLOCKIFY_API_KEY']
CLOCKIFY_WORKSPACE = '5d6d305c8c5e57633d851e45'
CLOCKIFY_ENDPOINT = 'https://api.clockify.me/api/v1'
CLOCKIFY_HEADERS = {
    'Content-type': 'application/json',
    'Accept': 'application/json',
    'X-API-KEY': CLOCKIFY_API_KEY
}

ZENHUB_API_KEY = creds['ZENHUB_API_KEY']
ZENHUB_ENDPOINT = 'https://api.zenhub.com'
ZENHUB_HEADERS = {
    'Content-type': 'application/json',
    'Accept': 'application/json',
    'X-Authentication-Token': ZENHUB_API_KEY
}

GITHUB_API_KEY = creds['GITHUB_API_KEY']
GITHUB_ENDPOINT = 'https://api.github.com'
GITHUB_HEADERS = {
    'Content-type': 'application/json',
    'Accept': 'application/json',
    'Authorization': f'token {GITHUB_API_KEY}'
}

### Dealing with paging

In [None]:
## Retrieve all results from Github API for given url
#
# Requests the results page by page and puts them into a single list, which is then returned
# @param url The request url excluding the common prefix of the endpoint
# @param arg Possible additional arguments as a string, prefixed with an ampersand, for example '&state=all'
# @return A list of dicts, each dict containing one result for given url
def get_github(url, arg = None):
    page = 1
    res = []
    ret = get_github_page(url, page, arg)
    while len(ret) > 0:
        res += ret
        page += 1
        ret = get_github_page(url, page, arg)
    return res

## Retrieve one page of results from Github API for given url
#
# The default limit to the size of a page returned is 30
# @param url The request url excluding the common prefix of the endpoint
# @param page Number of the page to retrieve
# @param arg Possible additional arguments as a string, prefixed with an ampersand, for example '&state=all'
# @return A list of dicts, each dict containing one result for given url
def get_github_page(url, page, arg):
    return r.get(
        f'{GITHUB_ENDPOINT}/{url}?page={page}&per_page=100{arg}',
        headers = GITHUB_HEADERS
    ).json()

## Retrieve all results from Zenhub API for given url
#
# @param url The request url excluding the common prefix of the endpoint
# @param json Boolean flag indicating whether to extract the requested data to a dict, or return the whole request response
# @return Either a list of dicts, each dict containing one result for given url, or the response to the request
def get_zenhub(url, json = True):
    res = r.get(
        f'{ZENHUB_ENDPOINT}/{url}',
        headers = ZENHUB_HEADERS
    )

    return res.json() if json else res

## Retrieve all results from Clockify API for given url
#
# Requests the results page by page and puts them into a single list, which is then returned
# @param url The request url excluding the common prefix of the endpoint and workspace
# @return A list of dicts, each dict containing one result for given url
def get_clockify(url):
    page = 1
    res = []
    ret = get_clockify_page(url, page)
    while len(ret) > 0:
        # Clockify does not provide any information on when we can send more requests and disregards any requests in the meantime
        # therefore we have to play it safe by strictly limiting it to 10 requests/second by sleeping for 0.1s after every request
        time.sleep(0.1)
        res += ret
        page += 1
        ret = get_clockify_page(url, page)
    return res

## Retrieve one page of results from Clockify API for given url
#
# The default limit to the size of a page returned is 50
# @param url The request url excluding the common prefix of the endpoint and workspace
# @param page Number of the page to retrieve
# @return A list of dicts, each dict containing one result for given url
def get_clockify_page(url, page):
    return r.get(
        f'{CLOCKIFY_ENDPOINT}/workspaces/{CLOCKIFY_WORKSPACE}/{url}?page={page}',
        headers = CLOCKIFY_HEADERS
    ).json()


### Dealing with limited number of requests per given time

In [None]:
import json
import requests as r
import re
import pandas as pd
from treelib import Node, Tree
import time

In [None]:
## Temporary variable to make user retrieval more efficient
users_tmp = get_clockify('users')

## A pandas.DataFrame with id and name of all users
users = pd.DataFrame({
    'id': [user['id'] for user in users_tmp],
    'name': [user['name'] for user in users_tmp]
})

In [None]:
## A pandas.DataFrame with all time entries
time_entries = pd.DataFrame()

for user_id in users['id']:
    for entry in get_clockify(f'user/{user_id}/time-entries'):
        time_entries = time_entries.append({
            'userId': user_id,
            'id': entry['id'],
            'description': entry['description'],
            'projectId': entry['projectId'],
            'billable': entry['billable'],
            'start': entry['timeInterval']['start'],
            'end': entry['timeInterval']['end']
        }, ignore_index=True)


In [None]:
## Temporary variable to make repo retrieval more efficient
repos_json = get_github('orgs/aivero/repos')

## A pandas.DataFrame containing all Github repositories
repos = pd.DataFrame(columns=['id', 'name'],
                 data=[[repo['id'], repo['name']] for repo in repos_json])


In [None]:
## A pandas.DataFrame containing all issues
issues = pd.DataFrame(
    columns=['repo', 'repo_id', 'issue_number', 'description', 'repository_url']
)

# Retrieve issues from all repos and put them into one pandas.DataFrame
for id, row in repos.iterrows():
    repo = row['name']
    repo_id = row['id']
    issues_arr = get_github(f'repos/aivero/{repo}/issues', arg="&state=all")

    for issue in issues_arr:
        issues = issues.append(
            {
                'repo': repo,
                'repo_id': repo_id,
                'description': issue['title'],
                'issue_number': issue['number'],
                'repository_url': issue['repository_url'],
                'labels': ','.join([iss['name'] for iss in issue['labels']]),
                'state': issue['state'],
                'closed_at': issue['closed_at']
            },
            ignore_index=True)
issues['closed_at'] = pd.to_datetime(issues['closed_at'])

In [None]:
# Determine whether an issue has children issues (is_epic = True) for all epics.
# ZenHub does not allow more than 100 requests per minute,
# therefore this script controls the number of requests per minute
# and in case 100 requests per minute is reached,
# the script sleeps until it is allowed to send a request again.
is_epic = []
for _, x in issues.iterrows():
    rq = get_zenhub('p1/repositories/{}/issues/{}'.format(x['repo_id'], x['issue_number']), json=False)

    # Zenhub API returns the information about the limit, so we can check when we are allowed to send requests again
    if int(rq.headers['X-RateLimit-Used']) >= int(rq.headers['X-RateLimit-Limit']) - 1:
        diff = int(rq.headers['X-RateLimit-Reset']) - time.time()
        if diff >= 0:
            print(diff)
            time.sleep(diff)

    is_epic.append(rq.json()['is_epic'])

issues['is_epic'] = is_epic

### Dealing with limited number of requests without headers

In [None]:
is_epic = []
for _, x in issues.iterrows():
    rq = get_zenhub('p1/repositories/{}/issues/{}'.format(x['repo_id'], x['issue_number']), json=False)
    
    while rq.status_code == 403:
        rq = get_zenhub('p1/repositories/{}/issues/{}'.format(x['repo_id'], x['issue_number']), json=False)
        continue
    print(rq.status_code)

issues['is_epic'] = is_epic

## API wrappers

In [None]:
# PyGithub wrapper for Github API

from github import Github

g = Github(f"{GITHUB_API_KEY}")
for repo in g.get_user().get_repos():
    print(repo.name)

In [None]:
# yfinance wrapper for Yahoo Finance API

import yfinance as yf

yf.download(
    tickers = "AAPL GOOG MSFT",
    period = 'ytd',
    group_by = 'ticker'
)

In [None]:
# yahoo_finance_api2 wrapper for Yahoo Finance API

from yahoo_finance_api2 import share

tickers = ['GOOG', 'AAPL', 'MSFT']

for ticker in tickers:
    t = share.Share(ticker)
    t = t.get_historical(share.PERIOD_TYPE_DAY, 10, share.FREQUENCY_TYPE_MINUTE, 5)
    print(t['open'][-1])

In [None]:
import wbpy

api = wbpy.IndicatorAPI()
iso_country_codes = ["GB", "FR", "JP"]
total_population = "SP.POP.TOTL"

dataset = api.get_dataset(total_population, iso_country_codes, date="2010:2012")
dataset.as_dict()

## Web Scraping

In [None]:
import requests as r

content = r.get('http://en.wikipedia.org/wiki/Python_(programming_language)').text
content

In [None]:
content = content[content.find('infobox vevent'):]

for i in range(8):
    content = content[content.find('tr')+2:]
    
content = content[content.find('td')+2:]

for i in range(2):
    content = content[content.find('>')+1:]
    
author = content[0:content.find('<')]

author

## BeatifulSoup4

In [None]:
from bs4 import BeautifulSoup, Comment

In [None]:
rq = r.get('http://en.wikipedia.org/wiki/Python_(programming_language)')

soup = BeautifulSoup(rq.text, 'html.parser')

In [None]:
# With .prettify, the code becomes much easier to read
print(soup.prettify())

In [None]:
soup.find('table', class_=['infobox', 'vevent']).findAll('td', text=True)[0].text

### More practical example with BS4

In [None]:
import requests as r
import re

rq = r.get('https://onemocneni-aktualne.mzcr.cz/pes')

soup = BeautifulSoup(rq.text, 'html.parser')

text = soup.find('span', id='pes-current-degree').get_text()
match = re.match('.*(\d).*', text)
if match is not None:
    print(f'Current degree is {match.groups()[0]}')

### Troubles with dynamical pages

In [None]:
rq = r.get('https://www.amazon.com/Apple-MacBook-13-inch-Storage-Keyboard/dp/B0882JG168/ref=sr_1_3?dchild=1&keywords=macbook+13+2020&qid=1606917636&sr=8-3')

print(BeautifulSoup(rq.text, 'html.parser').prettify())

In [None]:
rq = r.get('https://coronavirus.app/tracking/czechia')

print(BeautifulSoup(rq.text, 'html.parser').prettify())

## Web crawling

In [None]:
from queue import Queue
import urllib3
import re
import requests
from urllib.parse import urljoin, urlsplit
from url_normalize import url_normalize

In [None]:
# Disable InsecureRequestWarning
urllib3.disable_warnings()

# Simple URL normalization
# Converts relative URLs to absolute and converts to lowercase, then applies url-normalize
# See (https://pypi.org/project/url-normalize/)
def norm_url(url, base):
    if url[0] == '/':
        return url_normalize(urljoin(base, url).lower())
    return url_normalize(url)

# Retrieve the base of a URL
def get_base_url(url):
    split = urlsplit(url)
    base = split.scheme + '://' + split.netloc + '/'
    return base


# Checks for outlinks in a HTML page
def find_outlinks(soup):
    # URL of the file is saved as the first comment of the HTML file.
    base = get_base_url(soup.findAll(text = lambda text: isinstance(text, Comment))[0])
    
    all_urls = [link.get('href') for link in soup.find_all('a', href=True)]
    
    # Selects only URLs that are either absolute or relative to the domain.
    selected_urls = [u for u in all_urls if len(u) > 1 and (u[0] == '/' and u[1] != '/' or u[0] == 'h')]
    
    # Normalizes selected URLs.
    all_urls_normalized = [norm_url(u, base) for u in selected_urls]
    
    return all_urls_normalized

# Finds robots.txt and processes its rules
def process_robots(url):
    base = get_base_url(url)
    robots_file = base + 'robots.txt'
    req = requests.get(robots_file)
    
    # If robots.txt cannot be processes, no rules are applied
    if req.status_code != 200:
        return []
    
    robots = req.text.split('\n')
    
    # Only check for 'User-agent: *'
    if 'User-agent: *' not in robots:
        return []
    
    robots = robots[robots.index('User-agent: *')+1:]

    # Retrieve all Disallow rules
    rules = []
    for rule in robots:
        if rule.startswith('Disallow: '):
            rules.append(rule.split(' ')[1])
        if rule.startswith('User-agent: '):
            break

    # Normalize all URLs in the rules
    rules_normalized = [norm_url(rule, base) for rule in rules]
    
    return rules_normalized

# Check for visible text only
def visible(element):
    if element.parent.name in ['style', 'script', '[document]', 'head', 'title']:
        return False
    elif re.match('<!--.*-->', str(element.encode('utf-8'))):
        return False
    elif element == '\n' or element == ' ':
        return False
    return True

def extract_shingles(soup, n=4):
    # Extract all textual data
    data = soup.findAll(text=True)
    
    # Filter out comments, scripts, etc.
    texts = filter(visible, data)
    # Remove whitespaces
    texts = [str(e).strip() for e in texts]
    
    # Convert to list of words
    words = [text.split() for text in texts]
    words = [w for sub in words for w in sub]
    
    shingles = []
    for i in range(len(words) - n + 1):
        shingles.append(tuple(words[i:i+n]))
        
    return set(shingles)

def near_duplicate(shingles1, shingles2, threshold=0.8):
    union = len(set(list(shingles1) + list(shingles2)))
    overlap = len(list(shingles1) + list(shingles2)) - union

    return overlap/union > threshold

In [None]:
i = 1
n_pages = 10

q = Queue()
seed_url = "https://en.wikipedia.org/wiki/Alexandria_Ocasio-Cortez"

q.put(seed_url)

# Save texts of processed pages for near-duplication detection
processed = []

while i <= n_pages:
    # Get URL from queue
    url = q.get()

    # Make a request to URL
    try:
        req = requests.get(url, verify=False, timeout=5)
    except Exception:
        continue
    
    # Check if page exists and is accessible
    if req.status_code != 200:
        continue
        
    soup = BeautifulSoup(req.text)
    
    # Extract 4-shingles out of textual data
    shingles = extract_shingles(soup, n=4)
    
    # Check for near duplicates among processed files
    if any([near_duplicate(shingles, processed_file, threshold=0.8) for processed_file in processed]):
        continue
    
    # Save shingles for near-duplicate detection
    processed.append(shingles)
    
    # Insert the URL as a comment on the first line of the created file
    soup.insert(0, '\n')
    soup.insert(0, Comment(url))
    
    # Save the HTML of retrieved file
    with open('Pages/{}'.format('{}.html'.format(i)), 'wb+') as file:
        file.write(soup.prettify('utf-8'))
                
    # Find all outlinks in the file
    urls = find_outlinks(soup)
    # Retrieve rules from robots.txt
    rules = process_robots(url)
    
    # Use only outlinks that aren't contradictory to rules from robots.txt
    urls_to_add = []
    for u in urls:
        for rule in rules:
            if rule.startswith(u):
                continue
        urls_to_add.append(u)
    
    # Put filtered outlinks to queue
    for u in urls_to_add:
        q.put(u)
    
    i += 1

In [None]:
from glob import glob

with open(glob('Pages/*')[0]) as f:
    soup = BeautifulSoup(f)
    print(soup.find('title').text)

In [None]:
driver = webdriver.Chrome('./chromedriver')

In [None]:
from selenium import webdriver

# Load the Chromedriver and open a browser
driver = webdriver.Chrome('./chromedriver')

# go to a page
driver.get('https://www.amazon.com/')

# Find the search box and write text to it
driver.find_element_by_id('twotabsearchtextbox').send_keys('macbook pro 2020')

# Find the search box and press enter
driver.find_element_by_id('twotabsearchtextbox').send_keys(u'\ue007')

# Locate the first product's image and click on it
driver.find_elements_by_class_name('s-image')[0].click()

# Finally, retrieve the price
driver.find_element_by_id('priceblock_ourprice').text