# Read in Data

In [1]:
# start spark
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1619746266595_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7efc15663510>

In [2]:
# read in data from S3
df = spark.read.json('s3://sagemaker-bda-project/electronics/Electronics.json')
df.printSchema()
df.show(n=3, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- category: string (nullable = true)
 |-- class: double (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
# Clean the complex fields
from pyspark.sql import Row
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import col, expr, when

temp = df.rdd.map(lambda x: Row(
    prod_id = x['_id'][0],
    prod_name = 'prod_'+str(x['_id'][0][:5]),
    asin = x['asin'],
    category = x['category'],
    helpful_0 = x['helpful'][0],
    helpful_1 = x['helpful'][1],
    overall = x['overall'],
    reviewText = str(x['reviewText']),
    unixreviewTime = x['unixReviewTime'],
    summary = str(x['summary']),
    label=x['class']
))
data = spark.createDataFrame(temp)
data = data.withColumn('reviewtime', from_unixtime(data.unixreviewTime))

data.show(n=3, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 asin           | 0439886341                                                                                                                                                                                                

In [25]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- asin: string (nullable = true)
 |-- category: string (nullable = true)
 |-- helpful_0: long (nullable = true)
 |-- helpful_1: long (nullable = true)
 |-- label: double (nullable = true)
 |-- overall: double (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- prod_name: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixreviewTime: long (nullable = true)
 |-- reviewtime: string (nullable = true)

# Inspect the data

In [4]:
## Not able to use matplot lib.... lol

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
data.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['asin', 'category', 'helpful_0', 'helpful_1', 'label', 'overall', 'prod_id', 'prod_name', 'reviewText', 'summary', 'unixreviewTime', 'reviewtime']

In [7]:
data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7574169

In [8]:
data = data.dropna()
data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7574169

# Build the model to identify spam

In [32]:
# split training testing data
train, test = data.randomSplit([0.7, 0.3])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer_text = Tokenizer(inputCol="reviewText", outputCol="words")
hashingTF_text = HashingTF(inputCol="words", outputCol="rawFeaturesText", numFeatures=50)
idf_text = IDF(inputCol="rawFeaturesText", outputCol="featuresText")

tokenizer_summary = Tokenizer(inputCol="summary", outputCol="words_summary")
hashingTF_summary = HashingTF(inputCol="words_summary", outputCol="rawFeaturesSummary", numFeatures=10)
idf_summary = IDF(inputCol="rawFeaturesSummary", outputCol="featuresSummary")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['helpful_0', 'helpful_1', 'overall', 'featuresText',"featuresSummary"],
                     outputCol="features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
from pyspark.ml import Pipeline

pipeline_gbt = Pipeline(stages= [tokenizer_text, hashingTF_text, idf_text, tokenizer_summary, hashingTF_summary, idf_summary,va, gbt])

gbt_fitted = pipeline_gbt.fit(train)
results_cv = gbt_fitted.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

def get_performance(pred_col, label_col, df):
    roc = BinaryClassificationEvaluator(rawPredictionCol=pred_col, labelCol=label_col, metricName='areaUnderROC').evaluate(df)
    f1 = MulticlassClassificationEvaluator(predictionCol=pred_col, labelCol=label_col, metricName='f1').evaluate(df)
    accuracy = MulticlassClassificationEvaluator(predictionCol=pred_col, labelCol=label_col, metricName='accuracy').evaluate(df)
    performance = [roc, f1, accuracy]
    return "ROC: {0}, F1: {1}, Accuracy: {2}".format(roc, f1, accuracy)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
get_performance(pred_col = 'prediction', label_col = 'label', df = results_cv)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'ROC: 1.0, F1: 1.0, Accuracy: 1.0'

In [39]:
results_cv.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7574169

In [18]:
# final_result = results_cv.filter("prediction == 0")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
final_result = results_cv.withColumn('label_new', expr("IF(prediction==0, 'True', 'Fake')"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
final_result.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7574169

# LDA

In [42]:
from collections import defaultdict
from pyspark import SparkContext
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.sql import SQLContext
import re

num_of_stop_words = 50      # Number of most common words to remove, trying to eliminate stop words
num_topics = 10	            # Number of topics we are looking for
num_words_per_topic = 10    # Number of words to display for each topic
max_iterations = 35         # Max number of times to iterate before finishing

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# readJSON = spark.read.json('s3://msba6330team2bucket/lda/part.json')
readJSON = final_result
data = readJSON.rdd.map(lambda x: x[8])

# Create list of stop words
text = spark.read.text('s3://sagemaker-bda-project/part/english.txt')
stop_words = ['a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't",
              'as', 'at', 'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by', "can't",
              'cannot', 'could', "couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down',
              'during', 'each', 'few', 'for', 'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't",
              'having', 'he', "he'd", "he'll", "he's", 'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his',
              'how', "how's", 'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself', "let's",
              'me', 'more', 'most', "mustn't", 'my', 'myself', 'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or',
              'other', 'ought', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'same', "shan't", 'she', "she'd",
              "she'll", "she's", 'should', "shouldn't", 'so', 'some', 'such', 'than', 'that', "that's", 'the', 'their',
              'theirs', 'them', 'themselves', 'then', 'there', "there's", 'these', 'they', "they'd", "they'll", "they're",
              "they've", 'this', 'those', 'through', 'to', 'too', 'under', 'until', 'up', 'very', 'was', "wasn't", 'we',
              "we'd", "we'll", "we're", "we've", 'were', "weren't", 'what', "what's", 'when', "when's", 'where',
              "where's", 'which', 'while', 'who', "who's", 'whom', 'why', "why's", 'with', "won't", 'would',
              "wouldn't", 'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 'yourself', 'yourselves']

tokens = data \
    .map( lambda document: document.strip().lower()) \
    .map( lambda document: re.split("[\s;,#]", document)) \
    .map( lambda word: [x for x in word if x.isalpha()]) \
    .map( lambda word: [x for x in word if len(x) > 3] ) \
    .map( lambda word: [x for x in word if not x in stop_words] )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
# Get our vocabulary
# 1. Flat map the tokens -> Put all the words in one giant list instead of a list per document
# 2. Map each word to a tuple containing the word, and the number 1, signifying a count of 1 for that word
# 3. Reduce the tuples by key, i.e.: Merge all the tuples together by the word, summing up the counts
# 4. Reverse the tuple so that the count is first...
# 5. ...which will allow us to sort by the word count

termCounts = tokens \
    .flatMap(lambda document: document) \
    .map(lambda word: (word, 1)) \
    .reduceByKey( lambda x,y: x + y) \
    .map(lambda tuple: (tuple[1], tuple[0])) \
    .sortByKey(False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Identify a threshold to remove the top words, in an effort to remove stop words
threshold_value = termCounts.take(num_of_stop_words)[num_of_stop_words - 1][0]

# Only keep words with a count less than the threshold identified above, 
# and then index each one and collect them into a map
vocabulary = termCounts \
    .map(lambda x: x[1]) \
    .zipWithIndex() \
    .collectAsMap()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
# Convert the given document into a vector of word counts
def document_vector(document):
    id = document[1]
    counts = defaultdict(int)
    for token in document[0]:
        if token in vocabulary:
            token_id = vocabulary[token]
            counts[token_id] += 1
    counts = sorted(counts.items())
    keys = [x[0] for x in counts]
    values = [x[1] for x in counts]
    return (id, Vectors.sparse(len(vocabulary), keys, values))

# Process all of the documents into word vectors using the 
# `document_vector` function defined previously
documents = tokens.zipWithIndex().map(document_vector).map(list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
# Get an inverted vocabulary, so we can look up the word by it's index value
inv_voc = {value: key for (key, value) in vocabulary.items()}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
lda_model = LDA.train(documents, k=num_topics, maxIterations=max_iterations)
topic_indices = lda_model.describeTopics(maxTermsPerTopic=num_words_per_topic)

topic_list = []
word_list = []

# Print topics, showing the top-weighted 10 terms for each topic
for i in range(len(topic_indices)):
    print("Topic #{0}\n".format(i + 1))
    for j in range(len(topic_indices[i][0])):
        print("{0}\t{1}\n".format(inv_voc[topic_indices[i][0][j]].encode('utf-8'),
                                  topic_indices[i][1][j]))
        word_list.append(inv_voc[topic_indices[i][0][j]].encode('utf-8'))
    
    topic_list.append(word_list)
    word_list= []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Invalid status code '400' from http://172.31.18.174:8998/sessions/0/statements/59 with error payload: {"msg":"requirement failed: Session isn't active."}


In [60]:
# convert to string
string_topic_list = []
string_word_list = []

for x in topic_list:
    for y in x:
        word = y.decode()
        string_word_list.append(word)
    
    string_topic_list.append(string_word_list)
    string_word_list=[]

string_topic_list

An error was encountered:
Invalid status code '404' from http://172.31.18.174:8998/sessions/0 with error payload: {"msg":"Session '0' not found."}


In [61]:
from pyspark.sql import Row

R = Row("0", "1", "2", '3', '4', '5', '6', '7', '8', '9')
lda_result_df = sc.parallelize([R(*r) for r in zip(*string_topic_list)]).toDF()

An error was encountered:
Invalid status code '404' from http://172.31.18.174:8998/sessions/0 with error payload: {"msg":"Session '0' not found."}


# Save result to cloud

In [73]:
final_result.write.format("json").save('s3://bda-project-updated/electronics-result/electronics')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
lda_result_df.write.format("csv").save('s3://bda-project-updated/electronics-result/electronics-lda')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…