In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("onepiece").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/23 20:23:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
file = './onepieceanime/ONE PIECE.csv'
df = spark.read.csv(file,header=True, inferSchema=True)

                                                                                

In [3]:
df.show(10)

+---+------+-----+------+-------+--------------------+-----+-----------+--------------+
|_c0|  rank|trend|season|episode|                name|start|total_votes|average_rating|
+---+------+-----+------+-------+--------------------+-----+-----------+--------------+
|  0|24,129|   18|     1|      1|I'm Luffy! The Ma...| 1999|        647|           7.6|
|  1|29,290|   11|     1|      2|The Great Swordsm...| 1999|        473|           7.8|
|  2|32,043|    7|     1|      3|Morgan vs. Luffy!...| 1999|        428|           7.7|
|  3|28,818|    8|     1|      4|Luffy's Past! The...| 1999|        449|           8.1|
|  4|37,113|    4|     1|      5|Fear, Mysterious ...| 1999|        370|           7.5|
|  5|36,209|    4|     1|      6|Desperate Situati...| 1999|        364|           7.7|
|  6|37,648|    4|     1|      7|Sozetsu Ketto! Ke...| 1999|        344|           7.7|
|  7|38,371|    6|     1|      8|Shousha wa docchi...| 1999|        335|           7.7|
|  8|42,249|    5|     1|      9

23/09/23 20:23:14 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , rank, trend, season, episode, name, start, total_votes, average_rating
 Schema: _c0, rank, trend, season, episode, name, start, total_votes, average_rating
Expected: _c0 but found: 
CSV file: file:///Users/sukumarsubudhi/Downloads/Learning/personal_projects/one-piece-de-project/onepieceanime/ONE%20PIECE.csv


In [4]:
df.count()

958

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- rank: string (nullable = true)
 |-- trend: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- episode: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- start: integer (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- average_rating: double (nullable = true)



We observe that the columns like rank, trend, total_votes are imported as string
We need to convert them to integer columns

In [6]:
df_copy = df.cache()

In [7]:
# Cleaning the 3 columns to int
from pyspark.sql import functions as F

for colname in ["rank", "trend", "total_votes"]:
    df = df.withColumn(colname, F.regexp_replace(F.col(colname), ",", ""))
    df = df.withColumn(colname, df[colname].cast("int"))

In [8]:
df.filter(F.col("total_votes").isNull()).show()

23/09/23 20:23:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , rank, trend, season, episode, name, start, total_votes, average_rating
 Schema: _c0, rank, trend, season, episode, name, start, total_votes, average_rating
Expected: _c0 but found: 
CSV file: file:///Users/sukumarsubudhi/Downloads/Learning/personal_projects/one-piece-de-project/onepieceanime/ONE%20PIECE.csv


+---+----+-----+------+-------+----+-----+-----------+--------------+
|_c0|rank|trend|season|episode|name|start|total_votes|average_rating|
+---+----+-----+------+-------+----+-----+-----------+--------------+
+---+----+-----+------+-------+----+-----+-----------+--------------+



[Stage 6:>                                                          (0 + 1) / 1]                                                                                

In [9]:
df.filter(F.col("rank").isNull()).show()

+---+----+-----+------+-------+----+-----+-----------+--------------+
|_c0|rank|trend|season|episode|name|start|total_votes|average_rating|
+---+----+-----+------+-------+----+-----+-----------+--------------+
+---+----+-----+------+-------+----+-----+-----------+--------------+



In [10]:
df.filter(F.col("trend").isNull()).show()

+---+-----+-----+------+-------+--------------------+-----+-----------+--------------+
|_c0| rank|trend|season|episode|                name|start|total_votes|average_rating|
+---+-----+-----+------+-------+--------------------+-----+-----------+--------------+
|141|82792| null|     1|    142|Ransen Hisshi! We...| 2003|        145|           6.8|
|142|71522| null|     1|    143|Soshite Densetsu ...| 2003|        150|           7.5|
|195|77786| null|     1|    196|Hijoujitai Hatsur...| 2004|        143|           7.2|
|196|75763| null|     1|    197|Ryourinin Sanji! ...| 2004|        141|           7.4|
|203|77161| null|     1|    204|Ougon Dakkan Saku...| 2004|        137|           7.4|
|206|74726| null|     1|    207|Long Ring Long La...| 2004|        144|           7.4|
|207|74069| null|     1|    208|Foxy Kaizoku Dan ...| 2004|        146|           7.4|
|208|73811| null|     1|    209|Dai Ikkaisen! Gur...| 2004|        143|           7.5|
|209|74363| null|     1|    210|Gingitsune 

In [11]:
df.filter(F.col("trend").isNull()).count()

374

In [12]:
df_copy.filter(F.col("trend")=="-").count()

374

In [13]:
df.filter(F.col("total_votes").isNull()).count()

0

The values in trend column as "-" have been converted to null, which is okay

In [21]:
df.agg(F.min("rank")).collect()[0][0]

2940

In [22]:
df.agg(F.max("rank")).collect()[0][0]

126450

The column _rank_ is not very meaningful

In [26]:
# Output1 - Season-wise
df.groupBy("season").agg({"episode" : "count"}).collect()

[Row(season=1, count(episode)=958)]

In [30]:
df.select("season").distinct().show()

+------+
|season|
+------+
|     1|
+------+



Looks like _season_ column is not very meaningful either. 
We cant use it for analysis

In [32]:
# Yearwise summary
df.createOrReplaceTempView("df_sql")
query = """
select 
    start,
    count(episode) as no_episodes,
    sum(total_votes) as total_votes,
    sum(total_votes) / count(episode) as avg_votes_per_episode,
    avg(average_rating) as avg_rating
from df_sql
group by start
"""
spark.sql(query).show(10,False)

+-----+-----------+-----------+---------------------+-----------------+
|start|no_episodes|total_votes|avg_votes_per_episode|avg_rating       |
+-----+-----------+-----------+---------------------+-----------------+
|2003 |37         |5784       |156.32432432432432   |7.64054054054054 |
|2007 |45         |6707       |149.04444444444445   |7.600000000000002|
|2018 |47         |4433       |94.31914893617021    |7.919148936170211|
|2015 |48         |6291       |131.0625             |8.202083333333334|
|2006 |38         |5666       |149.10526315789474   |7.894736842105263|
|2013 |48         |5704       |118.83333333333333   |7.416666666666667|
|2014 |50         |6161       |123.22               |7.828000000000003|
|2019 |49         |7514       |153.3469387755102    |7.908163265306124|
|2004 |39         |5866       |150.4102564102564    |7.753846153846155|
|2020 |41         |6483       |158.1219512195122    |7.887804878048779|
+-----+-----------+-----------+---------------------+-----------

In [33]:
output_file = "./outputs/year_wise_summary"
spark.sql(query).write.mode("overwrite").parquet(output_file)

                                                                                

In [35]:
raw_partitioned_file = "./outputs/raw_data"
df.write.partitionBy("start").mode("overwrite").parquet(raw_partitioned_file)

                                                                                