In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from posgres_conn import get_engine_from_settings
from textblob import TextBlob
from pyspark.ml.feature import Tokenizer
from googletrans import Translator
import time
import six
from google.cloud import translate_v2 as translate

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Sentiment Analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

22/03/07 07:25:05 WARN Utils: Your hostname, MacBook-Air-Mufida.local resolves to a loopback address: 127.0.0.1; using 192.168.1.31 instead (on interface en0)
22/03/07 07:25:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/07 07:25:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
engine = get_engine_from_settings()

#extract from postgresql
query = """SELECT review_id, content 
            FROM reviews r
            WHERE language='id'
        """
pdf = pd.read_sql_query(query, engine)

In [4]:
df = spark.createDataFrame(pdf)
df = df.repartition(4)
df.write.mode('overwrite').parquet('reviews/')

22/03/07 07:25:21 WARN TaskSetManager: Stage 0 contains a task of very large size (8580 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [5]:
df = spark.read.parquet('reviews/')

In [6]:
df = df.withColumn("content", lower(col("content")))

In [7]:
df = df.withColumn("content", regexp_replace("content", r"[^a-zA-Z0-9\s]", ""))
df = df.withColumn("content", regexp_replace("content", r"\n", ""))

In [8]:
df = df.withColumn("content", regexp_replace("content", r"^([0-9]*)$", ""))
df = df.withColumn("content", \
       when(col("content")=="" ,None) \
          .otherwise(col("content")))
df = df.withColumn("content", regexp_replace("content", r"^\s*$", ""))
df = df.withColumn("content", \
       when(col("content")=="" ,None) \
          .otherwise(col("content")))
df = df.na.drop(subset=["content"])

In [9]:
def translate_text(target, text):
    """Translates text into the target language.

    Target must be an ISO 639-1 language code.
    See https://g.co/cloud/translate/v2/translate-reference#supported_languages
    """
    translate_client = translate.Client()

    if isinstance(text, six.binary_type):
        text = text.decode("utf-8")

    # Text can also be a sequence of strings, in which case this method
    # will return a sequence of results for each text.
    result = translate_client.translate(text, target_language=target)
    return result["translatedText"]

In [10]:
translator_en_udf = udf(lambda text: translate_text('en', text), StringType())
translator_id_udf = udf(lambda text: translate_text('id', text), StringType())

In [11]:
df = df.withColumn("trans_en", translator_en_udf(col("content")))
df = df.withColumn("trans_id", translator_id_udf(col("trans_en")))

In [12]:
tokenizer = Tokenizer(inputCol="trans_id", outputCol="words")
wordsData = tokenizer.transform(df)

In [37]:
wordsData.show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+
|           review_id|             content|            trans_en|            trans_id|               words|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|AOqpTOF3gbzJqhFpv...|                woww|                 wow|                 Wow|               [wow]|
|AOqpTOFJcbTXR5dTE...|cacat kode yg dik...|defective code se...|kode rusak yang d...|[kode, rusak, yan...|
|AOqpTOEMHREZ2NBDv...|              senang|                like|                Suka|              [suka]|
|AOqpTOGDop3IjUrpt...|      anyaaq kimocii|      anyaaq kimocii|      anyaaq kimocii|   [anyaaq, kimocii]|
|AOqpTOHbebKt44ltZ...|     menamba wawasan|        gain insight|    mendapat wawasan| [mendapat, wawasan]|
|AOqpTOHgs4Xd36iwN...|min kenapa tak bi...|Min, why can&#39;...|Min kok kemaren g...|[min, kok, kemare...|
|AOqpTOGdzJrLbeFfz...|     wetv kesuk

                                                                                

In [36]:
words_pd = wordsData.toPandas()

                                                                                

AttributeError: 'str' object has no attribute 'conf'

In [27]:
# def translator_en(text):
#   new_text = GoogleTranslator(source='id', target='en').translate(text)#.translate_batch(list(small_test.loc[:,"Yoruba"]))
#   return new_text

# def translator_id(text):
#   new_text = GoogleTranslator(source='en', target='id').translate(text)
#   return new_text

# translator_en_udf = udf(lambda text: translator_en(text), StringType())
# translator_id_udf = udf(lambda text: translator_id(text), StringType())

In [28]:
# df = df.withColumn("trans_en", translator_en_udf(col("content")))
# df = df.withColumn("trans_id", translator_id_udf(col("trans_en")))

In [29]:
# def trans_en(text):
#     translator = Translator()
#     text = translator.translate(text, src='id', dest='en')
#     time.sleep(1)
#     return text.text
    
# def trans_id(text):
#     translator = Translator()
#     text = translator.translate(text, src='en', dest='id')
#     time.sleep(1)
#     return text.text

In [12]:
# trans_en_udf = udf(lambda text: trans_en(text), StringType())
# #trans_id_udf = udf(lambda text: trans_id(text), StringType())

In [13]:
# df = df.withColumn("trans_en", trans_en_udf(col("content")))
# #df = df.withColumn("trans_id", trans_id_udf(col("trans_en")))

In [14]:
#df.show()

In [15]:
# trans_id_udf = udf(lambda text: trans_id(text), StringType())
# df = df.withColumn("trans_id", trans_id_udf(col("trans_en")))

In [16]:
#df.show()

In [17]:
# tokenizer = Tokenizer(inputCol="trans_id", outputCol="words")
# wordsData = tokenizer.transform(df)

In [36]:
#wordsData.show()

In [57]:
import re

In [11]:
content = df.select('content')

In [12]:
tokenizer = Tokenizer(inputCol="content", outputCol="words")
wordsData = tokenizer.transform(content)

In [61]:
wordsData.show()

+--------------------+--------------------+
|             content|               words|
+--------------------+--------------------+
|kenapa sekarang s...|[kenapa, sekarang...|
|susah daftarnya u...|[susah, daftarnya...|
|duh nyesel bgt ba...|[duh, nyesel, bgt...|
|ntah apa laherugi...|[ntah, apa, laher...|
|                good|              [good]|
|                    |                  []|
|gak tau knp eror ...|[gak, tau, knp, e...|
|            good job|         [good, job]|
|nonton drama kore...|[nonton, drama, k...|
|karna bagus parah...|[karna, bagus, pa...|
|kok ribet bnget y...|[kok, ribet, bnge...|
|aplikasi yang san...|[aplikasi, yang, ...|
|                  ok|                [ok]|
|buat siapapun yg ...|[buat, siapapun, ...|
|maaf kami tidak d...|[maaf, kami, tida...|
|bgs sy suka hampi...|[bgs, sy, suka, h...|
|                  gi|                [gi]|
|seneng nonton di ...|[seneng, nonton, ...|
|semoga bisa makin...|[semoga, bisa, ma...|
|belum masuk apk n...|[belum, ma

In [13]:
words_list = wordsData.select('words').rdd.flatMap(lambda x: x).collect()

                                                                                

In [14]:
from itertools import chain
flatten_list = list(chain.from_iterable(words_list))

In [15]:
len(flatten_list)

4369167

In [17]:
import nltk
from nltk.corpus import stopwords

In [19]:
stop_words = set(stopwords.words('indonesian'))
filtered_sentence = [w for w in flatten_list if not w in stop_words]

In [20]:
len(filtered_sentence)

3051969

In [34]:
word_freq = pd.Series(filtered_sentence).value_counts()

In [99]:
uniqWords = sorted(set(filtered_sentence))

In [100]:
len(uniqWords)

140612

In [101]:
def remove_multiple_char(word):
    if re.search(r"(.)\1{1,}", word):
        new_word = re.split(r"(.)\1{1,}", word)
        new_word = ''.join(new_word)
    else:
        new_word = word
    return new_word

In [102]:
uniq_word = []
for word in uniqWords:
    new_word = remove_multiple_char(word)
    uniq_word.append(new_word)

In [103]:
uniq_word= sorted(set(uniq_word))

In [104]:
len(uniq_word)

120420

In [105]:
r = re.compile(".*[0-9].*")
uniq_word = [ s for s in uniq_word if not r.match(s) ]

In [106]:
len(uniq_word)

112142

In [113]:
uniq_word[110000:]

['wtch',
 'wtefak',
 'wtf',
 'wtfdfd',
 'wth',
 'wththe',
 'wtp',
 'wts',
 'wtv',
 'wtvapl',
 'wtvnonton',
 'wtvsayang',
 'wtwo',
 'wu',
 'wuah',
 'wuahaplikasi',
 'wuality',
 'wudih',
 'wuf',
 'wuh',
 'wuhu',
 'wuhwbwcvwhwiwhehjs',
 'wuhz',
 'wuih',
 'wuji',
 'wujud',
 'wujudkan',
 'wuk',
 'wukong',
 'wulan',
 'wulandari',
 'wulasan',
 'wuo',
 'wuos',
 'wuruk',
 'wusbshs',
 'wushjadi',
 'wuw',
 'wuwuwu',
 'wuxia',
 'wuxiahe',
 'wuxian',
 'wuxiaseperti',
 'wuxiawushu',
 'wuy',
 'wuz',
 'wv',
 'wviusyeye',
 'wvtv',
 'wvwuw',
 'wxcn',
 'wxnx',
 'wy',
 'wyh',
 'wynona',
 'wyws',
 'wz',
 'wza',
 'wzs',
 'wzwaw',
 'x',
 'xa',
 'xak',
 'xalma',
 'xan',
 'xaparah',
 'xape',
 'xavier',
 'xawal',
 'xb',
 'xblin',
 'xbokepv',
 'xboleh',
 'xbox',
 'xbugil',
 'xbukan',
 'xbx',
 'xc',
 'xcelent',
 'xcerita',
 'xcgjm',
 'xcm',
 'xcon',
 'xcontinuex',
 'xcv',
 'xcx',
 'xd',
 'xdansex',
 'xde',
 'xdhfetwhfkgkfk',
 'xdi',
 'xdisney',
 'xdpt',
 'xdxz',
 'xean',
 'xebelin',
 'xec',
 'xediain',
 'xentar',

In [88]:
# from Sastrawi.Stemmer.StemmerFactory import StemmerFactory

# # create stemmer
# factory = StemmerFactory()
# stemmer = factory.create_stemmer()

# # stemming process
# documents = [stemmer.stem(word) for word in filtered_sentence]