## Preprocess review & summary texts in Amazon dataset

In [1]:
import pandas as pd
import gzip
import glob
import spacy
from pathlib import Path
import concurrent.futures

nlp = spacy.load('en_core_web_sm')

In [2]:
def read_2_dataframe(path): 
    df = pd.read_json(path, compression='gzip', lines=True)
    return df

In [3]:
def custom_tokenizer(nlp):
    prefix_re = spacy.util.compile_prefix_regex(nlp.Defaults.prefixes)
    suffix_re = spacy.util.compile_suffix_regex(nlp.Defaults.suffixes)
    custom_infixes = ['\.\.\.+', '(?<=[0-9])-(?=[0-9])', '[!&:,()]']
    infix_re = spacy.util.compile_infix_regex(custom_infixes)

    tokenizer = spacy.tokenizer.Tokenizer(nlp.vocab,
                                        nlp.Defaults.tokenizer_exceptions,
                                        prefix_re.search,
                                        suffix_re.search,
                                        infix_re.finditer,
                                        token_match=None)
    return lambda text: tokenizer(text)

def process_data_with_spacy(review_data):
    nlp = spacy.load('en_core_web_sm')
    return [text_to_seq(s, nlp) for s in review_data]

def text_to_seq (s, nlp):
    doc = nlp(s)
    tokens = []
    
    for tok in doc:
        if not tok.is_stop and not tok.is_punct and not tok.like_url and not tok.like_email:
            tokens.append(tok.lemma_.lower().strip() if tok.lemma_ != '-PRON-' else tok.lower_)
    return tokens

def text_to_text(s, nlp):
    return ' '.join(text_to_seq(s, nlp))

def process_data_with_spacy_df(df):
    
    df['reviewTextProc'] = df.apply (lambda row: text_to_text(row['reviewText'], nlp), axis=1)
    df['summaryProc'] = df.apply (lambda row: text_to_text(row['summary'], nlp), axis=1)
    
    return df


In [4]:
ds_gzip_path = r'D:\Datasets\amazon_reviews\gzips'
ds_proc_path = r'D:\Datasets\amazon_reviews\processed'

files = [Path(f) for f in glob.glob(ds_gzip_path + r"\*.gz", recursive=False)]
files.reverse()

In [6]:
import numpy as np
from multiprocessing import cpu_count, Pool
 
cores = cpu_count() - 2 #Number of CPU cores on your system
partitions = cores #Define as many partitions as you want
 
def parallelize(data, func):
    data_split = np.array_split(data, partitions)
    print('DF is splitted to {} partitions'.format(partitions))
    with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
        data_proc = pd.concat(executor.map(func, data_split))
        return data_proc
    
    return None

In [11]:
files = [Path(ds_gzip_path + '\\reviews_Kindle_Store_5.json.gz'), Path(ds_gzip_path + '\\reviews_Home_and_Kitchen_5.json.gz')]

for f in files:
    print("Start processing " + f.stem)

    df = read_2_dataframe(str(f))
    print("Shape of DF: " + str(df.shape))

    df_proc = parallelize(df, process_data_with_spacy_df);

    print("Shape of processed DF: " + str(df_proc.shape))
    df_proc.to_json(ds_proc_path + "/" + f.stem)

    print("Processing of " + f.stem + " is finished")

Start processing reviews_Kindle_Store_5.json
Shape of DF: (982619, 9)
DF is splitted to 10 partitions
Shape of processed DF: (982619, 11)
Processing of reviews_Kindle_Store_5.json is finished
Start processing reviews_Home_and_Kitchen_5.json
Shape of DF: (551682, 9)
DF is splitted to 10 partitions
Shape of processed DF: (551682, 11)
Processing of reviews_Home_and_Kitchen_5.json is finished
