# Setup

In [1]:
# pyspark imports
from pyspark.sql import Window
from pyspark.sql import functions
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer

In [2]:
# regular imports
import os

In [3]:
spark = SparkSession.builder.appName('ccbd-demo').master('local').getOrCreate()
spark

24/04/26 08:47:40 WARN Utils: Your hostname, cie-B760M-DS3H-DDR4 resolves to a loopback address: 127.0.1.1; using 172.16.171.63 instead (on interface enxac15a2afb0c0)
24/04/26 08:47:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/26 08:47:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# folder paths
data_folder = "data/"
output_folder = "outputs/"
graphs_folder = "graphs/"

# file paths
dataset_path = data_folder + "anime_data.csv"

if not os.path.exists(dataset_path):
  raise Exception(f"{dataset_path} does not exist!")

In [5]:
df = spark.read.csv(dataset_path, header = True)

# Data Preprocessing

**DATASET DESCRIPTION**

| Column     | Description                                                                         |
|------------|-------------------------------------------------------------------------------------|
| username   | Username of the user who interacted with the anime (might be null if not logged in) |
| anime_id   | Unique identifier for the anime entry                                               |
| my_score   | User's personal score for the anime (might be null if not rated)                    |
| user_id    | Unique identifier for the user profile (might be null if not logged in)             |
| gender     | User's reported gender (might be null if not provided) (Female/Non-Binary/Male)     |
| title      | Official title of the anime                                                         |
| type       | Type of anime (e.g., TV, Movie, OVA, etc.)                                          |
| source     | Source material of the anime (e.g., Manga, Light Novel, etc.)                       |
| score      | Average score for the anime from all users (might be null if not enough ratings)    |
| scored_by  | Number of users who rated the anime (might be null)                                 |
| rank       | Anime's ranking based on either score or popularity (might be null)                 |
| popularity | Anime's popularity ranking on MyAnimeList (might be null)                           |
| genre      | Genre(s) of the anime (might be a single string or comma-separated list)            |


In [None]:
# get summary + peek dataset
print("Dataset size:", df.count())
df.head(10)

## 1. NULL value inspection
- There is a problem in the 'score' column
- Before casting there are no null values
- After casting to float there are ~8000 null values
- This is due to strings like "Game" or "Visual Novel" being placed in the 'score' column
- Null values MUST be calculated after casting

In [None]:
# inspect rows where 'score' is null (before casting to float)
score_null = df.filter(df['score'].isNull())
score_null.show()

In [None]:
df.select('score').distinct().write.csv(output_folder + "distinct_scores.csv", header=True)

In [None]:
value_to_check = "Visual novel"
problem_df = df.filter(df['score'] == value_to_check)
count = problem_df.count()
print("Number of times '{}' appears in the 'score' column: {}".format(value_to_check, count))

## 2. Dealing with NULL values

**NULL VALUE COUNTS**

| Column        | Count  |
|---------------|--------|
| username      | 256    |
| anime_id      | 0      |
| my_score      | 0      |
| user_id       | 0      |
| gender        | 0      |
| title         | 0      |
| type          | 0      |
| score         | 8160   |
| scored_by     | 0      |
| rank          | 751600 |
| popularity    | 370    |
| genre         | 2267   |

In [6]:
# get the dataframe columns in their correct datatypes
df = df.withColumn("anime_id", functions.col("anime_id").cast("int")) \
		.withColumn("my_score", functions.col("my_score").cast("int")) \
  		.withColumn("user_id", functions.col("user_id").cast("int")) \
      	.withColumn("score", functions.col("score").cast("float")) \
		.withColumn("scored_by", functions.col("scored_by").cast("int")) \
		.withColumn("rank", functions.col("rank").cast("int")) \
		.withColumn("popularity", functions.col("popularity").cast("int"))

# round the 'score' column to 2 decimal places
df = df.withColumn("score", functions.round(functions.col("score"), 2))

df.head(10)

[Row(username='karthiga', anime_id=21, my_score=9, user_id=2255153, gender='Female', title='One Piece', type='TV', source='Manga', score=8.539999961853027, scored_by=423868, rank=91, popularity=35, genre='Action, Adventure, Comedy, Super Power, Drama, Fantasy, Shounen'),
 Row(username='karthiga', anime_id=59, my_score=7, user_id=2255153, gender='Female', title='Chobits', type='TV', source='Manga', score=7.53000020980835, scored_by=175388, rank=1546, popularity=188, genre='Sci-Fi, Comedy, Drama, Romance, Ecchi, Seinen'),
 Row(username='karthiga', anime_id=74, my_score=7, user_id=2255153, gender='Female', title='Gakuen Alice', type='TV', source='Manga', score=7.769999980926514, scored_by=33244, rank=941, popularity=1291, genre='Comedy, School, Shoujo, Super Power'),
 Row(username='karthiga', anime_id=120, my_score=7, user_id=2255153, gender='Female', title='Fruits Basket', type='TV', source='Manga', score=7.769999980926514, scored_by=167968, rank=939, popularity=222, genre='Slice of Life

In [None]:
# count the number of null values in each column
null_counts = {}

for col in df.columns:
	null_counts[col] = df.filter(functions.col(col).isNull()).count()

print(null_counts)

- Despite the 'score' column having ~8000 null values
- We will not drop records due to that
- Because no EDA task requires us to use the average score

In [None]:
# inspect rows where 'score' is null
score_null = df.filter(df['score'].isNull())
score_null.show()

In [7]:
# drop rows with null values in any column (other than 'rank')
clean_df = df.na.drop(subset = ["username", "popularity", "genre"])

In [None]:
# check if and how many titles were lost due to above
unique_animes_before = df.select("anime_id").distinct().count()
unique_animes_after = clean_df.select("anime_id").distinct().count()
unique_percentage_change = ((unique_animes_before - unique_animes_after) / unique_animes_before) * 100

print(f"Total unique anime titles: {unique_animes_before}")
print(f"Number of unique animes lost after NA drop: {unique_animes_before - unique_animes_after}")
print(f"Percentage change in unique titles after NA drop: {unique_percentage_change}")

# Exploratory Data Analysis

## 1. The most popular animes according to users

In [9]:
task_1_path = output_folder + "task_1.csv"

In [10]:
# get the top 50 popular anime titles based on their 'popularity'
# explanation:
#   - select rows where 'popularity' > 0
#   - remove duplicate rows based on 'title'
#   - sort in ascending order

top_50_popular = clean_df.filter(functions.col("popularity") > 0).dropDuplicates(["title"]).orderBy("popularity").limit(50)
top_50_popular = top_50_popular.select("anime_id", "title", "score", "popularity")

In [11]:
top_50_popular = top_50_popular.coalesce(1)
top_50_popular.write.csv(task_1_path, header=True)

                                                                                

## 2. The most popular anime for every genre

In [10]:
task_2_path = output_folder + "task_2.csv"

In [13]:
exploded_genres_df = clean_df.withColumn("genre", F.explode(F.split("genre", ", ")))
no_distinct_genres = exploded_genres_df.select("genre").distinct().count()

print("No. of distinct genres:", no_distinct_genres)



No. of distinct genres: 44


                                                                                

In [8]:
# find the most popular anime for each genre
# explanation:
#     - split the 'genre' column into multiple rows (using explode)
#     - partition the df by genre and order by popularity
#     - assign rank to each partition (rank = 1 => highest popularity for that genre)

genre_exploded = clean_df.withColumn("genre", F.explode(F.split("genre", ", "))).select("genre", "popularity", "title")

window = Window.partitionBy("genre").orderBy(F.col("popularity"))
ranked = genre_exploded.withColumn("rank", F.dense_rank().over(window))
max_popularity_by_genre = ranked.filter(F.col("rank") == 1).select("genre", "popularity", "title")

most_popular_by_genre = max_popularity_by_genre.dropDuplicates()

most_popular_by_genre.show()




+------------+----------+--------------------+
|       genre|popularity|               title|
+------------+----------+--------------------+
|      Seinen|         5|       One Punch Man|
|    Dementia|        48|Neon Genesis Evan...|
|     Romance|         3|    Sword Art Online|
|       Magic|         4|Fullmetal Alchemi...|
|       Josei|       256|          Usagi Drop|
|    Thriller|         1|          Death Note|
|   Adventure|         3|    Sword Art Online|
| Super Power|         2|  Shingeki no Kyojin|
|Martial Arts|        10|              Naruto|
|      Sports|        85|    Kuroko no Basket|
|      Shoujo|        69|Ouran Koukou Host...|
|       Drama|         2|  Shingeki no Kyojin|
|   Shoujo Ai|       380|           Yuru Yuri|
|      School|         7|        Angel Beats!|
|      Hentai|      1057|        Boku no Pico|
|    Military|         2|  Shingeki no Kyojin|
|     Samurai|        67|    Samurai Champloo|
|        Yaoi|      1057|        Boku no Pico|
|     Fantasy

                                                                                

In [14]:
most_popular_by_genre = most_popular_by_genre.coalesce(1)
most_popular_by_genre.write.csv(task_2_path, header=True)

                                                                                

## 3. The most popular anime genres for every gender

In [16]:
task_3_path = output_folder + "task_3.csv"

In [17]:
# split genre column and put each genre in its own row
genre_gender_exploded = clean_df.withColumn("genre", F.explode(F.split("genre", ", "))).select("genre", "gender")

# count instances of each (genre, gender) combination
genre_gender_counts = genre_gender_exploded.groupBy("genre", "gender").agg(F.count("*").alias("count"))

In [18]:
# for each gender, find top 5 genres
window = Window.partitionBy("gender").orderBy(F.desc("count"))
ranked = genre_gender_counts.withColumn("rank", F.dense_rank().over(window))
top_genres_by_gender = ranked.filter(F.col("rank") <= 5) \
                              .select("gender", "genre", "count") \
                              .orderBy("gender", "rank")

# display top 5 genres for Female, Male & Non-Binary
top_genres_by_gender.show(truncate=False)



+----------+-------+--------+
|gender    |genre  |count   |
+----------+-------+--------+
|Female    |Comedy |4671909 |
|Female    |Action |3482950 |
|Female    |Drama  |3102596 |
|Female    |Romance|3058903 |
|Female    |Fantasy|2477082 |
|Male      |Comedy |12438080|
|Male      |Action |10506368|
|Male      |Romance|7422895 |
|Male      |Drama  |6713172 |
|Male      |Fantasy|6232376 |
|Non-Binary|Comedy |112258  |
|Non-Binary|Action |86008   |
|Non-Binary|Drama  |65602   |
|Non-Binary|Romance|62128   |
|Non-Binary|School |57705   |
+----------+-------+--------+



                                                                                

In [19]:
top_genres_by_gender = top_genres_by_gender.coalesce(1)
top_genres_by_gender.write.csv(task_3_path, header=True)

                                                                                

## 4. Power users (users with a high number of ratings)

In [20]:
task_4_path = output_folder + "task_4.csv"

In [21]:
# no. of ratings for each user
# get top 50 power users (using limit)
user_rating_counts = clean_df.select("user_id", "username") \
                       .groupBy("user_id", "username") \
                       .agg(F.count("*").alias("rating_count")) \
                       .orderBy(F.desc("rating_count")).limit(50)

# show results (top 50)
user_rating_counts.show()



+-------+----------------+------------+
|user_id|        username|rating_count|
+-------+----------------+------------+
|   8669|     spacecowboy|        8739|
|1245229|      TsukasaKei|        8739|
|4561255|         uemmega|        8588|
|3979333|          Exxorn|        8558|
|1237755|    DeadlyKizuna|        8153|
|1381655|          xbhrjd|        8078|
|1636745|      JakCooper2|        7894|
|2063865|         De_Baer|        7412|
| 291713|      Dedzapadlo|        7235|
| 805623|   DesireDestiny|        7010|
|4042345|       ComfyLoli|        6705|
| 132251|            canc|        6605|
|1283539|      Dragonflyk|        6535|
| 337383|VincentHarkonnen|        6511|
|4328447|     KanaenuYume|        6423|
| 186731|     NightTerror|        6407|
|  98846|        coty9090|        6292|
| 216713|           Cafer|        6285|
|2476641|         Tsutaee|        6281|
|1287643|      AngelShiva|        6049|
+-------+----------------+------------+
only showing top 20 rows



                                                                                

In [22]:
user_rating_counts = user_rating_counts.coalesce(1)
user_rating_counts.write.csv(task_4_path, header=True)

                                                                                

## 5. Anime titles with the highest number of user ratings

In [23]:
task_5_path = output_folder + "task_5.csv"

In [24]:
top_50_by_user_ratings = clean_df.filter(functions.col("scored_by") > 0).dropDuplicates(["title"]).orderBy(functions.col("scored_by").desc()).limit(50)
top_50_by_user_ratings = top_50_by_user_ratings.select("anime_id", "title", "scored_by")
top_50_by_user_ratings.show()



+--------+--------------------+---------+
|anime_id|               title|scored_by|
+--------+--------------------+---------+
|    1535|          Death Note|  1009477|
|   16498|  Shingeki no Kyojin|   940211|
|   11757|    Sword Art Online|   915986|
|    5114|Fullmetal Alchemi...|   733592|
|   30276|       One Punch Man|   691845|
|   22319|         Tokyo Ghoul|   659308|
|      20|              Naruto|   648605|
|    6547|        Angel Beats!|   641851|
|    1575|Code Geass: Hangy...|   627740|
|   19815|     No Game No Life|   623227|
|   10620|    Mirai Nikki (TV)|   592994|
|    9253|         Steins;Gate|   563857|
|    4224|           Toradora!|   557898|
|    2904|Code Geass: Hangy...|   543904|
|   21881| Sword Art Online II|   531486|
|    9919|      Ao no Exorcist|   521881|
|     226|          Elfen Lied|   514656|
|   20507|            Noragami|   502213|
|     199|Sen to Chihiro no...|   498602|
|   31964|Boku no Hero Acad...|   494037|
+--------+--------------------+---

                                                                                

In [25]:
top_50_by_user_ratings = top_50_by_user_ratings.coalesce(1)
top_50_by_user_ratings.write.csv(task_5_path, header=True)

                                                                                

## 6. Checking for a relationship between an anime's source (e.g., manga, light novel original) and its score

### Why use eta value here?
- Our objective is to investigate if an anime's source is related to its overall score, so apart from a scatterplot, it would be nice to have some sort of a statistical measure for the strength of the relationship between these variables.
- Since we want to find the correlation between the 'source' column (independent nominal variable) and the 'score' column (dependent scale variable), we can not use a traditional correlation metric like Pearson's coefficient.
- A suitable candidate to measure this nominal-by-interval association would be eta correlation. Eta is a coefficient of nonlinear association, and requires that the dependent variable be interval in level, and the independent variable be categorical (nominal, ordinal, or grouped interval).

| Eta value       | Association       |
|-----------------|-------------------|
| < 0.20          | No association    |
| 0.21 - 0.40     | Weak association  |
| 0.41 - 0.70     | Medium association|
| > 0.70          | Strong association|


In [26]:
# convert anime source -> index (number)
string_indexer = StringIndexer(inputCol="source", outputCol="source_indexed")
indexed_df = string_indexer.fit(clean_df).transform(clean_df)

# group means and grand mean
group_means = indexed_df.groupBy("source_indexed").agg(F.mean("score").alias("group_mean"))
grand_mean = indexed_df.agg(F.mean("score")).collect()[0][0]

# sum of suqared differences (ssw)
ssw_per_group = indexed_df.join(group_means, "source_indexed") \
                          .withColumn("deviation", F.col("score") - F.col("group_mean")) \
                          .groupBy("source_indexed") \
                          .agg(F.sum(F.pow("deviation", 2)).alias("ssw"))
ssw = ssw_per_group.agg(F.sum("ssw")).collect()[0][0]

# sum of squared differences bw groups (ssb)
ssb = group_means.join(indexed_df, "source_indexed") \
                  .groupBy("source_indexed") \
                  .agg((F.pow(F.first("group_mean") - grand_mean, 2) * F.count("*")).alias("ssb")) \
                  .agg(F.sum("ssb").alias("ssb_sum")).collect()[0]["ssb_sum"]

# eta correlation = sqrt(ssb / (ssb + ssw))
eta_correlation = (ssb / (ssb + ssw)) ** 0.5
print(f"Eta correlation between anime source and average user score: {eta_correlation}")

[Stage 79:>                                                         (0 + 1) / 1]

Eta correlation between anime source and average user score: 0.31289410349397284


                                                                                

According to the table above, we can conclude that an anime's source and its average score are **WEAKLY CORRELATED**.