In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext("yarn")
spark = SparkSession(sc)
spark = SparkSession.builder.appName('Yelp-ML').getOrCreate()


In [None]:
Main = spark.read.csv("yelp_review.csv",inferSchema=True,header=True)


In [None]:
Main = Main.withColumn("label", Main["stars"].cast("double"))
Main = Main.dropna(subset=['label', 'text'])

Main = Main.select('text','label')
Main=Main.filter(Main['label']<=5.0)
Main=Main.filter(Main['label']>0.0)
(X,data) = Main.randomSplit([0.903,0.097],seed=100)


In [None]:
from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))

In [None]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()

In [None]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

In [None]:
clean_data = clean_data.select(['label','features'])
#clean_data.show()

In [None]:
(training,testing) = clean_data.randomSplit([0.7,0.3])

In [None]:
label_predictor = nb.fit(training)
#data.printSchema()

In [None]:
test_results = label_predictor.transform(testing)
#test_results.show()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
#print("Accuracy of model nvb at predicting label was: {}".format(acc))

In [None]:
#Random forest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20)
model = rf.fit(training)
predictions = model.transform(testing)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc2 = evaluator.evaluate(predictions)
#print("Accuracy of model Random Forest at predicting label was: {}".format(acc2))

In [None]:
#Gradient Boosted Trees
from pyspark.ml.classification import GBTClassifier

# Train a GBT model.
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)

# Train model.  This also runs the indexers.
model = gbt.fit(training)

# Make predictions.
predictions = model.transform(testing)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc3 = evaluator.evaluate(predictions)
#print("Accuracy of model Gradient Boosted Trees at predicting label was: {}".format(acc3))

In [None]:
print("Accuracy of model nvb at predicting label was: {}".format(acc))
print("Accuracy of model Random Forest at predicting label was: {}".format(acc2))
print("Accuracy of model Gradient Boosted Trees at predicting label was: {}".format(acc3))