In [6]:
import sys
import itertools
import logging
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.sql import SparkSession

import json
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes

# from pycorenlp import StanfordCoreNLP
# nlp = StanfordCoreNLP('http://localhost:9000')

In [7]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
print("Running Spark Version %s" % (spark.version))

#### load business data ###
# path_business ="yizhan/Desktop/cs181/dataset/buiness.json"
path_business = "dataset/business.json"
df_business_raw = spark.read.json(path_business)
# df_business_raw.first()

businessDF = df_business_raw.select(df_business_raw["business_id"], df_business_raw["name"], df_business_raw["categories"])
print(businessDF.first())
businessDF.printSchema()
businessDF.createOrReplaceTempView("business")

# path_review="yizhan/Desktop/CS181/yelp dataset/review.json"
path_review="dataset/review.json"
df_review_raw = spark.read.json(path_review)
# df_review_raw.first()

reviewDF= df_review_raw.select(df_review_raw["review_id"], df_review_raw["user_id"], df_review_raw["business_id"], df_review_raw["text"])
print(reviewDF.first())
reviewDF.printSchema()
reviewDF.createOrReplaceTempView("review")

# join reviews with busines on businnes_id 
merged_review = spark.sql("SELECT * FROM review left outer join business on review.business_id == business.business_id")

Running Spark Version 2.2.0
Row(business_id='YDf95gJZaq05wvo7hTQbbQ', name='Richmond Town Square', categories=['Shopping', 'Shopping Centers'])
root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)

Row(review_id='VfBHSwC5Vz_pbFluy07i9Q', user_id='cjpdDjZyprfyDG3RlkVG3w', business_id='uYHaNptLzDLoV_JZ_MuzUA', text='My girlfriend and I stayed here for 3 nights and loved it. The location of this hotel and very decent price makes this an amazing deal. When you walk out the front door Scott Monument and Princes street are right in front of you, Edinburgh Castle and the Royal Mile is a 2 minute walk via a close right around the corner, and there are so many hidden gems nearby including Calton Hill and the newly opened Arches that made this location incredible.\n\nThe hotel itself was also very nice with a reasonably priced bar, very considerate staff, and small but comforta

In [8]:
merged_review.rdd

MapPartitionsRDD[26] at javaToPython at NativeMethodAccessorImpl.java:0

In [9]:
sample = merged_review.take(1000)

In [10]:
len(sample)
type(sample)
from pyspark.ml.feature import Tokenizer,StopWordsRemover

In [16]:
# duplicate data with multiple features
list = []
label_to_num_dict = {}
num_to_label_dict = {}
index = 0 
for line in sample:
    labels = line["categories"]
    text = line["text"]
    # print(text)
    for label in labels:
        if label not in label_to_num_dict:
            # print(index, " represents ",label)
            label_to_num_dict[label] = index
            num_to_label_dict[index] = label 
            index += 1  
        # covert the category label to numerical value
        num_label = label_dict[label]      
        list.append([num_label,text])
        
# print(list)

In [45]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# convert to rdd 
sample_with_dup = spark.sparkContext.parallelize(list)
label = sample_with_dup.map(lambda item: item[0])
text = sample_with_dup.map(lambda item: item[1])

'''
sampleDF = spark.createDataFrame(list)
sampleDF.printSchema()

'''
schemaString = "label text"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
sampleDF = spark.createDataFrame(sample_with_dup, schema)
sampleDF.printSchema()

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(sampleDF)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=30)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

'''
#labeled_points = sample_with_dup.map(lambda item : LabeledPoint([item[0],item[1]]))

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(sampleDF)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
remover.transform(wordsData) #.show(truncate=False) 

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000) 
featurizedData = hashingTF.transform(remover)

#hashingTF = HashingTF(numFeatures = 1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) '''
'''
tf = hashingTF.transform()
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
'''
sampleDF.printSchema()


root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)

root
 |-- label: string (nullable = true)
 |-- text: string (nullable = true)



In [56]:
# display the generated tfidf 
print("tfidf:")
count = 0
for each in rescaledData.collect():    
    print(each)
    count += 1
    if count > 10:
        break

tfidf:
Row(label='0', text="What can I say.. Wowzers! Probably one of the best steak houses I've been too. Service was absolutely flawless and dinner was excellent . Ordered seafood tower, wedge, wagyu filet, chateaubriand, bacon grits and sautéed  mushrooms Will definitely be back!", words=['what', 'can', 'i', 'say..', 'wowzers!', 'probably', 'one', 'of', 'the', 'best', 'steak', 'houses', "i've", 'been', 'too.', 'service', 'was', 'absolutely', 'flawless', 'and', 'dinner', 'was', 'excellent', '.', 'ordered', 'seafood', 'tower,', 'wedge,', 'wagyu', 'filet,', 'chateaubriand,', 'bacon', 'grits', 'and', 'sautéed', '', 'mushrooms', 'will', 'definitely', 'be', 'back!'], rawFeatures=SparseVector(30, {0: 1.0, 1: 1.0, 2: 1.0, 3: 4.0, 4: 2.0, 8: 2.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 3.0, 13: 3.0, 14: 3.0, 15: 3.0, 16: 2.0, 17: 1.0, 19: 1.0, 20: 1.0, 21: 1.0, 23: 1.0, 24: 2.0, 26: 3.0, 28: 1.0, 29: 2.0}), features=SparseVector(30, {0: 0.0483, 1: 0.1144, 2: 0.3031, 3: 0.4545, 4: 0.5479, 8: 0.5063, 9:

In [120]:
def getLabeledPoint(line):
    return LabeledPoint(line[0],line[1])

In [57]:
print(type(rescaledData))
data = rescaledData.rdd.map(lambda item : LabeledPoint(item.label,item.features))
training, test = data.randomSplit([0.6, 0.4],seed = 42)

model = NaiveBayes.train(training, 1.0)

# Make prediction and test accuracy.
predictionAndLabel = test.map(lambda p: [model.predict(p.features), p.label])
accuracy = 1.0 * predictionAndLabel.filter(lambda item: item[0] == item[1]).count() / test.count()

# Save and load model
model.save(sc, "target/tmp/myNaiveBayesModel")
sameModel = NaiveBayesModel.load(sc, "target/tmp/myNaiveBayesModel")

<class 'pyspark.sql.dataframe.DataFrame'>


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 1 times, most recent failure: Lost task 0.0 in stage 29.0 (TID 120, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/rddsampler.py", line 95, in func
    for obj in iterator:
  File "<ipython-input-57-5558219f68f2>", line 2, in <lambda>
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 54, in __init__
    self.features = _convert_to_vector(features)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/rddsampler.py", line 95, in func
    for obj in iterator:
  File "<ipython-input-57-5558219f68f2>", line 2, in <lambda>
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 54, in __init__
    self.features = _convert_to_vector(features)
  File "/Users/Wenonah/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
# groupby cat

# get one 

# dup the data 
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsData = tokenizer.transform(merged_review)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
remover.transform(wordsData).show(truncate=False)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


'''
loop through the cat   
    groupby(cat).tf

# tf idf
hashingTF = HashingTF(100)
tf = hashingTF.transform(review.map(extrat the text line ))
# get it back to the data?


# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
'''

# Split positive and negative data 60/40 into training and test data sets
train, test = review.randomSplit([0.6, 0.4])

# Train a Naive Bayes model on the training data
model = NaiveBayes.train(train)

# Compare predicted labels to actual labels
prediction_and_labels = test.map(lambda point: (model.predict(point.features), point.label))

# Filter to only correct predictions
#correct = prediction_and_labels.filter(lambda(predicted, actual): predicted == actual)

# Calculate and print accuracy rate
accuracy = correct.count() / float(testh.count())

print("Classifier correctly predicted category " + str(accuracy * 100) + " percent of the time