In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# default_exp runners

# Process

In [3]:
# export
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable



In [4]:
# export
import logging
from tqdm.auto import tqdm

@dataclass
class Process:
    queries: list[Callable]
    steps: list[Callable]
    filter: Callable

# Simple in Memory Runner

In [5]:
# export

class RunnerMemory():
    def __init__(self, process: Process, progress_bar: bool = True):
        self.process = process
        self.progress_bar = progress_bar
        
    def query(self):
        for query in tqdm(self.process.queries, desc='query', disable=not self.progress_bar):
            for record in query.query():
                yield record

    def prepare(self, records):
        return self.process.filter(tqdm(records, desc='filter', disable=not self.progress_bar))

    def fetch(self, records):
        for record in tqdm(records, desc='fetch', disable=not self.progress_bar):
            yield (record.content, record)

    def transform(self, content_records):
        for content, record in tqdm(content_records, desc='transform', disable=not self.progress_bar):
            error = False
            for step in self.process.steps:
                try:
                    content = step(content, record)
                except Exception as e:
                    error = True
                    logging.error('Error processing %s at step %s: %s' % (record, step.__name__, e))
                    break
            if not error:
                yield content

    def run(self):
        records = self.prepare(self.query())
        content_records = self.fetch(records)
        return self.transform(content_records) 

In [6]:
# export
import itertools
from pathlib import Path
from sqlitedict import SqliteDict

class RunnerCached():
    def __init__(self, process: Process, path: Union[str, Path], progress_bar: bool = True):
        self.process = process
        self.progress_bar = progress_bar
        
        self.path = Path(path)
        
        self._query = SqliteDict(path, tablename='query', autocommit=True)
        self._fetch = SqliteDict(path, tablename='fetch', autocommit=False)
        
    def query(self):
        # TODO: Don't cache WaybackQuery or FileQuery
        for query in tqdm(self.process.queries, desc='query', disable=not self.progress_bar):
            key = repr(query)
            if key not in self._query:
                self._query[key] = list(query.query())
                
        # TODO: Merge
        for query in self.process.queries:
            for record in self._query[key]:
                yield record
                
    def prepare(self, records):
        return self.process.filter(tqdm(records, desc='filter', disable=not self.progress_bar))

    def fetch(self, records):
        records = list(records)
        unfetched_records = [r for r in records if r.digest not in self._fetch]
        
        for cls, record_group in itertools.groupby(unfetched_records, key=type):
            record_group = list(record_group)
            for content, record in zip(cls.fetch_parallel(record_group), record_group):
                assert record.digest is not None
                self._fetch[record.digest] = content
            self._fetch.commit()
        
        for record in records:
            yield (self._fetch[record.digest], record)
                

    def transform(self, content_records):
        for content, record in tqdm(content_records, desc='transform', disable=not self.progress_bar):
            error = False
            for step in self.process.steps:
                try:
                    content = step(content, record)
                except Exception as e:
                    error = True
                    logging.error('Error processing %s at step %s: %s' % (record, step.__name__, e))
                    break
            if not error:
                yield content

    def run(self):
        records = self.prepare(self.query())
        content_records = self.fetch(records)
        return self.transform(content_records) 

## Simple test process

In practice we'd use a something like parsel, beautifulsoup or selectolax.
However for a simple demo this has no external dependencies.

In [7]:
from html.parser import HTMLParser

class SkeptricHTMLParser(HTMLParser):
    def __init__(self):
        super().__init__()
        self.extract = {}
        self.field = None
        
    def handle_starttag(self, tag, attrs):
        if dict(attrs).get('class') == 'post-full-title':
            self.field = 'title'
        if dict(attrs).get('class') == 'byline-meta-date':
            self.field = 'date'

    def handle_endtag(self, tag):
        self.field = None

    def handle_data(self, data):
        if self.field is not None:
            self.extract[self.field] = data

def skeptric_filter(records):
    for r in records:
        if r.mime == 'text/html' and r.status == 200:
            yield r
        
def skeptric_extract(content, metadata):
    parser = SkeptricHTMLParser()
    html = content.decode('utf-8')
    parser.feed(html)
    data = parser.extract
    data['url'] = metadata.url
    data['timestamp'] = metadata.timestamp
    return data

def skeptric_verify_extract(content, metadata):
    if not content.get('title'):
        raise ValueError('Missing title')
    if not content.get('date'):
        raise ValueError('Missing date')
    return content

from datetime import datetime
def skeptric_normalise(content, metadata):
    content = content.copy()
    content['date'] = datetime.strptime(content['date'], '%d %B %Y')
    return content

from webrefine.query import WarcFileQuery
test_data = '../resources/test/skeptric.warc.gz'

skeptric_query = WarcFileQuery(test_data)

# Test Memory Runner

In [8]:
skeptric_process = Process(queries=[skeptric_query],
                     filter=skeptric_filter,
                     steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])

In [9]:
data = list(RunnerMemory(skeptric_process).run())
data

filter: 0it [00:00, ?it/s]

transform: 0it [00:00, ?it/s]

fetch: 0it [00:00, ?it/s]

query:   0%|          | 0/1 [00:00<?, ?it/s]

ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title


[{'title': "Pagination in Internet Archive's Wayback Machine with CDX",
  'date': datetime.datetime(2021, 11, 23, 0, 0),
  'url': 'https://skeptric.com/pagination-wayback-cdx/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 34)},
 {'title': 'About Skeptric',
  'date': datetime.datetime(2021, 10, 18, 0, 0),
  'url': 'https://skeptric.com/about/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 37)},
 {'title': 'Searching 100 Billion Webpages Pages With Capture Index',
  'date': datetime.datetime(2020, 6, 11, 0, 0),
  'url': 'https://skeptric.com/searching-100b-pages-cdx/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 39)},
 {'title': 'Fast Web Dataset Extraction Worfklow',
  'date': datetime.datetime(2021, 11, 21, 0, 0),
  'url': 'https://skeptric.com/fast-web-data-workflow/',
  'timestamp': datetime.datetime(2021, 11, 26, 11, 28, 39)},
 {'title': 'Unique Key for Web Captures',
  'date': datetime.datetime(2021, 11, 19, 0, 0),
  'url': 'https://skeptric.com/key-

We can always look up an error

Would be nicer if everything was a string so we didn't have to handle the imports...

In [10]:
from webrefine.query import WarcFileRecord
from pathlib import PosixPath
WarcFileRecord(url='https://skeptric.com/', timestamp=datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2')

WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2')

# Test Persistent Runner

In [11]:
%%time
from pathlib import Path
test_cache_path = Path('./test_skeptric_cache.sqlite')
data_cached = list(RunnerCached(skeptric_process, test_cache_path).run())

filter: 0it [00:00, ?it/s]

transform: 0it [00:00, ?it/s]

query:   0%|          | 0/1 [00:00<?, ?it/s]

ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 36), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=17122, digest='JJVB3MQERHRZJCHOJNKS5VDOODXPZAV2') at step skeptric_verify_extract: Missing title
ERROR:root:Error processing WarcFileRecord(url='https://skeptric.com/tags/data/', timestamp=datetime.datetime(2021, 11, 26, 11, 28, 38), mime='text/html', status=200, path=PosixPath('../resources/test/skeptric.warc.gz'), offset=130269, digest='R7CLAACFU5L7T5LKI5G53RZSMCNUNV6F') at step skeptric_verify_extract: Missing title


CPU times: user 652 ms, sys: 28.9 ms, total: 681 ms
Wall time: 794 ms


In [12]:
assert data_cached == data

In [13]:
#slow
from webrefine.query import CommonCrawlQuery, WaybackQuery
from datetime import datetime

skeptric_cc = CommonCrawlQuery('skeptric.com/*', apis=['CC-MAIN-2021-43'])

skeptric_wb = WaybackQuery('skeptric.com/*', start='202103', end='202111')

def skeptric_filter_strict(records):
    for r in records:
        if r.mime != 'text/html' or r.status != 200:
            continue
        if '/tags/' in r.url or '/notebooks/' in r.url or r.url.endswith('skeptric.com/'):
            continue
        yield r

skeptric_process_all = Process(queries=[skeptric_query, skeptric_cc, skeptric_wb],
                     filter=skeptric_filter_strict,
                     steps=[skeptric_extract, skeptric_verify_extract, skeptric_normalise])

The cached runner has to evaluate everything the first time and so is slow.

TODO: We need to fix the fetch progress bar (callbacks?)

In [14]:
# slow
%time data_all = list(RunnerCached(skeptric_process_all, test_cache_path).run())
len(data_all)

filter: 0it [00:00, ?it/s]

transform: 0it [00:00, ?it/s]

query:   0%|          | 0/3 [00:00<?, ?it/s]

CPU times: user 8.17 s, sys: 343 ms, total: 8.51 s
Wall time: 8.49 s


882

Cache size in MB

In [26]:
test_cache_path.stat().st_size / 1024**2

6.1953125

It runs much faster the second time

In [27]:
# slow
%time data_all_2 = list(RunnerCached(skeptric_process_all, test_cache_path).run())
assert data_all == data_all_2

filter: 0it [00:00, ?it/s]

transform: 0it [00:00, ?it/s]

query:   0%|          | 0/3 [00:00<?, ?it/s]

CPU times: user 8 s, sys: 350 ms, total: 8.35 s
Wall time: 8.29 s


In [28]:
test_cache_path.unlink()