In [1]:
# %%

from __future__ import print_function

import re
import sys
from re import sub

import pandas as pd
import nltk
import numpy as np
from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml.linalg import Vectors

# spark = SparkSession.builder.appName('python word count').getOrCreate()
# sc = SparkContext(appName="A7")

# lines = sc.textFile(sys.argv[1], 1)
# stripped = lines.map(lambda x: sub("<[^>]+>", "", x))

nltk.download('stopwords')

# Define a list of stop words or use default list

spark = SparkSession.builder.appName('python').getOrCreate()
spark.catalog.clearCache()
sc = spark.sparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

file = pd.read_csv("DisneylandReviews.csv")
print(file)


def freqArray(listOfIndices, numberofwords):
    returnVal = np.zeros(5000)
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    returnVal = np.divide(returnVal, numberofwords)
    return returnVal


def buildArray(listOfIndices):
    returnVal = np.zeros(5000)
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    mysum = np.sum(returnVal)
    returnVal = np.divide(returnVal, mysum)
    return returnVal


schema = StructType([
    StructField("Review_ID", IntegerType()),
    StructField("Rating", IntegerType(), True),
    StructField("Year_Month", StringType(), True),
    StructField("Reviewer_Location", StringType(), True),
    StructField("Review_Text", StringType(), True),
    StructField("Branch", StringType(), True)
])

dfWithSchema = spark.read.schema(schema).csv('DisneylandReviews.csv')

print(dfWithSchema.printSchema())
dfWithSchema = dfWithSchema.select(concat(col("Review_ID"), lit("-"), col("Branch")), col("Review_Text"))
print(dfWithSchema.show())
dfWithSchema = dfWithSchema.withColumnRenamed("concat(Review_ID, -, Branch)", "Branch")
print(dfWithSchema.show())
print(1)
dfWithSchema = dfWithSchema.select(col("Branch"), col("Review_Text"))
print(dfWithSchema.show())
rdd = dfWithSchema.rdd.map(tuple)
numberOfDocs = rdd.count()
print(numberOfDocs)
print(rdd.take(5))
regex = re.compile('[^a-zA-Z]')
d_keyAndListOfWords = rdd.map(lambda x: (str(x[0]), regex.sub(' ', x[1]).lower().split()))
print(d_keyAndListOfWords.take(4))
remover = StopWordsRemover()
stopwords = remover.getStopWords()
stopwords = stopwords + ['hong', 'kong', 'california', 'paris']
print(stopwords)

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/shezalpadani/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


       Review_ID  Rating Year_Month     Reviewer_Location  \
0      670772142       4     2019-4             Australia   
1      670682799       4     2019-5           Philippines   
2      670623270       4     2019-4  United Arab Emirates   
3      670607911       4     2019-4             Australia   
4      670607296       4     2019-4        United Kingdom   
...          ...     ...        ...                   ...   
42651    1765031       5    missing        United Kingdom   
42652    1659553       5    missing                Canada   
42653    1645894       5    missing          South Africa   
42654    1618637       4    missing         United States   
42655    1536786       4    missing        United Kingdom   

                                             Review_Text               Branch  
0      If you've ever been to Disneyland anywhere you...  Disneyland_HongKong  
1      Its been a while since d last time we visit HK...  Disneyland_HongKong  
2      Thanks God it wasn  

In [2]:
dfWithSchema = spark.createDataFrame(d_keyAndListOfWords).toDF("branch", "Review_Text")
a = StopWordsRemover(inputCol="Review_Text", outputCol="Filtered_Reviews", stopWords=stopwords)
b = a.transform(dfWithSchema)
df = b.select(col("Branch"), col("Filtered_Reviews"))
d_keyAndListOfWords = df.rdd.map(tuple)
print(d_keyAndListOfWords.take(3))

allWords = d_keyAndListOfWords.flatMap(lambda x: ((j, 1) for j in x[1]))
print(allWords.take(1))
allCounts = allWords.reduceByKey(lambda a, b: a + b)
print(allCounts.take(1))
# # # Get the top 20,000 words in a local array in a sorted format based on frequency
# # # If you want to run it on your laptio, it may a longer time for top 20k words.
topWords = allCounts.top(5000, lambda x: x[1])
print("Top Words in Corpus:", allCounts.top(10, key=lambda x: x[1]))
topWordsK = sc.parallelize(range(5000))
print(topWordsK.take(10))
dictionary = topWordsK.map(lambda x: (topWords[x][0], x))
print("Word Postions in our Feature Matrix. Last 20 words in 20k positions: ", dictionary.top(20, lambda x: x[1]))
allWordsWithDocID = d_keyAndListOfWords.flatMap(lambda x: ((j, x[0]) for j in x[1]))
allDictionaryWords = dictionary.join(allWordsWithDocID)
justDocAndPos = allDictionaryWords.map(lambda x: (x[1][1], x[1][0]))
allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))
zeroOrOne = allDocsAsNumpyArrays.map(lambda x: (x[0], np.clip(np.multiply(x[1], 9e9), 0, 1)))
dfArray = zeroOrOne.reduce(lambda x1, x2: ("", np.add(x1[1], x2[1])))[1]
multiplier = np.full(5000, numberOfDocs)
idfArray = np.log(np.divide(np.full(5000, numberOfDocs), dfArray))
allDocsAsNumpyArrays = allDocsAsNumpyArrays.map(lambda x: (x[0], np.multiply(x[1], idfArray)))

[('None', ['review', 'text']), ('670772142-Disneyland_HongKong', ['ve', 'ever', 'disneyland', 'anywhere', 'll', 'find', 'disneyland', 'similar', 'layout', 'walk', 'main', 'street', 'familiar', 'feel', 'one', 'rides', 'small', 'world', 'absolutely', 'fabulous', 'worth', 'day', 'visited', 'fairly', 'hot', 'relatively', 'busy', 'queues', 'moved', 'fairly', 'well']), ('670682799-Disneyland_HongKong', ['since', 'd', 'last', 'time', 'visit', 'hk', 'disneyland', 'yet', 'time', 'stay', 'tomorrowland', 'aka', 'marvel', 'land', 'iron', 'man', 'experience', 'n', 'd', 'newly', 'open', 'ant', 'man', 'n', 'd', 'wasp', 'ironman', 'great', 'feature', 'n', 'exciting', 'especially', 'd', 'whole', 'scenery', 'hk', 'hk', 'central', 'area', 'kowloon', 'antman', 'changed', 'previous', 'buzz', 'lightyear', 'less', 'd', 'm', 'expecting', 'something', 'however', 'boys', 'like', 'space', 'mountain', 'turns', 'star', 'wars', 'great', 'cast', 'members', 'staffs', 'felt', 'bit', 'minus', 'point', 'dun', 'feel', 'l

In [3]:
def convert(value):
    if 'HongKong' in value:
        return 0.0
    elif 'California' in value:
        return 1.0
    else:
        return 2.0


# convert AU to 1 and wiki docs to 0
myRDD = allDocsAsNumpyArrays
test = myRDD.map(lambda x: x[0])
new_RDD = myRDD.map(lambda x: (convert(x[0]), x[1]))
# new_RDD = new_RDD.map(lambda x: LabeledPoint(x[0], x[1]))
# new_RDD.cache()

print(new_RDD.count())
trainingData, testData = new_RDD.randomSplit([.5, .5])
print(testData.take(5))
new_RDD = new_RDD.map(lambda x: (x[0], Vectors.dense([float(i) for i in x[1]])))
data = spark.createDataFrame(new_RDD)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="_1", featuresCol="_2", numTrees=10)
model = rf.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)

42637
[(0.0, array([0.01564438, 0.01840228, 0.0164562 , ..., 0.        , 0.        ,
       0.        ])), (0.0, array([0., 0., 0., ..., 0., 0., 0.])), (0.0, array([0., 0., 0., ..., 0., 0., 0.])), (0.0, array([0.00809827, 0.00952589, 0.0085185 , ..., 0.        , 0.        ,
       0.        ])), (0.0, array([0.02503102, 0.        , 0.        , ..., 0.        , 0.        ,
       0.        ]))]


In [None]:
#### GET MODEL RESULTS
# Select example rows to display.
predictions.select("_1", "_2", "prediction", "probability").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="_1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
evaluate_importance = model.featureImportances
print(evaluate_importance)
importance = model.featureImportances
# summarize feature importance
features = []
values = []
for i, v in enumerate(importance):
    print('Feature: %0d, Score: %.5f' % (i, v))
    features.append(i)
    values.append(v)
final = sorted(zip(features, values), key=lambda x: x[1], reverse=True)
print(final)

top_words = final[0:50]
print(top_words)


words = dictionary.collect()

print(words)

for word in top_words:
    get_index = int(word[0])
    print(words[get_index][0])

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval_accuracy = MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy")
eval_precision = MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="precisionByLabel")
eval_recall = MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="recallByLabel")
eval_f1 = MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="f1")
eval_auc = BinaryClassificationEvaluator(labelCol="_1", rawPredictionCol="prediction")

accuracy = eval_accuracy.evaluate(predictions)
precision = eval_precision.evaluate(predictions)
recall = eval_recall.evaluate(predictions)
f1score = eval_f1.evaluate(predictions)
auc = eval_accuracy.evaluate(predictions)

print('accuracy', accuracy)
print('precision', precision)
print('recall', recall)
print('flscore', f1score)
print('auc', auc)

#calculate results per label
actual_predicted = predictions.select("_1", "prediction")
by_label = actual_predicted.groupBy("_1", "prediction").count()
print(by_label.show())

+---+--------------------+----------+--------------------+
| _1|                  _2|prediction|         probability|
+---+--------------------+----------+--------------------+
|0.0|[0.0,0.0,0.0,0.0,...|       1.0|[0.22638019822517...|
|0.0|[0.0,0.0,0.0,0.0,...|       1.0|[0.22112818623166...|
|0.0|[0.0,0.0,0.0,0.0,...|       1.0|[0.24585791238234...|
|0.0|[0.0,0.0,0.0,0.0,...|       1.0|[0.24585791238234...|
|0.0|[0.0,0.0,0.0,0.0,...|       1.0|[0.22638019822517...|
+---+--------------------+----------+--------------------+
only showing top 5 rows

Test Error = 0.448268
(5000,[1,2,3,4,6,11,22,23,25,26,30,40,43,44,47,50,60,61,71,74,78,81,82,85,102,106,112,113,120,124,127,128,133,150,163,166,182,188,189,195,203,209,216,240,244,268,291,295,307,308,311,316,317,324,332,334,340,346,352,355,407,413,414,422,437,444,451,453,457,464,473,479,486,489,504,512,515,529,534,545,547,553,573,591,592,599,602,604,605,609,637,639,641,671,698,714,717,720,739,743,750,767,779,788,806,807,822,832,842,849,870,

Feature: 1434, Score: 0.00000
Feature: 1435, Score: 0.00000
Feature: 1436, Score: 0.00000
Feature: 1437, Score: 0.00000
Feature: 1438, Score: 0.00000
Feature: 1439, Score: 0.00000
Feature: 1440, Score: 0.00000
Feature: 1441, Score: 0.00000
Feature: 1442, Score: 0.00000
Feature: 1443, Score: 0.00000
Feature: 1444, Score: 0.00000
Feature: 1445, Score: 0.00000
Feature: 1446, Score: 0.00000
Feature: 1447, Score: 0.00000
Feature: 1448, Score: 0.00000
Feature: 1449, Score: 0.00000
Feature: 1450, Score: 0.00000
Feature: 1451, Score: 0.00000
Feature: 1452, Score: 0.00000
Feature: 1453, Score: 0.00000
Feature: 1454, Score: 0.00000
Feature: 1455, Score: 0.00000
Feature: 1456, Score: 0.00000
Feature: 1457, Score: 0.00000
Feature: 1458, Score: 0.00000
Feature: 1459, Score: 0.00000
Feature: 1460, Score: 0.00000
Feature: 1461, Score: 0.00000
Feature: 1462, Score: 0.00000
Feature: 1463, Score: 0.00000
Feature: 1464, Score: 0.00000
Feature: 1465, Score: 0.00000
Feature: 1466, Score: 0.00000
Feature: 1

Feature: 2934, Score: 0.00000
Feature: 2935, Score: 0.00000
Feature: 2936, Score: 0.00000
Feature: 2937, Score: 0.00000
Feature: 2938, Score: 0.00000
Feature: 2939, Score: 0.00000
Feature: 2940, Score: 0.00000
Feature: 2941, Score: 0.00000
Feature: 2942, Score: 0.00000
Feature: 2943, Score: 0.00000
Feature: 2944, Score: 0.00000
Feature: 2945, Score: 0.00000
Feature: 2946, Score: 0.00000
Feature: 2947, Score: 0.00000
Feature: 2948, Score: 0.00000
Feature: 2949, Score: 0.00000
Feature: 2950, Score: 0.00000
Feature: 2951, Score: 0.00000
Feature: 2952, Score: 0.00000
Feature: 2953, Score: 0.00000
Feature: 2954, Score: 0.00000
Feature: 2955, Score: 0.00000
Feature: 2956, Score: 0.00000
Feature: 2957, Score: 0.00000
Feature: 2958, Score: 0.00000
Feature: 2959, Score: 0.00000
Feature: 2960, Score: 0.00000
Feature: 2961, Score: 0.00000
Feature: 2962, Score: 0.00000
Feature: 2963, Score: 0.00000
Feature: 2964, Score: 0.00000
Feature: 2965, Score: 0.00000
Feature: 2966, Score: 0.00000
Feature: 2

Feature: 4433, Score: 0.00000
Feature: 4434, Score: 0.00000
Feature: 4435, Score: 0.00000
Feature: 4436, Score: 0.00000
Feature: 4437, Score: 0.00000
Feature: 4438, Score: 0.00000
Feature: 4439, Score: 0.00000
Feature: 4440, Score: 0.00000
Feature: 4441, Score: 0.00000
Feature: 4442, Score: 0.00000
Feature: 4443, Score: 0.00000
Feature: 4444, Score: 0.00000
Feature: 4445, Score: 0.00000
Feature: 4446, Score: 0.00000
Feature: 4447, Score: 0.00000
Feature: 4448, Score: 0.00000
Feature: 4449, Score: 0.00000
Feature: 4450, Score: 0.00000
Feature: 4451, Score: 0.00000
Feature: 4452, Score: 0.00000
Feature: 4453, Score: 0.00000
Feature: 4454, Score: 0.00000
Feature: 4455, Score: 0.00000
Feature: 4456, Score: 0.00000
Feature: 4457, Score: 0.00000
Feature: 4458, Score: 0.00000
Feature: 4459, Score: 0.00000
Feature: 4460, Score: 0.00000
Feature: 4461, Score: 0.00000
Feature: 4462, Score: 0.00000
Feature: 4463, Score: 0.00000
Feature: 4464, Score: 0.00000
Feature: 4465, Score: 0.00000
Feature: 4

queue
queues
hk
hotel
king
disneyland
staff
mystic
disney
village
restaurants
euro
mtr
children
manor
fast
closed
food
half
hkd
also
meet
studio
always
hongkong
parades
staying
crush
splash
coffee
shows
winnie
train
poor
grizzly
ocean
haunted
dreams
rer
pirates
love
buzz
smaller
chips
board
klook
wait
windows
station
downtown
