In [None]:
!pip install pyspark



In [None]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:

df = spark.read.csv(
    "anime-dataset-2023.csv",
    header=True,
    inferSchema=True,
    escape='"',
    multiLine=True
)

print("Verifikasi data setelah perbaikan:")
df.show(5)

print("Membagi data menjadi 80% training dan 20% testing...")
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

total_count = df.count()
train_count = df_train.count()
test_count = df_test.count()

print(f"\nTotal data keseluruhan: {total_count}")
print(f"Jumlah data training: {train_count} (~{train_count/total_count:.2%})")
print(f"Jumlah data testing: {test_count} (~{test_count/total_count:.2%})")

print("\nContoh data training:")
df_train.show(5)

Verifikasi data setelah perbaikan:
+--------+--------------------+--------------------+---------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+---------------+--------------------+--------------------+--------------+--------+-------------+--------------------+------+----------+---------+---------+-------+--------------------+
|anime_id|                Name|        English name|                 Other name|Score|              Genres|            Synopsis| Type|Episodes|               Aired|  Premiered|         Status|           Producers|           Licensors|       Studios|  Source|     Duration|              Rating|  Rank|Popularity|Favorites|Scored By|Members|           Image URL|
+--------+--------------------+--------------------+---------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+---------------+--------------------+--------------------+---------

In [None]:
from pyspark.sql.functions import mean, col, when

df = df.withColumn("Score", when(col("Score") == "UNKNOWN", None).otherwise(col("Score")).cast("float"))
mean_score = df.select(mean(col("Score"))).first()[0]
df_imputed = df.fillna(mean_score, subset=["Score"])

df_imputed = df_imputed.fillna("Missing", subset=["Studios"])

df_imputed.show(5)

+--------+--------------------+--------------------+---------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+---------------+--------------------+--------------------+--------------+--------+-------------+--------------------+------+----------+---------+---------+-------+--------------------+
|anime_id|                Name|        English name|                 Other name|Score|              Genres|            Synopsis| Type|Episodes|               Aired|  Premiered|         Status|           Producers|           Licensors|       Studios|  Source|     Duration|              Rating|  Rank|Popularity|Favorites|Scored By|Members|           Image URL|
+--------+--------------------+--------------------+---------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+---------------+--------------------+--------------------+--------------+--------+-------------+------

In [None]:
df_train.show()

+--------+--------------------+--------------------+--------------------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+----------------+--------------------+--------------------+--------------------+-----------+-------------+--------------------+------+----------+---------+---------+-------+--------------------+
|anime_id|                Name|        English name|                            Other name|Score|              Genres|            Synopsis| Type|Episodes|               Aired|  Premiered|          Status|           Producers|           Licensors|             Studios|     Source|     Duration|              Rating|  Rank|Popularity|Favorites|Scored By|Members|           Image URL|
+--------+--------------------+--------------------+--------------------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+----------------+--------------------+-----------

In [None]:
columns_to_drop = [
    "Synopsis",
    "Other name",
    "Producers",
    "Licensors",
    "Aired",
    "Premiered",
    "Rank",
    "Scored By",
    "Members"
]

df_train = df_train.drop(*columns_to_drop)
df_test = df_test.drop(*columns_to_drop)

print("Columns in df_train after dropping:")
print(df_train.columns)

print("\nColumns in df_test after dropping:")
print(df_test.columns)

Columns in df_train after dropping:
['anime_id', 'Name', 'English name', 'Score', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Duration', 'Rating', 'Popularity', 'Favorites', 'Image URL']

Columns in df_test after dropping:
['anime_id', 'Name', 'English name', 'Score', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Duration', 'Rating', 'Popularity', 'Favorites', 'Image URL']


In [None]:
df_train.show()

+--------+--------------------+--------------------+-----+--------------------+-----+--------+----------------+--------------------+-----------+-------------+--------------------+----------+---------+--------------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|          Status|             Studios|     Source|     Duration|              Rating|Popularity|Favorites|           Image URL|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+----------------+--------------------+-----------+-------------+--------------------+----------+---------+--------------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0| Finished Airing|             Sunrise|   Original|24 min per ep|R - 17+ (violence...|        43|    78525|https://cdn.myani...|
|       5|Cowboy Bebop: Ten...|Cowboy Bebop: The...| 8.38|      Action, Sci-Fi|Movie|     1.0| Finished Airing| 

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Source", outputCol="source_encoded")
indexer_model = indexer.fit(df_train)

df_train = indexer_model.transform(df_train)
df_test = indexer_model.transform(df_test)

df_train.show(5)
df_test.show(5)

+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0|Finished Airing|       Sunrise|Original|24 min per ep|R - 17+ (violence...|        43|    78525|https://cdn.myani...|           0.0|
|       5|Cowboy Bebop: Ten...|Cowboy Bebop: The...| 8.38|      Action, Sci-Fi|Movie|     1.

In [None]:

excluded_sources = [
    "4-koma manga",
    "Book",
    "Card game",
    "Game",
    "Light novel",
    "Manga",
    "Mixed media",
    "Music",
    "Novel",
    "Original",
    "Other",
    "Picture book",
    "Radio",
    "Unknown",
    "Visual novel",
    "Web manga",
    "Web novel"
]

anomalies_df = df_train.filter(~df_train["Source"].isin(excluded_sources))

print("Dataframe with sources outside the specified 17 types:")
anomalies_df.show()

Dataframe with sources outside the specified 17 types:
+--------+----+------------+-----+------+----+--------+------+-------+------+--------+------+----------+---------+---------+--------------+
|anime_id|Name|English name|Score|Genres|Type|Episodes|Status|Studios|Source|Duration|Rating|Popularity|Favorites|Image URL|source_encoded|
+--------+----+------------+-----+------+----+--------+------+-------+------+--------+------+----------+---------+---------+--------------+
+--------+----+------------+-----+------+----+--------+------+-------+------+--------+------+----------+---------+---------+--------------+



In [None]:

allowed_sources = [
    "4-koma manga",
    "Book",
    "Card game",
    "Game",
    "Light novel",
    "Manga",
    "Mixed media",
    "Music",
    "Novel",
    "Original",
    "Other",
    "Picture book",
    "Radio",
    "Unknown",
    "Visual novel",
    "Web manga",
    "Web novel"
]

df_train = df_train.filter(df_train["Source"].isin(allowed_sources))
df_test = df_test.filter(df_test["Source"].isin(allowed_sources))

print("Number of rows in df_train after filtering:", df_train.count())
print("Number of rows in df_test after filtering:", df_test.count())

Number of rows in df_train after filtering: 19946
Number of rows in df_test after filtering: 4959


In [None]:
df_train.show(5)
df_test.show(5)

+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0|Finished Airing|       Sunrise|Original|24 min per ep|R - 17+ (violence...|        43|    78525|https://cdn.myani...|           0.0|
|       5|Cowboy Bebop: Ten...|Cowboy Bebop: The...| 8.38|      Action, Sci-Fi|Movie|     1.

In [None]:

df_train = df_train.drop("source_encoded")
df_test = df_test.drop("source_encoded")

indexer = StringIndexer(inputCol="Source", outputCol="source_encoded")
indexer_model = indexer.fit(df_train)

df_train = indexer_model.transform(df_train)
df_test = indexer_model.transform(df_test)

df_train.show(5)
df_test.show(5)

+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0|Finished Airing|       Sunrise|Original|24 min per ep|R - 17+ (violence...|        43|    78525|https://cdn.myani...|           0.0|
|       5|Cowboy Bebop: Ten...|Cowboy Bebop: The...| 8.38|      Action, Sci-Fi|Movie|     1.

In [None]:
from pyspark.ml.feature import StringIndexer

allowed_types = [
    "Movie",
    "Music",
    "ONA",
    "OVA",
    "Special",
    "TV"
]

df_train = df_train.filter(df_train["Type"].isin(allowed_types))
df_test = df_test.filter(df_test["Type"].isin(allowed_types))

indexer_type = StringIndexer(inputCol="Type", outputCol="type_encoded")
indexer_type_model = indexer_type.fit(df_train)

df_train = indexer_type_model.transform(df_train)
df_test = indexer_type_model.transform(df_test)


print("Number of rows in df_train after filtering by Type:", df_train.count())
print("Number of rows in df_test after filtering by Type:", df_test.count())

df_train.show(5)
df_test.show(5)

Number of rows in df_train after filtering by Type: 19886
Number of rows in df_test after filtering by Type: 4945
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|type_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0|Finished Airing|       Sunrise|Original|24 min per ep|R - 17+ (violence.

In [None]:
from pyspark.ml.feature import StringIndexer

allowed_statuses = [
    "Currently Airing",
    "Finished Airing",
    "Not yet aired"
]

df_train = df_train.filter(df_train["Status"].isin(allowed_statuses))
df_test = df_test.filter(df_test["Status"].isin(allowed_statuses))

indexer_status = StringIndexer(inputCol="Status", outputCol="status_encoded")
indexer_status_model = indexer_status.fit(df_train)

df_train = indexer_status_model.transform(df_train)
df_test = indexer_status_model.transform(df_test)

print("Number of rows in df_train after filtering by Status:", df_train.count())
print("Number of rows in df_test after filtering by Status:", df_test.count())

df_train.show(5)
df_test.show(5)

Number of rows in df_train after filtering by Status: 19886
Number of rows in df_test after filtering by Status: 4945
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|type_encoded|status_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award Win...|   TV|    26.0|Finished Airing|       

In [None]:
from pyspark.ml.feature import StringIndexer

allowed_ratings = [
    "G - All Ages",
    "PG - Children",
    "PG-13 - Teens 13 or older",
    "R - 17+ (violence & profanity)",
    "R+ - Mild Nudity",
    "UNKNOWN"
]

df_train = df_train.filter(df_train["Rating"].isin(allowed_ratings))
df_test = df_test.filter(df_test["Rating"].isin(allowed_ratings))

indexer_rating = StringIndexer(inputCol="Rating", outputCol="rating_encoded")
indexer_rating_model = indexer_rating.fit(df_train)

df_train = indexer_rating_model.transform(df_train)
df_test = indexer_rating_model.transform(df_test)

print("Number of rows in df_train after filtering by Rating:", df_train.count())
print("Number of rows in df_test after filtering by Rating:", df_test.count())

df_train.show(5)
df_test.show(5)

Number of rows in df_train after filtering by Rating: 18702
Number of rows in df_test after filtering by Rating: 4654
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|         Status|       Studios|  Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|type_encoded|status_encoded|rating_encoded|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+---------------+--------------+--------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+
|       1|        Cowboy Bebop|        Cowboy Bebop| 8.75|Action, Award 

In [None]:
from pyspark.ml.feature import CountVectorizer

df_train = df_train.withColumn("genres_array", split(col("Genres"), ", "))
df_test = df_test.withColumn("genres_array", split(col("Genres"), ", "))

cv = CountVectorizer(inputCol="genres_array", outputCol="genres_vector")
cv_model = cv.fit(df_train)

df_train = cv_model.transform(df_train)
df_test = cv_model.transform(df_test)

print("Schema after CountVectorizer:")
df_train.printSchema()

df_train.show(5)
df_test.show(5)

Schema after CountVectorizer:
root
 |-- anime_id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Image URL: string (nullable = true)
 |-- source_encoded: double (nullable = false)
 |-- type_encoded: double (nullable = false)
 |-- status_encoded: double (nullable = false)
 |-- rating_encoded: double (nullable = false)
 |-- genres_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- genres_vector: vector (nullable = true)

+--------+--------------------+-------------------

In [None]:
print("Number of rows in df_train after filtering by Rating:", df_train.count())
print("Number of rows in df_test after filtering by Rating:", df_test.count())

Number of rows in df_train after filtering by Rating: 18702
Number of rows in df_test after filtering by Rating: 4654


In [None]:
df_train.show()
df_test.show()

+--------+--------------------+--------------------+-----+--------------------+-----+--------+----------------+--------------------+-----------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+--------------------+--------------------+
|anime_id|                Name|        English name|Score|              Genres| Type|Episodes|          Status|             Studios|     Source|     Duration|              Rating|Popularity|Favorites|           Image URL|source_encoded|type_encoded|status_encoded|rating_encoded|        genres_array|       genres_vector|
+--------+--------------------+--------------------+-----+--------------------+-----+--------+----------------+--------------------+-----------+-------------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+--------------------+--------------------+
|       1|        Cowboy Bebop|   

In [None]:

df_train = df_train.drop("Score")
df_test = df_test.drop("Score")

print("Columns in df_train after dropping Score:")
print(df_train.columns)

print("\nColumns in df_test after dropping Score:")
print(df_test.columns)

Columns in df_train after dropping Score:
['anime_id', 'Name', 'English name', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Duration', 'Rating', 'Popularity', 'Favorites', 'Image URL', 'source_encoded', 'type_encoded', 'status_encoded', 'rating_encoded', 'genres_array', 'genres_vector']

Columns in df_test after dropping Score:
['anime_id', 'Name', 'English name', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Duration', 'Rating', 'Popularity', 'Favorites', 'Image URL', 'source_encoded', 'type_encoded', 'status_encoded', 'rating_encoded', 'genres_array', 'genres_vector']


In [None]:
from pyspark.sql.functions import col, when, regexp_extract
from pyspark.sql.types import DoubleType

df_train = df_train.withColumn("hours", regexp_extract(col("Duration"), r"(\d+)\s*hr", 1).cast("int"))
df_train = df_train.withColumn("minutes", regexp_extract(col("Duration"), r"(\d+)\s*min", 1).cast("int"))
df_test = df_test.withColumn("hours", regexp_extract(col("Duration"), r"(\d+)\s*hr", 1).cast("int"))
df_test = df_test.withColumn("minutes", regexp_extract(col("Duration"), r"(\d+)\s*min", 1).cast("int"))

df_train = df_train.fillna(0, subset=["hours", "minutes"])
df_test = df_test.fillna(0, subset=["hours", "minutes"])

df_train = df_train.withColumn("duration_minutes", (col("hours") * 60) + col("minutes"))
df_test = df_test.withColumn("duration_minutes", (col("hours") * 60) + col("minutes"))

df_train = df_train.withColumn("Episodes_clean", col("Episodes").cast(DoubleType()))
df_test = df_test.withColumn("Episodes_clean", col("Episodes").cast(DoubleType()))
df_train = df_train.fillna(1.0, subset=["Episodes_clean"])
df_test = df_test.fillna(1.0, subset=["Episodes_clean"])

df_train = df_train.withColumn(
    "duration_minutes",
    when(
        col("Duration").contains("per ep"),
        col("minutes") * col("Episodes_clean")
    ).otherwise(col("duration_minutes"))
)
df_test = df_test.withColumn(
    "duration_minutes",
    when(
        col("Duration").contains("per ep"),
        col("minutes") * col("Episodes_clean")
    ).otherwise(col("duration_minutes"))
)

columns_to_drop = ["Duration", "hours", "minutes", "Episodes_clean"]
df_train = df_train.drop(*columns_to_drop)
df_test = df_test.drop(*columns_to_drop)

print("Verifikasi hasil akhir pada df_train:")
df_train.select("Name", "duration_minutes").show(10, truncate=False)

Verifikasi hasil akhir pada df_train:
+-------------------------------+----------------+
|Name                           |duration_minutes|
+-------------------------------+----------------+
|Cowboy Bebop                   |624.0           |
|Cowboy Bebop: Tengoku no Tobira|115.0           |
|Witch Hunter Robin             |650.0           |
|Bouken Ou Beet                 |1196.0          |
|Eyeshield 21                   |3335.0          |
|Hungry Heart: Wild Striker     |1196.0          |
|Monster                        |1776.0          |
|Naruto                         |5060.0          |
|One Piece                      |24.0            |
|Tennis no Ouji-sama            |3916.0          |
+-------------------------------+----------------+
only showing top 10 rows



**Reasoning**:
Drop the specified columns from both dataframes and display the columns to verify the changes.



In [None]:
columns_to_drop_final = ["Duration", "duration_value", "duration_unit"]

df_train = df_train.drop(*columns_to_drop_final)
df_test = df_test.drop(*columns_to_drop_final)

print("Columns in df_train after dropping:")
print(df_train.columns)

print("\nColumns in df_test after dropping:")
print(df_test.columns)

Columns in df_train after dropping:
['anime_id', 'Name', 'English name', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Rating', 'Popularity', 'Favorites', 'Image URL', 'source_encoded', 'type_encoded', 'status_encoded', 'rating_encoded', 'genres_array', 'genres_vector', 'duration_minutes']

Columns in df_test after dropping:
['anime_id', 'Name', 'English name', 'Genres', 'Type', 'Episodes', 'Status', 'Studios', 'Source', 'Rating', 'Popularity', 'Favorites', 'Image URL', 'source_encoded', 'type_encoded', 'status_encoded', 'rating_encoded', 'genres_array', 'genres_vector', 'duration_minutes']


## Summary:

### Data Analysis Key Findings
*   The numerical value and unit of duration were successfully extracted into separate columns, `duration_value` and `duration_unit`, in both `df_train` and `df_test` dataframes.
*   Durations specified in hours (`hr`) were correctly converted to minutes by multiplying by 60.
*   Durations specified "per ep" were successfully converted to total minutes by multiplying the per-episode duration by the value in the `Episodes` column.
*   The `Episodes` column in both dataframes was successfully cast to a double type to enable numerical operations.
*   Rows with null values in the `Episodes` or `duration_minutes` columns were removed from both `df_train` and `df_test` dataframes, resulting in zero null counts for these columns.
*   The original `Duration` column, along with the intermediate `duration_value` and `duration_unit` columns, were successfully dropped from both dataframes.

### Insights or Next Steps
*   The dataframes now have a standardized `duration_minutes` column, facilitating direct comparison and analysis of content lengths regardless of their original format.
*   The cleaned dataframes with numeric duration and episode information are ready for subsequent feature engineering or model training steps.


In [None]:
df_train.show()
df_test.show()
print("Number of rows in df_train after cleaning:", df_train.count())
print("Number of rows in df_test after cleaning:", df_test.count())

+--------+--------------------+--------------------+--------------------+-----+--------+----------------+--------------------+-----------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+--------------------+--------------------+----------------+
|anime_id|                Name|        English name|              Genres| Type|Episodes|          Status|             Studios|     Source|              Rating|Popularity|Favorites|           Image URL|source_encoded|type_encoded|status_encoded|rating_encoded|        genres_array|       genres_vector|duration_minutes|
+--------+--------------------+--------------------+--------------------+-----+--------+----------------+--------------------+-----------+--------------------+----------+---------+--------------------+--------------+------------+--------------+--------------+--------------------+--------------------+----------------+
|       1|        Cowboy Bebop|        Cowb

In [None]:
df_train.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Image URL: string (nullable = true)
 |-- source_encoded: double (nullable = false)
 |-- type_encoded: double (nullable = false)
 |-- status_encoded: double (nullable = false)
 |-- rating_encoded: double (nullable = false)
 |-- genres_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- genres_vector: vector (nullable = true)
 |-- duration_minutes: double (nullable = true)



In [None]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StandardScaler

df_train = df_train.withColumn("Popularity", col("Popularity").cast("int"))
df_test = df_test.withColumn("Popularity", col("Popularity").cast("int"))
df_train = df_train.withColumn("Favorites", col("Favorites").cast("int"))
df_test = df_test.withColumn("Favorites", col("Favorites").cast("int"))

df_train = df_train.withColumn("anime_id_int", col("anime_id").cast("int"))
df_test = df_test.withColumn("anime_id_int", col("anime_id").cast("int"))

df_train = df_train.filter(col("anime_id_int").isNotNull())
df_test = df_test.filter(col("anime_id_int").isNotNull())


numeric_features = ["Popularity", "Favorites", "duration_minutes"]
numeric_assembler = VectorAssembler(inputCols=numeric_features, outputCol="numeric_features_vector")

scaler = StandardScaler(inputCol="numeric_features_vector", outputCol="scaled_numeric_features_vector",
                        withStd=True, withMean=False)

encoded_features = ["source_encoded", "type_encoded", "status_encoded", "rating_encoded", "genres_vector"]
final_assembler = VectorAssembler(inputCols=["scaled_numeric_features_vector"] + encoded_features, outputCol="features")

pipeline = Pipeline(stages=[numeric_assembler, scaler, final_assembler])

pipeline_model = pipeline.fit(df_train)

df_train_featured = pipeline_model.transform(df_train)
df_test_featured = pipeline_model.transform(df_test)

print("Schema after final feature engineering:")
df_train_featured.printSchema()

print("\nDataframe df_train_featured with 'features' column:")
df_train_featured.select("features").show(5, truncate=False)

print("\nDataframe df_test_featured with 'features' column:")
df_test_featured.select("features").show(5, truncate=False)

Schema after final feature engineering:
root
 |-- anime_id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Image URL: string (nullable = true)
 |-- source_encoded: double (nullable = false)
 |-- type_encoded: double (nullable = false)
 |-- status_encoded: double (nullable = false)
 |-- rating_encoded: double (nullable = false)
 |-- genres_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- genres_vector: vector (nullable = true)
 |-- duration_minutes: double (nullable = true)
 |-- anime_id_int: integer (nullable = true)
 |-- numeric_features_vec

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(featuresCol='features', k=1000, seed=42)

print("Melatih model K-Means...")
model = kmeans.fit(df_train_featured)

print("Mengevaluasi model pada data test...")
predictions = model.transform(df_test_featured)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score (on test data): {silhouette}")

print("\nHasil prediksi cluster untuk data test:")
predictions.select("Name", "Genres", "prediction").show(20)

Melatih model K-Means...
Mengevaluasi model pada data test...
Silhouette Score (on test data): 0.26549338247315646

Hasil prediksi cluster untuk data test:
+--------------------+--------------------+----------+
|                Name|              Genres|prediction|
+--------------------+--------------------+----------+
|              Trigun|Action, Adventure...|       966|
|Hachimitsu to Clover|Comedy, Drama, Ro...|       885|
|Initial D Fourth ...|       Action, Drama|        50|
|    Ring ni Kakero 1|      Action, Sports|       123|
|              Zipang|Action, Drama, Sc...|        50|
|Kenpuu Denki Berserk|Action, Adventure...|       122|
|         .hack//Sign|Adventure, Fantas...|       666|
|   Appleseed (Movie)|Action, Drama, Sc...|       750|
|        Rozen Maiden|Action, Comedy, D...|        50|
|Rozen Maiden: Trä...|Action, Comedy, D...|        50|
|Azumanga Daiou Th...|Comedy, Slice of ...|       611|
|      Black Cat (TV)|Action, Adventure...|       837|
|   Full Metal Pani

In [None]:
from pyspark.sql.functions import col

def get_recommendations(anime_name: str, full_dataset, model):
    """
    Fungsi untuk mencari anime yang mirip berdasarkan cluster K-Means.

    Args:
        anime_name (str): Nama anime yang ingin dicari (bisa sebagian).
        full_dataset: DataFrame gabungan yang berisi semua data.
        model: Model K-Means yang sudah dilatih.
    """
    print("-" * 50)
    print(f"Mencari rekomendasi untuk: '{anime_name}'")

    possible_matches = full_dataset.filter(col("Name").like(f"%{anime_name}%")).select("Name").distinct()

    if possible_matches.count() == 0:
        print(f"Hasil: Tidak ada anime yang cocok dengan '{anime_name}' ditemukan.")
        return

    exact_name = possible_matches.first()["Name"]
    target_anime_features = full_dataset.filter(col("Name") == exact_name).select("features").first()

    if not target_anime_features:
        print(f"Tidak dapat menemukan fitur untuk '{exact_name}'.")
        return

    target_df = spark.createDataFrame([target_anime_features])
    target_cluster_id = model.transform(target_df).select("prediction").first()[0]

    print(f"Anime ditemukan: '{exact_name}' (Cluster: {target_cluster_id})")

    df_train_predictions = model.transform(df_train_featured)
    df_test_predictions = model.transform(df_test_featured)
    all_predictions = df_train_predictions.unionByName(df_test_predictions)

    recommendations = all_predictions.filter(
        (col("prediction") == target_cluster_id) &
        (col("Name") != exact_name)
    ).select("Name", "Genres")

    if recommendations.count() == 0:
        print("Hasil: Tidak ada rekomendasi lain yang ditemukan di cluster ini.")
    else:
        print("\nRekomendasi:")
        recommendations.show(truncate=False)

In [None]:

df_combined = df_train_featured.unionByName(df_test_featured)

get_recommendations("High School DxD", df_combined, model)

--------------------------------------------------
Mencari rekomendasi untuk: 'High School DxD'
Anime ditemukan: 'High School DxD' (Cluster: 544)

Rekomendasi:
+-------------------------------------------------------+---------------------------------------+
|Name                                                   |Genres                                 |
+-------------------------------------------------------+---------------------------------------+
|Baccano!                                               |Action, Comedy, Mystery, Supernatural  |
|Yojouhan Shinwa Taikei                                 |Award Winning, Comedy, Mystery, Romance|
|Youjo Senki                                            |Action, Fantasy                        |
|Re:Zero kara Hajimeru Isekai Seikatsu 2nd Season       |Drama, Fantasy, Suspense               |
|86                                                     |Action, Drama, Sci-Fi                  |
|Re:Zero kara Hajimeru Isekai Seikatsu 2nd Season Part 2

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

genre_assembler = VectorAssembler(inputCols=["genres_vector"], outputCol="genre_features")

df_train_genre_featured = genre_assembler.transform(df_train_featured)
df_test_genre_featured = genre_assembler.transform(df_test_featured)

kmeans_genre = KMeans(featuresCol='genre_features', k=250, seed=42)
model_genre = kmeans_genre.fit(df_train_genre_featured)


In [None]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

data_for_similarity = genre_assembler.transform(df_combined)
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=2.0)
data_normalized = normalizer.transform(data_for_similarity)

target_vector = data_normalized.filter(col("Name") == "Date A Live").select("normFeatures").first()[0]

dot_product_udf = udf(lambda x: float(x.dot(target_vector)), FloatType())

similarity_df = data_normalized.withColumn("similarity", dot_product_udf(col("normFeatures")))

print("Rekomendasi berdasarkan Cosine Similarity (lebih spesifik):")
similarity_df.orderBy("similarity", ascending=False).select("Name", "Genres", "similarity").show(truncate=False)

Rekomendasi berdasarkan Cosine Similarity (lebih spesifik):
+---------------------------------------------------------------------------------------------------------+---------------------------------------+----------+
|Name                                                                                                     |Genres                                 |similarity|
+---------------------------------------------------------------------------------------------------------+---------------------------------------+----------+
|Date A Live                                                                                              |Romance, Sci-Fi                        |1.0       |
|Date A Live IV                                                                                           |Action, Romance, Sci-Fi                |0.96977186|
|Katanagatari                                                                                             |Action, Adventure, Romance            

In [None]:
model_genre.save("./model_kmeans_genre")
genre_assembler.save("./assembler_genre")
df_combined.select("Name", "Genres").write.parquet("./anime_data.parquet")