In [1]:
import os
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import pyspark.pandas as ps

Matplotlib created a temporary cache directory at /tmp/matplotlib-82va9ino because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [2]:
import sys
import site
sys.path.append(site.getusersitepackages())
!{sys.executable} -m pip install plotly --upgrade

[0mDefaulting to user installation because normal site-packages is not writeable


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Yelp Data Analysis") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", 2) \
    .config("spark.executor.instances", 8) \
    .config("spark.sql.shuffle.partitions", 32) \
    .config("spark.memory.fraction", 0.6) \
    .getOrCreate()


In [4]:
review_df = spark.read.json('yelp_academic_dataset_review.json')

# Data Cleaning and Preprocessing

### Select features

In [5]:
review_df = review_df.select('text','stars')
print('Total Count: ' + str(review_df.count()))
review_df.show(5)

Total Count: 3358049
+--------------------+-----+
|                text|stars|
+--------------------+-----+
|If you decide to ...|  3.0|
|I've taken a lot ...|  5.0|
|Family diner. Had...|  3.0|
|Wow!  Yummy, diff...|  5.0|
|Cute interior and...|  4.0|
+--------------------+-----+
only showing top 5 rows



### Remove Missing Values

In [6]:
#dropping rows with any null values
review_df = review_df.dropna()
review_df.count()

3358048

### Add feature: 'review_length'

In [9]:
# from pyspark.sql.functions import col, count, when, length

# review_df = review_df.withColumn('review_length', length('text'))
# review_df.show(5)

+--------------------+-----+-------------+
|                text|stars|review_length|
+--------------------+-----+-------------+
|If you decide to ...|  3.0|          513|
|I've taken a lot ...|  5.0|          829|
|Family diner. Had...|  3.0|          339|
|Wow!  Yummy, diff...|  5.0|          243|
|Cute interior and...|  4.0|          534|
+--------------------+-----+-------------+
only showing top 5 rows



### Tokenization and Stop Words Removal

In [7]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized_df = tokenizer.transform(review_df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(tokenized_df)

filtered_df.show(5)

+--------------------+-----+--------------------+--------------------+
|                text|stars|               words|      filtered_words|
+--------------------+-----+--------------------+--------------------+
|If you decide to ...|  3.0|[if, you, decide,...|[decide, eat, her...|
|I've taken a lot ...|  5.0|[i've, taken, a, ...|[taken, lot, spin...|
|Family diner. Had...|  3.0|[family, diner., ...|[family, diner., ...|
|Wow!  Yummy, diff...|  5.0|[wow!, , yummy,, ...|[wow!, , yummy,, ...|
|Cute interior and...|  4.0|[cute, interior, ...|[cute, interior, ...|
+--------------------+-----+--------------------+--------------------+
only showing top 5 rows



### Stemming

In [8]:
!pip install nltk

[0mDefaulting to user installation because normal site-packages is not writeable


In [12]:
# from nltk.stem import PorterStemmer
# from pyspark.sql.functions import udf, col
# from pyspark.sql.types import ArrayType, StringType, FloatType

# stemmer=PorterStemmer()

# def stem_words(words):
#     return [stemmer.stem(word) for word in words]

# stem_udf = udf(stem_words, ArrayType(StringType()))

# stemmed_df = filtered_df.withColumn("stemmed_words", stem_udf(col("filtered_words")))
# stemmed_df.show(5)

+--------------------+-----+-------------+--------------------+--------------------+--------------------+
|                text|stars|review_length|               words|      filtered_words|       stemmed_words|
+--------------------+-----+-------------+--------------------+--------------------+--------------------+
|If you decide to ...|  3.0|          513|[if, you, decide,...|[decide, eat, her...|[decid, eat, here...|
|I've taken a lot ...|  5.0|          829|[i've, taken, a, ...|[taken, lot, spin...|[taken, lot, spin...|
|Family diner. Had...|  3.0|          339|[family, diner., ...|[family, diner., ...|[famili, diner., ...|
|Wow!  Yummy, diff...|  5.0|          243|[wow!, , yummy,, ...|[wow!, , yummy,, ...|[wow!, , yummy,, ...|
|Cute interior and...|  4.0|          534|[cute, interior, ...|[cute, interior, ...|[cute, interior, ...|
+--------------------+-----+-------------+--------------------+--------------------+--------------------+
only showing top 5 rows



### Sentiment Polarity Scores

In [15]:
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer

#initialize the Sentiment Intensity Analyzer
sia = SentimentIntensityAnalyzer()

#define a function to calculate sentiment scores
def sentiment_scores(text):
    scores = sia.polarity_scores(text)
    return float(scores['compound'])

In [19]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType, FloatType

sentiment_udf = udf(sentiment_scores, FloatType())

polarity_df = filtered_df.withColumn('sentiment_polarity', sentiment_udf(col('text')))
polarity_df.show(5)

+--------------------+-----+--------------------+--------------------+------------------+
|                text|stars|               words|      filtered_words|sentiment_polarity|
+--------------------+-----+--------------------+--------------------+------------------+
|If you decide to ...|  3.0|[if, you, decide,...|[decide, eat, her...|            0.8597|
|I've taken a lot ...|  5.0|[i've, taken, a, ...|[taken, lot, spin...|            0.9858|
|Family diner. Had...|  3.0|[family, diner., ...|[family, diner., ...|            0.9201|
|Wow!  Yummy, diff...|  5.0|[wow!, , yummy,, ...|[wow!, , yummy,, ...|            0.9588|
|Cute interior and...|  4.0|[cute, interior, ...|[cute, interior, ...|            0.9804|
+--------------------+-----+--------------------+--------------------+------------------+
only showing top 5 rows



### TF-IDF

In [21]:
from pyspark.ml.feature import HashingTF, IDF

# Apply TF-IDF
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="hashing_tf")
tf_df = hashing_tf.transform(polarity_df)
idf = IDF(inputCol="hashing_tf", outputCol="TF_IDF")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

tfidf_df.show(5)

+--------------------+-----+--------------------+--------------------+------------------+--------------------+--------------------+
|                text|stars|               words|      filtered_words|sentiment_polarity|          hashing_tf|              TF_IDF|
+--------------------+-----+--------------------+--------------------+------------------+--------------------+--------------------+
|If you decide to ...|  3.0|[if, you, decide,...|[decide, eat, her...|            0.8597|(262144,[12524,24...|(262144,[12524,24...|
|I've taken a lot ...|  5.0|[i've, taken, a, ...|[taken, lot, spin...|            0.9858|(262144,[18176,29...|(262144,[18176,29...|
|Family diner. Had...|  3.0|[family, diner., ...|[family, diner., ...|            0.9201|(262144,[578,1261...|(262144,[578,1261...|
|Wow!  Yummy, diff...|  5.0|[wow!, , yummy,, ...|[wow!, , yummy,, ...|            0.9588|(262144,[30899,45...|(262144,[30899,45...|
|Cute interior and...|  4.0|[cute, interior, ...|[cute, interior, ...|      

### Prepare data for Modeling

In [22]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

In [23]:
#Assembling features
assembler = VectorAssembler(inputCols=['sentiment_polarity', 'TF_IDF'], outputCol='combined_features')
final_df = assembler.transform(tfidf_df)

### Logistic Regression Model

In [24]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

#select features
select_df = final_df.select('combined_features', 'stars')
select_df.show(5)

+--------------------+-----+
|   combined_features|stars|
+--------------------+-----+
|(262145,[0,12525,...|  3.0|
|(262145,[0,18177,...|  5.0|
|(262145,[0,579,12...|  3.0|
|(262145,[0,30900,...|  5.0|
|(262145,[0,1690,4...|  4.0|
+--------------------+-----+
only showing top 5 rows



In [20]:
# from sklearn.model_selection import train_test_split
# from sklearn.linear_model import LogisticRegression

# X = select_df['combined_features']
# y = select_df['stars']

# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# model = LogisticRegression(max_iter=10000)

# model.fit(X_train, y_train)

# accuracy = model.score(X_test, y_test)
# print(f'Accuracy: {accuracy})


In [25]:
from pyspark.ml.classification import LogisticRegression

#split data into training and test sets
train_df, test_df = select_df.randomSplit([0.8,0.2], seed=42)

#define logistic regression model
lr = LogisticRegression(featuresCol='combined_features', labelCol='stars', family="multinomial")

#train the model
lr_model = lr.fit(train_df)

# #make predictions
predictions = lr_model.transform(test_df)



### Evaluate the Model

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='stars', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy = {accuracy}")

Test Accuracy = 0.6257126027989651


### Overfitting/Underfitting Graph

In [31]:
error = 1 - accuracy
#model_complexity = []
#model_complexity.append('Logistic Regression')
#input_rows = model_complexity

#rows_list = []
#for row in input_rows:
    #dict1 = {}
    #dict1.update({row:error}) 
    #rows_list.append(dict1)

#df_fitting_graph = px.scatter(rows_list, labels={'index':'count'})
#df_fitting_graph.show()

df_fitting_graph = pd.DataFrame([['Logistic Regression',error]], columns= ['Model','loss'])
new_dict = {"Model": list(df_fitting_graph['Model']), "error": list(df_fitting_graph['loss'])}

df_fitting_graph = ps.scatter(df_fitting_graph, x='Model',y='loss',title='Fitting Graph')
df_fitting_graph.show()

AttributeError: module 'pyspark.pandas' has no attribute 'scatter'