In [2]:
! pip install --quiet s3fs
! pip install -e 'git://github.com/nandanrao/embed-software.git#egg=embed_software'

In [31]:
from os.path import join
import json
from time import sleep
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
from multiprocessing import Pool, Queue, Process, Manager
from itertools import takewhile, islice, count
from sqlite3 import connect, OperationalError

import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def chunk(n, it):
    src = iter(it)
    return takewhile(bool, (list(islice(src, n)) for _ in count(0)))

def remove_qs(url):
    """ Removes query string except for key jk """
    u = urlparse(url)
    qs = parse_qs(u.query)
    jk = qs.get('jk')
    if jk:
        qs = urlencode({'jk': jk}, True)
    else:
        qs = None        
    return urlunparse(u._replace(query = qs))

class ParallelProcessor():
    def __init__(self, inpath, fs, outpath, string_processor, threads, **kwargs):
        self.inpath = inpath
        self.outpath = outpath
        self.threads = threads
        self.fs = fs
        self.kwargs = kwargs
        self.string_processor = string_processor
        self.processed = 0
        
    def _create_table(self, outpath):
        con = connect(self.outpath)
        cur = con.cursor()
        cur.execute("""CREATE TABLE processed(id INTEGER PRIMARY KEY, 
                                              url VARCHAR UNIQUE, 
                                              title VARCHAR, 
                                              content VARCHAR,
        				      category VARCHAR,
                                              section VARCHAR,
                                              timestamp DATETIME)
                                              """)
        con.commit()
        cur.close()
        con.close()
        
    def _insert_rows(self, cur, li, tries=0):
        try:
            cur.executemany("insert or ignore into \
            processed(url,title,content,category,section,timestamp) \
            values (?,?,?,?,?,?)", li)
        except OperationalError as e:
            sleep(.5)
            if tries < 10:
                self._insert_rows(cur, li, tries = tries+1)
            else:
                logging.error('failed to insert!')
                print(e)
                pass
            
    def _process(self, d, keys):
        d['category'], d['section'] = d['categories']
        d['content'] = self.string_processor(d['description'])
        d['url'] = remove_qs(d['url'])
        d = [d[k] for k in keys] if d['content'] else None        
        return d
        
    def _get_files(self):
        files = self.fs.ls(self.inpath)
        files = [f.split('/')[-1] for f in files]
        files = [f for f in files if f]
        return [join(self.inpath, f) for f in files]
    
    def process(self, filename):
        with self.fs.open(filename) as f:
            for c in chunk(1000, f):
                rows = [self._process(json.loads(l), **self.kwargs) for l in c]
                rows = [d for d in rows if d]
                self.q.put(rows)

    def write(self):
        con = connect(self.outpath)        
        cur = con.cursor()
        while True:
            rows = self.q.get()
            if rows is None:
                break
            self._insert_rows(cur, rows)
            con.commit()
        cur.close()
        con.close()
        
    def process_all(self, files=None):
        self._create_table(self.outpath)        
        if files is None:
            files = self._get_files()

        m = Manager()
        self.q = m.Queue()
        p = Process(target = self.write)
        p.start()

        with Pool(self.threads) as pool:
            pool.map(self.process, files)
        
        print('done processing')
        self.q.put(None)
        p.join()

In [None]:
from embed_software.preprocess import claims_processor, readme_processor, Preprocessor
import s3fs

def preprocess_country(COUNTRY):
    fs = s3fs.S3FileSystem()
    inpath = f'oecd-scraping/indeed-{COUNTRY}'
    keys = ['url', 'title', 'content', 'category', 'section', 'scrapeTimestamp']
    string_processor = Preprocessor(readme_processor, 4).process
    p = ParallelProcessor(inpath, fs, f'data/{COUNTRY}/processed.db', string_processor, threads=8, keys=keys)
    p.process_all()

done processing


In [None]:
preprocess_country('uk')