In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session with proper memory settings
spark = SparkSession.builder \
    .appName("Local VADER Sentiment Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()


# Load the JSON file
business_df = spark.read.json("yelp_academic_dataset_business.json")

KeyboardInterrupt: 

In [2]:
business_df.count()

150346

In [3]:
business_df.show(truncate=False)

+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+----------------------------------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------------+-------+-------------+--------------+------------------------+-----------+------------+-----+-----+
|address                             

In [4]:
usa_state_codes = [
    'AL', 'AK', 'AZ', 'AR', 'CA', 
    'CO', 'CT', 'DE', 'FL', 'GA', 
    'HI', 'ID', 'IL', 'IN', 'IA', 
    'KS', 'KY', 'LA', 'ME', 'MD', 
    'MA', 'MI', 'MN', 'MS', 'MO', 
    'MT', 'NE', 'NV', 'NH', 'NJ', 
    'NM', 'NY', 'NC', 'ND', 'OH', 
    'OK', 'OR', 'PA', 'RI', 'SC', 
    'SD', 'TN', 'TX', 'UT', 'VT', 
    'VA', 'WA', 'WV', 'WI', 'WY'
]

In [5]:
from pyspark.sql.functions import col
filtered_df = business_df.filter(col("state").isin(usa_state_codes))

In [6]:
filtered_df.count()

144771

In [7]:
filtered_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{NULL, NULL, NULL...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                NULL|      0|   34.4266787|  -119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{NULL, NULL, NULL...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1|   

In [8]:
reviews_df=spark.read.json("yelp_academic_dataset_review.json")

In [9]:
reviews_df.count()

6990280

In [10]:
reviews_df.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [11]:
filtered_reviews = reviews_df.join(filtered_df, on="business_id", how="inner")

In [12]:
filtered_reviews.show(10)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+-------+----------+-----------+-----------------+-----------+------------+-----+-----+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|         address|          attributes|          categories|           city|               hours|is_open|  latitude|  longitude|             name|postal_code|review_count|stars|state|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+----------------+--------------------+--------------------+---------------+--------------------+-------+----------+-----------+-----------------+-----------+------------+-----+-----+
|---kPU91CF4Lq2-Wl...|   0|2020-08-09 19:19:16|    0|hoLKem4XpXG

In [13]:
filtered_reviews = filtered_reviews.drop(filtered_df.stars)

In [14]:
filtered_reviews.count()

6880828

In [15]:
filtered_reviews.show(truncate=False)

+----------------------+----+-------------------+-----+----------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
filtered_reviews.printSchema()

In [17]:
# pip install vaderSentiment

In [16]:
selected_reviews= filtered_reviews.select("business_id","review_id","text")


In [17]:
selected_reviews.show(10)

+--------------------+--------------------+--------------------+
|         business_id|           review_id|                text|
+--------------------+--------------------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|KU_O5udG6zpxOg-Vc...|If you decide to ...|
|7ATYjTIgM3jUlt4UM...|BiTunyQ73aT9WBnpR...|I've taken a lot ...|
|YjUWPpI6HXG530lwP...|saUsX_uimxRlCVr67...|Family diner. Had...|
|kxX2SOes4o-D3ZQBk...|AqPFMleE6RsU23_au...|Wow!  Yummy, diff...|
|e4Vwtrqf-wpJfwesg...|Sx8TMOWLNuJBWer-0...|Cute interior and...|
|04UD14gamNjLY0IDY...|JrIxlS1TzJ-iCu79u...|I am a long term ...|
|gmjsEdUsKpj9Xxu6p...|6AxgBCNX_PNTOxmbR...|Loved this tour! ...|
|LHSTtnW3YHCeUkRDG...|_ZeMknuYdlQcUqng_...|Amazingly amazing...|
|B5XSoSG3SfvQGtKEG...|ZKvDG2sBvHVdF5oBN...|This easter inste...|
|gebiRewfieSdtt17P...|pUycOfUwM8vqX7KjR...|Had a party of 6 ...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [20]:
# pip install databricks

In [None]:
# filtered_reviews.printSchema()

In [18]:
from pyspark.sql import functions as F
# trial starts

review_counts = selected_reviews.groupBy("business_id").agg(F.count("review_id").alias("review_count"))

# Step 2: Filter out businesses with fewer than 10 reviews
valid_businesses = review_counts.filter(F.col("review_count") >= 10)

# Step 3: Join back to the original DataFrame to get only valid businesses
valid_reviews = selected_reviews.join(valid_businesses.select("business_id"), on="business_id")

In [None]:


# Step 4: Select a maximum of 50 reviews per business_id
result = (
    valid_reviews
    .groupBy("business_id")
    .agg(F.collect_list(F.struct("review_id", "text")).alias("reviews"))
)

# Step 5: Explode to get individual rows and limit to a maximum of 50 reviews per business
final_result = (
    result
    .withColumn("reviews", F.expr("slice(reviews, 1, 20)"))
    .select("business_id", F.explode("reviews").alias("review"))
)

# Step 6: Select required columns
final_df = final_result.select("business_id", "review.review_id", "review.text")

# Show the final DataFrame
final_df.show()


In [None]:
final_df.count()

In [25]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf, avg
from pyspark.sql.types import FloatType

In [26]:
# Initialize VADER SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()

# Define a function to get sentiment score
def get_sentiment_score(text):
    sentiment_dict = analyzer.polarity_scores(text)
    return float(sentiment_dict['compound'])  # Using the compound score

# Register the UDF
sentiment_udf = udf(get_sentiment_score, FloatType())


In [None]:
# Apply the UDF to the DataFrame
df_with_sentiment = final_df.withColumn("sentiment_score", sentiment_udf(final_df["text"]))

# Show the results
df_with_sentiment.show()


In [28]:
average_sentiment_df = df_with_sentiment.groupBy("business_id").agg(avg("sentiment_score").alias("average_sentiment_score"))

In [None]:
average_sentiment_df.show()


In [None]:
joined_df = filtered_df.join(average_sentiment_df, on='business_id', how='inner')

# Show the results
joined_df.show()

In [None]:
from pyspark.sql.functions import col

# Check which rows have null values in the "sentiment_score" column
null_rows = joined_df.filter(col("average_sentiment_score").isNull())

# Show rows where sentiment_score is null
# null_rows.show()

# Remove rows where sentiment_score is null
cleaned_df = joined_df.filter(col("average_sentiment_score").isNotNull())

# Show the cleaned DataFrame
cleaned_df.show()


In [None]:
cleaned_df.count()

In [16]:
# Writing DataFrame to a JSON file
cleaned_df.write.json(r"C:\Users\admin\Desktop\Exploratory\cleaned_data.json", header=True)


NameError: name 'cleaned_df' is not defined