In [None]:
import duckdb
import plotly.express as px

with open("../queries/count_score.sql") as query_file:
    query = query_file.read()
    with duckdb.connect("../data/anime_data.duckdb") as conn:
        df = conn.execute(query).fetchdf()
        print(df)
        print(df["count"].sum())
        fig = px.bar(df, x="score", y="count", title="Score Count")
        fig.show()

In [None]:
import duckdb
import plotly.express as px

with open("../queries/count_score_by_top_genre.sql") as query_file:
    query = query_file.read()
    with duckdb.connect("../data/anime_data.duckdb") as conn:
        df = conn.execute(query).fetchdf()
        print(df)
        fig = px.bar(df, x="score", y="count", color="genre", title="Score Count by Top Genre")
        fig.show()

In [None]:
import duckdb
import plotly.express as px

with open("../queries/count_score_by_top_tag.sql") as query_file:
    query = query_file.read()
    with duckdb.connect("../data/anime_data.duckdb") as conn:
        df = conn.execute(query).fetchdf()
        print(df)
        fig = px.bar(df, x="score", y="count", color="tag", title="Score Count by Top Tag")
        fig.show()

In [None]:
import duckdb

with duckdb.connect("../data/anime_data.duckdb") as conn:
    conn.sql("COPY (FROM dbt.anime_scores) TO '../data/anime_scores.parquet' (FORMAT parquet)")

In [None]:
from pyspark.sql import SparkSession as session
from pyspark.sql.functions import count_distinct, desc

spark = session.builder.appName("Anime Data Pipeline - Spark").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "2g").getOrCreate()

In [None]:

df = spark.read.parquet("../data/anime_scores.parquet")
aggs = df.groupBy("score").agg(count_distinct("media_id").alias("count")).filter(df["score"] > 0.0).orderBy(desc("count"), desc("score"))
aggs.show()

In [None]:
table = df.createOrReplaceTempView("anime_scores")
query = """SELECT
  score,
  COUNT(DISTINCT(media_id)) AS count
FROM
  anime_scores
WHERE
  score > 0.0
GROUP BY
  score
ORDER BY
  count DESC, score DESC"""
result = spark.sql(query)
result.show()

In [None]:
with_tags = df.select("media_id", "score", "tag").filter(df["tag"].isNotNull())
with_genres = df.select("media_id", "score", "genre").filter(df["genre"].isNotNull())
joined = with_tags.join(with_genres, ["media_id", "score"]).distinct()
joined.show()

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder

indexer = StringIndexer(inputCols=["tag", "genre"], outputCols=["tag_indexed", "genre_indexed"])
indexed_df = indexer.fit(joined).transform(joined)
indexed_df.show()

In [None]:

encoder_tag = OneHotEncoder(inputCols=["tag_indexed"], outputCols=["tag_one_hot"])
encoded_tag_df = encoder_tag.fit(indexed_df).transform(indexed_df)
assembler_tag = VectorAssembler(inputCols=["score", "tag_one_hot"], outputCol="features")
assembled_tag_df = assembler_tag.transform(encoded_tag_df)
scaler_tag = StandardScaler(inputCol="features", outputCol="standardized_tag")
scaled_tag_df = scaler_tag.fit(assembled_tag_df).transform(assembled_tag_df)
scaled_tag_df.select("standardized_tag").show(truncate=False)

In [None]:

encoder_genre = OneHotEncoder(inputCols=["genre_indexed"], outputCols=["genre_one_hot"])
encoded_genre_df = encoder_genre.fit(indexed_df).transform(indexed_df)
assembler_genre = VectorAssembler(inputCols=["score", "genre_one_hot"], outputCol="features")
assembled_genre_df = assembler_genre.transform(encoded_genre_df)
scaler_genre = StandardScaler(inputCol="features", outputCol="standardized")
scaled_genre_df = scaler_genre.fit(assembled_genre_df).transform(assembled_genre_df)
scaled_genre_df.select("standardized").show(truncate=False)

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np

max_clusters = 100

cost = np.zeros(max_clusters)

evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="standardized", metricName="silhouette", distanceMeasure="squaredEuclidean")

for i in range(2, max_clusters):
    KMeans_algo = KMeans(featuresCol="standardized", k=i)
    KMeans_fit = KMeans_algo.fit(scaled_genre_df)
    output = KMeans_fit.transform(scaled_genre_df)
    cost[i] = KMeans_fit.summary.trainingCost

cost

In [None]:
import pandas as pd
import plotly.express as px

cost_df = pd.DataFrame(cost[2:])
cost_df.columns = ["cost"]
new_col = range(2, max_clusters)
cost_df.insert(0, "cluster", new_col)
fig = px.line(cost_df, x="cluster", y="cost", title="Cluster vs Cost")
fig.show()

In [None]:
KMeans_algo = KMeans(featuresCol='standardized', k=18)
predictions = KMeans_algo.fit(scaled_genre_df).transform(scaled_genre_df)
predictions.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

viz_df = predictions.select("score", "genre", "prediction")
viz_df = viz_df.toPandas()

list1 = ["score", "genre"]

for i in list1:
    sns.barplot(x="prediction", y=str(i), data=viz_df)
    plt.show()