In [0]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, explode, desc
from pyspark.sql.types import ArrayType, StringType

In [1]:
%spark.pyspark
# Initialize Spark Session
spark = SparkSession.builder.appName("ReviewAnalysis").getOrCreate()

# Read data (30000 rows of ratings with emotions)
path = "/user/tl4151_nyu_edu/emotion_analysis_result_1209"
rating_df = spark.read.parquet(path)
print("Number of total rows:", rating_df.count())
print("Number of unique books:", rating_df.select("Title").distinct().count())
rating_df.show(20)

In [2]:
%spark.pyspark
# Only keep the top/ higheest emotion of each review
from pyspark.sql.functions import greatest, col, lit, when

# List of emotion columns
emotion_columns = ["sadness", "joy", "love", "anger", "fear", "surprise"]

# Add the 'emotion' column by finding the column with the highest score
rating_df = rating_df.withColumn(
    "emotion",
    when(col("sadness") == greatest(*[col(c) for c in emotion_columns]), "sadness")
    .when(col("joy") == greatest(*[col(c) for c in emotion_columns]), "joy")
    .when(col("love") == greatest(*[col(c) for c in emotion_columns]), "love")
    .when(col("anger") == greatest(*[col(c) for c in emotion_columns]), "anger")
    .when(col("fear") == greatest(*[col(c) for c in emotion_columns]), "fear")
    .when(col("surprise") == greatest(*[col(c) for c in emotion_columns]), "surprise")
)

# Show a few rows to verify the new 
rating_df = rating_df.select("row_id", "Title", "review/score", "review/text", "emotion")
rating_df.show(5)


## Top k keywords

### Unigram

In [5]:
%spark.pyspark
from pyspark.sql.functions import explode, split, col, lower, regexp_replace


# Remove punctuation and convert to lowercase
rating_df = rating_df.withColumn(
    "cleaned_text",
    lower(regexp_replace(col("review/text"), "[^a-zA-Z0-9\\s]", ""))  # Keep only letters, numbers, and spaces
)

# Tokenize the review/text column
tokenized_df = rating_df.select(
    col("row_id"),
    col("cleaned_text"),
    explode(split(col("cleaned_text"), "\\s+")).alias("word")
)

print(tokenized_df.count())
tokenized_df.show(20)

In [6]:
%spark.pyspark

# Filter out stopwords
uni_stop_words = ['a', 'an', 'the', 'is', 'in', 'at', 'of', 'on', 'and', 'to', 'for', 'with', 'this', 'that', 'it', 'was', 'as', 'by', 'are', 'from', 'be', 'has', 'had', 'will', 'would', 'can', 'could', 'should', 'you', 'your', 'we', 'they', 'their', 'but', 'not', 'or', 'if', 'which', 'when', 'what', 'how', 'where', 'if', 'i', 'book', 'have', 'read', 'he', 'his', 'my', 'one', 'all', 'about', 'so', 'who', 'like', 'her', 'more', 'very', 'just', 'some', 'out', 'there', 'me', 'she', 'books', 'really', 'many', 'up', 'because', 'into', 'much', 'no', 'than', 'most', 'were', 'been', 'do', 'also', 'it', 'dont', 'then', 'its', 'book', 'its', 'story', 'great', 'first', 'good', 'reading', 'time', 'only', 'get', 'other', 'people', 'even', 'think', 'book', 'after', 'any', 'through', 'way', 'these', 'well', 'its', 'know', 'two', 'am', 'them', 'make', 'little', 'find', 'best', 'never', 'found', 'still', 'out', 'every', 'does', 'did', 'see', 'want', 'years', 'ever', 'written', 'too', 'while', 'being', 'each', 'author', 'him', 'such', 'us', 'im', 'few', 'now', 'world', 'say', 'own', 'things', 'end', 'work', 'ive', 'made', 'must', 'writing', 'better', 'back', 'put', 'before', 'makes', 'last', 'read', 'lot', 'go', 'going', 'another', 'something', 'may', 'reader', 'thought', 'feel', 'didnt', 'plot', 'always', 'same', 'give', 'pages', 'long', 'youre', 'characters', 'new', 'series', 'those', 'our', 'over', 'character', 'anyone', 'old', 'interesting', 'got', 'nothing', 'take', 'why', 'man', 'real', 'since', 'both', 'next', 'cant', 'between', 'different', 'stories', 'need', 'however', 'though', 'down', 'without', 'part', 'come', 'easy', 'hard', 'use', 'almost', 'doesnt', 'seems', 'times', 'help', 'quite', 'main', 'until', 'around', 'yet', 'used', 'highly', 'might', 'off', 'worth', 'once', 'keep', 'thing', 'three', 'enough', 'far', 'looking', 'rather', 'story', 'life', 'time', 'both', 'trying', 'reading', 'it', 'enjoyed', 'loved', 'buy', 'actually', 'understand', 'everything', 'everyone', 'although', 'having', 'become', 'look', 'excellent', 'enjoy', 'bit', 'point', 'learn', 'right', 'human', 'takes', 'true', 'day', 'whole', 'able', 'short', 'school', 'mr', 'liked', 'left', 'place', 'seem', 'books', 'recommend', 'wonderful', 'favorite', 'high', 'probably', 'bought', 'full', 'important', 'felt', 'year', 'during', 'gives', 'here', 'simply', 'thats', 'getting', 'set', 'write', 'second', 'sure', 'start', 'bad', 'tell', 'fact', 'couldnt', 'along', 'away', 'me', 'words', 'less', 'wanted', '5', 'series', 'live', 'often', 'done', 'kind', 'read', 'try', 'big', 'believe', 'lives', 'already', 'came', 'them', 'said', 'hope', 'myself', 'goes', 'isnt', 'information', 'someone', 'them', 'came', 'several', 'readers', 'page', 'i', 'truly', 'especially', 'least', 'anything', 'comes', 'again', 'sense', 'money', 'novel', 'novel', 'one', 'told', '2', 'others', 'took', 'story', 'books', 'wont', 'wait', 'youll', 'together', 'lost', 'job', 'novels', 'care', 'review', 'small', 'gets', 'given', 'english', 'novel', 'making', 'early', 'let', 'want', 'doing', 'style', 'chapter', 'reviews', 'went', 'movie', 'hes', 'shows', 'mind', 'using', 'interested', 'definitely', 'id', 'time', '3', 'quotthe', 'series', 'me', 'gave', 'wasnt', 'and', 'ago',
    'class', 'works', 'reason', 'writer', 'past', 'age', 'kids', 'home', 'kind', 'himself', 'fun', 'friend', 'course', 'action', '10', 'woman', 'person', 'authors', 'beginning', 'women', 'men', 'side', 'mother', 'pretty', 'today', 'parents', 'events',
    'instead', 'ending', 'detail', 'later', 'son', 'seen', 'within', 'version', 'absolutely', 'five', 'either', 'major', 'daughter', 'knew', 'case', 'girl', 'original', 'wants', 'yourself', 'throughout', 'wrote', 'view', 'half', 'cover', 'ones', 'days',
    'called', 'next', 'thinking', 'text', 'yes', 'says'
    ]

# Convert the stop words list to a broadcast variable for efficiency
uni_broadcast_stop_words = spark.sparkContext.broadcast(uni_stop_words)

# Filter out stop words
filtered_df = tokenized_df.filter(~col("word").isin(uni_broadcast_stop_words.value))

print(filtered_df.count())
filtered_df.show(20)

In [7]:
%spark.pyspark

# Count word frequencies
keyword_counts = filtered_df.groupBy("word").count()
sorted_keywords = keyword_counts.orderBy(col("count").desc())
sorted_keywords.show(100)

### Bigram

In [9]:
%spark.pyspark
# First remove stop words from the sentence before finding bigrams
broadcast_uni_stop_words = spark.sparkContext.broadcast(uni_stop_words)

def remove_single_stopwords(sentence):
    if not sentence or sentence.strip() == "":
        return sentence
    words = sentence.split()
    # Remove single-word stop words
    filtered_words = [word for word in words if word not in broadcast_uni_stop_words.value]
    return " ".join(filtered_words)

remove_single_stopwords_udf = udf(remove_single_stopwords, StringType())


# Define UDF to generate bigrams
def generate_ngrams(text, n):
    if not text or text.strip() == "":  # Handle None or empty strings
        return []
    words = text.split()
    ngrams = [' '.join(words[i:i+n]) for i in range(len(words) - n + 1)]
    return ngrams
    
bigrams_udf = udf(lambda text: generate_ngrams(text, 2), ArrayType(StringType()))


# Remove single-word stop words from the cleaned_text column
rating_df = rating_df.withColumn("cleaned_text_new", remove_single_stopwords_udf(col("cleaned_text")))
# Generate biagrams from
bigrams = rating_df.withColumn("bigrams", bigrams_udf(col("cleaned_text_new")))

# Explode 'bigrams' column to create one row per bigram
bigrams_df = bigrams.select(
    col("row_id"), 
    col("cleaned_text"),
    explode(col("bigrams")).alias("bigram")  # Explode the bigrams column
)

print(bigrams_df.count())
bigrams_df.show(5)

In [10]:
%spark.pyspark
bigram_counts = bigrams_df.groupBy("bigram").count()
sorted_bigrams = bigram_counts.orderBy(col("count").desc())
sorted_bigrams.show(100, truncate=False)

### Trigrams

In [12]:
%spark.pyspark

# Functions to remove stopwords and generate ngram are already defined in the bigrams code, stopwords are removed in bigrams code as well
trigram_udf = udf(lambda text: generate_ngrams(text, 3), ArrayType(StringType()))
trigrams = rating_df.withColumn("trigrams", trigram_udf(col("cleaned_text_new")))
trigrams_df = trigrams.select(
    col("row_id"),
    col("cleaned_text_new"),
    explode(col("trigrams")).alias("trigram")  # Explode the trigrams column
)

trigram_counts = trigrams_df.groupBy("trigram").count().orderBy(desc("count"))
trigram_counts.show(100, truncate=False)

### Top keywords
1. Themes/ Genres: family, history, war, spiritual, entertaining, mystery, death, vampire, gang, religion, art, fairy tale, love
2. Authors/Characters: robert jordan, se hinton, stephen king, F. Scott Fitzgerald, anne rice, dale carnegie, helen fielding, kurt vonnegut, lemony snicket, lord rings
3. Literary Periods/Movements: american dream, 20th century, american literature, civil war, 19th century
4. Adjective: boring, disapointed, 4 stars, blah blah blah

#### Partition by top keywords

In [15]:
%spark.pyspark
from pyspark.sql.functions import col, array, when, lit, explode, expr

# Step 1: Define the list of keywords
keywords = [
    "family", "history", "war", "spiritual", "entertaining", "mystery", "death", "vampire", "gang", "religion",
    "art", "fairy tale", "love", "robert jordan", "se hinton", "stephen king", "F. Scott Fitzgerald", "anne rice",
    "dale carnegie", "helen fielding", "kurt vonnegut", "lemony snicket", "american dream", "20th century",
    "american literature", "civil war", "19th century", "lord rings", "boring", "disapointed", "4 stars", "blah blah blah"
]

# Step 2: Create a column containing an array of matched keywords
keywords_df = rating_df.withColumn(
    "matched_keywords",
    array(*[when(col("cleaned_text_new").contains(keyword), lit(keyword)) for keyword in keywords])
)

# Step 3: Remove nulls from the `matched_keywords` array
keywords_df = keywords_df.withColumn(
    "matched_keywords",
    expr("filter(matched_keywords, x -> x IS NOT NULL)")
)

print("Count of rows before exploding by keywords and drop rows w/o keywords", keywords_df.count())

# Step 4: Explode the `matched_keywords` column to create one row per keyword
keywords_df = keywords_df.withColumn("keyword", explode(col("matched_keywords")))

print("Count of rows after exploding by keywords", keywords_df.count())
print("Number of unique books with review containing top keywords", keywords_df.select("Title").distinct().count())
keywords_df.show(10)

In [16]:
%spark.pyspark
# Drop duplicates and only select the needed columns
result_df = keywords_df.dropDuplicates(["Title", "review/score", "emotion", "keyword"])
result_df = result_df.select("Title", "review/score", "emotion", "keyword")
print("Count of rows after droping duplicates:", result_df.count())
print("Number of unique books with review containing top keywords", result_df.select("Title").distinct().count())
result_df.show(20)

In [17]:
%spark.pyspark
# Convert review/score column to float and remove invalid ones with -1
result_df = result_df.withColumn(
    "review/score",
    when(col("review/score").cast("float").isNotNull(), col("review/score").cast("float"))
    .otherwise(-1.0)
)

# Save the result
output_path = "/user/tl4151_nyu_edu/books_result_1209"
result_df.write.mode("overwrite").partitionBy("emotion", "keyword", "review/score").parquet(output_path)

### Emotion distribution

In [19]:
%spark.pyspark
emotion_distribution = rating_df.groupBy("emotion").count().orderBy("count", ascending=False)
emotion_distribution.show()

### Review/Score Distribution

In [21]:
%spark.pyspark
from pyspark.sql.functions import col, when

# Convert review/score datatype from string to float
rating_df = rating_df.withColumn(
    "review/score",
    when(col("review/score").cast("float").isNotNull(), col("review/score").cast("float"))
    .otherwise(-1.0)
)

# Show the result
rating_df.show()

In [22]:
%spark.pyspark
review_score_distribution = rating_df.groupBy("review/score").count().orderBy("count", ascending=False)
review_score_distribution.show()

### Emotion vs Rating Distribution

#### 1. Distribution of Rating for each Emotion

In [25]:
%spark.pyspark
rating_distribution_by_emotion = result_df.groupBy("emotion", "review/score").count().orderBy("emotion", "review/score")

# Show the results for each emotion
emotions = result_df.select("emotion").distinct().collect()
for emotion_row in emotions:
    emotion = emotion_row["emotion"]
    print(f"Distribution of Ratings for Emotion: {emotion}")
    rating_distribution_by_emotion.filter(col("emotion") == emotion).show()

#### 2. Distribution of Emotion for each Rating

In [27]:
%spark.pyspark
emotion_distribution_by_rating = result_df.groupBy("review/score", "emotion").count().orderBy("review/score", "emotion")

# Show the results for each rating
ratings = result_df.select("review/score").distinct().collect()
for rating_row in ratings:
    rating = rating_row["review/score"]
    print(f"Distribution of Emotions for Rating: {rating}")
    emotion_distribution_by_rating.filter(col("review/score") == rating).show()

#### 3. Most Frequent Top Keywords in each Emotion

In [29]:
%spark.pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Group by emotion and keyword to count occurrences
keyword_distribution = result_df.groupBy("emotion", "keyword").count()

# Use a window to rank the keywords for each emotion by count
window_spec = Window.partitionBy("emotion").orderBy(col("count").desc())

# Add a rank column and filter for the top 10 keywords per emotion
most_frequent_keywords = keyword_distribution.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") <= 10)

# Show the top 10 keywords for each emotion
print("Top 10 Keywords for Each Emotion")
most_frequent_keywords.orderBy("emotion", "rank").show(60, truncate=False)  # Show all 6*10=60 rows

In [30]:
%spark.pyspark
# Calculate table to plot a heapmap that shows the relationship between top keywords and ratings
from pyspark.sql.functions import col, count, sum as _sum

# Generate table to plot the heapmap
top_keywords = [
    "family", "history", "war", "spiritual", "entertaining", "mystery", "death", "vampire", "gang", "religion", "art", 
    "fairy tale", "love", "robert jordan", "se hinton", "stephen king", "F. Scott Fitzgerald", "anne rice", "dale carnegie", 
    "helen fielding", "kurt vonnegut", "lemony snicket", "lord rings", "american dream", "20th century", "american literature", 
    "civil war", "19th century", "boring", "disapointed", "4 stars", "blah blah blah"
]

# Filter the DataFrame for top keywords
filtered_df = result_df.filter(col("keyword").isin(top_keywords))

# Group by keyword and review score, and calculate counts
grouped_df = filtered_df.groupBy("keyword", "review/score").agg(count("*").alias("count"))

# Calculate total counts per keyword
total_counts_df = grouped_df.groupBy("keyword").agg(_sum("count").alias("total_count"))

# Join to calculate percentages
percentage_df = grouped_df.join(total_counts_df, "keyword")
percentage_df = percentage_df.withColumn("percentage", (col("count") / col("total_count")) * 100)
percentage_df.write.mode("overwrite").csv("/user/tl4151_nyu_edu/heatmap_data_new.csv", header=True)

In [31]:
%spark.pyspark
percentage_df.show(5)

In [32]:
%spark.pyspark
