<p>DATA PREPROCESSING WITH GOOGLE COLAB </p>
<a href="https://colab.research.google.com/drive/15zB3v1PRyaLupEHQoKzSXA-b4Pvp0g4P?usp=sharing">Colab Version</a>

In [15]:
# !pip install nltk bs4 wget elasticsearch symspellpy==6.7.6 

In [16]:
# dependency modules for NLP
from nltk.tag.perceptron import PerceptronTagger
from nltk.stem.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize  
from nltk.corpus import stopwords
from nltk.corpus import wordnet
import nltk
import re

# dependency modules for elastic search & data preprocess
from pandas.api.types import is_list_like,is_string_dtype
from elasticsearch import Elasticsearch
from urllib.request import urlopen
from elasticsearch import helpers
from functools import lru_cache
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import itertools
import pickle
import wget
import gzip
import json
import time
import gc
import os

# dependency module for Spell Checker
from symspellpy import SymSpell, Verbosity

In [17]:
# nltk.download('averaged_perceptron_tagger')
# nltk.download('stopwords')
# nltk.download('wordnet')
# nltk.download('punkt')

In [18]:
class Data_Preprocessing:
    
    def __init__(self,):
        self.all_text = '' 
        self.brands = []
        self.main_cats = [] 
        self.my_symspell = SymSpell()
        self.wnl = WordNetLemmatizer()
        self.tagger = PerceptronTagger() 
        self.p_stemmer = PorterStemmer()
        self.counter = itertools.count(1)
        self.stemmer = lru_cache(maxsize=50000)(self.p_stemmer.stem)
        self.lemmatize = lru_cache(maxsize=50000)(self.wnl.lemmatize)
        self.date_dict = {'january':'01','february':'02','march':'03','april':'04','may':'05','june':'06','july':'07','august':'08',
                          'september':'09','october':'10','november':'11','december':'12'}
    
    # functions for text processing
    def db_rm_punctuations(self,text):
        temp = []
        punctuations = re.compile(r'[!"#%&\'()*+,-./:;<=>?$@\[\\\]^_`{|}~]')
        for words in text.split():
            word = punctuations.sub(" ",words)
            temp.append(word)
        return " ".join(temp)
    
    def db_rm_stopwords(self,text):
        temp = []
        nltk_stop_words = set(stopwords.words("english"))
        for word in text.split():
            if word.lower() not in nltk_stop_words:  
                temp.append(word)
        return " ".join(temp)

    def db_rm_duplicates(self,text):
        words = text.split()
        return " ".join(sorted(set(words), key=words.index))

    def db_iso_nums(self,text):
        pattern = re.compile(r'$\d+\W+|\b\d+\b|\W+\d+$') 
        new_word = pattern.sub("",text)
        if len(new_word) > 1:
            return True
        return False

    def db_get_wordnet_pos(self,tag):
        if tag[0]=='J':
            return wordnet.ADJ
        elif  tag[0]=='V':
            return wordnet.VERB
        elif  tag[0]=='N':
            return wordnet.NOUN
        elif tag[0]=='R':
            return wordnet.ADV
        else:
            return ''
        
    def db_lemmatization(self,text,lemmatizer=None,tagger=None):
        if lemmatizer==None:
            lemmatizer=self.lemmatize
        if tagger==None:
            tagger = self.tagger
        temp = []
        for word in text.split():
            wn_pos = self.db_get_wordnet_pos(tagger.tag([word])[0][1])
            if wn_pos != '':
                temp.append(lemmatizer(word,pos=wn_pos))
            else:
                temp.append(word)
        return " ".join(temp)

    def db_stemming(self,text,stemmer=None):
        if stemmer==None:
            stemmer=self.stemmer
        temp = []
        for word in text.split():
            temp.append(stemmer(word))
        return " ".join(temp)

    def db_lower_case(self,text):
        return text.lower()
    
    def db_cvt2num(self,text):
        text = text.replace('$','')
        text = text.replace(',','')
        if text.find('-'):
            text = text.split('-')[0]
        return float(text)

    def db_rm_html_tags(self,text):
        if type(text)==float:
            return ""
        soup = BeautifulSoup(text)
        return soup.get_text()
    
    def db_transdate(self,text):
        try:
            date=text.split()
            if date!=[]:
                date[1]=date[1].replace(',','')
                if len(date[1])<2:
                    date[1]='0'+date[1]
                return date[2]+'-'+self.date_dict.get(date[0])+'-'+date[1]
            else:
                return '1994-07-05'
        except:
            return '1994-07-05'
    
    def db_getrank(self,text):
        if text!='' and text!='[]':
            rank = re.findall(r"[-+]?(?:\d*\.\d+|\d+)",text.split('in')[0])
            if not rank:
              return 5000000
            rank = ''.join(rank)
            return int(rank)
        else:
            return 5000000

    def db_dict2string(self,text):
        return re.sub(' +',' ',str(text).strip('{}').replace('\n','').replace('\\n',''))

    # functions for creating custom dictionary & bigrams (spell checking & autocomplete)
    def create_dict(self,seq,new=False):
        if new==True:
            self.my_symspell = SymSpell()
        for word in seq:
            self.my_symspell.create_dictionary_entry(word,1)
        return True      
    
    def save_dict(self,filename="custom_dictionary.txt"):
        self.my_symspell.save_pickle(filename)
        return True

    def load_dict(self,filename="custom_dictionary.txt"):
        self.my_symspell.load_pickle(filename)
        return True
    
    def save_text(self,filename="compile_text.txt"):
        with open(filename, 'wb') as f:
            pickle.dump(self.all_text, f)

    def load_text(self,filename="compile_text.txt"):
        with open(filename, 'rb') as f:
            self.all_text = pickle.load(f) 

    def compile_text(self,df,include_columns=['title']):
        words = ''
        for i in include_columns:
            for j in df[i]:
                words += j
        self.all_text+=words
    

    def custom_dictionary(self,df,include_columns=['description','title','brand','main_cat']):
        # extract vocabulary in dataset
        custom_vocab = []
        for i in include_columns:
            for j in df[i]:
                filt_words = [x for x in j.split() if self.db_iso_nums(x)]
                custom_vocab.extend(filt_words)
        unique_vocab = list(set(custom_vocab))
        
        # create dictionary
        self.create_dict(seq=unique_vocab)

    def build_ngrams(self,source="compile_text.txt",dest="ngrams_freq.txt",min_len=1,max_len=3):
        with open(source, 'rb') as f:
            words = pickle.load(f)

        # tokenize words
        tokens = nltk.word_tokenize(words)
        # create ngrams
        ngrams = nltk.everygrams(tokens,max_len=max_len,min_len=min_len)
        # compute frequency distribution for all ngrams
        fdist = nltk.FreqDist(ngrams)

        # save ngrams frequency
        with open(dest, 'wb') as f:
            pickle.dump(fdist, f)
        return True

    ########## Text Preprocessing #############
    def custom_text_process(self,df):
        # keeping necessary attributes
        df.drop_duplicates(subset=['asin'], keep='first',inplace=True)
        df['price'].replace("", np.nan, inplace=True)
        df=df.loc[df['price'].str.find('$') == 0]
        df=df[df['imageURLHighRes'].notna()]
        df=df.loc[df['imageURLHighRes'].map(len) > 0]
        df.drop(columns=['category', 'tech1','tech2','feature','fit','similar_item'],axis=1,errors='ignore',inplace=True)

        # convert list columns to str
        for i in df.columns:
            if (is_list_like(df[i].iloc[0]) or i in ['description','also_buy','also_view','imageURL','imageURLHighRes']) and i != 'details':
                df[i] = [' '.join(map(str, [""])) if type(l)!=list else ' '.join(map(str, l)) for l in df[i]]
        
        # remove html tags
        for i in ['description','title','main_cat','date','brand']:
            df[i] = df[i].apply(self.db_rm_html_tags)

        # text preprocessing
        for i in df.columns:
            if type(df[i].iloc[0])==str or i=='rank':
                df[i] = df[i].apply(str)
                if i not in ['imageURL','imageURLHighRes']:
                    df[i] = df[i].apply(self.db_lower_case)
            if i in ['description','title','brand']:
                if i == 'description':
                    df['orig_desc'] = df[i]
                elif i == 'title':
                    df['orig_title'] = df[i]
                elif i == 'brand':
                    df['orig_brand'] = df[i]
                df[i] = df[i].apply(self.db_rm_punctuations)
                df[i] = df[i].apply(self.db_rm_stopwords)
                df[i] = df[i].apply(self.db_rm_duplicates)
            if i == 'main_cat':
                df[i] = df[i].apply(self.db_rm_punctuations)
                df['orig_mcat'] = df[i]
            if i == 'rank':
                df[i] = df[i].apply(self.db_getrank)
            if i == 'price':
                df[i] = df[i].apply(self.db_cvt2num)
            if i == 'details':
                df[i] = df[i].apply(self.db_dict2string)
            if i == 'date':
                df[i] = df[i].apply(self.db_transdate)
        
        # create custom dictionary
        self.custom_dictionary(df)
        
        # compile all text
        self.compile_text(df)

        # extract main categories
        for i in df['main_cat'].unique():
            for j in i.split():
                if len(j)>1:
                    self.main_cats.append(j)
        self.main_cats = list(set(self.main_cats))
        
        # extract brands
        for i in df['brand'].unique():
            for j in i.split():
                if len(j)>1:
                    self.brands.append(j)
        self.brands = list(set(self.brands))
        
        # additional text preprocessing
        for i in ['description','title','brand','main_cat']:
            df[i] = df[i].apply(self.db_stemming)
            df[i] = df[i].apply(self.db_lemmatization)
            df[i] = df[i].apply(self.db_rm_duplicates)

        return df
    
    # indexing values to database
    def df_to_db_eltk(self,df,es,idxname,chunk_size=250):
        df_list = df.values.tolist()
        docs =  [{"_index":idxname,"_type":"product","_id":next(self.counter),"_source":{'desc':row[0],'title':row[1],
                'also_buy':row[2],'brand':row[3],'rank':row[4],'rating':row[5],'also_view':row[6],'details':row[7],'main_cat':row[8],
                'date':row[9],'price':row[10],'asin':row[11],'imageURL':row[12],'imageURLHighRes':row[13],
                'orig_desc':row[14],'orig_title':row[15],'orig_brand':row[16],'orig_mcat':row[17]}
            }for row in df_list]
        helpers.bulk(client=es, actions=docs, chunk_size=chunk_size, request_timeout=600)
        return True

<h6>NOTE: DATA PREPROCESSING MAY TAKE SEVERAL HOURS, LEAVING ASIDE THE DOWNLOADING TIME OF THE DATASET</h6>
<br>

In [19]:
base_dir = "C:/Users/user/Python Codes/Project Codes (THESIS)/Data Preprocess Small Complete"
meta_data_names = ['meta_Appliances.json.gz','meta_Cell_Phones_and_Accessories.json.gz','meta_Luxury_Beauty.json.gz','meta_Video_Games.json.gz']
meta_data_names.extend(['meta_Clothing_Shoes_and_Jewelry.json.gz','meta_Home_and_Kitchen.json.gz','meta_Electronics.json.gz'])
dictionary_dir = base_dir+"resources/"+"custom_dictionary.txt"
main_cats_dir = base_dir+"resources/"+"main_cats.txt"
ngrams_dir = base_dir+"resources/"+"ngrams_freq.txt"
text_dir = base_dir+"resources/"+"compile_text.txt"
brands_dir = base_dir+"resources/"+"brands.txt"
my_db_class = None

# for name in meta_data_names:
#     # resources temp storage
#     main_cats,brands = [],[]

#     # create class instance (reset class data)
#     my_db_class = Data_Preprocessing()
    
#     # load metadata
#     data,lmt,prt,pure = [],0,1,name.split('.')[0]
#     if os.path.exists("/content/{}".format(name)) != True:
#       wget.download("http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles2/{}".format(name))
    
#     # load resources if exist (dictionary, main cats, brands)
#     if os.path.isdir(base_dir+"resources") != True:
#       os.mkdir(base_dir+"resources")
#     # dictionary
#     if os.path.exists(dictionary_dir):
#       my_db_class.load_dict(filename=dictionary_dir)
#     # main cats
#     if os.path.exists(main_cats_dir):
#       with open(main_cats_dir, 'rb') as f:
#         main_cats = pickle.load(f)
#     # brands
#     if os.path.exists(brands_dir):
#       with open(brands_dir, 'rb') as f:
#         brands = pickle.load(f)
        
#     # Preprocessing
#     with gzip.open(name) as f:
#         if os.path.isdir(base_dir+pure) != True:
#             os.mkdir(base_dir+pure)
#         for l in f:
#             if lmt==50000:
#                 df = pd.DataFrame.from_dict(data)
#                 st = time.time()
#                 # load text
#                 if os.path.exists(text_dir):
#                     my_db_class.load_text(text_dir)
#                 df = my_db_class.custom_text_process(df)
#                 # save text
#                 my_db_class.save_text(text_dir)
#                 # save df
#                 df.to_pickle(base_dir+pure+"/"+name+"_"+str(prt))
#                 end = time.time()
#                 print("Elapsed Time in Processing "+name+"_"+str(prt)+":",end-st)
#                 data,lmt = [],0
#                 prt+=1
#             data.append(json.loads(l.strip()))
#             lmt+=1
#         if lmt!=0:
#             df = pd.DataFrame.from_dict(data)
#             st = time.time()
#             # load text
#             if os.path.exists(text_dir):
#                 my_db_class.load_text(text_dir)
#             df = my_db_class.custom_text_process(df)
#             # save text
#             my_db_class.save_text(text_dir)
#             # save df
#             df.to_pickle(base_dir+pure+"/"+name+"_"+str(prt))
#             end = time.time()
#             print("Elapsed Time in Processing "+name+"_"+str(prt)+":",end-st)
            
#         # save/update the dictionary
#         my_db_class.save_dict(filename=dictionary_dir)

#         # save/update main categories
#         with open(main_cats_dir, 'wb') as f:
#             main_cats.extend(my_db_class.main_cats)
#             main_cats=list(set(main_cats))
#             pickle.dump(main_cats, f)

#         # save/update brands
#         with open(brands_dir, 'wb') as f:
#             brands.extend(my_db_class.brands)
#             brands=list(set(brands))
#             pickle.dump(brands, f)

# print('Data Preprocessing Completed!')
# print('Now building ngrams...')
# my_db_class = Data_Preprocessing()
# st = time.time()
# my_db_class.build_ngrams(source=text_dir,dest=ngrams_dir,max_len=3)
# end = time.time()
# print("Elapsed Time in building ngrams:",end-st)

Adding Amazon Mobile Phones Metadata from Kaggle

In [20]:
# # download dataset from https://www.kaggle.com/grikomsn/amazon-cell-phones-reviews
# df = pd.read_csv('C:/Users/user/Python Codes/Project Codes (THESIS)/20191226-items.csv')
# dictionary_dir = base_dir+"resources/"+"custom_dictionary.txt"

# # load dictionary
# my_db_class = Data_Preprocessing()
# my_db_class.load_dict(filename=dictionary_dir)

# df = pd.read_csv('20191226-items.csv')

# if os.path.isdir(base_dir+"meta_Mobile_Phones") != True:
#     os.mkdir(base_dir+"meta_Mobile_Phones")

# nums = [i for i in range(1,len(df['title'])+1)]
# random.shuffle(nums)

# df.rename(columns={'image': 'imageURLHighRes'}, inplace=True)
# df = df[['asin','brand','title','imageURLHighRes','price']]
# avg = sum(df['price'])/len(df['price'])
# df['main_cat'] = 'mobile phone smartphone'
# df['date'] = '2020-01-01'
# df['also_view'] = ''
# df['also_buy'] = ''
# df['imageURL'] = ''
# df['details'] = ''
# df['rank'] = nums

# for i in df.columns:
#     if i in ['asin','brand','title']:
#         df[i] = df[i].apply(str)
#         df[i] = df[i].apply(my_db_class.db_lower_case)
#     if i in ['brand','title']:
#         if i == 'title':
#             df['orig_desc'] = df[i]
#             df['orig_title'] = df[i]
#         elif i == 'brand':
#             df['orig_brand'] = df[i]
#         df[i] = df[i].apply(my_db_class.db_rm_punctuations)
#         df[i] = df[i].apply(my_db_class.db_rm_stopwords)
#         df[i] = df[i].apply(my_db_class.db_rm_duplicates)
#         if i == 'title':
#             df['description'] = df[i]
#     if i == 'price':
#         df[i]=[x if x!=0.00 else round(avg,2) for x in df[i]]
#     if i == 'main_cat':
#         df['orig_mcat'] = df[i]

# my_db_class.custom_dictionary(df)

# for i in df.columns:
#     if i in ['brand','title','main_cat']:
#         df[i] = df[i].apply(my_db_class.db_check_pos_tag)
#         df[i] = df[i].apply(my_db_class.db_stemming)
#         df[i] = df[i].apply(my_db_class.db_lemmatization)
#         df[i] = df[i].apply(my_db_class.db_rm_duplicates)
        
# df.to_pickle(base_dir+"meta_Mobile_Phones/meta_Mobile_Phones.json.gz_1")

In [21]:
# # add custom vocabs
# my_db_class.create_dict(seq=['greater','higher','highest','lower','lowest','less', 'beyond', 'never', 'except', 'dollar', 'january', 
#                               'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december'])
# my_db_class.create_dict(seq=stopwords.words("english"))
# # save the updated dictionary
# my_db_class.save_dict(filename=dictionary_dir)

<h6>RUN THIS CODE ON LOCAL MACHINE AFTER DOWNLOADING ELASTIC SEARCH (link available below)</h6>
<a href="https://www.elastic.co/downloads/elasticsearch">Elastic Search Link</a>

<p>Step 1: download the elastic search window zip file</p>
<p>Step 2: extract the zip file to your desired path e.g. "C:\Users\your_user\ElasticSearch"</p>
<p>Step 3: after extraction, navigate to the bin file path of elastic search folder  e.g. "C:\Users\user\ElasticSearch\elasticsearch-7.17.0\bin"</p>
<p>Step 4: add the path e.g. ("C:\Users\user\ElasticSearch\elasticsearch-7.17.0\bin") to your system enviroment variables.
to do that you can search and click 'edit system environment variables' on your computer
then go to advance tab, click environment variables, in the System variables click 'path' and then click edit,
click new then copy paste the path, click ok </p>
<p>Step 5: run the elastic search server using cmd, open cmd and type 'elasticsearch' then enter </p>
<p>Step 6: wait for the server to run, you can check if the server is up by opening a browser and navigate to <a href=http://localhost:9200>http://localhost:9200</a>
the server is up when you see some information on the given url address</p>

In [22]:
es = Elasticsearch(HOST="http://localhost", PORT=9200)
my_db_class = Data_Preprocessing()

In [None]:
es.indices.delete(index='amazon')

In [25]:
for meta_name in meta_data_names:
    pure_name = meta_name.split(".")[0]
    for name in os.listdir(base_dir+"/"+pure_name):
        df = pd.read_pickle(base_dir+"/"+pure_name+"/"+name)
        df = df[['description','title','also_buy','brand','rank','rating','also_view','details','main_cat','date','price','asin',
                'imageURL','imageURLHighRes','orig_desc','orig_title','orig_brand','orig_mcat']]
        print("Indexing:",name)
        st = time.time()
        my_db_class.df_to_db_eltk(df=df,es=es,idxname='amazon',chunk_size=500)
        end = time.time()
        print("Elapsed Time in Indexing "+name+":",end-st)

Indexing: meta_Appliances.json.gz_1




Elapsed Time in Indexing meta_Appliances.json.gz_1: 33.59723997116089
Indexing: meta_Cell_Phones_and_Accessories.json.gz_1




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_1: 4.336954116821289
Indexing: meta_Cell_Phones_and_Accessories.json.gz_10




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_10: 17.57241129875183
Indexing: meta_Cell_Phones_and_Accessories.json.gz_11




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_11: 13.471562147140503
Indexing: meta_Cell_Phones_and_Accessories.json.gz_12




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_12: 70.69651460647583
Indexing: meta_Cell_Phones_and_Accessories.json.gz_2




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_2: 11.7816002368927
Indexing: meta_Cell_Phones_and_Accessories.json.gz_3




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_3: 9.186009168624878
Indexing: meta_Cell_Phones_and_Accessories.json.gz_4




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_4: 12.364008903503418
Indexing: meta_Cell_Phones_and_Accessories.json.gz_5




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_5: 9.738965272903442
Indexing: meta_Cell_Phones_and_Accessories.json.gz_6




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_6: 27.450361967086792
Indexing: meta_Cell_Phones_and_Accessories.json.gz_7




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_7: 19.834563970565796
Indexing: meta_Cell_Phones_and_Accessories.json.gz_8




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_8: 22.526806592941284
Indexing: meta_Cell_Phones_and_Accessories.json.gz_9




Elapsed Time in Indexing meta_Cell_Phones_and_Accessories.json.gz_9: 25.642383098602295
Indexing: meta_Luxury_Beauty.json.gz_1




Elapsed Time in Indexing meta_Luxury_Beauty.json.gz_1: 14.865371942520142
Indexing: meta_Video_Games.json.gz_1




Elapsed Time in Indexing meta_Video_Games.json.gz_1: 6.214976072311401
Indexing: meta_Video_Games.json.gz_2




Elapsed Time in Indexing meta_Video_Games.json.gz_2: 6.206044435501099


FileNotFoundError: [WinError 3] The system cannot find the path specified: 'C:/Users/user/Python Codes/Project Codes (THESIS)/Data Preprocess Small Complete/meta_Clothing_Shoes_and_Jewelry'

In [26]:
!curl http://localhost:9200/_cat/indices?v

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current

health status index            uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   amazon           GeOFGe11SQSpbvT2bmSvrw   1   1      74080            0    195.1mb        195.1mb
green  open   .geoip_databases NXIVcwW9Q-6DPwcZMpBrJg   1   0         41            3     38.7mb         38.7mb
yellow open   sample           lMw5E725RfCWkIaTpxYONg   1   1          3            0        5kb            5kb



                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

In [28]:
es.indices.get_mapping(index='amazon')

{'amazon': {'mappings': {'properties': {'also_buy': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'also_view': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'asin': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'brand': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'date': {'type': 'date'},
    'desc': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'details': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'imageURL': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'imageURLHighRes': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
    'main_cat': {'type': 'text',
     'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}},
  