# <div style="text-align:center">World Bank Documents and Reports Cleaner</div>

This notebook implements the cleaner classes for the data from the **Documents and Reports API**. This cleaner module provides respelling functionality as well.

In [1]:
# # Requirements:
# # Please install spacy library and the `en` model
# !~/anaconda3/bin/pip install spacy
# !~/anaconda3/bin/python -m spacy download en
# !~/anaconda3/bin/pip install contexttimer

### Installing pattern as alternative to pyenchant
#### Note, the pattern module's spell checking function is quite slow!


Clone first the development repo ([pypi version is outdated](https://github.com/clips/pattern/issues/217
))
- `git clone -b development https://github.com/clips/pattern`
- `cd pattern/`
- Commenting out `"mysqlclient"` inside the `setup.py` file may be necessary if errors are encountered in the next step.
- `pip install .`

Make sure that the `pip` that you use corresponds to the python installation that you will use to run the notebook.

In [1]:
import import_ipynb
from acronyms.AcronymModule import AcronymMapper

importing Jupyter notebook from /home/avsolatorio/WBG/wb-nlp/SCRIPTS/acronyms/AcronymModule.ipynb


In [2]:
import os
import re
import numpy as np
import pandas as pd
import nltk

nltk.data.path.append("/R/nltk_data")

from nltk.corpus import words
from nltk.corpus import PlaintextCorpusReader, wordnet
from nltk.stem import WordNetLemmatizer
from nltk import word_tokenize, pos_tag

from langdetect import detect, detect_langs

from joblib import Parallel, delayed
import multiprocessing as mp
import multiprocessing

from contexttimer import Timer

In [4]:
from phoenix.cleaner.cleaner import Cleaner

# Spelling correction module

In [6]:
class CorpusCleaner(Cleaner):

    # Clean documents using spell checker
    def batch_clean_docs(self, doclist, batch_size=None, save_docs=False, collect_text_log=False, collect_spell_errors=False, skip_existing=True, default_docs_per_worker=20):
        if batch_size is None:
            # Use a multiplier for efficient usage of workers
            batch_size = default_docs_per_worker * self.num_workers

        file_counter_x = 0
        input_folder  = self.input_folder
        output_folder = self.output_folder
        
        #log statistics
        lang_log = {} # Lang info per document - uses the format - lang_log[fileid]=('lang', 'score')
        text_log = {} # Errors count per document
        token_log = {} # Tokens count per document
        skipped_log = {} # Documents not processed
        spell_errors = {}
        exception_log = {}
        write_status_log = {}
        
        log_interval = batch_size

        with Parallel(n_jobs=self.num_workers, backend='multiprocessing') as pool:
            # Cleaning all text files found in input in folder
            batch = []
            for ix, fileid in enumerate(doclist):
                if ix % log_interval == 0:
                    self.logger(f'Docset {ix}')

                file_counter_x += 1
                if fileid.endswith('.txt'):    # text files only 

                    filen = os.path.join(input_folder, fileid)     # input file 
                    newfile = os.path.join(output_folder, fileid)   # output file
                    
                    if not os.path.isfile(filen):
                        self.logger(f"No input file: {fileid}")
                        continue

                    # Skip if output file already exists
                    if os.path.isfile(newfile) and skip_existing:
                        # self.logger(f"Output file exists: {fileid}. Skipping...")
                        continue

                    if len(batch) != batch_size:
                        batch.append(filen)

                    else:
                        with Timer() as timer:
                            doc_outputs = pool((delayed(self.clean_doc)(fln, save_doc=save_docs) for fln in batch))
                            # doc_outputs = Parallel(n_jobs=self.num_workers, backend='multiprocessing')(delayed(self.clean_doc)(fln, save_doc=save_docs) for fln in batch)
                            # doc_outputs = pool.map(self.clean_doc, [(fln, save_docs) for fln in batch], chunksize=batch_size)

                            for doc_output in doc_outputs:

                                lang_log.update(doc_output['lang'])
                                token_log.update(doc_output['tokens'])
                                skipped_log.update(doc_output['skipped'])  
                                exception_log.update(doc_output['exception'])
                                write_status_log.update(doc_output['write_status'])

                                if collect_text_log:
                                    # Don't do this if you're processing a lot of docs
                                    text_log.update(doc_output['text'])
                                if collect_spell_errors:
                                    spell_errors.update(doc_output['spell_errors'])

                            batch = []
                            
                        self.logger(f'Set {ix}: {log_interval} items for {timer.elapsed:.2f} seconds.')

            if batch:
                doc_outputs = pool((delayed(self.clean_doc)(fln, save_doc=save_docs) for fln in batch))
                # doc_outputs = Parallel(n_jobs=self.num_workers, backend='multiprocessing')(delayed(self.clean_doc)(fln, save_doc=save_docs) for fln in batch)
                # doc_outputs = pool.map(self.clean_doc, [(fln, save_docs) for fln in batch], chunksize=batch_size)

                for doc_output in doc_outputs:

                    lang_log.update(doc_output['lang'])
                    token_log.update(doc_output['tokens'])
                    skipped_log.update(doc_output['skipped']) 
                    exception_log.update(doc_output['exception'])
                    write_status_log.update(doc_output['write_status'])

                    if collect_text_log:
                        # Don't do this if you're processing a lot of docs
                        text_log.update(doc_output['text'])
                    if collect_spell_errors:
                        spell_errors.update(doc_output['spell_errors'])

        output_log = {}
        output_log['lang'] = lang_log
        output_log['tokens'] = token_log
        output_log['text'] = text_log
        output_log['spell_errors'] = spell_errors
        output_log['skipped'] = skipped_log
        output_log['exception'] = exception_log
        output_log['write_status'] = write_status_log

        return output_log

    # Clean a single document using spell checker    
    def clean_doc(self, filepath, save_doc=False):  # args):
        # filepath, *save_doc = args

        if save_doc is None:
            save_doc = False

        # log statistics
        lang_log = {}  # lang info per document - uses the format - lang_log[fileid]={'score','lang'}
        spell_errors = {}
        token_log = {}  # Tokens count per document
        text_errors = {}
        text_log = {}
        skipped_log = {}
        exception_log = {}
        write_status_log = {}

        filename = filepath.split('/')[-1]
    
        fileid = filename.strip('.txt')
        
        with open(filepath, 'rb') as fl:
            # Use context so that the file will be closed automatically upon exit from the context.
            text = fl.read()
            text = text.decode('utf-8', errors='ignore')
        
        cleaning_output = self.clean_text(text, filen=fileid)
        text = cleaning_output['text']
        
        lang_log[fileid] = cleaning_output['lang']
        token_log[fileid] = cleaning_output['token']
        skipped_log[fileid] = cleaning_output['skipped']
        text_log[fileid] = text
        spell_errors[fileid] = cleaning_output['spell_errors']
        exception_log[fileid] = cleaning_output['exception']
        write_status_log[fileid] = cleaning_output['write_status']
        
        if save_doc and cleaning_output['write_status']:
            with open(os.path.join(self.output_folder, filename), 'w') as fl:
                fl.write(text)

        # return logs
        output_log = {}
        output_log['lang'] = lang_log
        output_log['tokens'] = token_log
        output_log['text'] = text_log
        output_log['spell_errors'] = spell_errors
        output_log['skipped'] = skipped_log
        output_log['exception'] = exception_log
        output_log['write_status'] = write_status_log

        return output_log

In [7]:
class ParallelCorpusCleaner(Cleaner):

    # Clean documents using spell checker    
    def batch_clean_docs(self, doclist, batch_size=None, save_docs=False, collect_text_log=False, collect_spell_errors=False, skip_existing=True):
        if batch_size is None:
            # Use a multiplier for efficient usage of workers
            batch_size = 4 * self.num_workers

        file_counter_x = 0
        input_folder  = self.input_folder
        output_folder = self.output_folder
        
        #log statistics
        lang_log = {} # Lang info per document - uses the format - lang_log[fileid]=('lang', 'score')
        text_log = {} # Errors count per document
        token_log = {} # Tokens count per document
        skipped_log = {} # Documents not processed
        spell_errors = {}
        exception_log = {}
        write_status_log = {}
        
        log_interval = batch_size
        
        process_output_manager = multiprocessing.Manager()
        process_output_dict = process_output_manager.dict()

        batch = {}
        ix = 0
        while True:
            try:
                if len(batch) < batch_size:
                    fileid = doclist.pop(0)
                    # print(f'Processing {ix}: {fileid}')
                    ix += 1

                    if ix % log_interval == 0:
                        self.logger(f'Docset {ix}')

                    if fileid.endswith('.txt'):    # text files only 
                        filen = os.path.join(input_folder, fileid)     # input file 
                        newfile = os.path.join(output_folder, fileid)   # output file

                        if not os.path.isfile(filen):
                            self.logger(f"No input file: {fileid}")
                            continue

                        # Skip if output file already exists
                        if os.path.isfile(newfile) and skip_existing:
                            p = multiprocessing.Process(target=self.load_existing_and_extract_metadata, args=(fileid, newfile, save_docs, process_output_dict))  # , kwargs=kwargs)
                            batch[fileid] = p
                            p.start()
                            # self.logger(f"Output file exists: {fileid}. Skipping...")
                            continue

                        # kwargs = {'process_output_dict': process_output_dict, 'save_doc': save_docs}                    
                        p = multiprocessing.Process(target=self.clean_doc, args=(fileid, filen, save_docs, process_output_dict))  # , kwargs=kwargs)
                        batch[fileid] = p
                        p.start()
                else:
                    completed_data = set(process_output_dict.keys())

                    for fld in completed_data:
                        # print(f'Completed {fld}')
                        if fld not in batch:
                            continue
                        pk = batch.pop(fld)
                        pk.join()

                        # This shouldn't be necessary but still doing this just to be safe... :)
                        if pk.is_alive():
                            pk.terminate()
                            pk.join()

                        fileid = doclist.pop(0)
                        ix += 1
                        
                        # print(f'Starting {fileid}')

                        if ix % log_interval == 0:
                            self.logger(f'Docset {ix}')

                        if fileid.endswith('.txt'):    # text files only 
                            filen = os.path.join(input_folder, fileid)     # input file 
                            newfile = os.path.join(output_folder, fileid)   # output file

                            if not os.path.isfile(filen):
                                self.logger(f"No input file: {fileid}")
                                continue

                            # Skip if output file already exists
                            if os.path.isfile(newfile) and skip_existing:
                                p = multiprocessing.Process(target=self.load_existing_and_extract_metadata, args=(fileid, newfile, save_docs, process_output_dict))  # , kwargs=kwargs)
                                batch[fileid] = p
                                p.start()
                                # self.logger(f"Output file exists: {fileid}. Skipping...")
                                continue

                            # kwargs = {'process_output_dict': process_output_dict, 'save_doc': save_docs}                    
                            p = multiprocessing.Process(target=self.clean_doc, args=(fileid, filen, save_docs, process_output_dict))  # , kwargs=kwargs)
                            batch[fileid] = p
                            p.start()
                            
            except Exception as e:
                print(f'Exception received: {e.args[0]}')
                bfileids = set(batch.keys())
                for fileid in bfileids:
                    p = batch.pop(fileid)
                    p.join()

                    if p.is_alive():
                        p.terminate()
                        p.join()
                break

        # Cleanup just in case... 
        bfileids = set(batch.keys())
        for fileid in bfileids:
            p = batch.pop(fileid)
            p.join()

            if p.is_alive():
                p.terminate()
                p.join()
                
        for fileid in process_output_dict.keys():
            doc_output = process_output_dict[fileid]

            lang_log.update(doc_output['lang'])
            token_log.update(doc_output['tokens'])
            skipped_log.update(doc_output['skipped'])  
            exception_log.update(doc_output['exception'])
            write_status_log.update(doc_output['write_status'])

            if collect_text_log:
                # Don't do this if you're processing a lot of docs
                text_log.update(doc_output['text'])
            if collect_spell_errors:
                spell_errors.update(doc_output['spell_errors'])

        output_log = {}
        output_log['lang'] = lang_log
        output_log['tokens'] = token_log
        output_log['text'] = text_log
        output_log['spell_errors'] = spell_errors
        output_log['skipped'] = skipped_log
        output_log['exception'] = exception_log
        output_log['write_status'] = write_status_log

        return output_log

    # Clean a single document using spell checker    
    def clean_doc(self, fileid, filepath, save_doc=False, process_output_dict=None):  # args):
        proc_fileid = fileid
        # filepath, *save_doc = args

        if save_doc is None:
            save_doc = False

        # log statistics
        lang_log = {}  # lang info per document - uses the format - lang_log[fileid]={'score','lang'}
        spell_errors = {}
        token_log = {}  # Tokens count per document
        text_errors = {}
        text_log = {}
        skipped_log = {}
        exception_log = {}
        write_status_log = {}

        filename = filepath.split('/')[-1]
    
        fileid = filename.strip('.txt')
        
        with open(filepath, 'rb') as fl:
            # Use context so that the file will be closed automatically upon exit from the context.
            text = fl.read()
            text = text.decode('utf-8', errors='ignore')
            text = text.lower()
        
        cleaning_output = self.clean_text(text, filen=fileid)
        text = cleaning_output['text']
        
        lang_log[fileid] = cleaning_output['lang']
        token_log[fileid] = cleaning_output['token']
        skipped_log[fileid] = cleaning_output['skipped']
        text_log[fileid] = text
        spell_errors[fileid] = cleaning_output['spell_errors']
        exception_log[fileid] = cleaning_output['exception']
        write_status_log[fileid] = cleaning_output['write_status']
        
        if save_doc and cleaning_output['write_status']:
            with open(os.path.join(self.output_folder, filename), 'w') as fl:
                fl.write(text)

        # return logs
        output_log = {}
        output_log['lang'] = lang_log
        output_log['tokens'] = token_log
        output_log['text'] = text_log
        output_log['spell_errors'] = spell_errors
        output_log['skipped'] = skipped_log
        output_log['exception'] = exception_log
        output_log['write_status'] = write_status_log
        
        if process_output_dict is not None:
            process_output_dict[proc_fileid] = output_log
        else: 
            return output_log

In [8]:
# payload = dict(
#     lang=lang_log,
#     token=token_log,
#     text=text_log,
#     skipped=skipped_log,
#     spell_errors=spell_errors,
#     exception=exp,
#     write_status=write_status,
# )

# payload = {k: v if not isinstance(v, set) else list(v) for k, v in payload.items()}