In [None]:
import pickle
import os
import spacy

import multiprocessing as mp
from functools import partial
from joblib import Parallel, delayed
from tqdm import tqdm
from spacy.util import minibatch
from spacy.attrs import LEMMA

nlp = spacy.load("en_core_web_lg", disable=[  # "tagger",
    # "parser",
    # "ner"
])

In [None]:
current_path = os.path.dirname(os.path.abspath("__file__"))
newsgroups_train_data_loc = f"{current_path}/../data/raw/20_newsgroups/train_data.pkl"
newsgroups_test_data_loc = f"{current_path}/../data/raw/20_newsgroups/test_data.pkl"

In [None]:
def parallel_apply_list(a_list, a_function, n_jobs=mp.cpu_count(), func_param=None, n_threads=None, **kwargs):
    """
    Applies a_function to a_list using multiprocessing with n_jobs. If a_function has a specific
    parameter that elements in a_list should fill, indicate it with func_param. If there are
    other parameters in a_function that should be statically filled, use **kwargs.

    If elements in a_list are tuples, lists, or anything else that provides multiple inputs
    to a_function, wrap a_function so that it takes the entire tuple or list (see parallel_apply_row for example)

    Parameters
    ----------
    a_list : list
    a_function : function object
    n_jobs : int (multiprocessing)
    n_threads : int (threading)
    func_param : None (defaults to first parameter in function) or str (parameter in a_function)
    kwargs : static keyword arguments to be given to all instances of a_function

    Returns
    -------
    result : list
    """
    if n_jobs:
        executor = Parallel(n_jobs=n_jobs, backend="multiprocessing", prefer="processes")
    else:
        executor = Parallel(n_jobs=n_threads, backend="threading", prefer="threads")
    do = delayed(partial(a_function, **kwargs))
    if func_param:
        tasks = (do(**{func_param: ele}) for ele in tqdm(a_list))
    else:
        tasks = (do(ele) for ele in tqdm(a_list))
    result = executor(tasks)
    return result


def spacy_np_lemma_no_stop(doc):
    for np in doc.noun_chunks:
        while len(np) > 1 and np[0].dep_ not in ('amod', 'compound'):
            np = np[1:]
        if len(np) > 1:
            with doc.retokenize() as retokenizer:
                doc.vocab['_'.join([ele.lemma_.lower() for ele in np])]
                retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.lemma_.lower() for ele in np])]})
        for ent in doc.ents:
            if len(ent) > 1:
                with doc.retokenize() as retokenizer:
                    doc.vocab['_'.join([ele.lemma_.lower() for ele in np])]
                    retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.lemma_.lower() for ele in np])]})

    tokenized_doc = [ele.lemma_.lower().replace(' ', '_') for ele in doc if ((not ele.is_stop) and (not ele.is_space) and (not ele.is_punct))]
    tokenized_doc = [ele for ele in tokenized_doc if ele]
    return tokenized_doc

             
def spacy_np_lemma(doc):
    for np in doc.noun_chunks:
        while len(np) > 1 and np[0].dep_ not in ('amod', 'compound'):
            np = np[1:]
        if len(np) > 1:
            with doc.retokenize() as retokenizer:
                doc.vocab['_'.join([ele.lemma_.lower() for ele in np])]
                retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.lemma_.lower() for ele in np])]})
        for ent in doc.ents:
            if len(ent) > 1:
                with doc.retokenize() as retokenizer:
                    doc.vocab['_'.join([ele.lemma_.lower() for ele in np])]
                    retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.lemma_.lower() for ele in np])]})

    tokenized_doc = [ele.lemma_.lower().replace(' ', '_') for ele in doc if ((not ele.is_space) and (not ele.is_punct))]
    tokenized_doc = [ele for ele in tokenized_doc if ele]
    return tokenized_doc

                
def spacy_np_no_stop(doc):
    for np in doc.noun_chunks:
        while len(np) > 1 and np[0].dep_ not in ('amod', 'compound'):
            np = np[1:]
        if len(np) > 1:
            with doc.retokenize() as retokenizer:
                doc.vocab['_'.join([ele.text.lower() for ele in np])]
                retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.text.lower() for ele in np])]})
        for ent in doc.ents:
            if len(ent) > 1:
                with doc.retokenize() as retokenizer:
                    doc.vocab['_'.join([ele.text.lower() for ele in np])]
                    retokenizer.merge(np, attrs={LEMMA: doc.vocab.strings['_'.join([ele.text.lower() for ele in np])]})

    tokenized_doc = [ele.text.lower().strip().replace(' ', '_') for ele in doc if ((not ele.is_stop) and (not ele.is_space) and (not ele.is_punct))]
    tokenized_doc = [ele for ele in tokenized_doc if ele]
    return tokenized_doc


def spacy_process_save_text_batch(batch, data_name, data_type, current_path, return_obj):
    norm_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_norm.txt"
    lemma_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_lemma.txt"
    no_stop_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_no_stop.txt"
    lemma_no_stop_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_lemma_no_stop.txt"
    np_no_stop_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_np_no_stop.txt"
    np_lemma_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_np_lemma.txt"
    np_lemma_no_stop_outpath = f"{current_path}/../data/processed/{data_name}/{data_type}_np_lemma_no_stop.txt"
    np_no_stop_outpath_only = f"{current_path}/../data/processed/{data_name}/{data_type}_np_no_stop_only.txt"
    np_lemma_outpath_only = f"{current_path}/../data/processed/{data_name}/{data_type}_np_lemma_only.txt"
    np_lemma_no_stop_outpath_only = f"{current_path}/../data/processed/{data_name}/{data_type}_np_lemma_no_stop_only.txt"
    
    if os.path.exists(norm_outpath):
        norm_outpath_write = 'a'
    else:
        norm_outpath_write = 'w'
    if os.path.exists(lemma_outpath):
        lemma_outpath_write = 'a'
    else:
        lemma_outpath_write = 'w'
    if os.path.exists(no_stop_outpath):
        no_stop_outpath_write = 'a'
    else:
        no_stop_outpath_write = 'w'
    if os.path.exists(lemma_no_stop_outpath):
        lemma_no_stop_outpath_write = 'a'
    else:
        lemma_no_stop_outpath_write = 'w'
    if os.path.exists(np_no_stop_outpath):
        np_no_stop_outpath_write = 'a'
    else:
        np_no_stop_outpath_write = 'w'
    if os.path.exists(np_lemma_outpath):
        np_lemma_outpath_write = 'a'
    else:
        np_lemma_outpath_write = 'w'
    if os.path.exists(np_lemma_no_stop_outpath):
        np_lemma_no_stop_outpath_write = 'a'
    else:
        np_lemma_no_stop_outpath_write = 'w'
    if os.path.exists(np_no_stop_outpath_only):
        np_no_stop_outpath_write_only = 'a'
    else:
        np_no_stop_outpath_write_only = 'w'
    if os.path.exists(np_lemma_outpath_only):
        np_lemma_outpath_write_only = 'a'
    else:
        np_lemma_outpath_write_only = 'w'
    if os.path.exists(np_lemma_no_stop_outpath_only):
        np_lemma_no_stop_outpath_write_only = 'a'
    else:
        np_lemma_no_stop_outpath_write_only = 'w'
    
    
    os.makedirs(os.path.dirname(np_lemma_no_stop_outpath_only), exist_ok=True)
    os.makedirs(os.path.dirname(np_lemma_outpath_only), exist_ok=True)
    os.makedirs(os.path.dirname(np_no_stop_outpath_only), exist_ok=True)
    os.makedirs(os.path.dirname(np_lemma_no_stop_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(np_lemma_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(np_no_stop_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(lemma_no_stop_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(no_stop_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(lemma_outpath), exist_ok=True)
    os.makedirs(os.path.dirname(norm_outpath), exist_ok=True)
    f0 = open(norm_outpath, norm_outpath_write)
    f1 = open(lemma_outpath, lemma_outpath_write)
    f2 = open(no_stop_outpath, no_stop_outpath_write)
    f3 = open(lemma_no_stop_outpath, lemma_no_stop_outpath_write)
    f4 = open(np_no_stop_outpath, np_no_stop_outpath_write)
    f5 = open(np_lemma_outpath, np_lemma_outpath_write)
    f6 = open(np_lemma_no_stop_outpath, np_lemma_no_stop_outpath_write)
    f7 = open(np_no_stop_outpath_only, np_no_stop_outpath_write_only)
    f8 = open(np_lemma_outpath_only, np_lemma_outpath_write_only)
    f9 = open(np_lemma_no_stop_outpath_only, np_lemma_no_stop_outpath_write_only)
    for doc in nlp.pipe(batch):
        doc_norm = [ele.text.lower() for ele in doc if (not ele.is_space) and (not ele.is_punct)]
        f0.write(" ".join(doc_norm))
        f0.write("\n")
                
        doc_lemma = [ele.lemma_.lower() for ele in doc if (not ele.is_space) and (not ele.is_punct)]
        f1.write(" ".join(doc_lemma))
        f1.write("\n")
                
        doc_no_stop = [ele.text.lower() for ele in doc if ((not ele.is_space) and (not ele.is_punct) and (not ele.is_stop))]
        f2.write(" ".join(doc_no_stop))
        f2.write("\n")
                
        doc_lemma_no_stop = [ele.lemma_.lower() for ele in doc if ((not ele.is_space) and (not ele.is_punct) and (not ele.is_stop))]
        f3.write(" ".join(doc_lemma_no_stop))
        f3.write("\n")
                
        doc_np_no_stop = spacy_np_no_stop(doc)
        f4.write(" ".join(doc_np_no_stop))
        f4.write("\n")
                
        doc_np_lemma = spacy_np_lemma(doc)
        f5.write(" ".join(doc_np_lemma))
        f5.write("\n")
                
        doc_np_lemma_no_stop = spacy_np_lemma_no_stop(doc)
        f6.write(" ".join(doc_np_lemma_no_stop))
        f6.write("\n")
                
        doc_np_no_stop_only = [ele for ele in doc_np_no_stop if '_' in ele]
        f7.write(" ".join(doc_np_no_stop_only))
        f7.write("\n")
                
        doc_np_lemma_only = [ele for ele in doc_np_lemma if '_' in ele]
        f8.write(" ".join(doc_np_lemma_only))
        f8.write("\n")
                
        doc_np_lemma_no_stop_only = [ele for ele in doc_np_lemma_no_stop if '_' in ele]
        f9.write(" ".join(doc_np_lemma_no_stop_only))
        f9.write("\n")
        
    f0.close()
    f1.close()
    f2.close()
    f3.close()
    f4.close()
    f5.close()
    f6.close()
    f7.close()
    f8.close()
    f9.close()
    if return_obj:           
        return (doc_norm,
                doc_lemma,
                doc_no_stop,
                doc_lemma_no_stop,
                doc_np_no_stop,
                doc_np_lemma,
                doc_np_lemma_no_stop,
                doc_np_no_stop_only,
                doc_np_lemma_only,
                doc_np_lemma_no_stop_only)
    else:
        return None


def multiprocess_spacy_nlp(texts, batch_size=1000, n_jobs=mp.cpu_count(), data_name='20_newsgroups', data_type='train', current_path=os.path.dirname(os.path.abspath("__file__")), return_obj=False):
    partitions = minibatch(texts, size=batch_size)
    results = parallel_apply_list(partitions, 
                                  spacy_process_save_text_batch,
                                  data_name=data_name,
                                  data_type=data_type,
                                  current_path=current_path,
                                  return_obj=return_obj,
                                  n_jobs=n_jobs)
    if return_obj:
        doc_norm = [ele[0] for ele in results]
        doc_lemma = [ele[1] for ele in results]
        doc_no_stop = [ele[2] for ele in results]
        doc_lemma_no_stop = [ele[3] for ele in results]
        doc_np_no_stop = [ele[4] for ele in results]
        doc_np_lemma = [ele[5] for ele in results]
        doc_np_lemma_no_stop = [ele[6] for ele in results]
        doc_np_no_stop_only = [ele[7] for ele in results]
        doc_np_lemma_only = [ele[8] for ele in results]
        doc_np_lemma_no_stop_only = [ele[9] for ele in results]
        
        doc_norm = [ele for sublist in doc_norm for ele in sublist]
        doc_lemma = [ele for sublist in doc_lemma for ele in sublist]
        doc_no_stop = [ele for sublist in doc_no_stop for ele in sublist]
        doc_lemma_no_stop = [ele for sublist in doc_lemma_no_stop for ele in sublist]
        doc_np_no_stop = [ele for sublist in doc_np_no_stop for ele in sublist]
        doc_np_lemma = [ele for sublist in doc_np_lemma for ele in sublist]
        doc_np_lemma_no_stop = [ele for sublist in doc_np_lemma_no_stop for ele in sublist]
        doc_np_no_stop_only = [ele for sublist in doc_np_no_stop_only for ele in sublist]
        doc_np_lemma_only = [ele for sublist in doc_np_lemma_only for ele in sublist]
        doc_np_lemma_no_stop_only = [ele for sublist in doc_np_lemma_no_stop_only for ele in sublist]
        
        return (doc_norm,
                doc_lemma,
                doc_no_stop,
                doc_lemma_no_stop,
                doc_np_no_stop,
                doc_np_lemma,
                doc_np_lemma_no_stop,
                doc_np_no_stop_only,
                doc_np_lemma_only,
                doc_np_lemma_no_stop_only)
    else:
        None

In [None]:
newgroups_train_data = pickle.load(open(newsgroups_train_data_loc, "rb"))
newgroups_test_data = pickle.load(open(newsgroups_test_data_loc, "rb"))

In [None]:
multiprocess_spacy_nlp(newgroups_train_data,
                       batch_size=300,
                       n_jobs=11,
                       data_name='20_newsgroups',
                       data_type='train',
                       current_path=os.path.dirname(os.path.abspath("__file__")),
                       return_obj=False)

In [None]:
multiprocess_spacy_nlp(newgroups_test_data,
                       batch_size=300,
                       n_jobs=11,
                       data_name='20_newsgroups',
                       data_type='test',
                       current_path=os.path.dirname(os.path.abspath("__file__")),
                       return_obj=False)