In [1]:
###################################### Prepare data for training Model #####################################

In [2]:
import pyspark
from nltk.tokenize import word_tokenize

Lreviewdata_full = spark.read.parquet("/mnt/etl/parquet/labeled/Lreviewdata")

In [3]:
Lreviewdata = Lreviewdata_full.select("Text","label")
Lreviewdata.registerTempTable("DF")
Lreviewdata.cache()
rdataset = spark.createDataFrame(Lreviewdata.rdd.repartition(8))

sqlContext.clearCache()
rdataset.cache()

In [4]:
Lreviewdata_full.show(3)

In [5]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="Text",outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens",outputCol="stp_tokens")
pipeline = Pipeline(stages=[tokenizer,remover])

In [6]:
pipe_model = pipeline.fit(rdataset)
tok_dataset = pipe_model.transform(rdataset)

In [7]:
tok_dataset.cache()

In [8]:
import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('sentiwordnet')
nltk.download('wordnet')
from nltk.tag import pos_tag
from nltk.corpus import sentiwordnet as swn

In [9]:
tok_dataset.show(1)

In [10]:
def get_score(word):
  syn = swn.senti_synsets(word)
  for s in syn:
    if s.pos_score() > 0:
      return s.pos_score()
    
def custom_sum(list):
  sum = 0.0
  i = 0
  for j in list:
    if j != None:
      i += 1
      sum += j
  return sum/i if i>0 else 0

In [11]:
tok_df = tok_dataset.select("tokens").rdd.map(lambda x: [custom_sum([get_score(j) for j in i]) for i in x]).toDF()

In [12]:
tok_df.show()

In [13]:
from pyspark.sql import functions as f
rate_D_df = tok_df.withColumn('Sentiment', f.when(f.col('_1') > 0.5, '1').when(f.col('_1') == 0.5, '0').otherwise('-1'))

In [14]:
rate_D_df.count()

In [15]:
tok_dataset.count()

In [16]:
from pyspark.sql.functions import monotonically_increasing_id
rate_D_df = rate_D_df.withColumn('id', monotonically_increasing_id())
tok_dataset = tok_dataset.withColumn("id", monotonically_increasing_id())
df = rate_D_df.join(tok_dataset, "id", "inner").select("Sentiment","Text","label")

In [17]:
df.show()

In [18]:
df.write.parquet("/mnt/etl/parquet/labeled/Lreviewdata/Lreviewdata_sentiment/sentiment_analysis.parquet", mode='overwrite')

In [19]:
df.rdd.isEmpty()

In [20]:
Lreviewdata_full_n = spark.read.parquet("/mnt/etl/parquet/labeled/Lreviewdata/Lreviewdata_sentiment/sentiment_analysis.parquet")

In [21]:
Lreviewdata_full_n.show(20)