In [4]:
df = spark.read.csv("gs://mit805_data_bucket/data_raw/final_animedataset.csv", header = True, inferSchema=True)


                                                                                

In [2]:
display(df)

DataFrame[username: string, anime_id: int, my_score: int, user_id: int, gender: string, title: string, type: string, source: string, score: string, scored_by: double, rank: double, popularity: double, genre: string]

In [5]:
# 2. Basic Statistics
df.describe().show()

                                                                                

+-------+-------------+------------------+------------------+------------------+----------+------------------+--------------------+------------+------------------+------------------+------------------+-----------------+------------------+
|summary|     username|          anime_id|          my_score|           user_id|    gender|             title|                type|      source|             score|         scored_by|              rank|       popularity|             genre|
+-------+-------------+------------------+------------------+------------------+----------+------------------+--------------------+------------+------------------+------------------+------------------+-----------------+------------------+
|  count|     35305439|          35305695|          35305695|          35305695|  35305695|          35305695|            35305695|    35305695|          35305695|          35305695|          34554095|         35305325|          35303428|
|   mean|     Infinity|11481.590444006271| 4

## 3. Checking for Missing Values
Determine the count of missing values in each column:

In [7]:
from pyspark.sql.functions import isnan, when, count, col

missing_data = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
missing_data.show()




+--------+--------+--------+-------+------+-----+----+------+-----+---------+------+----------+-----+
|username|anime_id|my_score|user_id|gender|title|type|source|score|scored_by|  rank|popularity|genre|
+--------+--------+--------+-------+------+-----+----+------+-----+---------+------+----------+-----+
|     256|       0|       0|      0|     0|    0|   0|     0|    0|        0|751600|       370| 2267|
+--------+--------+--------+-------+------+-----+----+------+-----+---------+------+----------+-----+



                                                                                

In [7]:
#Determine number of unique Anime Titles
unique_titles =df.select("title").distinct().rdd.flatMap(lambda x: x).collect()
len(unique_titles)

                                                                                

8746

In [6]:
8746

8746

In [10]:
# Count the number of unique titles that do not have a rank value in the "rank" column
missing_rank_count = df.filter(df["rank"].isNull()).select("title").distinct().count()
missing_rank_count

                                                                                

1107

## 4. Handling Missing Values

In [6]:
# 3. Checking for Missing Values
for col in df.columns:
    print(col, ":", df.filter(df[col].isNull()).count())

                                                                                

username : 256




anime_id : 0




my_score : 0


                                                                                

user_id : 0


                                                                                

gender : 0


                                                                                

title : 0


                                                                                

type : 0


                                                                                

source : 0


                                                                                

score : 0


                                                                                

scored_by : 0




rank : 751600


                                                                                

popularity : 370




genre : 2267


                                                                                

In [None]:
#Drop rows where any of the specified columns have missing values
cleaned_data = data.dropna(subset=["genre", "popularity", "username"])

In [8]:
# 4. Distribution of User Scores
df.groupBy("my_score").count().orderBy("my_score").show()

[Stage 50:>                                                         (0 + 1) / 1]

+--------+--------+
|my_score|   count|
+--------+--------+
|       0|13743594|
|       1|  137481|
|       2|  165239|
|       3|  277507|
|       4|  586871|
|       5| 1318123|
|       6| 2514152|
|       7| 4822985|
|       8| 5329087|
|       9| 3721491|
|      10| 2689165|
+--------+--------+



                                                                                

In [2]:
# 5. Top Rated Animes
top_rated = df.select("title", "score", "scored_by").distinct().orderBy("score", ascending=False)
top_rated.show(10)


                                                                                

+--------------------+------------+---------+
|               title|       score|scored_by|
+--------------------+------------+---------+
|"Marriage Blue: "...|Visual novel|     6.42|
|"Gyakuten Saiban:...|        Game|     6.44|
|Fullmetal Alchemi...|        9.25| 733592.0|
|      Kimi no Na wa.|        9.19| 471398.0|
|       Steins;Gate 0|        9.15|  26739.0|
|            Gintama°|        9.15|  71751.0|
|         Steins;Gate|        9.14| 563857.0|
|Hunter x Hunter (...|        9.11| 403377.0|
|Ginga Eiyuu Densetsu|        9.11|  29036.0|
|       Gintama&#039;|        9.11|  92025.0|
+--------------------+------------+---------+
only showing top 10 rows



                                                                                

In [4]:
top_rated.write.save("gs://mit805_data_bucket/Processed/top_rated.csv",format="csv")

                                                                                

In [10]:
# 6. Most Active Users
df.groupBy("username").count().orderBy("count", ascending=False).show(10)




+-------------+-----+
|     username|count|
+-------------+-----+
|  spacecowboy| 8745|
|   TsukasaKei| 8745|
|      uemmega| 8594|
|       Exxorn| 8562|
| DeadlyKizuna| 8159|
|       xbhrjd| 8084|
|   JakCooper2| 7897|
|      De_Baer| 7414|
|   Dedzapadlo| 7239|
|DesireDestiny| 7012|
+-------------+-----+
only showing top 10 rows



                                                                                

In [11]:
# 7. Distribution by Type and Source
df.groupBy("type").count().show()




+--------------------+--------+
|                type|   count|
+--------------------+--------+
|                  TV|23696077|
|           Igi Ari!"|    7790|
| Doushite Konna O...|     370|
|             Special| 2576837|
|                 OVA| 3952868|
|               Music|  176054|
|               Movie| 4270626|
|                 ONA|  625073|
+--------------------+--------+



                                                                                

In [12]:
df.groupBy("source").count().show()




+-------------+--------+
|       source|   count|
+-------------+--------+
|           TV|    7790|
| Visual novel| 2462047|
|Digital manga|   17600|
|     Original| 6400588|
|        Novel| 1320645|
| Picture book|   12522|
|         Book|   68975|
|      Unknown| 1402138|
|        Other|  413565|
|        Radio|    2882|
|        Manga|14655746|
| 4-koma manga| 1008246|
|          OVA|     370|
|        Music|   96945|
|         Game| 1270683|
|    Web manga|  509959|
|    Card game|   88883|
|  Light novel| 5566111|
+-------------+--------+



ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
