In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import DecisionTreeClassifier,NaiveBayes,LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import nltk
from nltk.corpus import stopwords

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

ModuleNotFoundError: ignored

In [None]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
spark = SparkSession.builder \
    .appName("text_classification_trainer") \
    .master("local") \
    .getOrCreate()

In [None]:
df = spark.read.json("gs://finalprojectbdl2021/yelp_train.json")

In [None]:
df.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|--FnvijzY20d1nk9H...|   0|2019-10-09 17:11:51|    0|ha9TgGOiBr1l7Mi9D...|  5.0|Great Mexican Foo...|     0|czIk4xBskNcWieyWI...|
|--SrzpvFLwP_YFwB_...|   1|2011-02-28 21:07:48|    0|a5DGTpucUmVYT-lyP...|  4.0|Keung's is one of...|     1|HFItzRohDHZvcKDrM...|
|--cZ6Hhc9F7VkKXxH...|   0|2008-08-24 16:35:22|    0|0sRfH3GTUXqqxtkKc...|  4.0|The food is great...|     1|DQIt5Uv87fdS54b2o...|
|--cZ6Hhc9F7VkKXxH...|   0|2010-01-18 18:50:25|    0|bgA8LHJ8yQ6h0PrpO...|  4.0|Saw this restaura...|     2|NnMCDFCsaiJ3OgvzZ...|
|--cZ6Hhc9F7VkKXxH...|   0|2011-02-16 13:57:00|    0|-Ud_XVfiL4CAF4fAe...|  5.0|Delicious 

In [None]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
data = df.select('text', 'stars').dropna()

In [None]:
data.show(5)

+--------------------+-----+
|                text|stars|
+--------------------+-----+
|Great Mexican Foo...|  5.0|
|Keung's is one of...|  4.0|
|The food is great...|  4.0|
|Saw this restaura...|  4.0|
|Delicious rotisse...|  5.0|
+--------------------+-----+
only showing top 5 rows



In [None]:
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
add_stopwords = stopwords.words('english')
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000, minDF=5)
#label_stringIdx = StringIndexer(inputCol = "stars", outputCol = "label")

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

In [None]:
pipelineFit = pipeline.fit(data)
data = pipelineFit.transform(data)

In [None]:
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed = 0)

In [None]:
# trainingData.count()

In [None]:
data.show(10)

+--------------------+-----+--------------------+--------------------+--------------------+-----+
|                text|stars|               words|            filtered|            features|label|
+--------------------+-----+--------------------+--------------------+--------------------+-----+
|Great Mexican Foo...|  5.0|[great, mexican, ...|[great, mexican, ...|(1000,[0,1,3,19,2...|  0.0|
|Keung's is one of...|  4.0|[keung, s, is, on...|[keung, one, fave...|(1000,[0,8,11,18,...|  1.0|
|The food is great...|  4.0|[the, food, is, g...|[food, great, 5, ...|(1000,[0,1,3,5,7,...|  1.0|
|Saw this restaura...|  4.0|[saw, this, resta...|[saw, restaurant,...|(1000,[1,3,9,10,1...|  1.0|
|Delicious rotisse...|  5.0|[delicious, rotis...|[delicious, rotis...|(1000,[8,11,36,44...|  0.0|
|If your looking f...|  3.0|[if, your, lookin...|[looking, somethi...|(1000,[3,10,23,79...|  3.0|
|I like this place...|  3.0|[i, like, this, p...|[like, place, alt...|(1000,[0,1,2,4,6,...|  3.0|
|Living in Hunters..

In [None]:
lr = LogisticRegression(labelCol="stars", featuresCol="features",maxIter=15, regParam=0.1)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol="stars",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.5871770599898263


In [None]:
nb = NaiveBayes(modelType="multinomial",labelCol="stars", featuresCol="features")
nbModel = nb.fit(trainingData)
nb_predictions = nbModel.transform(testData)


In [None]:
## Evaluating the model
evaluator = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))

Accuracy of NaiveBayes is = 0.60583


In [None]:
# nbModel.save('gs://project-final/modelpath/')

In [None]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'stars', maxDepth = 10)
dtModel = dt.fit(trainingData)
dtPreds = dtModel.transform(testData)


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dtPreds)
print("Accuracy of Decision Trees is = %g"% (dt_accuracy))

Accuracy of Decision Trees is = 0.523514
