In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [0]:
spark = SparkSession.builder.appName("Read CSV").getOrCreate()

In [0]:
df_animes = spark.sql("SELECT * FROM default.animes")
df_profiles = spark.sql("SELECT * FROM default.profiles")
df_reviews = spark.sql("SELECT * FROM default.reviews")

In [0]:
df_animes = df_animes.drop("img_url", "link")\
                    .withColumnRenamed('uid', 'anime_uid')\
                    .na.drop()
df_profiles = df_profiles.drop("link")\
                        .na.drop()
df_reviews = df_reviews.drop("link")\
                    .withColumnRenamed('score', 'user_score')\
                    .withColumnRenamed('scores', 'user_individual_scores')\
                    .na.drop()


In [0]:
df_animes.show()
df_profiles.show()
df_reviews.show()

+---------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|anime_uid|               title|            synopsis|               genre|  aired from| aired to|popularity|ranked|score|
+---------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|    28891|Haikyuu!! Second ...|Following their p...|['Comedy', 'Sport...| Oct 4, 2015|27-Mar-16|       141|    25| 8.82|
|    23273|Shigatsu wa Kimi ...|Music accompanies...|['Drama', 'Music'...|Oct 10, 2014|20-Mar-15|        28|    24| 8.83|
|    34599|       Made in Abyss|The Abyss—a gapin...|['Sci-Fi', 'Adven...| Jul 7, 2017|29-Sep-17|        98|    23| 8.83|
|     5114|Fullmetal Alchemi...|"In order for som...|['Action', 'Milit...| Apr 5, 2009|04-Jul-10|         4|     1| 9.23|
|    37510|   Mob Psycho 100 II|Shigeo "Mob" Kage...|['Action', 'Slice...| Jan 7, 2019|01-Apr-19|       176|    21| 8.89|
|    38000|    Kimetsu n

I want to build a model to find the users' favorite genres based on age group.
For this I'm going to use the profiles table which has users' dates of birth and their favorite animes' uid. 
And I'm going to use the Animes table which has the animes' uid and the genres that each anime falls under.

So let's start by preprocessing the profiles table.
I'm going to start by converting the birthday column to date type & Extract year of birth.

In [0]:
from pyspark.sql.functions import to_date, year, current_date, col, when

# Convert birthday column to date type & Extract year of birth
df_profiles = df_profiles.withColumn('birthday', to_date(col('birthday'), 'MMM d, yyyy'))\
                        .withColumn('birth_year', year(col('birthday')))

# Handle missing years
current_year = year(current_date())
df_profiles = df_profiles.withColumn('birth_year', when(col('birth_year') < current_year - 100, None).otherwise(col('birth_year')))\
                        .na.drop()

In [0]:
df_profiles.show()

+---------------+------+----------+--------------------+----------+
|        profile|gender|  birthday|     favorites_anime|birth_year|
+---------------+------+----------+--------------------+----------+
| DesolatePsyche|  Male|1994-10-02|['33352', '25013'...|      1994|
|      baekbeans|Female|2000-11-10|['11061', '31964'...|      2000|
|aManOfCulture99|  Male|1999-10-30|['4181', '7791', ...|      1999|
|   NIGGER_BONER|  Male|1985-01-01|['11061', '30', '...|      1985|
|         jchang|  Male|1992-07-29|['846', '2904', '...|      1992|
|   angelsreview|Female|1989-05-26|['534', '71', '77...|      1989|
| CalebTheMenace|  Male|2004-06-29|['4181', '11617',...|      2004|
|          Eanki|  Male|1995-12-03|['27989', '28851'...|      1995|
|      NekoKyupi|  Male|2001-01-08|['34599', '4181',...|      2001|
|  OVERPOWERED99|  Male|2000-11-16|['5680', '15051',...|      2000|
|    JoJo_Stalin|  Male|2000-08-04|['1050', '227', '...|      2000|
|        Kaishuu|  Male|1999-06-10|['27899', '15

Next I'm going to create a new column age where it displays the users' age. 
And then I'm going to create another column age_group where I group users between certain ages together as children, teenagers, young adults, adults, middle-aged, seniors and elderly.

In [0]:
from pyspark.sql.functions import floor

df_profiles = df_profiles.withColumn("age", floor((current_date() - to_date("birthday", "yyyy-MM-dd")).cast("int") / 365.25))

df_profiles = df_profiles.withColumn("age_group", when(df_profiles.age.between(0, 12), "Child")\
                        .when(df_profiles.age.between(13, 17), "Teenager")
                        .when(df_profiles.age.between(18, 24), "Young Adult")
                        .when(df_profiles.age.between(25, 34), "Adult")
                        .when(df_profiles.age.between(35, 44), "Middle-aged Adult")
                        .when(df_profiles.age.between(45, 54), "Senior Adult")
                        .when(df_profiles.age >= 55, "Elderly")
                        .otherwise("Unknown")
                    )

Now we have users' ages and their age groups.
Following this, I'm going to convert the favorites_anime column's data type from string type to an array of integers. Because the anime_uid in animes table is of type integer.

In [0]:
from pyspark.sql.functions import split, regexp_replace, col, when

# Split values in favorites_anime column by comma
df_profiles = df_profiles.withColumn("favorites_anime", split(regexp_replace(col("favorites_anime"), "[\[\]']", ""), ","))
# Convert values in favorites_anime column to integer
df_profiles = df_profiles.withColumn("favorites_anime", col("favorites_anime").cast("array<int>"))

df_profiles.show()

+---------------+------+----------+--------------------+----------+---+-----------------+
|        profile|gender|  birthday|     favorites_anime|birth_year|age|        age_group|
+---------------+------+----------+--------------------+----------+---+-----------------+
| DesolatePsyche|  Male|1994-10-02|[33352, 25013, 55...|      1994| 28|            Adult|
|      baekbeans|Female|2000-11-10|[11061, 31964, 85...|      2000| 22|      Young Adult|
|aManOfCulture99|  Male|1999-10-30|[4181, 7791, 9617...|      1999| 23|      Young Adult|
|   NIGGER_BONER|  Male|1985-01-01|[11061, 30, 6594,...|      1985| 38|Middle-aged Adult|
|         jchang|  Male|1992-07-29|[846, 2904, 5114,...|      1992| 30|            Adult|
|   angelsreview|Female|1989-05-26|[534, 71, 7724, 8...|      1989| 33|            Adult|
| CalebTheMenace|  Male|2004-06-29|[4181, 11617, 173...|      2004| 18|      Young Adult|
|          Eanki|  Male|1995-12-03|[27989, 28851, 17...|      1995| 27|            Adult|
|      Nek

Next we are going to preprocess the animes table. I'm going to repeat the same process that I did for the favorites_anime column in the profiles table.
I'm going to convert the genre column type frol string to an array of strings.

In [0]:
# Split values in genre column by comma
df_animes = df_animes.withColumn("genre", split(regexp_replace(col("genre"), "[\[\]']", ""), ","))
# Convert values in genre column to string
df_animes = df_animes.withColumn("genre", col("genre").cast("array<string>"))

df_animes.show()

+---------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|anime_uid|               title|            synopsis|               genre|  aired from| aired to|popularity|ranked|score|
+---------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|    28891|Haikyuu!! Second ...|Following their p...|[Comedy,  Sports,...| Oct 4, 2015|27-Mar-16|       141|    25| 8.82|
|    23273|Shigatsu wa Kimi ...|Music accompanies...|[Drama,  Music,  ...|Oct 10, 2014|20-Mar-15|        28|    24| 8.83|
|    34599|       Made in Abyss|The Abyss—a gapin...|[Sci-Fi,  Adventu...| Jul 7, 2017|29-Sep-17|        98|    23| 8.83|
|     5114|Fullmetal Alchemi...|"In order for som...|[Action,  Militar...| Apr 5, 2009|04-Jul-10|         4|     1| 9.23|
|    37510|   Mob Psycho 100 II|Shigeo "Mob" Kage...|[Action,  Slice o...| Jan 7, 2019|01-Apr-19|       176|    21| 8.89|
|    38000|    Kimetsu n

Next, I'm going to join the animes and profiles tables. I join profiles table with reviews table and then join the merged table with the animes table.

In [0]:
df_joined = df_profiles.join(df_reviews, "profile").join(df_animes, "anime_uid")
df_joined = df_joined.dropDuplicates(subset=df_joined.columns)

df_joined.show()

+---------+---------------+------+----------+--------------------+----------+---+-----------------+------+--------------------+----------+----------------------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|anime_uid|        profile|gender|  birthday|     favorites_anime|birth_year|age|        age_group|   uid|                text|user_score|user_individual_scores|               title|            synopsis|               genre|  aired from| aired to|popularity|ranked|score|
+---------+---------------+------+----------+--------------------+----------+---+-----------------+------+--------------------+----------+----------------------+--------------------+--------------------+--------------------+------------+---------+----------+------+-----+
|     1425|      BowlingJD|  Male|1988-02-09|[21, 235, 19, 302...|      1988| 35|Middle-aged Adult|189368|\n           \n  ...|         6|  {'Overall': '6', ...|  Lupin III: Part II|Lu

Now that I have the merged table, I'm going to split the values of the favorites_anime column into separate rows for each user so that I can obtain the genres which each of these animes fall under. I'm going to only select the columns that we need for our model, which are profiles, the age groups that they fall under, their favorite anime_uids and the genres of these animes.

In [0]:
from pyspark.sql.functions import explode, count
# Join the DataFrames
df_joined = df_profiles.select("profile", "birthday", "age_group", explode("favorites_anime").alias("anime_uid")) \
    .join(df_animes.select("anime_uid", "genre"), on="anime_uid", how="inner") \
    .select("profile", "age_group", "anime_uid", "genre")\
    .dropDuplicates(subset=["profile", "age_group", "anime_uid"])
df_joined.show()

+---------------+-----------+---------+--------------------+
|        profile|  age_group|anime_uid|               genre|
+---------------+-----------+---------+--------------------+
|       --Mizu--|      Adult|       21|[Action,  Adventu...|
|       --Mizu--|      Adult|      177|[Action,  Adventu...|
|       --Mizu--|      Adult|     4081|[Slice of Life,  ...|
|       --Mizu--|      Adult|     5678|[Comedy,  Drama, ...|
|       --Mizu--|      Adult|     6864|[Mystery,  Supern...|
|       --Mizu--|      Adult|    23289|[Comedy,  Romance...|
|--animeislife--|      Adult|      249|[Action,  Adventu...|
|--animeislife--|      Adult|     2167|[Comedy,  Drama, ...|
|--animeislife--|      Adult|     9989|[Slice of Life,  ...|
|--animeislife--|      Adult|    10793|[Action,  Sci-Fi,...|
|--animeislife--|      Adult|    13601|[Action,  Police,...|
|--animeislife--|      Adult|    14467|[Action,  Super P...|
|--animeislife--|      Adult|    16498|[Action,  Militar...|
|          --d41|Young A

In [0]:
unique_genres = df_joined.select("genre").distinct().rdd.flatMap(lambda x: x).collect()
unique_genres = list(set([genre for sublist in unique_genres for genre in sublist]))
print(unique_genres)
print(len(unique_genres))

[' Police', 'Sports', 'Sci-Fi', 'Adventure', 'Shounen', ' Shounen Ai', ' Ecchi', 'Psychological', 'Slice of Life', 'Mecha', ' Psychological', ' Demons', 'Parody', 'Seinen', ' Drama', 'Romance', 'Cars', ' Super Power', ' Magic', ' Vampire', ' Adventure', 'Martial Arts', ' Shounen', ' Music', ' Slice of Life', 'Demons', ' Game', ' School', 'Drama', ' Comedy', ' Josei', 'School', ' Romance', ' Horror', ' Historical', ' Kids', 'Thriller', 'Military', ' Space', ' Thriller', 'Fantasy', ' Dementia', ' Martial Arts', 'Horror', 'Josei', 'Action', 'Police', ' Seinen', 'Historical', ' Shoujo Ai', ' Parody', 'Ecchi', 'Kids', ' Samurai', ' Mystery', 'Dementia', 'Comedy', ' Sports', 'Mystery', 'Magic', ' Mecha', ' Fantasy', ' Supernatural', 'Supernatural', 'Harem', 'Game', ' Military', ' Shoujo', ' Cars', 'Space', ' Sci-Fi', ' Harem', 'Music']
73


In [0]:
from pyspark.sql.functions import expr, trim

# Apply the trim() function to the genre column
df_trimmed = df_joined.withColumn("genre", expr("transform(genre, x -> trim(x))"))

df_trimmed.show()

+---------------+-----------+---------+--------------------+
|        profile|  age_group|anime_uid|               genre|
+---------------+-----------+---------+--------------------+
|       --Mizu--|      Adult|       21|[Action, Adventur...|
|       --Mizu--|      Adult|      177|[Action, Adventur...|
|       --Mizu--|      Adult|     4081|[Slice of Life, D...|
|       --Mizu--|      Adult|     5678|[Comedy, Drama, R...|
|       --Mizu--|      Adult|     6864|[Mystery, Superna...|
|       --Mizu--|      Adult|    23289|[Comedy, Romance,...|
|--animeislife--|      Adult|      249|[Action, Adventur...|
|--animeislife--|      Adult|     2167|[Comedy, Drama, R...|
|--animeislife--|      Adult|     9989|[Slice of Life, S...|
|--animeislife--|      Adult|    10793|[Action, Sci-Fi, ...|
|--animeislife--|      Adult|    13601|[Action, Police, ...|
|--animeislife--|      Adult|    14467|[Action, Super Po...|
|--animeislife--|      Adult|    16498|[Action, Military...|
|          --d41|Young A

In [0]:
unique_trimmed_genres = df_trimmed.select("genre").distinct().rdd.flatMap(lambda x: x).collect()
unique_trimmed_genres = list(set([genre for sublist in unique_trimmed_genres for genre in sublist]))
print(unique_trimmed_genres)
print(len(unique_trimmed_genres))

['Sports', 'Sci-Fi', 'Adventure', 'Shounen', 'Mecha', 'Parody', 'Psychological', 'Slice of Life', 'Seinen', 'Romance', 'Cars', 'Martial Arts', 'Demons', 'Samurai', 'Shoujo Ai', 'Drama', 'School', 'Thriller', 'Shoujo', 'Military', 'Fantasy', 'Horror', 'Josei', 'Action', 'Police', 'Historical', 'Ecchi', 'Kids', 'Dementia', 'Comedy', 'Mystery', 'Magic', 'Supernatural', 'Harem', 'Shounen Ai', 'Space', 'Game', 'Vampire', 'Super Power', 'Music']
40


In [0]:
genre_mapping = {
    "Action": ["Action", "Martial Arts", "Police", "Super Power", "Military"],
    "Adventure": ["Adventure", "Game"],
    "Kids": ["Kids"],
    "Fantasy": ["Fantasy", "Vampire", "Demons", "Magic", "Supernatural"],
    "Comedy": ["Comedy", "Parody"],
    "Drama": ["Drama", "Romance", "Slice of Life"],
    "Ecchi": ["Ecchi", "Harem"],
    "Music": ["Music"],
    "Thriller": ["Mystery", "Thriller", "Horror"],
    "Psychological": ["Psychological", "Dementia"],
    "Sci-Fi": ["Sci-Fi", "Cars", "Space", "Mecha"],
    "Shounen": ["Shounen", "Shounen Ai", "Shoujo Ai", "Shoujo"],
    "School": ["School"],
    "Seinen": ["Seinen", "Josei"],
    "Sports": ["Sports"],
    "Historical": ["Historical", "Samurai"]
}

In [0]:
from pyspark.sql.functions import expr, udf
from pyspark.sql.types import ArrayType, StringType

# Define a mapping function for each genre value
def map_genre(genre):
    for key, value in genre_mapping.items():
        if genre in value:
            return key
    return None

# Define a UDF that applies the mapping function to each genre value in an array
map_genre_udf = udf(lambda arr: [map_genre(x) for x in arr], ArrayType(StringType()))

# Apply the UDF to the genre column and assign the result to a new column
df_mapped_genres = df_joined.withColumn("mapped_genres", map_genre_udf(expr("transform(genre, x -> trim(x))")))

df_mapped_genres.show()

+----------------+-----------+---------+--------------------+--------------------+
|         profile|  age_group|anime_uid|               genre|       mapped_genres|
+----------------+-----------+---------+--------------------+--------------------+
|        --Mizu--|      Adult|      177|[Action,  Adventu...|[Action, Adventur...|
|           --d41|Young Adult|       21|[Action,  Adventu...|[Action, Adventur...|
|      --mimika--|Young Adult|    32995|   [Comedy,  Sports]|    [Comedy, Sports]|
|       -Ariadne-|      Adult|    13601|[Action,  Police,...|[Action, Action, ...|
|      -Arlequin-|      Adult|     5114|[Action,  Militar...|[Action, Action, ...|
|       -Bizarro-|      Adult|       20|[Action,  Adventu...|[Action, Adventur...|
|         -CHAOS-|      Adult|      384|[Action,  Sci-Fi,...|[Action, Sci-Fi, ...|
|         -CHAOS-|      Adult|      849|[Comedy,  Mystery...|[Comedy, Thriller...|
|         -CHAOS-|      Adult|     1535|[Mystery,  Police...|[Thriller, Action...|
|   

In [0]:
from pyspark.sql.functions import array_distinct

df_unique_genres = df_mapped_genres.withColumn("mapped_genres", array_distinct("mapped_genres"))
# Drop the original genre column
df_unique_genres = df_unique_genres.drop("genre")

df_unique_genres.show()

+---------------+-----------+---------+--------------------+
|        profile|  age_group|anime_uid|       mapped_genres|
+---------------+-----------+---------+--------------------+
|       --Mizu--|      Adult|       21|[Action, Adventur...|
|       --Mizu--|      Adult|      177|[Action, Adventur...|
|       --Mizu--|      Adult|     4081|[Drama, Fantasy, ...|
|       --Mizu--|      Adult|     5678|[Comedy, Drama, F...|
|       --Mizu--|      Adult|     6864| [Thriller, Fantasy]|
|       --Mizu--|      Adult|    23289|[Comedy, Drama, S...|
|--animeislife--|      Adult|      249|[Action, Adventur...|
|--animeislife--|      Adult|     2167|[Comedy, Drama, S...|
|--animeislife--|      Adult|     9989|    [Drama, Fantasy]|
|--animeislife--|      Adult|    10793|[Action, Sci-Fi, ...|
|--animeislife--|      Adult|    13601|[Action, Psycholo...|
|--animeislife--|      Adult|    14467|   [Action, Fantasy]|
|--animeislife--|      Adult|    16498|[Action, Thriller...|
|          --d41|Young A

Now for the last step in preprocessing before I create my model, I'm going to vectorize the genre column and index the age groups column. 
Because the genre is an array of strings and the age groups are also of type string. 
We need numeric values to use them in our model.

In [0]:
from pyspark.ml.feature import CountVectorizer, StringIndexer

# Create a count vectorizer
vectorizer = CountVectorizer(inputCol="mapped_genres", outputCol="features")
vectorizer_model = vectorizer.fit(df_unique_genres)
df_vectorized = vectorizer_model.transform(df_unique_genres)

# Convert age_group column to numeric format
indexer = StringIndexer(inputCol="age_group", outputCol="label")
df_indexed = indexer.fit(df_vectorized).transform(df_vectorized)

df_indexed.show()

+----------------+-----------+---------+--------------------+--------------------+-----+
|         profile|  age_group|anime_uid|       mapped_genres|            features|label|
+----------------+-----------+---------+--------------------+--------------------+-----+
|        --Mizu--|      Adult|      177|[Action, Adventur...|(16,[0,1,3,4,5],[...|  0.0|
|           --d41|Young Adult|       21|[Action, Adventur...|(16,[0,1,2,3,4,5]...|  1.0|
|      --mimika--|Young Adult|    32995|    [Comedy, Sports]|(16,[2,13],[1.0,1...|  1.0|
|       -Ariadne-|      Adult|    13601|[Action, Psycholo...|(16,[1,7,9],[1.0,...|  0.0|
|      -Arlequin-|      Adult|     5114|[Action, Adventur...|(16,[0,1,2,3,4,5]...|  0.0|
|       -Bizarro-|      Adult|       20|[Action, Adventur...|(16,[1,2,4,5],[1....|  0.0|
|         -CHAOS-|      Adult|      384|[Action, Sci-Fi, ...|(16,[0,1,3,6,7,9,...|  0.0|
|         -CHAOS-|      Adult|      849|[Comedy, Thriller...|(16,[0,2,6,7,8],[...|  0.0|
|         -CHAOS-|   

Now I have my labels and vectors, I continue to build my model to predict the preferred genre based on age group.
I start by creating my train and test datasets.

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Split the data into training and test sets (80% for training and 20% for testing)
(training_data, test_data) = df_indexed.randomSplit([0.8, 0.2])
training_data.show()

+----------------+-----------+---------+--------------------+--------------------+-----+
|         profile|  age_group|anime_uid|       mapped_genres|            features|label|
+----------------+-----------+---------+--------------------+--------------------+-----+
|        --Mizu--|      Adult|      177|[Action, Adventur...|(16,[0,1,3,4,5],[...|  0.0|
|           --d41|Young Adult|       21|[Action, Adventur...|(16,[0,1,2,3,4,5]...|  1.0|
|      --mimika--|Young Adult|    32995|    [Comedy, Sports]|(16,[2,13],[1.0,1...|  1.0|
|       -Ariadne-|      Adult|    13601|[Action, Psycholo...|(16,[1,7,9],[1.0,...|  0.0|
|      -Arlequin-|      Adult|     5114|[Action, Adventur...|(16,[0,1,2,3,4,5]...|  0.0|
|       -Bizarro-|      Adult|       20|[Action, Adventur...|(16,[1,2,4,5],[1....|  0.0|
|         -CHAOS-|      Adult|      384|[Action, Sci-Fi, ...|(16,[0,1,3,6,7,9,...|  0.0|
|         -CHAOS-|      Adult|      849|[Comedy, Thriller...|(16,[0,2,6,7,8],[...|  0.0|
|      -Crucival-|   

Training the model

In [0]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(training_data)

In [0]:
# Make predictions on the test data
predictions = lr_model.transform(test_data)

In [0]:
# Evaluate the model's performance on the test data
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.6233024304685543
