### Do the normal setup to get Spark context.

In [None]:
import sys

rootpath = '/home/student/ROI/SparkProgram/'
datapath = f'{rootpath}datasets/'
sys.path.append(rootpath)
import pyspark_helpers as pyh
from pyspark_helpers import *
sc, spark, conf = initspark()

import pandas as pd
import matplotlib as mp
import numpy
from matplotlib import pyplot as plt

from pyspark_helpers import display
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
from functools import reduce
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud 
import pandas as pd
import re
import string


### Read the Alexa reviews sample dataset from Kaggle.
#### https://www.kaggle.com/sid321axn/amazon-alexa-reviews


In [None]:
schema = StructType([
    StructField("rating", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("variation", StringType(), True),
    StructField("verified_reviews", StringType(), True),
    StructField("feedback", IntegerType(), True)])
df = spark.read.option("delimiter","\t").schema(schema).option("inferSchema", "True").csv("amazon_alexa.tsv")
display(df)
df.createOrReplaceTempView('alexa')


### Dealing with unstructured data works better at the RDD level because we will need to use a lot of custom functions using the map method. So let's just select the reviews column. 

### The flatMap returning the object itself is a trick to turn the RDD from containing Row objects into regular string objects.

In [None]:
df = spark.sql("select lower(verified_reviews) from alexa where verified_reviews is not null and verified_reviews <> 'verified_reviews'")
# Note how each row is returned as a row element
print(df.rdd.take(1))

# flatMap converts the row into a regular string, because there is only one column in each row
# if the dataframe had two columns in it, this would not work
reviewsRDD = df.rdd.flatMap(lambda x: x)
print(df.count(), reviewsRDD.count())
print(reviewsRDD.take(10))

### nltk has a function to split text up into sentences.

In [None]:
import nltk
# punctuation tokenizer, needs to be downloaded at least once. 
# make sure not to put this inside a loop or it will call it many times and slow down performance
nltk.download('punkt')
sentenceTokenizeRDD = reviewsRDD.map(lambda x : nltk.sent_tokenize(x))
sentenceTokenizeRDD.take(10)

### Once the reviews are broken into sentences, let's break each sentence into a list of words.

In [None]:
wordTokenizeRDD = sentenceTokenizeRDD.map(lambda x : [word for line in x for word in line.split()])
wordTokenizeRDD.take(10)

### Next, remove stop words, punctuation, and empty spaces.

In [None]:
import string
# Must also make sure to download the stopwords list at least once.
# Also be sure not to put this inside a loop
nltk.download('stopwords')


def removePunctuations(x):
    list_punct = list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered

def removeStopWords(x, language = 'english', additional_words = {}):
    from nltk.corpus import stopwords
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words(language)).union(additional_words)
    filteredSentence = [w for w in x if not w in stop_words]
    return filteredSentence

nopunctRDD = wordTokenizeRDD.map(removePunctuations)

# I decided to add a few extra stop words to the list just to show how easy it is
stopwordsRDD = nopunctRDD.map(lambda x : removeStopWords(x, 'english', {'u', 'r', 'im', 'ive'}))
stopwordsRDD.take(10)


### wordnet is a library that has rules of the English language and how to parse it to standardize tense and case.

In [None]:
# download at least once, but not inside a loopo
nltk.download('wordnet')

def lemmatize(x):
    from nltk.stem import WordNetLemmatizer
    lemmatizer = WordNetLemmatizer()
    finalLem = [lemmatizer.lemmatize(s) for s in x]
    return finalLem

lemwordsRDD = stopwordsRDD.map(lemmatize)
lemwordsRDD.take(10)

### After splitting it into individual words to fix the words, let's put it back together as a sentence.

In [None]:
def joinTokensFunct(x):
    joinedTokens_list = []
    x = " ".join(x)
    return x
joinedTokens = lemwordsRDD.map(joinTokensFunct)
joinedTokens.take(10)

### Perceptron tagger goes through and adds additional information on words by adding things like part of speech.

### Finds combinations of words that belong together to be treated as a phrase instead of individually.

In [None]:
nltk.download('averaged_perceptron_tagger')

def extractPhrase(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))
    def leaves(tree):
        """Finds NP (nounphrase) leaf nodes of a chunk tree."""
        for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
            yield subtree.leaves()
    
    def get_terms(tree):
        for leaf in leaves(tree):
            term = [w for w,t in leaf if not w in stop_words]
            yield term 
            
    sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
    grammar = r"""
    NBAR:
        {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns
        
    NP:
        {<NBAR>}
        {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
    """
    chunker = nltk.RegexpParser(grammar)
    tokens = nltk.regexp_tokenize(x,sentence_re)
    postoks = nltk.tag.pos_tag(tokens) #Part of speech tagging 
    tree = chunker.parse(postoks) #chunking
    terms = get_terms(tree)
    temp_phrases = []
    for term in terms:
        if len(term):
            temp_phrases.append(' '.join(term))
    
    finalPhrase = [w for w in temp_phrases if w] #remove empty lists
    return finalPhrase

extractphraseRDD = joinedTokens.map(extractPhrase)
extractphraseRDD.take(10)

### Let's do a quick analysis of what are the most common phrases.
### Could have also done this on individual words instead by skipping the previous step.

In [None]:
freqDistRDD = extractphraseRDD.flatMap(lambda x : nltk.FreqDist(x).most_common()).map(lambda x: x).reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending = False)
print(freqDistRDD.take(20))

print('top ten words')
print(lemwordsRDD.flatMap(lambda x : nltk.FreqDist(x).most_common()).map(lambda x: x).reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending = False).take(10))



### Let's take the phrase counts and turn them into visualizations by bringing these small sets of calculated results to pandas.

In [None]:
%matplotlib inline
freqDistDF = freqDistRDD.toDF() #converting RDD to spark dataframe
freqDistDF.createOrReplaceTempView("myTable") 
df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20")
pandD = df2.toPandas()
pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))

### Even better, make it into a word cloud.

In [None]:
#! pip3 install WordCloud
%matplotlib inline 
from wordcloud import WordCloud
wordcloudConvertDF = pandD.set_index('Keywords').T.to_dict('records')
wordcloud = WordCloud(width=800, height=500, random_state=21, max_font_size=100, relative_scaling=0.5, colormap='Dark2').generate_from_frequencies(dict(*wordcloudConvertDF))
plt.figure(figsize=(14, 10))    
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.show()

### Word sentiment will attempt to automatically classify the content into Positive, Negative, or Neutral using basic understanding of word meanings in context.

In [None]:
# Download this once and not inside a loop
nltk.download('vader_lexicon')

def wordSentiment(x):
    from nltk.sentiment.vader import SentimentIntensityAnalyzer
    analyzer = SentimentIntensityAnalyzer() 
    senti_list_temp = []
    for i in x:
        y = ''.join(i) 
        vs = analyzer.polarity_scores(y)
        senti_list_temp.append((y, vs))
        senti_list_temp = [w for w in senti_list_temp if w]
    sentiment_list  = []
    for j in senti_list_temp:
        first = j[0]
        second = j[1]
    
        for (k,v) in second.items():
            if k == 'compound':
                if v < 0.0:
                    sentiment_list.append((first, "Negative"))
                elif v == 0.0:
                    sentiment_list.append((first, "Neutral"))
                else:
                    sentiment_list.append((first, "Positive"))
    return sentiment_list

print(extractphraseRDD.take(1))
sentimentRDD = extractphraseRDD.map(wordSentiment)
print(sentimentRDD.take(10))



### The wordsentiment function we wrote takes a list of phrases and returns a list of tuples of phrases and sentiments. We could write another function to take a single string, but it's just easier to map the string to a single element list and back again.

In [None]:
print(joinedTokens.take(1))
sentencesentimentRDD = joinedTokens.map(lambda x : [x]).map(wordSentiment)
print(sentencesentimentRDD.take(10))

In [None]:
#sentencesentimentRDD.filter(lambda x : x[0][1] == 'Negative').take(3)

### Do an old school reduceByKey to see how many items we have of each sentiment.

In [None]:
def sentimentCount(x):
    return x.flatMap(lambda x : x).map(lambda x : (x[1], 1)).reduceByKey(lambda x, y : x + y)

print('phrase count')
print(sentimentCount(sentimentRDD).collect())
print('review count')
print(sentimentCount(sentencesentimentRDD).collect())

### Wrap it all up in a convenient helper function.

In [None]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

def processText(df, language = 'english', additionalWords = {'u', 'r', 'im', 'ive'}):
    import nltk
    import string

    def removePunctuations(x):
        list_punct = list(string.punctuation)
        filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
        filtered_space = [s for s in filtered if s] #remove empty space 
        return filtered

    def removeStopWords(x, language = 'english', additional_words = {}):
        from nltk.corpus import stopwords
        from nltk.corpus import stopwords
        stop_words=set(stopwords.words(language)).union(additional_words)
        filteredSentence = [w for w in x if not w in stop_words]
        return filteredSentence

    def lemmatize(x):
        from nltk.stem import WordNetLemmatizer
        lemmatizer = WordNetLemmatizer()
        finalLem = [lemmatizer.lemmatize(s) for s in x]
        return finalLem

    def joinTokensFunct(x):
        joinedTokens_list = []
        x = " ".join(x)
        return x

    def extractPhrase(x):
        from nltk.corpus import stopwords
        stop_words=set(stopwords.words('english'))
        def leaves(tree):
            """Finds NP (nounphrase) leaf nodes of a chunk tree."""
            for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
                yield subtree.leaves()

        def get_terms(tree):
            for leaf in leaves(tree):
                term = [w for w,t in leaf if not w in stop_words]
                yield term 

        sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
        grammar = r"""
        NBAR:
            {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns

        NP:
            {<NBAR>}
            {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
        """
        chunker = nltk.RegexpParser(grammar)
        tokens = nltk.regexp_tokenize(x,sentence_re)
        postoks = nltk.tag.pos_tag(tokens) #Part of speech tagging 
        tree = chunker.parse(postoks) #chunking
        terms = get_terms(tree)
        temp_phrases = []
        for term in terms:
            if len(term):
                temp_phrases.append(' '.join(term))

        finalPhrase = [w for w in temp_phrases if w] #remove empty lists
        return finalPhrase

    rdd = df.rdd.flatMap(lambda x: x)
    sentence = rdd.map(lambda x : nltk.sent_tokenize(x))
    word = sentence.map(lambda x : [word for line in x for word in line.split()])
    nopunct = word.map(removePunctuations)
    stopwords = nopunct.map(lambda x : removeStopWords(x, language, additionalWords))
    lemwords = stopwords.map(lemmatize)
    joinedTokens = lemwords.map(joinTokensFunct)
    extractphrase = joinedTokens.map(extractPhrase)
    return extractphrase

def frequencyDistribution(x, plot = False):
    df = x.flatMap(lambda x : nltk.FreqDist(x).most_common()).map(lambda x: x).reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending = False).toDF()
    if plot:
        df.createOrReplaceTempView("myTable") 
        df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20")
        pandD = df2.toPandas()
        pandD.plot.barh(x='Keywords', y='Frequency', rot=1, figsize=(10,8))
    return df

def wordCloud(x):
    from wordcloud import WordCloud
    x.createOrReplaceTempView("myTable") 
    df2 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20")
    pandD = df2.toPandas()
    wordcloudConvertDF = pandD.set_index('Keywords').T.to_dict('records')
    wordcloud = WordCloud(width=800, height=500, random_state=21, max_font_size=100, relative_scaling=0.5, colormap='Dark2').generate_from_frequencies(dict(*wordcloudConvertDF))
    plt.figure(figsize=(14, 10))    
    plt.imshow(wordcloud, interpolation="bilinear")
    plt.axis('off')
    plt.show()
    

### Using the helper functions, we can just select and fix the reviews from the original dataframe using spark sql in one line.

In [None]:
%matplotlib inline
x = processText(spark.sql("select lower(verified_reviews) from alexa where verified_reviews is not null and verified_reviews <> 'verified_reviews'"))
freq = frequencyDistribution(x, False)
print(freq.take(20))

wordCloud(freq)

### Using the results of the helper function, make a dataframe of the reviews and sentiments.

In [None]:
sentiment2RDD = joinedTokens.map(lambda x : [x]).map(wordSentiment).map(lambda x : x[0])

print(sentiment2RDD.take(10))
sentimentDF = spark.createDataFrame(sentiment2RDD, schema='review:string, sentiment:string')
display(sentimentDF)


### Use a pipeline to change the sentiment word into an index and tokenize, and convert the review text into an ML shaped dataframe.

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, OneHotEncoderEstimator
from pyspark.ml.pipeline import Pipeline

stages = [ StringIndexer(inputCol = 'sentiment', outputCol = 'label')
         , Tokenizer(inputCol="review", outputCol="words")
         , HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
         , IDF(inputCol="rawFeatures", outputCol="features")
         ]
          

pipeline = Pipeline(stages = stages)
dfMLFitted = pipeline.fit(sentimentDF)
dfML = dfMLFitted.transform(sentimentDF)
#display(dfML) 

### Let's explore the monitoring page by going to localhost:4040

### There's a lot of steps involved so since we may want to use this ML set many times, we can avoid recalculating the same transformations multiple times by persisting a copy of the dataframe in memory for the duration of this spark session or until we decide to unpersist it.

#### There are many options for persisting to memory or disk or both.

p = pyspark.StorageLevel(useDisk = True, useMemory = True, useOffHeap = False, deserialized = True, replication = 1)

DISK_ONLY
StorageLevel(True, False, False, False, 1)

DISK_ONLY_2
StorageLevel(True, False, False, False, 2)

MEMORY_AND_DISK
StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_2
StorageLevel(True, True, False, False, 2)

MEMORY_AND_DISK_SER
StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_SER_
StorageLevel(True, True, False, False, 2)

MEMORY_ONLY
StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_2
StorageLevel(False, True, False, False, 2)

MEMORY_ONLY_SER
StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_SER_2
StorageLevel(False, True, False, False, 2)

OFF_HEAP
StorageLevel(True, True, True, False, 1)

In [None]:
import pyspark
p = pyspark.StorageLevel(useDisk = True, useMemory = True, useOffHeap = False, deserialized = True, replication = 1)
pyspark.StorageLevel(True, True, True, False, 1)
#dfML.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
dfML.persist(p)
display(dfML)
train, test = dfML.randomSplit([.7,.3], seed = 100)
train.take(1)

### Switch to the browser and take a look now at the storage page.

### Let's just do a simple DecisionTreeClassifier on the ML dataset to predict which of the three labeled sentiments each review is.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 6)
%time dtModel = dt.fit(train)
%time dtPredictions, dtLog = pyh.predict_and_evaluate(dtModel, test)
print(dtLog)


### Take a look at a sample of each prediction.

In [None]:
print(dtPredictions.where('prediction = 0.0').select('review').take(1))
print(dtPredictions.where("prediction = 1.0 and review<>''").select('review').take(1))
print(dtPredictions.where('prediction = 2.0').select('review').take(1))

### We could save the trained model and then make a prediction one at a time using a helper function.

In [None]:
def predictSentiment(trainedModel, transformModel, x):
    newReview = sc.parallelize([(x,)])
    #print('RDD', newReview.collect())
    newReviewDF = spark.createDataFrame(newReview, schema='review:string')
    #print('DF', newReviewDF.collect())
    newReviewML = transformModel.transform(newReviewDF)
    #print('ML',newReviewML.collect())
    newPrediction = trainedModel.transform(newReviewML.select('features'))
    return newPrediction

prediction = predictSentiment(dtModel, dfMLFitted, 'I really love love love love my alexa' )
print (prediction.collect())


In [None]:
for review in ['I really love love love love my alexa', 'sent 85 year old dad talk constantly', 'device interact home filled apple device disappointing']:
    print(predictSentiment(dtModel, dfMLFitted, review).select('prediction').collect())




### The cached dataframe will disappear automatically when a session ends, but you can unpersist it whenever you want. Run the following and flip back to the Storage tab in the browser and see that it is gone.

In [None]:
dfML.unpersist()

### Create a simple RDD to demonstrate accumlators.

In [None]:
x0 = sc.parallelize(range(10))
x1 = x0.map(lambda x : x * 2)
print (x1.collect())


### This won't work, so we need another way to do it.

In [None]:
counter = 0
def fun1(x):
    global counter
    counter += x
    
x0.foreach(fun1)
print (counter)

### Use an accumulator to create a global variable shared by all the workers for the job.

In [None]:
counter = sc.accumulator(0)
def fun2(x):
    global counter
    counter += x
    
x0.foreach(fun2)
print (counter.value)
    