<a href="https://colab.research.google.com/github/nb2195/open_lib_analysis_case_study/blob/main/Adidas_Case_Study_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

###The steps below is optional. The code snippet installs pyspark module on the host and mounts and external storage location.

In [None]:
!pip install pyspark

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


##Importing necessary PySpark libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
import pyspark.sql.functions as F
from pyspark.sql.window import Window

###Initializing spark session

In [None]:
spark = SparkSession.builder\
                    .appName('Adidas')\
                    .getOrCreate()

In [None]:
path = '/content/drive/MyDrive/Adidas/ol_cdump.json'

inp = spark.read.option('encoding','utf-8')\
                .json(path)

In [None]:
# inp.printSchema()

###The flatten method resolves complex data types such as ArrayType and StructType in order to un-nest the columns and 'flatten' the JSON file. This enables querying on the dataset for insight generation.

In [None]:
def flatten(df):
    complex_fields = dict([
        (field.name, field.dataType) 
        for field in df.schema.fields 
        if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
    ])
    
    # print('complex_fields dict',complex_fields)

    qualify = list(complex_fields.keys())[0] + "_"

    while len(complex_fields) != 0:
        col_name = list(complex_fields.keys())[0]
        
        if isinstance(complex_fields[col_name], T.StructType):
            expanded = [F.col(col_name + '.' + k).alias(col_name + '_' + k) 
                        for k in [ n.name for n in  complex_fields[col_name]]
                       ]
            
            df = df.select("*", *expanded).drop(col_name)
    
        elif isinstance(complex_fields[col_name], T.ArrayType): 
            df = df.withColumn(col_name, F.explode_outer(col_name))
      
        complex_fields = dict([
            (field.name, field.dataType)
            for field in df.schema.fields
            if isinstance(field.dataType, T.ArrayType) or isinstance(field.dataType, T.StructType)
        ])
        
        
    for df_col_name in df.columns:
        df = df.withColumnRenamed(df_col_name, df_col_name.replace(qualify, ""))

    return df

In [None]:
flattened_df = flatten(inp)
# flattened_df.printSchema()

##Cleaning the dataset by appying necessary filters - 
1. Title should not be null
2. The Number of pages should be greater than 20
3. The publish date should be later than 1950

Fetching records which satisfy the above criteria

In [None]:
filtered_inp = flattened_df.where('title IS NOT NULL')\
                           .where('number_of_pages > 20')\
                           .where('publish_date > 1950')

## Ques 1: Select all "Harry Potter" books

The code below will fetch records where Harry Potter is properly spelled in the title column. However, we'll need to perform spell checks to make sure we are capturing records where 'Harry Potter' is incorrectly spelled. This can be acieved through Autocorrect Python package to look for strings which resemble 'Harry Potter'.

In [None]:
filtered_inp.where('upper(title) like "%HARRY%POTTER%" OR upper(description) like "%HARRY%POTTER%"')\
            .show(truncate=False)

DataFrame[alternate_names: string, bio: string, birth_date: string, by_statement: string, contributions: string, copyright_date: string, covers: bigint, death_date: string, description: string, dewey_decimal_class: string, dewey_number: string, download_url: string, edition_name: string, first_publish_date: string, first_sentence: string, full_title: string, fuller_name: string, genres: string, ia_box_id: string, ia_loaded_id: string, isbn_10: string, isbn_13: string, isbn_invalid: string, isbn_odd_length: string, key: string, latest_revision: bigint, lc_classifications: string, lccn: string, location: string, name: string, notes: string, number_of_pages: bigint, ocaid: string, oclc_number: string, oclc_numbers: string, other_titles: string, pagination: string, personal_name: string, photos: bigint, physical_dimensions: string, physical_format: string, publish_country: string, publish_date: string, publish_places: string, publishers: string, purchase_url: string, revision: bigint, seri

## Ques 2: Get the book with the most pages

In [None]:
max_pg = filtered_inp.agg(max('number_of_pages').alias('max_num_pages')).head()[0]

filtered_inp.where('number_of_pages={0}'.format(max_pg))\
            .select('full_title','title','number_of_pages')\
            .distinct()

# filtered_inp.createOrReplaceTempView('filtered_inp')

# spark.sql('''
#     select distinct full_title, title, number_of_pages from filtered_inp where number_of_pages = (select max(number_of_pages) from filtered_inp)
# ''').show(truncate=False)

DataFrame[full_title: string, title: string, number_of_pages: bigint]

##Ques 3: Find the Top 5 authors with most written books (assuming author in first position in the array, "key" field and each row is a different book)

In [None]:
filtered_inp.withColumn('isbn_comb',coalesce('isbn_10','isbn_13'))\
            .where('authors_key IS NOT NULL')\
            .groupBy('authors_key')\
            .agg(count('isbn_comb').alias('cnt_isbn_comb'))\
            .orderBy('cnt_isbn_comb',ascending=False)\
            .limit(5)\
            .withColumn('authors_key',split(col('authors_key'),'/').getItem(2))\
            .show(truncate=False)

+-----------+-------------+
|authors_key|cnt_isbn_comb|
+-----------+-------------+
|OL6551378A |51675        |
|OL6382015A |35400        |
|OL1056984A |10404        |
|OL6538270A |6432         |
|OL2695836A |5832         |
+-----------+-------------+



## Ques 4: Find the Top 5 genres with most books

In [None]:
filtered_inp.withColumn('isbn_comb',coalesce('isbn_10','isbn_13'))\
            .where('genres IS NOT NULL')\
            .groupBy('genres')\
            .agg(count(col('isbn_comb')).alias('cnt_isbn_comb'))\
            .orderBy(col('cnt_isbn_comb'),ascending=False)\
            .limit(5)\
            .show(truncate=False)

## Ques 5: Get the avg. number of pages

In [None]:
filtered_inp.agg(round(avg('number_of_pages'),3).alias('avg_num_pages')).show(truncate=False)

+-------------+
|avg_num_pages|
+-------------+
|280.142      |
+-------------+



## Ques 6: Per publish year, get the number of authors that published at least one book

In [None]:
#confirming that publish date column has clean data

filtered_inp.where('length(publish_date) = 4').count()
filtered_inp.count()

661841

In [None]:
filtered_inp.withColumn('isbn_comb',coalesce('isbn_10','isbn_13'))\
            .select(col('publish_date').cast('int'),'authors_key','isbn_comb')\
            .groupBy('publish_date','authors_key')\
            .agg(count('isbn_comb').alias('cnt_isbn_comb'))\
            .where('cnt_isbn_comb > 0')\
            .groupBy('publish_date')\
            .agg(count('authors_key').alias('cnt_authors'))\
            .show(truncate=False)