### Importing the  Libraries

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

from pyspark.ml.feature import StopWordsRemover,Tokenizer, HashingTF, Word2Vec, IDF,NGram
from pyspark.ml import Pipeline
import pyspark.sql.functions as f
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time


### Creating the spark session

In [8]:
spark = SparkSession.builder.appName("yelp").getOrCreate()

### Reading the data

In [13]:
path = "gs://bdl2021_final_project/yelp_train.json" # path of train data file
yelpDF = spark.read.json(path).select('review_id','text','stars') # reading selected columns data
yelpDF.printSchema() # schema

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: double (nullable = true)



In [14]:
yelpDF.count()

7863924

### Pre-processing

In [15]:
yelpDF = yelpDF.withColumn("text_raw", f.regexp_replace("text", "[^a-zA-Z0-9\s+\']", "")) # stripping special characters
yelpDF = yelpDF.withColumn("text_split", f.split(f.trim("text_raw"),"\s+")) # splitting clean text into words

In [16]:
yelpDF.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text_raw: string (nullable = true)
 |-- text_split: array (nullable = true)
 |    |-- element: string (containsNull = true)



### Creating the Pipeline Model

In [18]:
remover = StopWordsRemover(inputCol="text_split", outputCol="clean_words") # removing the stop words
hashingTF = HashingTF(inputCol="clean_words", outputCol="rawFeatures", numFeatures=10000) # making features
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
lr = LogisticRegression(labelCol="stars",featuresCol="features") # logistic regression

(trainingData, testData) = yelpDF.randomSplit([0.8, 0.2]) # splitting train and test


### Training and prediction

In [19]:
pipeline = Pipeline(stages=[remover,hashingTF,idf,lr]) # pipeline with 4 stages as said above

pipeline_model = pipeline.fit(trainingData) #fitting the model
predictions = pipeline_model.transform(testData) #predictions on test data

evaluator = MulticlassClassificationEvaluator(
    labelCol="stars", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions,{evaluator.metricName: "accuracy"}) # accuracy calculation
print("Test Accuracy = %g" % (accuracy))

f1 = evaluator.evaluate(predictions,{evaluator.metricName: "f1"}) # f1 score calculation
print("Test F1 = %g" % (f1))

Test Accuracy = 0.683716
Test F1 = 0.66193
