###Creating the Spark session to perform all operations

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import requests

spark = SparkSession.builder.getOrCreate()

###Reverting to the legacy Spark time parsing to handle the inconsistent publish date formats

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

###Reading the JSON document straight out of S3 using inbuilt PySpark functions and checking the record count

In [0]:
df = spark.read.json("s3a://csparkdata/ol_cdump.json")

In [0]:
df.count()

Out[7]: 148163

In [0]:
df.printSchema()

root
 |-- alternate_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: struct (nullable = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- bio: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- by_statement: string (nullable = true)
 |-- contributions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- contributors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- copyright_date: string (nullable = true)
 |-- covers: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- created: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- value: stri

###Checking how many author names are not populated

In [0]:
spark.sql('''
select 
  case when name is null then "null"
  else "not null"
  end as author_name_status,
  count(*)
  from df
  group by 1
''').show()

+------------------+--------+
|author_name_status|count(1)|
+------------------+--------+
|          not null|     339|
|              null|  147824|
+------------------+--------+



###Parsing the inconsistent publish dates

In [0]:
df = df.withColumn("publish_date", f.regexp_replace(f.col("publish_date"), "([0-9])th", "$1"))

In [0]:
df = df.withColumn("publish_date_formatted", f.expr('''
case
    when lower(publish_date) like "%u" then null
    when to_date(publish_date, "dd MMM yyyy") is not null and to_date(publish_date, "dd MMM yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "dd MMM yyyy")
    when to_date(publish_date, "dd/MM/yyyy") is not null and to_date(publish_date, "dd/MM/yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "dd/MM/yyyy")
    when to_date(publish_date, "MM/dd/yyyy") is not null and to_date(publish_date, "MM/dd/yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MM/dd/yyyy")
    when to_date(publish_date, "MM-dd-yyyy") is not null and to_date(publish_date, "MM-dd-yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MM-dd-yyyy")
    when to_date(publish_date, "MM.dd.yyyy") is not null and to_date(publish_date, "MM.dd.yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MM.dd.yyyy")
    when to_date(publish_date, "MM/dd/yy") is not null and to_date(publish_date, "MM/dd/yy") <= to_date(current_timestamp()) then to_date(publish_date, "MM/dd/yy")
    when to_date(publish_date, "MM-dd-yy") is not null and to_date(publish_date, "MM-dd-yy") <= to_date(current_timestamp()) then to_date(publish_date, "MM-dd-yy")
    when to_date(publish_date, "MM/yyyy") is not null and to_date(publish_date, "MM/yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MM/yyyy")
    when to_date(publish_date, "MMMM d, yyyy") is not null and to_date(publish_date, "MMMM d, yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MMMM d, yyyy")
    when to_date(publish_date, "MMMM d,yy") is not null and to_date(publish_date, "MMMM d,yy") <= to_date(current_timestamp()) then to_date(publish_date, "MMMM d,yy")
    when to_date(publish_date, "MMMM yyyy") is not null and to_date(publish_date, "MMMM yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MMMM yyyy")
    when to_date(publish_date, "MMMM, yyyy") is not null and to_date(publish_date, "MMMM, yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "MMMM, yyyy")
    when to_date(publish_date, "yyyy") is not null and to_date(publish_date, "yyyy") <= to_date(current_timestamp()) then to_date(publish_date, "yyyy")
    else null
end
'''))

In [0]:
df.createOrReplaceTempView("df")

###Creating publish_year column

In [0]:
df_w_pub_year = spark.sql('''
  select *,
  case 
    when publish_date_formatted is not null then year(publish_date_formatted) 
    else 0000
  end as publish_year
  from df
''')

In [0]:
df_w_pub_year.createOrReplaceTempView("df_pub_year")

###Applying data cleanup filters

In [0]:
df_cleaned = spark.sql('''
  select * from df_pub_year 
  where number_of_pages > 20
  and publish_year >= 1950
  and publish_date_formatted is not null
  and key is not null
  and authors is not null
''')

In [0]:
df_cleaned.count()

Out[22]: 77817

###Creating a udf to parse the author keys present in the data as a list of tuples, in a more readable format

In [0]:
def parse_author_keys(author_keys):
  return ",".join([x[1].split("/")[2] for x in author_keys])

parse_author_keys_udf = f.udf(parse_author_keys)
spark.udf.register("parse_author_keys_udf", parse_author_keys)

Out[23]: <function __main__.parse_author_keys(author_keys)>

###Creating a udf to parse the book keys present in the data in a more readable format

In [0]:
def parse_book_key(book_key):
  return book_key.split("/")[2]

parse_book_key_udf = f.udf(parse_book_key)
spark.udf.register("parse_book_key_udf", parse_book_key)

Out[24]: <function __main__.parse_book_key(book_key)>

###Creating a udf to make a REST call to the OpenLibrary API to fetch the correct author names for the corresponding author keys

In [0]:
def get_author_name(author_key):
  url = f"https://openlibrary.org/authors/{author_key}.json"
  try:
    r = requests.get(url = url)
    author_name = r.json()["name"]
    return author_name
  except:
    return None
  
get_author_name_udf = f.udf(get_author_name)
spark.udf.register("get_author_name_udf", get_author_name)

Out[25]: <function __main__.get_author_name(author_key)>

In [0]:
df_cleaned = df_cleaned.withColumn("author_keys", parse_author_keys_udf(f.col("authors")))

In [0]:
df_cleaned = df_cleaned.withColumn("book_key", parse_book_key_udf(f.col("key")))

In [0]:
df_cleaned.createOrReplaceTempView("data")

###Q1: Get the book with the most pages

In [0]:
ques1 = spark.sql('''
  select distinct book_key, title as book_title, number_of_pages from
  (select *, dense_rank() over (order by number_of_pages desc) as page_rank from data)
  where page_rank = 1
''').toPandas()

In [0]:
ques1

Unnamed: 0,book_key,book_title,number_of_pages
0,OL22855337M,Nihon shokuminchi kenchikuron,48418


###Q2: Find the top 5 genres with most books

In [0]:
ques2_inter = spark.sql('''
  select *, explode(genres) as genre from data
''')

ques2_inter = ques2_inter.withColumn("genre", f.regexp_replace(f.col("genre"), "\.$", ""))

ques2_inter.createOrReplaceTempView("ques2_inter")

ques2 = spark.sql('''
  select * from (
  select rank() over (order by books desc) as genre_rank, genre, books from
  (select genre, count(distinct key) as books from ques2_inter group by 1)
  )
  order by genre_rank
''').toPandas()

In [0]:
ques2[ques2.genre_rank <= 5]

Unnamed: 0,genre_rank,genre,books
0,1,Fiction,3415
1,2,Biography,2746
2,3,Juvenile literature,1677
3,4,Exhibitions,1150
4,5,Juvenile fiction,693


###Q3: Retrieve the top 5 authors who (co-)authored the most books

In [0]:
ques3_inter = spark.sql('''
  select *, explode(split(author_keys, ",")) as author_key from data
''')

ques3_inter.createOrReplaceTempView("ques3_inter")

ques3 = spark.sql('''
  select author_rank, author_key, author_name_data, get_author_name_udf(author_key) as author_name_site, books from (
  select *, rank() over (order by books desc) as author_rank from
  (select author_key, name as author_name_data, count(distinct key) as books from ques3_inter group by 1,2)
  )
  where author_rank <= 5
  order by author_rank
''').toPandas()

In [0]:
ques3

Unnamed: 0,author_rank,author_key,author_name_data,author_name_site,books
0,1,OL1224818A,,California. Dept. of Water Resources.,236
1,2,OL4283462A,,Jirō Akagawa,116
2,3,OL785848A,,John Harold Haynes,106
3,4,OL539875A,,Philip M. Parker,90
4,5,OL1926829A,,San Francisco (Calif.). Dept. of City Planning.,80


###Per publish year, get the number of authors that published at least one book

In [0]:
ques4 = spark.sql('''
  select publish_year, count(distinct author_key) as authors from
  (select *, explode(split(author_keys, ",")) as author_key from data)
  group by 1
  order by 1
''').toPandas()

In [0]:
pd.set_option("display.max_rows", None)
ques4

Unnamed: 0,publish_year,authors
0,1950,614
1,1951,648
2,1952,601
3,1953,601
4,1954,645
5,1955,589
6,1956,638
7,1957,706
8,1958,755
9,1959,830


###Q5: Find the number of authors and number of books published per month for years between 1950 and 1970

In [0]:
ques5 = spark.sql('''
  select date_format(publish_date_formatted, "yyyy-MM") as publish_year_month, count(distinct author_key) as authors, count(distinct key) as books from
  (select *, explode(split(author_keys, ",")) as author_key from data)
  where publish_year >= 1950 and publish_year <= 1970
  group by 1
  order by 1
''').toPandas()

In [0]:
ques5

Unnamed: 0,publish_year_month,authors,books
0,1950-01,614,647
1,1951-01,648,676
2,1952-01,601,633
3,1953-01,600,622
4,1953-10,1,1
5,1954-01,644,666
6,1954-06,1,1
7,1955-01,588,611
8,1955-06,1,1
9,1956-01,638,664
