# Name: Jackson Dawson
# Group: 9

# Setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

vg_df = spark.read.json("../../data_samples/Video_Games_SAMPLE.jsonl", multiLine=False)
vg_rdd = vg_df.rdd

b_df = spark.read.json("../../data_samples/Books_SAMPLE.jsonl", multiLine=False)
b_rdd = b_df.rdd

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/21 13:58:14 WARN Utils: Your hostname, MacBook-Pro-2.local, resolves to a loopback address: 127.0.0.1; using 10.41.254.17 instead (on interface en0)
25/11/21 13:58:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 13:58:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

# Analysis
**Question:** How does consumer sentiment differ between video-game products and book products?

## Merge Datasets

In [2]:
vg_df = vg_df.withColumn("category", F.lit("video_game"))
b_df = b_df.withColumn("category", F.lit("book"))

In [3]:
df = vg_df.union(b_df)

In [4]:
# Add text length column for analysis
df = df.withColumn("text_length", F.length(F.col("text")))
df = df.withColumn("title_length", F.length(F.col("title")))

rdd = df.rdd

## Side-by-Side Comparison Statistics

In [5]:
from pyspark.sql import Row
import math

# ---- 1. Map each record into (category, value_dict) ----
pair_rdd = rdd.map(lambda row: (
    row['category'],
    {
        "count": 1,
        "rating_sum": row['rating'],
        "rating_sq_sum": row['rating']**2,
        "text_sum": row['text_length'],
        "text_sq_sum": row['text_length']**2,
        "title_sum": row['title_length'],
        "helpful_sum": row['helpful_vote'],
        "verified_sum": 1 if row['verified_purchase'] else 0,
        "image_count_sum": len(row['images'])
    }
))

# ---- 2. Combine values by category ----
def combiner(v):
    return v

def merger(a, b):
    return {
        "count": a["count"] + b["count"],
        "rating_sum": a["rating_sum"] + b["rating_sum"],
        "rating_sq_sum": a["rating_sq_sum"] + b["rating_sq_sum"],
        "text_sum": a["text_sum"] + b["text_sum"],
        "text_sq_sum": a["text_sq_sum"] + b["text_sq_sum"],
        "title_sum": a["title_sum"] + b["title_sum"],
        "helpful_sum": a["helpful_sum"] + b["helpful_sum"],
        "verified_sum": a["verified_sum"] + b["verified_sum"],
        "image_count_sum": a["image_count_sum"] + b["image_count_sum"],
    }

agg = pair_rdd.combineByKey(combiner, merger, merger)

# ---- 3. Compute final statistics ----
def finalize(category, m):
    n = m["count"]
    return Row(
        category=category,
        total_reviews=n,
        avg_rating=m["rating_sum"] / n,
        stddev_rating=math.sqrt((m["rating_sq_sum"] / n) - (m["rating_sum"]/n)**2),
        avg_review_length=m["text_sum"] / n,
        stddev_review_length=math.sqrt((m["text_sq_sum"] / n) - (m["text_sum"]/n)**2),
        avg_title_length=m["title_sum"] / n,
        avg_helpful_votes=m["helpful_sum"] / n,
        total_helpful_votes=m["helpful_sum"],
        verified_purchases=m["verified_sum"],
        avg_images_per_review=m["image_count_sum"] / n
    )

final_rdd = agg.map(lambda kv: finalize(kv[0], kv[1]))

# ---- 4. Display results from RDD ----
results = final_rdd.sortBy(lambda row: row.category).collect()
for result in results:
    print(f"\nCategory: {result.category}")
    print(f"  Total Reviews: {result.total_reviews}")
    print(f"  Avg Rating: {result.avg_rating:.2f}")
    print(f"  Stddev Rating: {result.stddev_rating:.2f}")
    print(f"  Avg Review Length: {result.avg_review_length:.2f}")
    print(f"  Stddev Review Length: {result.stddev_review_length:.2f}")
    print(f"  Avg Title Length: {result.avg_title_length:.2f}")
    print(f"  Avg Helpful Votes: {result.avg_helpful_votes:.2f}")
    print(f"  Total Helpful Votes: {result.total_helpful_votes}")
    print(f"  Verified Purchases: {result.verified_purchases}")
    print(f"  Avg Images Per Review: {result.avg_images_per_review:.2f}")

                                                                                


Category: book
  Total Reviews: 50000
  Avg Rating: 4.41
  Stddev Rating: 1.07
  Avg Review Length: 421.19
  Stddev Review Length: 722.17
  Avg Title Length: 25.29
  Avg Helpful Votes: 1.84
  Total Helpful Votes: 91850
  Verified Purchases: 34746
  Avg Images Per Review: 0.03

Category: video_game
  Total Reviews: 50000
  Avg Rating: 4.05
  Stddev Rating: 1.43
  Avg Review Length: 310.42
  Stddev Review Length: 659.13
  Avg Title Length: 22.89
  Avg Helpful Votes: 1.26
  Total Helpful Votes: 63152
  Verified Purchases: 43058
  Avg Images Per Review: 0.08


In [6]:
# Additional detailed breakdowns 

print("\nRATING DISTRIBUTION COMPARISON")
print("="*80)
# Map to (category, rating) pairs and count
rating_counts = rdd.map(lambda row: ((row['category'], row['rating']), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()

# Organize by rating
from collections import defaultdict
rating_by_category = defaultdict(lambda: {'book': 0, 'video_game': 0})
for (category, rating), count in rating_counts:
    rating_by_category[rating][category] = count

print(f"{'rating':<10} {'book':<15} {'video_game':<15}")
print("-" * 40)
for rating in sorted(rating_by_category.keys()):
    counts = rating_by_category[rating]
    print(f"{rating:<10} {counts['book']:<15} {counts['video_game']:<15}")


print("\n\nREVIEW LENGTH QUARTILES")
print("="*80)
# Group text lengths by category
category_lengths = rdd.map(lambda row: (row['category'], row['text_length'])) \
    .groupByKey() \
    .mapValues(list) \
    .collect()

for category, lengths in sorted(category_lengths):
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    q1_idx = n // 4
    q2_idx = n // 2
    q3_idx = 3 * n // 4
    
    print(f"\nCategory: {category}")
    print(f"  Min Length: {min(sorted_lengths)}")
    print(f"  25th Percentile: {sorted_lengths[q1_idx]}")
    print(f"  Median: {sorted_lengths[q2_idx]}")
    print(f"  75th Percentile: {sorted_lengths[q3_idx]}")
    print(f"  Max Length: {max(sorted_lengths)}")


print("\n\nHELPFULNESS STATISTICS")
print("="*80)
# Calculate helpfulness stats by category
helpful_stats = rdd.map(lambda row: (
    row['category'],
    {
        'max_votes': row['helpful_vote'],
        'has_votes': 1 if row['helpful_vote'] > 0 else 0
    }
)).reduceByKey(lambda a, b: {
    'max_votes': max(a['max_votes'], b['max_votes']),
    'has_votes': a['has_votes'] + b['has_votes']
}).collect()

for category, stats in sorted(helpful_stats):
    print(f"\nCategory: {category}")
    print(f"  Max Helpful Votes: {stats['max_votes']}")
    print(f"  Reviews with Helpful Votes: {stats['has_votes']}")


print("\n\nVERIFIED PURCHASE ANALYSIS")
print("="*80)
# Calculate verification stats
verification = rdd.map(lambda row: (
    row['category'],
    {
        'total': 1,
        'verified': 1 if row['verified_purchase'] else 0
    }
)).reduceByKey(lambda a, b: {
    'total': a['total'] + b['total'],
    'verified': a['verified'] + b['verified']
}).collect()

for category, stats in sorted(verification):
    percentage = (stats['verified'] / stats['total']) * 100
    print(f"\nCategory: {category}")
    print(f"  Total Reviews: {stats['total']}")
    print(f"  Verified Count: {stats['verified']}")
    print(f"  Verified Percentage: {percentage:.2f}%")


RATING DISTRIBUTION COMPARISON
rating     book            video_game     
----------------------------------------
1.0        2263            6409           
2.0        1802            2721           
3.0        3566            3585           
4.0        7979            6695           
5.0        34390           30590          


REVIEW LENGTH QUARTILES

Category: book
  Min Length: 1
  25th Percentile: 75
  Median: 176
  75th Percentile: 443
  Max Length: 15405

Category: video_game
  Min Length: 0
  25th Percentile: 48
  Median: 129
  75th Percentile: 307
  Max Length: 31302


HELPFULNESS STATISTICS

Category: book
  Max Helpful Votes: 3401
  Reviews with Helpful Votes: 18157

Category: video_game
  Max Helpful Votes: 1857
  Reviews with Helpful Votes: 12714


VERIFIED PURCHASE ANALYSIS

Category: book
  Total Reviews: 50000
  Verified Count: 34746
  Verified Percentage: 69.49%

Category: video_game
  Total Reviews: 50000
  Verified Count: 43058
  Verified Percentage: 86.12%
