In [39]:
import pyspark

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, count, to_timestamp, year, regexp_extract, coalesce, from_json, col, when, explode, length, size, lit, current_date, trim, dense_rank, regexp_replace, split, countDistinct, to_date, concat_ws
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.window import Window

In [41]:
spark = SparkSession.builder.appName('Adidas_CaseStudy').master('local[*]').getOrCreate()

In [42]:
spark

Reading Dataset

In [43]:
base_df = spark.read.json('/tmp/ol_cdump.json')

Number of rows in raw data set

In [44]:
print('Number of rows in raw data set : '+ str(base_df.count()))

Number of rows in raw data set : 148163


Schema

In [45]:
base_df.printSchema()

root
 |-- alternate_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: struct (nullable = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- bio: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- by_statement: string (nullable = true)
 |-- contributions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- contributors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- copyright_date: string (nullable = true)
 |-- covers: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- created: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- value: stri

# Data profiling

In [46]:
## Merging Similar columns including authors which will get used further.
base_df = base_df.withColumn('dewey_number',coalesce('dewey_decimal_class','dewey_number')).\
withColumn('oclc_number',coalesce('oclc_number','oclc_numbers')).\
withColumn('subject_time',coalesce('subject_time','subject_times')).\
withColumn('work_title',coalesce('work_title','work_titles')).\
withColumn('publish_date',coalesce('publish_date','first_publish_date')).\
withColumn('authors', coalesce('authors.key','authors.author.key')).\
drop('dewey_decimal_class','oclc_numbers','subject_times','work_titles','first_publish_date')

In [47]:
### Trimming Title to remove extra spaces
### Extracting date value for last_modified_datetime
### Extracting publish year which will be used to filter.
base_df = base_df.withColumn('title',trim('title')).\
withColumn('last_modified_datetime',to_timestamp(base_df['last_modified.value'])).drop('last_modified').\
withColumn('publish_year',regexp_extract('publish_date', r'(\d{4})', 1))
#withColumn('authors', regexp_replace('authors', '/authors/', '')).\
#withColumn('authors',split(col('authors'),','))

In [48]:
schema = StructType([StructField("type", StringType(), True),
                    StructField("value", StringType(), True)]
                    )

In [49]:
##Extracting values from key, value data strings.
base_df = base_df.withColumn('description',when(col('description').startswith('{"type":'),from_json(base_df['description'],schema)['value']).otherwise(col('description'))).\
withColumn('first_sentence',when(col('first_sentence').startswith('{"type":'),from_json(base_df['first_sentence'],schema)['value']).otherwise(col('first_sentence'))).\
withColumn('notes',when(col('notes').startswith('{"type":'),from_json(base_df['notes'],schema)['value']).otherwise(col('notes'))).\
withColumn('bio',when(col('bio').startswith('{"type":'),from_json(base_df['bio'],schema)['value']).otherwise(col('bio')))

In [50]:
###All these fields mostly have Null values (Not Required). 
base_df = base_df.drop('full_title','fuller_name','ia_box_id', 'ia_loaded_id', 'isbn_invalid', 'isbn_odd_length', 'website')

In [51]:
###Filtering data publish year from 1950 to current year.
base_df = base_df.filter((col('publish_year') <= lit(year(current_date()))) & (col('publish_year') >= lit('1950')))

In [52]:
print('Number of rows in filtered data set : '+ str(base_df.count()))

Number of rows in filtered data set : 99693


## Book with Most Pages

In [53]:
w = Window.orderBy(col('number_of_pages').desc())
first_df = base_df.select('title','number_of_pages',dense_rank().over(w).alias('page_rank'))

In [54]:
first_df.filter(col('page_rank') == 1).select('title','number_of_pages').show(9,False)

+-----------------------------+---------------+
|title                        |number_of_pages|
+-----------------------------+---------------+
|Nihon shokuminchi kenchikuron|48418          |
+-----------------------------+---------------+



# Top 5 genres

In [55]:
second_df = base_df.select('title',explode('genres').alias('genres'))

In [56]:
second_df = second_df.select('title',trim(regexp_replace('genres', '[^A-Za-z0-9, ]','')).alias('genres'))

In [57]:
second_df.groupBy('genres').\
agg(count('title').alias('book_count')).\
orderBy(col('book_count').desc()).\
show(5,False)

+-------------------+----------+
|genres             |book_count|
+-------------------+----------+
|Fiction            |4319      |
|Biography          |3145      |
|Juvenile literature|2403      |
|Exhibitions        |1652      |
|Juvenile fiction   |912       |
+-------------------+----------+
only showing top 5 rows



# Top 5 authors who (co-)authored  most books

In [58]:
third_df = base_df.select('title','authors')

In [59]:
##This df will have list of all the authors who have co-authored
third_df = third_df.filter(size('authors') > 1)

In [60]:
third_df = third_df.select('title',explode('authors').alias('authors'))

In [61]:
third_df.groupBy('authors').\
agg(count('title').alias('book_count')).\
orderBy(col('book_count').desc()).\
show(5,False)

+-------------------+----------+
|authors            |book_count|
+-------------------+----------+
|/authors/OL3246797A|47        |
|/authors/OL2756622A|13        |
|/authors/OL1901196A|12        |
|/authors/OL1660476A|10        |
|/authors/OL2838553A|7         |
+-------------------+----------+
only showing top 5 rows



# Number of authors that published at least one book. (By Publish Year)

In [62]:
fourth_df = base_df.select('publish_year',explode('authors').alias('authors')).distinct()

In [63]:
fourth_df.groupBy('publish_year').\
agg(count('authors').alias('no_of_authors')).\
orderBy(col('publish_year').desc()).\
show(9999,False)

+------------+-------------+
|publish_year|no_of_authors|
+------------+-------------+
|2013        |2            |
|2011        |1            |
|2010        |295          |
|2009        |2664         |
|2008        |4198         |
|2007        |2549         |
|2006        |1425         |
|2005        |1479         |
|2004        |1562         |
|2003        |1598         |
|2002        |1189         |
|2001        |979          |
|2000        |1194         |
|1999        |1458         |
|1998        |2203         |
|1997        |2821         |
|1996        |2401         |
|1995        |2786         |
|1994        |2494         |
|1993        |2235         |
|1992        |2192         |
|1991        |1952         |
|1990        |2000         |
|1989        |1760         |
|1988        |1595         |
|1987        |1469         |
|1986        |1418         |
|1985        |1339         |
|1984        |1307         |
|1983        |1215         |
|1982        |1145         |
|1981        |

# Number of authors and number of books published per month for years between 1950 and 1970

In [64]:
fifth_df = base_df.filter((col('publish_year') >= lit('1950')) & (col('publish_year') <= lit('1970'))).\
select(explode('authors').alias('authors'),'title','publish_year','publish_date')

In [65]:
fifth_df = fifth_df.withColumn('publish_month', regexp_extract('publish_date',r'([A-Z][a-z]+)', 1))

In [66]:
fifth_df = fifth_df.filter(~(col('publish_month') == lit('')))

In [67]:
fifth_df.groupBy(concat_ws(' ','publish_month','publish_year').alias('month_year')).\
agg(countDistinct('authors').alias('authors'),countDistinct('title').alias('books')).\
orderBy(concat_ws(' ','publish_month','publish_year')).show(99,False)

+---------------+-------+-----+
|month_year     |authors|books|
+---------------+-------+-----+
|December 1958  |0      |1    |
|December 1962  |2      |1    |
|December 1968  |1      |1    |
|January 1960   |1      |1    |
|January 1969   |1      |1    |
|January 1970   |1      |1    |
|July 1963      |1      |1    |
|July 1968      |1      |1    |
|June 1954      |1      |1    |
|June 1955      |1      |1    |
|June 1958      |2      |1    |
|June 1960      |2      |2    |
|June 1962      |2      |1    |
|June 1964      |3      |3    |
|June 1965      |4      |2    |
|June 1967      |1      |1    |
|June 1968      |5      |4    |
|June 1969      |3      |2    |
|June 1970      |3      |2    |
|March 1970     |1      |1    |
|May 1964       |1      |1    |
|May 1966       |1      |1    |
|May 1968       |1      |1    |
|November 1966  |1      |1    |
|October 1953   |1      |1    |
|October 1961   |1      |1    |
|October 1969   |2      |2    |
|September 1957 |1      |1    |
|Septemb

In [None]:
#fifth_df.filter((col('publish_year') == lit('1965')) & (col('publish_month') == lit('June'))).show(4,False)