In [3]:
import pandas as pd
import numpy  as np
import os
import re
import psutil
import datetime
import pytz
import copy
from collections import Counter
from emoji   import UNICODE_EMOJI
from functools   import reduce
import sys
sys.path.append('/home/handeully/')
import bigquery_etl as bq
import operator
import time
from IPython.display import display
# NLP Env.
import nltk
from nltk import FreqDist
from nltk.corpus   import stopwords
from nltk.tokenize import regexp_tokenize
from nltk.stem import WordNetLemmatizer,PorterStemmer,LancasterStemmer
from nltk.corpus   import wordnet
from nltk.corpus   import sentiwordnet as swn
from nltk import sent_tokenize,word_tokenize,pos_tag
from tqdm.notebook import tqdm
# GCP Env.
import google.auth
from google.cloud import bigquery

# Vis Env.
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

# Coding Env.
import warnings
credentials, project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

client = bigquery.Client(credentials=credentials, project=project_id )
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 1000)
warnings.filterwarnings('ignore', category=DeprecationWarning)

#initialization ray
import ray

# Data Load

In [4]:
# Load Data FROM Big Query(db connection)
def convert_lowercase(df):
    df_1 =  df.apply(lambda x: x.astype(str).str.lower() if(x.dtype == 'object') else x)
    return df_1

df1 = pd.read_csv('temp/ray_reviews.csv')
cpl_factor  = pd.read_csv('temp/ray_cpl_factor.csv')
stopword_df = pd.read_csv('temp/ray_stopword.csv')
stop_words  = stopword_df['stopword'].tolist()

# Complain Factor

In [5]:
cmpl_fc_list  = list(set(cpl_factor['cmpl_fc1'].unique().tolist())|set(cpl_factor['cmpl_fc2'].unique().tolist())|set(cpl_factor['synonym'].unique().tolist())|set(cpl_factor['lemma'].unique().tolist()))
multi_express = list(filter(lambda x: len(x.split(' '))>1 , cmpl_fc_list))
# Put this list for MWE Tokenizing
mwe = [tuple(f.split(' ')) for f in multi_express]
print('complain factor    : ' ,cpl_factor.shape)
print(f'all complain factor list  : {len(cmpl_fc_list)}')
print(f'multi_express lenth : {len(mwe)}')
# mwe

complain factor    :  (4597, 6)
all complain factor list  : 1058
multi_express lenth : 407


# Tokenized ( Make corpus & word count )

In [10]:
N_POS_TAG   = ['CC','CD','DT','EX','FW','LS','PDT','POS','PRP','PRP$','TO','WDT','WP','WRB']
Y_POS_TAG   = ['JJ','JJR','JJS','MD','IN','NN','NNS','NNP','NNPS','RB','RBR','RBS','RP','UH','VB','VBG','VBD','VBN','VBP','VBZ']
    
def tokenized_corpus(df):
    corpus = []
    reviewid = []
    # if you wonder that nltk pos tag
    # nltk.help.upenn_tagset()
    for i,v in enumerate(tqdm(df['review_text'])):
        try:
#             print(f' index : {i}| text : {v}\\n')
            word = []
            reviewid.append(str(df.iloc[i].reviewId)) 
           
            for j in pos_tag(regexp_tokenize(v,"[\w']+")) :
                if (j[1] in Y_POS_TAG ) & (len(j[0])>1)  & (j[0].isascii()):
                    word.append(j[0])
            tokenized_string       = list(set(word)-set(stop_words))
            corpus.append(tokenized_string)
        except : 
            pass
    corpus_df = pd.DataFrame({'corpus_list' : corpus,'reviewId' : reviewid})
    return corpus_df
    
# ray step 0) defind function for ray @ray.remote if you declare ray.remote It is possible only ray (multiprocessing)
@ray.remote
def tokenized_ray_corpus(df):
    corpus = []
    reviewid = []
    # if you wonder that nltk pos tag
    # nltk.help.upenn_tagset()
    N_POS_TAG   = ['CC','CD','DT','EX','FW','LS','PDT','POS','PRP','PRP$','TO','WDT','WP','WRB']
    Y_POS_TAG   = ['JJ','JJR','JJS','MD','IN','NN','NNS','NNP','NNPS','RB','RBR','RBS','RP','UH','VB','VBG','VBD','VBN','VBP','VBZ']
    for i,v in enumerate(df['review_text']):
        try:
#         print(f' index : {i}| text : {v}\\n')
            word = []
            reviewid.append(str(df.iloc[i].reviewId)) 
            for j in pos_tag(regexp_tokenize(v,"[\w']+")) :
                if (j[1] in Y_POS_TAG ) & (len(j[0])>1)  & (j[0].isascii()):
                    word.append(j[0])
            mwe_tokenizer          = nltk.tokenize.MWETokenizer(mwe,separator=' ')
            tokenized_string       = mwe_tokenizer.tokenize(word)
            tokenized_string       = list(set(tokenized_string)-set(stop_words))
            corpus.append(tokenized_string)
        except : 
            pass
        corpus_df = pd.DataFrame({'corpus_list' : corpus,'reviewId' : reviewid})
    return corpus_df
    

#check multiprocessing progress 
def to_iterator(obj_ids):
    while obj_ids:
        done, obj_ids = ray.wait(obj_ids)
        yield ray.get(done[0])

def ray_multiprocessing_progress(ray_df):
    for x in tqdm(to_iterator(ray_df), total=len(ray_df)):
        pass
    ray_df  = pd.concat(ray.get(ray_df))
    return ray_df


#==> (single core example) for compare with single and ray 
# make corpus df (ex. ['word','word2','word3'])
print('================ Make Corpus ===============')
corpus_df    = tokenized_corpus(df)
print('============================================')

## ==> ■  make corpus using ray multiprocessing 
## ray step 1) setting for number of cpu cores if num_cpus is psutill.cpu_count() ray uses 'full core'  
num_logit_cpus = psutil.cpu_count()
## ray step 2) When starting ray, you have to ray init (you have to ray.init(num_cpus=n) in advance 
ray.init(ignore_reinit_error=True,num_cpus=num_logit_cpus)
print('================ Count Word DataFrame ===============')
## ray step 3) ray start example ( loop is for chunk size )
count_df = [tokenized_ray_corpus.remote(df1[df1['brand']==i]) for i in df1['brand'].unique()]
count_df = ray_multiprocessing_progress(count_df)
print('=====================================================')

#ray.shutdown()

2021-06-02 07:50:45,299	INFO worker.py:664 -- Calling ray.init() again after it has already been called.




  0%|          | 0/386 [00:00<?, ?it/s]

