In [12]:
import os
import pandas as pd
os.environ['JAVA_HOME'] = 'C:\java\jdk'
os.environ['SPARK_HOME'] = 'C:\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3'
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("YourAppName") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.default.parallelism", "100") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000m") \
    .getOrCreate()

sc.setLogLevel("DEBUG")  # Set log level to DEBUG


In [13]:
from pyspark.sql.functions import to_timestamp,col
reviews_df = spark.read.json('yelp_dataset\yelp_academic_dataset_review.json').withColumn('Date',to_timestamp(col('date'),'yyyy-MM-dd HH:mm:ss'))
business_df = spark.read.json('yelp_dataset\yelp_academic_dataset_business.json')

In [14]:
business_df = business_df.drop('attributes','hours')
from pyspark.sql.functions import lower
business_df = business_df.where(lower(col('Categories')).rlike('.*restaurant.*'))
business_id_df = business_df.select(col('business_id')).filter(col('is_open') == 1)


In [15]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def iqr_outliers(dataframe, column, factor=1.5):
    # Calculate quartiles
    window = Window.orderBy(column)
    df_with_quartiles = dataframe.withColumn(
        "row_number", F.row_number().over(window)
    ).withColumn(
        "total_rows", F.count("*").over(Window.partitionBy())
    )
    
    q1_row = df_with_quartiles.filter(
        F.col("row_number") == (F.col("total_rows") * 0.25).cast("int")
    ).select(column).collect()[0][0]
    
    q3_row = df_with_quartiles.filter(
        F.col("row_number") == (F.col("total_rows") * 0.75).cast("int")
    ).select(column).collect()[0][0]
    
    # Calculate IQR and bounds
    iqr = q3_row - q1_row
    lower_bound = q1_row - factor * iqr
    upper_bound = q3_row + factor * iqr
    
    # Filter outliers
    result = dataframe.filter(
        (F.col(column) >= lower_bound) & 
        (F.col(column) <= upper_bound)
    )
    
    return result

# Usage
business_df = iqr_outliers(business_df.filter(F.col('is_open') == 1), 'review_count')

In [16]:
business_review_df = business_df.join(reviews_df,reviews_df.business_id ==  business_df.business_id,"left")

In [7]:
business_review_df.show(5)

+----------------+--------------------+--------------------+--------+-------+----------+-----------+---------------+-----------+------------+-----+-----+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         address|         business_id|          categories|    city|is_open|  latitude|  longitude|           name|postal_code|review_count|stars|state|         business_id|cool|               Date|funny|           review_id|stars|                text|useful|             user_id|
+----------------+--------------------+--------------------+--------+-------+----------+-----------+---------------+-----------+------------+-----+-----+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|8456 Highway 100|G4G7D3_L-dHeq6SXs...|Comfort Food, Foo...|Bellevue|      1|36.0350177|-86.9729834|Biscuit Kitchen|      37221|           8|  3.0|   TN|G

In [17]:
from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Define a UDF (User Defined Function) to calculate sentiment polarity
def get_sentiment_polarity(text):
    return TextBlob(text).sentiment.polarity

# Register the UDF
sentiment_udf = udf(get_sentiment_polarity, FloatType())

# Apply the UDF to the 'text' column
review_sentiment_df = business_review_df.withColumn('sentiment_polarity', sentiment_udf(F.col('text')))

In [22]:
review_sentiment_df.show(5)

+----------------+--------------------+--------------------+--------+-------+----------+-----------+---------------+-----------+------------+-----+-----+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+------------------+
|         address|         business_id|          categories|    city|is_open|  latitude|  longitude|           name|postal_code|review_count|stars|state|         business_id|cool|               Date|funny|           review_id|stars|                text|useful|             user_id|sentiment_polarity|
+----------------+--------------------+--------------------+--------+-------+----------+-----------+---------------+-----------+------------+-----+-----+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+------------------+
|8456 Highway 100|G4G7D3_L-dHeq6SXs...|Comfort Food, Foo...|Bellevue|      1|36.0350177|-86.97298

Side project, use this review text to predict if the review is useful or not, will use LogisticRegression , Decision Tress Classifier and Random Forrest to compare the results!!

In [7]:
review_sentiment_df.count()

1833326

In [None]:
print(review_sentiment_df.filter(col('text').isNull()).count())

0


In [26]:
from pyspark.sql.functions import col

review_sentiment_df.printSchema()



root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- sentiment_polarity: float (nullable = true)



In [18]:
# Only taking dependant and independant variables
model_df = review_sentiment_df.select(col('sentiment_polarity'),col('text'))

model_df.printSchema()

root
 |-- sentiment_polarity: float (nullable = true)
 |-- text: string (nullable = true)



In [19]:
from pyspark.sql.functions import when

model_df = model_df.withColumn("label", when(model_df["sentiment_polarity"] > 0, 1).otherwise(0)) 

In [None]:
value_counts = model_df.groupBy("label").count()


In [20]:
model_df_samp = model_df.sample(withReplacement=False, fraction=0.1, seed=42)

In [21]:
num_partitions = 100  # Adjust based on your cluster resources
model_df_samp = model_df_samp.repartition(num_partitions)

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# Split the data
train_data, test_data = model_df_samp.randomSplit([0.8, 0.2], seed=42)
train_data, val_data = train_data.randomSplit([0.8, 0.2], seed=42)

train_data = train_data.cache()

# Create a pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label",maxIter=20, tol=1e-4)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Fit the pipeline
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(val_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="label")
accuracy = evaluator.evaluate(predictions)

print(f"Test set accuracy = {accuracy}")

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import expr

# Select (prediction, true label) and compute test error
predictionAndLabels = predictions.select("prediction", "label")

# Compute confusion matrix
confusion_matrix = predictionAndLabels.groupBy("prediction", "label").count().orderBy("prediction", "label")

# Display confusion matrix
print("Confusion Matrix:")
confusion_matrix.show()

# Compute metrics
metrics = MulticlassMetrics(predictionAndLabels.rdd.map(tuple))

# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print(f"Precision = {precision}" )
print(f"Recall = {recall}")
print(f"F1 Score = {f1Score}")

Now analyzing Decision Trees

In [None]:
from pyspark.sql.types import FloatType
model_df = model_df.withColumn("sentiment_polarity", model_df["sentiment_polarity"].cast(FloatType()))

In [10]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import DecisionTreeClassifier  
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# Split the data
train_data, test_data = model_df.sample(withReplacement=False, fraction=0.1, seed=42).randomSplit([0.8, 0.2], seed=42)
train_data, val_data = train_data.randomSplit([0.8, 0.2], seed=42)

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(train_data)
tokenized.head()


# Create a pipeline
# tokenizer = Tokenizer(inputCol="text", outputCol="words")
# remover = StopWordsRemover(inputCol="words", outputCol="filtered")
# hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
# idf = IDF(inputCol="rawFeatures", outputCol="features")
# dt = DecisionTreeClassifier(maxDepth=4, impurity='gini',featuresCol="features", labelCol="useful")

# pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, dt])

# # Fit the pipeline
# model = pipeline.fit(train_data)

# # Make predictions
# predictions = model.transform(val_data)

# # Evaluate the model
# evaluator = BinaryClassificationEvaluator(labelCol="useful")
# accuracy = evaluator.evaluate(predictions)

# 



Row(useful=0, text='"Toto, we ain\'t in Cali anymore." The girl in the red slippers doesn\'t actually say that and no one from California actually calls it by the name. But this pizza place is the bomb! Light on cheese and not overly breaddy or burnt or soggy. Cooked just the right amount! They also serve other things but I only had their pizza and it was bomb diggity for sure. I highly recommend to anyone swinging thru the area.', words=['"toto,', 'we', "ain't", 'in', 'cali', 'anymore."', 'the', 'girl', 'in', 'the', 'red', 'slippers', "doesn't", 'actually', 'say', 'that', 'and', 'no', 'one', 'from', 'california', 'actually', 'calls', 'it', 'by', 'the', 'name.', 'but', 'this', 'pizza', 'place', 'is', 'the', 'bomb!', 'light', 'on', 'cheese', 'and', 'not', 'overly', 'breaddy', 'or', 'burnt', 'or', 'soggy.', 'cooked', 'just', 'the', 'right', 'amount!', 'they', 'also', 'serve', 'other', 'things', 'but', 'i', 'only', 'had', 'their', 'pizza', 'and', 'it', 'was', 'bomb', 'diggity', 'for', 'su

In [11]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed = remover.transform(tokenized)
removed.head()

Row(useful=0, text='"Toto, we ain\'t in Cali anymore." The girl in the red slippers doesn\'t actually say that and no one from California actually calls it by the name. But this pizza place is the bomb! Light on cheese and not overly breaddy or burnt or soggy. Cooked just the right amount! They also serve other things but I only had their pizza and it was bomb diggity for sure. I highly recommend to anyone swinging thru the area.', words=['"toto,', 'we', "ain't", 'in', 'cali', 'anymore."', 'the', 'girl', 'in', 'the', 'red', 'slippers', "doesn't", 'actually', 'say', 'that', 'and', 'no', 'one', 'from', 'california', 'actually', 'calls', 'it', 'by', 'the', 'name.', 'but', 'this', 'pizza', 'place', 'is', 'the', 'bomb!', 'light', 'on', 'cheese', 'and', 'not', 'overly', 'breaddy', 'or', 'burnt', 'or', 'soggy.', 'cooked', 'just', 'the', 'right', 'amount!', 'they', 'also', 'serve', 'other', 'things', 'but', 'i', 'only', 'had', 'their', 'pizza', 'and', 'it', 'was', 'bomb', 'diggity', 'for', 'su

In [None]:
# hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
# hashedTF = hashingTF.transform(removed)
# hashedTF.head()

In [None]:
# import pyspark.sql.functions as F
# hashedTF.printSchema()
# hashedTF.select("words", "filtered", "rawFeatures").show(truncate=False)


In [12]:
from pyspark.ml.feature import CountVectorizer

# Initialize CountVectorizer
cv = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")

# Fit the CountVectorizer to create the vocabulary
cv_model = cv.fit(removed)

# Transform the data to create vector representations
vectorized = cv_model.transform(removed)


In [13]:
from pyspark.ml.feature import IDF

# Initialize IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Fit the IDF model
idf_model = idf.fit(vectorized)

# Transform the data to create TF-IDF vectors
tfidf_data = idf_model.transform(vectorized)

In [14]:
tfidf_data.printSchema()

root
 |-- useful: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



In [15]:
value_counts = tfidf_data.groupBy("useful").count()

# Show the results
value_counts.show()

+------+-----+
|useful|count|
+------+-----+
|    12|  128|
|    22|   27|
|     1|27160|
|    13|  105|
|     6|  836|
|    16|   60|
|     3| 5110|
|     5| 1429|
|    15|   69|
|     9|  280|
|    17|   37|
|     4| 2583|
|     8|  389|
|    23|   22|
|     7|  559|
|    10|  201|
|    25|   14|
|    24|   16|
|    21|   31|
|    11|  155|
+------+-----+
only showing top 20 rows



In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

dt = DecisionTreeClassifier(maxDepth=4, impurity='gini',featuresCol="features", labelCol="useful")

tfidf_data = tfidf_data.withColumn("useful", when(tfidf_data["useful"] > 0, 1).otherwise(0))

model = dt.fit(tfidf_data)

predictions = model.transform(tfidf_data)

evaluator = BinaryClassificationEvaluator(labelCol="useful")
accuracy = evaluator.evaluate(predictions)

print(f"Test set accuracy = {accuracy}")


In [17]:
train_data.count()

117842

In [None]:
spark.stop()