In [1]:
# ! pip install --quiet tqdm
# ! pip install --quiet gcsfs
# ! pip install --quiet s3fs
! pip install --quiet dataset

In [2]:
import pandas as pd
import os
from lib.preprocess import *
from tqdm import tnrange, tqdm_notebook
from multiprocessing import Pool
import threading 

In [9]:
import dataset
import gcsfs
import s3fs
from os.path import join
import re


class ParallelProcessor():    
    jl_pattern = re.compile('.+\.jl$')
    csv_pattern = re.compile('.+\.csv$')

    def __init__(self, 
                 string_processor, 
                 inpath, 
                 outpath, 
                 content_key,                 
                 id_key,
                 pandas_kwargs = {},
                 cores = None,
                 fs = gcsfs.GCSFileSystem(project='open-source-software')):
        self.string_processor = string_processor
        self.inpath = inpath
        self.outpath = outpath
        self.id_key = id_key
        self.content_key = content_key
        self.pandas_kwargs = pandas_kwargs
        self.cores = cores
        self.fs = fs
        
    def _read_df(self, filename, file):
        if re.match(self.csv_pattern, filename):
            return pd.read_csv(file, **self.pandas_kwargs)
        elif re.match(self.jl_pattern, filename):
            return pd.read_json(file, lines=True, **self.pandas_kwargs)
        else:
             raise TypeError('Cannot parse file: {}'.format(filename))

    def _read(self, filename):
        with self.fs.open(join(self.inpath, filename)) as fi:            
            return self._read_df(filename, fi)
            
    def _get_files(self):
        files = self.fs.ls(self.inpath)
        files = [f.split('/')[-1] for f in files]
        return [f for f in files if f]
    
    def process(self,  
                       f,  
                       compression = 'gzip'):
        
        key = self.content_key
        df = self._read(f)    
        df = df[df[key].notna()].reset_index(drop=True)
        
        processed = df[key].map(self.string_processor)
        
        df['content'] = processed
        df = (df[(df.content.notna()) & (df[self.id_key].notna())]
              .reset_index(drop=True)
              .drop(key, 1))
    
        return df
        
    def process_all(self):
        files = self._get_files()
        pool = Pool(self.cores)
        conn = dataset.connect('sqlite:///{}'.format(self.outpath))
        table = conn['processed']        
        for df in tqdm_notebook(pool.imap(self.process, files), total=len(files)):
            for i,c in zip(df[self.id_key], df.content):
                table.insert_ignore({self.id_key: i, 'content': c}, [self.id_key])           
        pool.close()
        pool.join()
        

In [None]:
inpath = 'oecd-scraping/indeed-uk'
string_processor = Preprocessor(claims_processor, 4).process
p = ParallelProcessor(string_processor, inpath, 'jobs-lookup.db', 'description', 'url', cores=2, fs = s3fs.S3FileSystem())
p.process_all()

HBox(children=(IntProgress(value=0, max=2), HTML(value='')))

In [5]:
inpath = 'gs://oss_bigquery_exports/patent-descriptions'
string_processor = Preprocessor(claims_processor, 4).process
p = ParallelProcessor(string_processor, inpath, 'patent-lookup.csv', pandas_kwargs = {'compression':'gzip'}, cores=24)
p.process_all()

HBox(children=(IntProgress(value=0, max=586), HTML(value='')))




In [None]:
# preprocess('claims_lookup.csv',  'oss_bigquery_exports/patent-descriptions', 
#                                                    process_claims, 
#                                                    'application_number_formatted', 'gzip')

In [None]:
# preprocess('readme_lookup.csv', 'oss_bigquery_exports/readmes', process_readmes, 'id')

HBox(children=(IntProgress(value=0, max=16), HTML(value='')))

### Treat owner like collaborative filtering? 

In [101]:
by_owner = df.groupby('repo_owner').apply(lambda df: ' \t '.join(df.content))

In [102]:
with open('by_owner.txt', 'w') as f:
    for c in by_owner:
        f.write(c + '\n')