In [1]:
import os

print("Workspace:", os.listdir("/workspace"))
print("Data:", os.listdir("/workspace/data"))

Workspace: ['data', 'notebooks']
Data: ['categories.json', 'podcasts.json', 'reviews.json']


In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PodcastPopularity")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config(
        "spark.jars.packages",
        "com.johnsnowlabs.nlp:spark-nlp_2.12:6.2.0"
    )
    .getOrCreate()
)

print("Spark version:", spark.version)
print("Test RDD count:", spark.sparkContext.parallelize(range(10)).count())

/opt/spark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-50e39681-7d96-4823-afaf-f35bf26efdae;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;6.2.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in central
	found com.g

Spark version: 3.5.0


[Stage 0:>                                                          (0 + 8) / 8]

Test RDD count: 10


                                                                                

In [3]:
reviews_df    = spark.read.json("/workspace/data/reviews.json")
podcasts_df   = spark.read.json("/workspace/data/podcasts.json")
categories_df = spark.read.json("/workspace/data/categories.json")

print("Reviews rows:", reviews_df.count())
print("Podcasts rows:", podcasts_df.count())
print("Categories rows:", categories_df.count())

25/11/25 04:45:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Reviews rows: 5607021


                                                                                

Podcasts rows: 2077665




Categories rows: 3706368


                                                                                

In [4]:
reviews_df.printSchema()
podcasts_df.printSchema()
categories_df.printSchema()

root
 |-- author_id: string (nullable = true)
 |-- content: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- podcast_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- title: string (nullable = true)

root
 |-- author: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- description: string (nullable = true)
 |-- itunes_id: string (nullable = true)
 |-- itunes_url: string (nullable = true)
 |-- podcast_id: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- scraped_at: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- title: string (nullable = true)

root
 |-- category: string (nullable = true)
 |-- itunes_id: string (nullable = true)
 |-- podcast_id: string (nullable = true)



In [5]:
from pyspark.sql import functions as F
import pandas as pd

# Make Pandas tables easier to read
pd.set_option("display.max_colwidth", 120)

# -------------------------------------------
# 1. Aggregate reviews at podcast level
# -------------------------------------------
podcast_reviews = (
    reviews_df
    .groupBy("podcast_id")
    .agg(
        F.count("*").alias("num_reviews"),
        F.avg("rating").alias("avg_review_rating_from_reviews")
    )
)

# -------------------------------------------
# 2. Core podcast info
# -------------------------------------------
podcast_core = (
    podcasts_df
    .select(
        "podcast_id",
        "title",
        "description",
        F.col("average_rating").alias("avg_rating_platform"),
        F.col("ratings_count").cast("long").alias("rating_count")
    )
)

# -------------------------------------------
# 3. Join podcasts with review aggregates
# -------------------------------------------
podcast_with_engagement = (
    podcast_core
    .join(podcast_reviews, on="podcast_id", how="left")
    .fillna({"num_reviews": 0})
)

# -------------------------------------------
# 4. Clean subset: only podcasts with ratings
#    and ensure description_length is not null
# -------------------------------------------
podcast_clean = (
    podcast_with_engagement
    .where(
        F.col("avg_rating_platform").isNotNull() &
        F.col("rating_count").isNotNull()
    )
)

podcast_clean = (
    podcast_clean
    .withColumn("description", F.coalesce("description", F.lit("")))
    .withColumn("description_length", F.length(F.col("description")))
)

print("Sample of clean podcasts:")
podcast_clean.show(5, truncate=False)
print("Clean podcasts:", podcast_clean.count())

# -------------------------------------------
# 5. Attach category information
# -------------------------------------------
category_lookup = (
    categories_df
    .select(
        "podcast_id",
        F.col("category").alias("category_name"),
        F.col("itunes_id").alias("category_itunes_id")
    )
    .dropDuplicates(["podcast_id", "category_name"])
)

podcast_final = (
    podcast_clean
    .join(category_lookup, on="podcast_id", how="left")
)

print("Sample of podcast_final:")
podcast_final.show(5, truncate=False)
print("Podcast_final rows:", podcast_final.count())

# -------------------------------------------
# 6. Category-level engagement summary
# -------------------------------------------
category_engagement = (
    podcast_final
    .groupBy( "category_name")
    .agg(
        F.countDistinct("podcast_id").alias("num_podcasts"),
        F.sum("num_reviews").alias("total_reviews"),
        F.sum("rating_count").alias("total_ratings_recorded"),
        F.avg("avg_rating_platform").alias("avg_platform_rating")
    )
    .orderBy(F.desc("total_reviews"))
)


# -------------------------------------------
# 7. Tidy category view: drop nulls and
#    require at least 5 podcasts per category
# -------------------------------------------
category_engagement_clean = (
    category_engagement
    .where(F.col("category_name").isNotNull())
)

category_engagement_filtered = (
    category_engagement_clean
    .where(F.col("num_podcasts") >= 5)
    .orderBy(F.desc("total_reviews"))
)

print("Filtered category_engagement (top 20):")
category_engagement_filtered.show(20, truncate=False)

# -------------------------------------------
# 8. Pretty Pandas table for top 20 categories
# -------------------------------------------
cat_top = category_engagement_filtered.limit(20).toPandas()
cat_top


Sample of clean podcasts:


                                                                                

+--------------------------------+---------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------+-----------+------------------------------+----

                                                                                

Clean podcasts: 81842
Sample of podcast_final:


                                                                                

+--------------------------------+---------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------+-----------+------------------------------+----

                                                                                

Podcast_final rows: 171009
Filtered category_engagement (top 20):


                                                                                

+---------------------------------+------------+-------------+----------------------+-------------------+
|category_name                    |num_podcasts|total_reviews|total_ratings_recorded|avg_platform_rating|
+---------------------------------+------------+-------------+----------------------+-------------------+
|business                         |10535       |119          |131304                |4.787271001423824  |
|education                        |10148       |108          |123105                |4.632035869136777  |
|health-fitness                   |7243        |59           |102974                |4.695015877398876  |
|education-self-improvement       |1333        |59           |24694                 |4.840585146286569  |
|arts-visual-arts                 |1815        |55           |22980                 |4.728815426997244  |
|business-investing               |1925        |42           |27285                 |4.746025974025973  |
|education-how-to                 |709        

                                                                                

Unnamed: 0,category_name,num_podcasts,total_reviews,total_ratings_recorded,avg_platform_rating
0,business,10535,119,131304,4.787271
1,education,10148,108,123105,4.632036
2,health-fitness,7243,59,102974,4.695016
3,education-self-improvement,1333,59,24694,4.840585
4,arts-visual-arts,1815,55,22980,4.728815
5,business-investing,1925,42,27285,4.746026
6,education-how-to,709,42,10877,4.707757
7,society-culture,12652,39,163138,4.750277
8,comedy,6449,29,82668,4.814808
9,sports,5789,25,77852,4.751961


In [6]:
print("podcast_clean:", podcast_clean.count())
print("distinct podcast_ids in podcast_final:",
      podcast_final.select("podcast_id").distinct().count())


                                                                                

podcast_clean: 81842




distinct podcast_ids in podcast_final: 81842


                                                                                

In [9]:
from pyspark.sql import functions as F

# ---------- 1. Basic null / distinct stats per file ----------

def podcast_id_stats(df, name):
    total = df.count()
    nulls = df.filter(F.col("podcast_id").isNull()).count()
    non_null = total - nulls
    distinct_ids = df.select("podcast_id").distinct().count()

    print(f"\n==== {name} ====")
    print(f"Total rows:                {total}")
    print(f"Rows with NULL podcast_id: {nulls}")
    print(f"Rows with NON-NULL id:     {non_null}")
    print(f"Distinct podcast_id values:{distinct_ids}")

    if nulls > 0:
        print(f"\nSample rows with NULL podcast_id in {name}:")
        df.filter(F.col("podcast_id").isNull()).show(5, truncate=False)

# run stats on each dataframe
podcast_id_stats(reviews_df,    "reviews_df")
podcast_id_stats(podcasts_df,   "podcasts_df")
podcast_id_stats(categories_df, "categories_df")


# ---------- 2. Check cross-dataset coverage of podcast_id ----------

# distinct ids in each
reviews_ids    = reviews_df.select("podcast_id").where(F.col("podcast_id").isNotNull()).distinct()
podcasts_ids   = podcasts_df.select("podcast_id").where(F.col("podcast_id").isNotNull()).distinct()
categories_ids = categories_df.select("podcast_id").where(F.col("podcast_id").isNotNull()).distinct()

# reviews whose podcast_id is NOT present in podcasts
missing_in_podcasts = (
    reviews_ids.alias("r")
    .join(podcasts_ids.alias("p"), on="podcast_id", how="left_anti")
)
print("\nReviews podcast_ids NOT found in podcasts_df:", missing_in_podcasts.count())
missing_in_podcasts.show(10, truncate=False)

# reviews whose podcast_id is NOT present in categories
missing_in_categories = (
    reviews_ids.alias("r")
    .join(categories_ids.alias("c"), on="podcast_id", how="left_anti")
)
print("\nReviews podcast_ids NOT found in categories_df:", missing_in_categories.count())
missing_in_categories.show(10, truncate=False)

# (optional) podcasts with no categories
podcasts_without_category = (
    podcasts_ids.alias("p")
    .join(categories_ids.alias("c"), on="podcast_id", how="left_anti")
)
print("\nPodcasts with NO entry in categories_df:", podcasts_without_category.count())
podcasts_without_category.show(10, truncate=False)


                                                                                


==== reviews_df ====
Total rows:                5607021
Rows with NULL podcast_id: 0
Rows with NON-NULL id:     5607021
Distinct podcast_id values:303911


                                                                                


==== podcasts_df ====
Total rows:                2077665
Rows with NULL podcast_id: 0
Rows with NON-NULL id:     2077665
Distinct podcast_id values:2077665


                                                                                


==== categories_df ====
Total rows:                3706368
Rows with NULL podcast_id: 0
Rows with NON-NULL id:     3706368
Distinct podcast_id values:2118882


                                                                                


Reviews podcast_ids NOT found in podcasts_df: 303880


                                                                                

+--------------------------------+
|podcast_id                      |
+--------------------------------+
|00017292b6ce7c58baf5e4b9f39a8065|
|00043fd68e8b12730c9e8be75038faa6|
|0006a430c1982e4fc338b1ab5cf011f3|
|000b7305c3495e64673b19f705ecbcaa|
|000b8cf1ab280459f30f81dcba1ada09|
|000dd6d7a4fec86c442e2ce2c5cb43ac|
|000fd1f44f43585d918f4da347c14ae1|
|0012386a5c043c3c8dea1a6249c8f281|
|0013008c799bc93c91d9973dc24a1e44|
|0014daae263a6294826c640e00fcdcc8|
+--------------------------------+
only showing top 10 rows



                                                                                


Reviews podcast_ids NOT found in categories_df: 303847


25/11/25 05:00:22 ERROR Executor: Exception in task 4.0 in stage 162.0 (TID 634)
org.apache.hadoop.fs.FSError: java.io.IOException: Input/output error
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:405)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.

Py4JJavaError: An error occurred while calling o240.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 162.0 failed 1 times, most recent failure: Lost task 0.0 in stage 162.0 (TID 630) (22a73d08c08d executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: Input/output error
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:405)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: Input/output error
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 40 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: Input/output error
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:405)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: Input/output error
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:287)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 40 more


In [None]:
from pyspark.sql import functions as F
from transformers import pipeline
import torch
import pandas as pd

pd.set_option("display.max_colwidth", 120)

# Make Spark a bit lighter for laptop
spark.conf.set("spark.sql.shuffle.partitions", "16")

# -------------------------------------------
# 1. Prepare reviews for NLP (RANDOM SAMPLE)
# -------------------------------------------
BASE_REVIEWS = (
    reviews_df
    .filter(F.col("content").isNotNull())
    .select(
        "podcast_id",
        "rating",
        F.col("content").alias("text")
    )
)

MAX_REVIEWS = 2000   # you can increase to e.g. 40000 if it runs smoothly

sample_reviews_spark = (
    BASE_REVIEWS
    .orderBy(F.rand())
    .limit(MAX_REVIEWS)
)

total_reviews = BASE_REVIEWS.count()
sample_reviews_count = sample_reviews_spark.count()
print(f"Total reviews in dataset:   {total_reviews}")
print(f"Sampled reviews for BERT:   {sample_reviews_count}")

# Pull sample to Pandas (only the sample, not the full dataset)
sample_reviews = sample_reviews_spark.toPandas()

# -------------------------------------------
# 2. Load DistilBERT sentiment model
# -------------------------------------------
device = 0 if torch.cuda.is_available() else -1  # GPU if available, else CPU
sentiment_pipe = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    device=device
)

# -------------------------------------------
# 3. Run model in batches and build sentiment scores
# -------------------------------------------
texts = sample_reviews["text"].tolist()
batch_size = 16   # reduce to 16 if you hit memory issues

labels = []
scores = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]
    outputs = sentiment_pipe(
        batch,
        truncation=True,
        max_length=256
    )
    for out in outputs:
        labels.append(out["label"])   # POSITIVE / NEGATIVE
        scores.append(out["score"])   # 0..1 confidence

sample_reviews["hf_label"] = labels
sample_reviews["hf_score_raw"] = scores

def signed_score(row):
    return row["hf_score_raw"] if row["hf_label"] == "POSITIVE" else -row["hf_score_raw"]

sample_reviews["sentiment_score"] = sample_reviews.apply(signed_score, axis=1)

print("Example scored reviews:")
display(sample_reviews[["rating", "hf_label", "sentiment_score", "text"]].head(10))


display(sample_reviews[["podcast_id", "rating", "sentiment_score"]].head(10))

# -------------------------------------------
# 4. Back to Spark and aggregate sentiment per podcast
# -------------------------------------------
sample_scored_spark = spark.createDataFrame(
    sample_reviews[["podcast_id", "rating", "sentiment_score"]]
)


podcast_sentiment = (
    sample_scored_spark
    .groupBy("podcast_id")
    .agg(
        F.avg("sentiment_score").alias("avg_sentiment"),
        F.count("*").alias("num_reviews_with_sentiment"),
        F.avg("rating").alias("avg_rating_from_text_reviews")
    )
)

podcast_sentiment = podcast_sentiment.cache()
print("podcast_sentiment rows (sample-based):", podcast_sentiment.count())
podcast_sentiment.show(5, truncate=False)

# -------------------------------------------
# 5. Join into main podcast table
#     (podcast_final must already exist from previous cell)
# -------------------------------------------
podcast_final_with_sentiment = (
    podcast_final
    .join(podcast_sentiment, on="podcast_id", how="left")
)

podcast_final_with_sentiment = podcast_final_with_sentiment.cache()

print("Sample podcasts with sentiment + engagement:")
(
    podcast_final_with_sentiment
    .select(
        "title",
        "category_name",
        "avg_rating_platform",
        "num_reviews",
        "avg_sentiment",
        "num_reviews_with_sentiment"
    )
    .orderBy(F.desc("num_reviews_with_sentiment"))
    .limit(10)
    .show(truncate=80)
)


In [None]:
from pyspark.sql import functions as F

print("Total podcasts:", podcast_final_with_sentiment.count())
print(
    "Podcasts with sentiment:",
    podcast_final_with_sentiment
        .where(F.col("num_reviews_with_sentiment").isNotNull())
        .count()
)


In [None]:
podcast_sentiment.printSchema()

In [None]:
podcast_final_with_sentiment.printSchema()

In [None]:
from pyspark.sql import functions as F

print("podcast_final schema podcast_id:", podcast_final.schema["podcast_id"].dataType)
print("podcast_sentiment schema podcast_id:", podcast_sentiment.schema["podcast_id"].dataType)

# Take one example ID that definitely exists in podcast_sentiment
example_id = podcast_sentiment.select("podcast_id").limit(1).collect()[0][0]
print("Example podcast_id from sentiment:", example_id)

# Check if that ID exists in podcast_final
exists_in_final = podcast_final.where(F.col("podcast_id") == example_id).count()
print("Rows with that id in podcast_final:", exists_in_final)


In [None]:
from pyspark.sql import functions as F

# =====================================================
# 1. Core podcast info (from podcasts_df)
# =====================================================
podcast_core = (
    podcasts_df
    .select(
        "podcast_id",
        "title",
        "description",
        F.col("average_rating").alias("avg_rating_platform"),
        F.col("ratings_count").cast("long").alias("rating_count")
    )
)

# =====================================================
# 2. Engagement from reviews (count + avg rating)
# =====================================================
podcast_reviews = (
    reviews_df
    .groupBy("podcast_id")
    .agg(
        F.count("*").alias("num_reviews"),
        F.avg("rating").alias("avg_review_rating_from_reviews")
    )
)

# =====================================================
# 3. Category lookup (1 row per podcast/category)
# =====================================================
category_lookup = (
    categories_df
    .select(
        "podcast_id",
        F.col("category").alias("category_name"),
        F.col("itunes_id").alias("category_itunes_id")
    )
    .dropDuplicates(["podcast_id", "category_name"])
)

# quick sanity check
print("podcast_core rows:    ", podcast_core.count())
print("podcast_reviews rows: ", podcast_reviews.count())
print("category_lookup rows: ", category_lookup.count())
print("podcast_sentiment rows:", podcast_sentiment.count())

# =====================================================
# 4. Build final analytic table with LEFT joins
#    (this bypasses the old podcast_final completely)
# =====================================================
podcast_analytic = (
    podcast_core
    .join(podcast_reviews, on="podcast_id", how="left")
    .join(podcast_sentiment, on="podcast_id", how="left")     # BERT sentiment
    .join(category_lookup, on="podcast_id", how="left")
)

# text feature
podcast_analytic = (
    podcast_analytic
    .withColumn("description", F.coalesce("description", F.lit("")))
    .withColumn("description_length", F.length("description"))
    .cache()
)

print("Total podcasts in analytic table:", podcast_analytic.count())

# How many podcasts actually have BERT sentiment?
podcasts_with_sent = podcast_analytic.where(
    F.col("num_reviews_with_sentiment").isNotNull()
).count()
print("Podcasts with sentiment (after joins):", podcasts_with_sent)

# Show only podcasts that DO have sentiment
(
    podcast_analytic
    .where(F.col("num_reviews_with_sentiment").isNotNull())
    .select(
        "title",
        "category_name",
        "avg_rating_platform",
        "num_reviews",
        "avg_sentiment",
        "num_reviews_with_sentiment"
    )
    .orderBy(F.desc("num_reviews_with_sentiment"))
    .limit(10)
    .show(truncate=80)
)


In [None]:
from pyspark.sql import functions as F

corr = podcast_final_restricted.select(
    "avg_sentiment",
    "avg_rating_platform"
).corr("avg_sentiment", "avg_rating_platform")

print("Correlation between sentiment and platform rating:", corr)


In [None]:
from pyspark.sql import functions as F
from transformers import pipeline
import torch
import pandas as pd

pd.set_option("display.max_colwidth", 120)

# =====================================================
# 0. Create a unified string key: podcast_key
# =====================================================

# Reviews with unified key
reviews_keyed = (
    reviews_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        "rating",
        F.col("content").alias("text")
    )
    .filter(F.col("text").isNotNull())
)

# Core podcast info with unified key
podcasts_keyed = (
    podcasts_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        "title",
        "description",
        F.col("average_rating").alias("avg_rating_platform"),
        F.col("ratings_count").cast("long").alias("rating_count")
    )
)

# Categories with unified key
categories_keyed = (
    categories_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        F.col("category").alias("category_name"),
        F.col("itunes_id").alias("category_itunes_id")
    )
    .dropDuplicates(["podcast_key", "category_name"])
)

print("reviews_keyed rows:   ", reviews_keyed.count())
print("podcasts_keyed rows:  ", podcasts_keyed.count())
print("categories_keyed rows:", categories_keyed.count())

# =====================================================
# 1. BERT sentiment on a RANDOM SAMPLE of reviews_keyed
# =====================================================

TOTAL_REVIEWS = reviews_keyed.count()
MAX_REVIEWS = 2000   # adjust upward if your laptop handles it

sample_reviews_spark = (
    reviews_keyed
    .orderBy(F.rand())
    .limit(MAX_REVIEWS)
)

sample_reviews = sample_reviews_spark.toPandas()

print(f"Total reviews in dataset:  {TOTAL_REVIEWS}")
print(f"Sampled reviews for BERT:  {len(sample_reviews)}")

device = 0 if torch.cuda.is_available() else -1
print("Device set to use", "gpu" if device == 0 else "cpu")

sentiment_pipe = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    device=device
)

texts = sample_reviews["text"].tolist()
batch_size = 16

labels = []
scores = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]
    outputs = sentiment_pipe(
        batch,
        truncation=True,
        max_length=256
    )
    for out in outputs:
        labels.append(out["label"])
        scores.append(out["score"])

sample_reviews["hf_label"] = labels
sample_reviews["hf_score_raw"] = scores

def signed_score(row):
    return row["hf_score_raw"] if row["hf_label"] == "POSITIVE" else -row["hf_score_raw"]

sample_reviews["sentiment_score"] = sample_reviews.apply(signed_score, axis=1)

print("Example scored reviews:")
display(sample_reviews[["rating", "hf_label", "sentiment_score", "text"]].head(10))

# =====================================================
# 2. Back to Spark: sentiment per podcast_key
# =====================================================

sample_scored_spark = spark.createDataFrame(
    sample_reviews[["podcast_key", "rating", "sentiment_score"]]
)

podcast_sentiment_keyed = (
    sample_scored_spark
    .groupBy("podcast_key")
    .agg(
        F.avg("sentiment_score").alias("avg_sentiment"),
        F.count("*").alias("num_reviews_with_sentiment"),
        F.avg("rating").alias("avg_rating_from_text_reviews")
    )
).cache()

print("podcast_sentiment_keyed rows:", podcast_sentiment_keyed.count())
podcast_sentiment_keyed.show(5, truncate=False)

# =====================================================
# 3. Engagement from all reviews (using same key)
# =====================================================

podcast_reviews_keyed = (
    reviews_keyed
    .groupBy("podcast_key")
    .agg(
        F.count("*").alias("num_reviews"),
        F.avg("rating").alias("avg_review_rating_from_reviews")
    )
)

print("podcast_reviews_keyed rows:", podcast_reviews_keyed.count())

# =====================================================
# 4. Final analytic table (ALL joins on podcast_key)
# =====================================================

podcast_analytic = (
    podcasts_keyed
    .join(podcast_reviews_keyed, on="podcast_key", how="left")
    .join(podcast_sentiment_keyed, on="podcast_key", how="left")
    .join(categories_keyed, on="podcast_key", how="left")
)

podcast_analytic = (
    podcast_analytic
    .withColumn("description", F.coalesce("description", F.lit("")))
    .withColumn("description_length", F.length("description"))
    .cache()
)

print("Total podcasts in analytic table:", podcast_analytic.count())

podcasts_with_sent = podcast_analytic.where(
    F.col("num_reviews_with_sentiment").isNotNull()
).count()
print("Podcasts with sentiment (after joins):", podcasts_with_sent)

# show only rows that actually HAVE sentiment
(
    podcast_analytic
    .where(F.col("num_reviews_with_sentiment").isNotNull())
    .select(
        "title",
        "category_name",
        "avg_rating_platform",
        "num_reviews",
        "avg_sentiment",
        "num_reviews_with_sentiment"
    )
    .orderBy(F.desc("num_reviews_with_sentiment"))
    .limit(10)
    .show(truncate=80)
)


In [None]:
from pyspark.sql import functions as F
from transformers import pipeline
import torch
import pandas as pd

pd.set_option("display.max_colwidth", 120)

# =====================================================
# 1. Build a "reviews WITH metadata" DataFrame
#    (joins on podcast_key, then filter for non-null meta)
# =====================================================

# Join reviews with podcast + category meta
reviews_with_meta = (
    reviews_keyed.alias("r")
    .join(
        podcasts_keyed.select(
            "podcast_key",
            "title",
            "description",
            "avg_rating_platform",
            "rating_count"
        ).alias("p"),
        on="podcast_key",
        how="inner"
    )
    .join(
        categories_keyed.select(
            "podcast_key",
            "category_name",
            "category_itunes_id"
        ).alias("c"),
        on="podcast_key",
        how="left"
    )
    # keep only reviews where we actually have useful metadata
    # .where(F.col("avg_rating_platform").isNotNull())
    # .where(F.col("rating_count").isNotNull())
    # .where(F.col("category_name").isNotNull())
    # .where(F.col("text").isNotNull())
)

TOTAL_META_REVIEWS = reviews_with_meta.count()
print("Total reviews WITH metadata:", TOTAL_META_REVIEWS)

# =====================================================
# 2. SAMPLE ONLY FROM reviews_with_meta, then run BERT
# =====================================================

MAX_REVIEWS = 2000  # adjust if your machine can handle more

sample_reviews_spark = (
    reviews_with_meta
    .orderBy(F.rand())      # still random, but from the meta subset
    .limit(MAX_REVIEWS)
)

# keep all useful columns (podcast_key, rating, text, plus meta if you want)
sample_reviews = sample_reviews_spark.toPandas()

print(f"Sampled reviews for BERT (with metadata): {len(sample_reviews)}")

device = 0 if torch.cuda.is_available() else -1
print("Device set to use", "gpu" if device == 0 else "cpu")

sentiment_pipe = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    device=device
)

texts = sample_reviews["text"].tolist()
batch_size = 16

labels = []
scores = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]
    outputs = sentiment_pipe(
        batch,
        truncation=True,
        max_length=256
    )
    for out in outputs:
        labels.append(out["label"])
        scores.append(out["score"])

sample_reviews["hf_label"] = labels
sample_reviews["hf_score_raw"] = scores

def signed_score(row):
    return row["hf_score_raw"] if row["hf_label"] == "POSITIVE" else -row["hf_score_raw"]

sample_reviews["sentiment_score"] = sample_reviews.apply(signed_score, axis=1)

print("Example scored reviews (with metadata):")
display(sample_reviews[[
    "podcast_key",
    "title",
    "category_name",
    "rating",
    "hf_label",
    "sentiment_score",
    "text"
]].head(10))

# =====================================================
# 3. Back to Spark: sentiment per podcast_key
#    (unchanged logic, just using the new sample_reviews)
# =====================================================

sample_scored_spark = spark.createDataFrame(
    sample_reviews[["podcast_key", "rating", "sentiment_score"]]
)

podcast_sentiment_keyed = (
    sample_scored_spark
    .groupBy("podcast_key")
    .agg(
        F.avg("sentiment_score").alias("avg_sentiment"),
        F.count("*").alias("num_reviews_with_sentiment"),
        F.avg("rating").alias("avg_rating_from_text_reviews")
    )
).cache()

print("podcast_sentiment_keyed rows:", podcast_sentiment_keyed.count())
podcast_sentiment_keyed.show(5, truncate=False)


In [None]:
from pyspark.sql import functions as F

# =====================================================
# Rebuild final analytic table: BASE = podcasts that
# actually have reviews (podcast_reviews_keyed)
# =====================================================

podcast_analytic = (
    podcast_reviews_keyed.alias("r")              # base: has at least 1 review
    .join(podcast_sentiment_keyed.alias("s"),
          on="podcast_key", how="left")           # BERT sentiment (sample)
    .join(podcasts_keyed.alias("p"),
          on="podcast_key", how="left")           # metadata (title, description, ratings)
    .join(categories_keyed.alias("c"),
          on="podcast_key", how="left")           # categories
)

podcast_analytic = (
    podcast_analytic
    .withColumn("description", F.coalesce("description", F.lit("")))
    .withColumn("description_length", F.length("description"))
    .cache()
)

print("Rows in analytic table (reviews-based):", podcast_analytic.count())
print("Distinct podcasts with any review:    ", podcast_reviews_keyed.count())

podcasts_with_sent = podcast_analytic.where(
    F.col("num_reviews_with_sentiment").isNotNull()
).count()
print("Podcasts with sentiment (after joins):", podcasts_with_sent)

# Show ONLY podcasts that actually have sentiment
(
    podcast_analytic
    .where(F.col("num_reviews_with_sentiment").isNotNull())
    .select(
        "title",
        "category_name",
        "avg_rating_platform",
        "num_reviews",
        "avg_sentiment",
        "num_reviews_with_sentiment"
    )
    .orderBy(F.desc("num_reviews_with_sentiment"))
    .limit(10)
    .show(truncate=80)
)


In [None]:
from pyspark.sql import functions as F

# ---------- Base reviews with a unified key ----------
reviews_keyed = (
    reviews_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        "rating",
        F.col("content").alias("text")
    )
    .filter(F.col("text").isNotNull())
)

# ---------- Podcast metadata with unified key ----------
podcasts_keyed = (
    podcasts_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        "title",
        "description",
        F.col("average_rating").alias("avg_rating_platform"),
        F.col("ratings_count").cast("long").alias("rating_count")
    )
)

# ---------- Categories (NOTE: column is 'podcastid', not 'podcast_id') ----------
categories_keyed = (
    categories_df
    .select(
        F.col("podcast_id").cast("string").alias("podcast_key"),
        F.col("category").alias("category_name"),
        F.col("itunes_id").alias("category_itunes_id")
    )
    .dropDuplicates(["podcast_key", "category_name"])
)

# ---------- Keep ONLY reviews belonging to podcasts that have metadata ----------
valid_podcasts = podcasts_keyed.select("podcast_key").dropDuplicates()

reviews_for_sentiment = (
    reviews_keyed
    .join(valid_podcasts, on="podcast_key", how="inner")
)

print("Total reviews (all):               ", reviews_keyed.count())
print("Reviews with podcast metadata:     ", reviews_for_sentiment.count())
print("Distinct podcasts with metadata:   ", valid_podcasts.count())


In [None]:
from transformers import pipeline
import torch
import pandas as pd

pd.set_option("display.max_colwidth", 120)

TOTAL_REVIEWS_META = reviews_for_sentiment.count()
MAX_REVIEWS = 2000   # adjust down if your laptop struggles

sample_reviews_spark = (
    reviews_for_sentiment
    .orderBy(F.rand())
    .limit(MAX_REVIEWS)
)

sample_reviews = sample_reviews_spark.toPandas()

print(f"Total reviews with metadata: {TOTAL_REVIEWS_META}")
print(f"Sampled reviews for BERT:    {len(sample_reviews)}")

device = 0 if torch.cuda.is_available() else -1
print("Device set to:", "gpu" if device == 0 else "cpu")

sentiment_pipe = pipeline(
    "sentiment-analysis",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    device=device
)

texts = sample_reviews["text"].tolist()
batch_size = 16

labels = []
scores = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]
    outputs = sentiment_pipe(
        batch,
        truncation=True,
        max_length=256
    )
    for out in outputs:
        labels.append(out["label"])
        scores.append(out["score"])

sample_reviews["hf_label"] = labels
sample_reviews["hf_score_raw"] = scores

def signed_score(row):
    return row["hf_score_raw"] if row["hf_label"] == "POSITIVE" else -row["hf_score_raw"]

sample_reviews["sentiment_score"] = sample_reviews.apply(signed_score, axis=1)

print("Example scored reviews:")
display(sample_reviews[["rating", "hf_label", "sentiment_score", "text"]].head(10))

# Back to Spark: aggregate per podcast_key
sample_scored_spark = spark.createDataFrame(
    sample_reviews[["podcast_key", "rating", "sentiment_score"]]
)

podcast_sentiment_keyed = (
    sample_scored_spark
    .groupBy("podcast_key")
    .agg(
        F.avg("sentiment_score").alias("avg_sentiment"),
        F.count("*").alias("num_reviews_with_sentiment"),
        F.avg("rating").alias("avg_rating_from_text_reviews")
    )
).cache()

print("podcast_sentiment_keyed rows:", podcast_sentiment_keyed.count())
podcast_sentiment_keyed.show(5, truncate=False)


In [None]:
# Aggregate all reviews (for engagement)
podcast_reviews_keyed = (
    reviews_keyed
    .groupBy("podcast_key")
    .agg(
        F.count("*").alias("num_reviews"),
        F.avg("rating").alias("avg_review_rating_from_reviews")
    )
)

# Build final analytic table – BASE = podcasts with review stats
podcast_analytic = (
    podcast_reviews_keyed.alias("r")
    .join(podcast_sentiment_keyed.alias("s"),
          on="podcast_key", how="left")          # BERT sentiment (subset)
    .join(podcasts_keyed.alias("p"),
          on="podcast_key", how="left")          # metadata
    .join(categories_keyed.alias("c"),
          on="podcast_key", how="left")          # categories
)

podcast_analytic = (
    podcast_analytic
    .withColumn("description", F.coalesce("description", F.lit("")))
    .withColumn("description_length", F.length("description"))
    .cache()
)

print("Rows in analytic table (reviews-based):", podcast_analytic.count())

podcasts_with_sent = podcast_analytic.where(
    F.col("num_reviews_with_sentiment").isNotNull()
).count()
print("Podcasts with sentiment (after joins):", podcasts_with_sent)

(
    podcast_analytic
    .where(F.col("num_reviews_with_sentiment").isNotNull())
    .select(
        "title",
        "category_name",
        "avg_rating_platform",
        "num_reviews",
        "avg_sentiment",
        "num_reviews_with_sentiment"
    )
    .orderBy(F.desc("num_reviews_with_sentiment"))
    .limit(10)
    .show(truncate=80)
)


In [None]:
from pyspark.sql import functions as F

# Use only podcasts that actually have sentiment
podcasts_with_sent = (
    podcast_analytic
    .where(F.col("avg_sentiment").isNotNull())
    .select(
        "avg_sentiment",
        "avg_rating_platform",
        "num_reviews",
        "rating_count",
        "description_length"
    )
)

print("Rows with sentiment:", podcasts_with_sent.count())

# Simple Pearson correlations
for col in ["avg_rating_platform", "num_reviews", "rating_count", "description_length"]:
    corr = podcasts_with_sent.stat.corr("avg_sentiment", col)
    print(f"corr(avg_sentiment, {col}) = {corr:.4f}")


In [None]:
from pyspark.sql import functions as F

category_engagement = (
    podcast_analytic
    .groupBy("category_name")
    .agg(
        F.countDistinct("podcast_key").alias("num_podcasts"),
        F.sum("num_reviews").alias("total_reviews"),
        F.sum("rating_count").alias("total_ratings_recorded"),
        F.avg("avg_rating_platform").alias("avg_platform_rating"),
        F.avg("avg_sentiment").alias("avg_sentiment_sampled")
    )
    .where("category_name IS NOT NULL")
)

print("Distinct categories:", category_engagement.count())

# Top 20 by total reviews
category_engagement.orderBy(F.desc("total_reviews")).show(20, truncate=80)

# If you want "highest engagement" as reviews per podcast:
category_engagement.withColumn(
    "reviews_per_podcast",
    F.col("total_reviews") / F.col("num_podcasts")
).orderBy(F.desc("reviews_per_podcast")).show(20, truncate=80)


In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Filter to rows that have everything we need
model_data = (
    podcast_analytic
    .where(
        (F.col("avg_rating_platform").isNotNull()) &
        (F.col("avg_sentiment").isNotNull())
    )
    .select(
        "podcast_key", "title", "category_name",
        "avg_rating_platform",
        "avg_sentiment",
        "num_reviews",
        "rating_count",
        "description_length"
    )
)

# Label: 1 = high performing, 0 = otherwise
model_data = model_data.withColumn(
    "label",
    (F.col("avg_rating_platform") >= F.lit(4.5)).cast("int")
)

feature_cols = ["avg_sentiment", "num_reviews", "rating_count", "description_length"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

model_data_assembled = assembler.transform(model_data).select("features", "label")

train_df, test_df = model_data_assembled.randomSplit([0.8, 0.2], seed=42)

print("Train rows:", train_df.count(), " Test rows:", test_df.count())

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)

preds = lr_model.transform(test_df)

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(preds)
print("Test AUC:", auc)

# Look at coefficients (rough idea of feature importance)
print("Feature order:", feature_cols)
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


VADER

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

analyzer = SentimentIntensityAnalyzer()

# Python UDF for VADER compound score
@F.udf(DoubleType())
def vader_compound(text):
    if text is None:
        return None
    s = analyzer.polarity_scores(text)
    return float(s["compound"])

# ---- choose how big a sample you want ----
# e.g. 50_000 reviews instead of 3_000 for BERT
VADER_SAMPLE_SIZE = 50000

base_reviews_vader = (
    reviews_df
      .filter(F.col("content").isNotNull())
      .select("podcast_id", "rating", "content")
      .orderBy(F.rand(42))
      .limit(VADER_SAMPLE_SIZE)
      .cache()
)

print("VADER review sample rows:", base_reviews_vader.count())

vader_scored_reviews = (
    base_reviews_vader
      .withColumn("sentiment_vader", vader_compound("content"))
)

vader_scored_reviews.show(5, truncate=80)


In [None]:
# build the same key used everywhere else
vader_keyed = (
    vader_scored_reviews
      .withColumn("podcast_key", F.concat_ws("||", "podcast_id"))
)

podcast_sentiment_vader = (
    vader_keyed
      .groupBy("podcast_key")
      .agg(
          F.avg("sentiment_vader").alias("avg_sentiment_vader"),
          F.count("*").alias("num_reviews_with_sentiment_vader"),
          F.avg("rating").alias("avg_rating_from_text_reviews_vader"),
      )
)

print("podcast_sentiment_vader rows:", podcast_sentiment_vader.count())
podcast_sentiment_vader.show(5, truncate=80)


In [None]:
podcast_analytic_vader = (
    podcast_analytic
      .join(
          podcast_sentiment_vader,
          on="podcast_key",      # <---- changed from podcast_id
          how="left"
      )
      .cache()
)

print("Rows in analytic table (reviews-based):", podcast_analytic_vader.count())

# sanity-check how many podcasts now have VADER sentiment
podcasts_with_vader = (
    podcast_analytic_vader
      .where(F.col("avg_sentiment_vader").isNotNull())
      .select("podcast_key")
      .distinct()
      .count()
)

print("Distinct podcasts with VADER sentiment:", podcasts_with_vader)

podcast_analytic_vader.select(
    "title",
    "category_name",
    "avg_rating_platform",
    "num_reviews",
    "avg_sentiment_vader",
    "num_reviews_with_sentiment_vader"
).orderBy(F.desc("num_reviews_with_sentiment_vader")).show(10, truncate=80)


In [None]:
from pyspark.sql import functions as F

# Minimum number of VADER-scored reviews you want per podcast
MIN_VADER_REVIEWS = 2

# Filter to podcasts that actually have VADER sentiment and enough reviews
podcasts_vader_filtered = (
    podcast_analytic_vader
        .where(F.col("avg_sentiment_vader").isNotNull())
        .where(F.col("num_reviews_with_sentiment_vader") >= MIN_VADER_REVIEWS)
        .where(F.col("avg_rating_platform").isNotNull())
        .where(F.col("category_name").isNotNull())
)

print("Total rows in analytic table (reviews-based):",
      podcast_analytic_vader.count())
print("Podcasts with VADER sentiment & at least",
      MIN_VADER_REVIEWS, "scored reviews:",
      podcasts_vader_filtered.count())

# Take a look at the filtered set
podcasts_vader_filtered.select(
    "title",
    "category_name",
    "avg_rating_platform",
    "num_reviews",
    "avg_sentiment_vader",
    "num_reviews_with_sentiment_vader"
).orderBy(F.desc("num_reviews_with_sentiment_vader")) \
 .show(10, truncate=80)
