### Start Spark context and load initial data

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, split, explode, count, percentile_approx, dense_rank, avg
from pyspark.sql.window import Window

In [2]:
spark_session = SparkSession.builder.appName('PerDateMetrics').getOrCreate()

In [3]:
review_raw_data_path = '../data/unziped/user_anime*.csv'
animes_raw_data_path = '../data/unziped/anime.csv'

In [4]:
reviews_schema = StructType([
    StructField('user_id', StringType(), False),
    StructField('anime_id', StringType(), False),
    StructField('favorite', BooleanType(), False),
    StructField('review_id', IntegerType(), False),
    StructField('review_date', DateType(), False),
    StructField('review_num_useful', IntegerType(), False),
    StructField('review_score', FloatType(), False),
    StructField('review_story_score', FloatType(), False),
    StructField('review_animation_score', FloatType(), False),
    StructField('review_sound_score', FloatType(), False),
    StructField('review_character_score', FloatType(), False),
    StructField('review_enjoyment_score', FloatType(), False),
    StructField('score', FloatType(), False),
    StructField('status', StringType(), False),
    StructField('progress', IntegerType(), False),
    StructField('last_interaction_date', DateType(), False)
])

In [5]:
animes_schema = StructType([
    StructField('anime_id', IntegerType(), True),
    StructField('anime_url', StringType(), True),
    StructField('title', StringType(), True),
    StructField('synopsis', StringType(), True),
    StructField('main_pic', StringType(), True),
    StructField('type', StringType(), True),
    StructField('source_type', StringType(), True),
    StructField('num_episodes', IntegerType(), True),
    StructField('status', StringType(), True),
    StructField('start_date', DateType(), True),
    StructField('end_date', DateType(), True),
    StructField('season', StringType(), True),
    StructField('studios', StringType(), True),
    StructField('genres', StringType(), True),
    StructField('score', FloatType(), True),
    StructField('score_count', IntegerType(), True),
    StructField('score_rank', IntegerType(), True),
    StructField('popularity_rank', IntegerType(), True),
    StructField('members_count', IntegerType(), True),
    StructField('favorite_count', IntegerType(), True),
    StructField('watching_count', IntegerType(), True),
    StructField('completed_count', IntegerType(), True),
    StructField('on_hold_count', IntegerType(), True),
    StructField('dropped_count', IntegerType(), True),
    StructField('plan_to_watch_count', IntegerType(), True),
    StructField('total_count', IntegerType(), True),
    StructField('score_10_count', IntegerType(), True),
    StructField('score_09_count', IntegerType(), True),
    StructField('score_08_count', IntegerType(), True),
    StructField('score_07_count', IntegerType(), True),
    StructField('score_06_count', IntegerType(), True),
    StructField('score_05_count', IntegerType(), True),
    StructField('score_04_count', IntegerType(), True),
    StructField('score_03_count', IntegerType(), True),
    StructField('score_02_count', IntegerType(), True),
    StructField('score_01_count', StringType(), True),
    StructField('clubs', StringType(), True),
    StructField('pics', StringType(), True)
])

In [6]:
df_reviews = spark_session.read.options(header=True, delimiter='\t').schema(reviews_schema).csv(review_raw_data_path)

In [7]:
df_animes = spark_session.read.options(header=True, delimiter='\t').schema(animes_schema).csv(animes_raw_data_path)

### Clean Anime Data

In [8]:
df_animes = df_animes.withColumn('genres', split(col('genres'), '\|'))
df_animes = df_animes.withColumn('studios', split(col('studios'), '\|'))
df_animes.select([col('genres'), col('studios')]).take(2)

[Row(genres=['Supernatural'], studios=['J.C.Staff']),
 Row(genres=['Action', 'Adventure'], studios=None)]

In [9]:
df_animes = df_animes.select([col('anime_id'), col('genres'), col('studios')])

### Clean reviews data

In [10]:
df_reviews = df_reviews.select(['status', 'user_id', 'anime_id', 'review_date', 'score'])
df_reviews = df_reviews.filter(col('review_date').isNotNull())

### Join Clubs data and Anime Data

In [11]:
df_reviews.printSchema()

root
 |-- status: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- anime_id: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- score: float (nullable = true)



In [12]:
df_reviews = df_reviews.join(df_animes, on='anime_id')
df_reviews_genres = df_reviews.withColumn('genre', explode(col('genres')))
df_reviews_genres = df_reviews_genres.drop(col('genres'))

### Adding ranking per genre-> date and computing number of reviews and scores

In [13]:
df_reviews_genres = df_reviews_genres.groupby(['genre', 'review_date']).agg(
    count('anime_id').alias('reviews_per_date'),
    percentile_approx("score", 0.5).alias("median_review_score")
)

In [16]:
df_reviews_genres.printSchema()

root
 |-- genre: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- reviews_per_date: long (nullable = false)
 |-- median_review_score: float (nullable = true)



In [25]:
df_reviews_genres = df_reviews_genres.withColumn('rank', dense_rank().over(Window.partitionBy(['review_date']).orderBy(col('reviews_per_date').desc())))

In [29]:
df_reviews_genres.filter(col('review_date') >= '2022-01-01').select([col('genre'), col('review_date'), col('reviews_per_date'), col('rank')]).orderBy(['review_date', 'rank']).show()

+-------------+-----------+----------------+----+
|        genre|review_date|reviews_per_date|rank|
+-------------+-----------+----------------+----+
|       Action| 2022-01-01|              28|   1|
|       Comedy| 2022-01-01|              27|   2|
|        Drama| 2022-01-01|              26|   3|
|       School| 2022-01-01|              26|   3|
|      Fantasy| 2022-01-01|              23|   4|
| Supernatural| 2022-01-01|              22|   5|
|Slice of Life| 2022-01-01|              17|   6|
|      Shounen| 2022-01-01|              17|   6|
|       Sci-Fi| 2022-01-01|              16|   7|
|    Adventure| 2022-01-01|              13|   8|
|      Romance| 2022-01-01|              12|   9|
|      Mystery| 2022-01-01|              12|   9|
|  Super Power| 2022-01-01|              11|  10|
|     Military| 2022-01-01|               8|  11|
|       Seinen| 2022-01-01|               8|  11|
|        Mecha| 2022-01-01|               7|  12|
|Psychological| 2022-01-01|               7|  12|


### Write output to parquet

In [32]:
output_file = '../data/refined/genres_review_stats/'
df_reviews_genres.write.parquet(output_file)