
#### General Instructions <br /><br />

- All relevant data sets are pre-loaded in the Jupyter Notebook environment.
- The data set for the challenge will be provided when you start the assessment. Load them as required.
- All files are in CSV format. Some files have headers and some files do not have headers
- The files do not have headers
  - CSV files: catgories.csv, customers.csv, departments.csv, orders.csv, order_items.csv, products.csv, customers.csv

#### Question 1

**Fetch the top 10 categories with highest percentage of 'pending orders' in the year 2014**

- Datasets to be used to solve this task are
  - order_items.csv, orders.csv, products.csv, categories.csv
- An order is considered as pending order if the order status is either 'PENDING' or 'PENDING_PAYMENT' (in orders.csv)
- Columns to be fetched are: **category, total_orders, pending_orders, percentage_pending_orders**
- Round the **percentage_pending_orders** to one decimal place
- Sort the data in the DESCENDING order of **percentage_pending_orders**
- The output should have 10 rows, excluding the header
- Save the output as a single CSV file with header in **question1** directory

In [105]:
#Write your import
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType, DoubleType, DateType
from pyspark.sql.functions import col,year,when, count, sum, round, row_number,countDistinct
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("question1").getOrCreate()

# File paths 
categories_file = "categories.csv"
customers_file = "customers.csv"
departments_file = "departments.csv"
order_items_file = "order_items.csv"
orders_file = "orders.csv"
products_file = "products.csv"

#Write your code

#Defining schema
orderSchema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

orderItemSchema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
])

productSchema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("type", StringType(), True),
    StructField("unit_price", DoubleType(), True)
])

categoriesSchema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("category", StringType(), True)
])



#Reading the csv files
orders = spark.read.csv(orders_file,schema = orderSchema)
order_items = spark.read.csv(order_items_file,schema = orderItemSchema)
products = spark.read.csv(products_file,schema = productSchema)
categories = spark.read.csv(categories_file,schema = categoriesSchema)

orders_2014 = orders.filter(year(col('order_date'))==2014)

df_joined = orders_2014.join(order_items,'order_id').join(products,'product_id').join(categories,'category_id')
                                 
result_1 = df_joined.groupBy('category').agg(countDistinct('order_id').alias('total_orders'),
                                           sum(when(col('order_status').isin('PENDING_PAYMENT','PENDING'),1).otherwise(0)).alias('pending_orders')).withColumn('percentage_pending_orders',round((col('pending_orders') / col('total_orders'))*100,1)).orderBy(col('percentage_pending_orders').desc()).limit(10)
# result_1.show()

#saving the output 
result_1.coalesce(1).write.mode('overwrite').option('header','true').csv('question1')

                                                                                


#### Question 2

**Fetch two products with lowest 'total_sale_value' in 'Basketball', 'Football' and 'Soccer' categories in 'Sports' department in the year 2014. Consider orders with status as ‘COMPLETE’ only.** <br/><br/>

- Datasets to be used to solve this task are
  - order_items.csv, orders.csv, products.csv, departments.csv, categories.csv
- Analyze and understand the data in all the datasets as per the ER diagram given above
- ***total_sale_value*** is computed as the ***sum of total_price of all items*** in ***order_items*** dataset. 
- Consider only orders with ***COMPLETE*** as status in orders dataset.
- Round the *total_sale_value* to nearest integer value. 
- Columns to be fetched are: **year, category, product, total_quantity, total_sale_value**
- Sort the data in the ASCENDING order of category and ASCENDING order of total_sale_value.
- The output should have 6 rows, excluding the header
- Save the output as a single CSV file with header in **question2** directory

In [96]:
#Write your code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType, DoubleType, DateType
from pyspark.sql.functions import col,year,when, countDistinct, sum, round, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("question2").getOrCreate()

# Dataset to be used order_items.csv, orders.csv, products.csv, departments.csv, categories.csv
orderSchema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

orderItemSchema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
])

productSchema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("type", StringType(), True),
    StructField("unit_price", DoubleType(), True)
])

categoriesSchema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("category", StringType(), True)
])

departmentSchema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department", StringType(), True)
])

#Reading the csv files
orders = spark.read.csv(orders_file,schema = orderSchema)
order_items = spark.read.csv(order_items_file,schema = orderItemSchema)
products = spark.read.csv(products_file,schema = productSchema)
categories = spark.read.csv(categories_file,schema = categoriesSchema)
departments = spark.read.csv(departments_file,schema = departmentSchema)

# Filtering the order with completed status

completed_orders = orders.filter((year(col('order_date'))=='2014') & (col('order_status')=="COMPLETE"))
# completed_orders.show()

# joining other dataframes
df_joined = completed_orders.join(order_items, 'order_id') \
    .join(products,'product_id') \
    .join(categories, 'category_id') \
    .join(departments, 'department_id')

#Filtering the data
df_filter = df_joined.filter((col('department') =='Sports') & 
                  (col('category').isin('Basketball', 'Football', 'Soccer')))

#Aggregation 
agg_df = df_filter.groupBy(year(col('order_date')).alias('year'),
                             col('category'),
                             col('product')
                          ).agg(sum('quantity').alias('total_quantity'),
                                     round(sum('total_price')).cast('int').alias('total_sale_value'))
windSpec = Window.partitionBy('category').orderBy(col('total_sale_value').asc())

result_2 = agg_df.withColumn('rn', row_number().over(windSpec)).filter(col('rn') <=2).drop('rn')\
.orderBy(col('category').asc(), col('total_sale_value').asc())

#Saving the result to question2 path
result_2.coalesce(1).write.mode('overwrite').option('header','true').csv('question2')

                                                                                

#### Question 3

**Find top two worst performing states which recorded highest percentage of drop in 'total sales' in the year 2014 compared to year 2013 in 'Sports', 'Footwear', 'Fitness' and 'Golf' departments. Consider orders with status as ‘COMPLETE’ only.** <br /><br />

- Datasets to be used to solve this task are
  - order_items.csv, orders.csv, products.csv, departments.csv, categories.csv, customers.csv
- Analyze and understand the data in all the datasets as per the ER diagram given above 
- **total_sales** is computed as **sum of total_price** in **order_items** dataset for that product.
- Consider only COMPLETED orders (order_status in orders should be COMPLETE only)
- Fetch the data related to top 2 states which recorder maximum percentage drop in total sale value in 2014 compared to 2013 in the specified departments.
- Discard the data if there are no sales in either 2013 or 2014 in any state for any department
- Round the **total_sales** values to nearest integer value
- Round the **drop%** to two decimal places
- Columns to be fetched are: **department, state, 2014_sales, 2013_sales, drop%**
- The output should have 8 rows, excluding the header
- Save the output as a single CSV file with header in **question3** directory


In [109]:
#Write your code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType, DoubleType, DateType
from pyspark.sql.functions import col,year,when, countDistinct, sum, round, row_number, first
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("question3").getOrCreate()

# Dataset to be used order_items.csv, orders.csv, products.csv, departments.csv, categories.csv, customers.csv
orderSchema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

orderItemSchema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
])

productSchema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("type", StringType(), True),
    StructField("unit_price", DoubleType(), True)
])

categoriesSchema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("category", StringType(), True)
])

departmentSchema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department", StringType(), True)
])

customerSchema =  StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True),
])


#Reading the csv files
orders = spark.read.csv(orders_file,schema = orderSchema)
order_items = spark.read.csv(order_items_file,schema = orderItemSchema)
products = spark.read.csv(products_file,schema = productSchema)
categories = spark.read.csv(categories_file,schema = categoriesSchema)
departments = spark.read.csv(departments_file,schema = departmentSchema)
customers = spark.read.csv(customers_file,schema = customerSchema)

completed_orders = orders.filter(col('order_status')=="COMPLETE")

df = completed_orders.join(order_items, 'order_id') 
df_join = df.join(products, 'product_id') \
                    .join(categories, 'category_id') \
                    .join(departments, 'department_id') \
                    

target_dept = ['Sports','Footwear','Fitness','Golf']
filter_dpet = df_join.filter(col('department').isin(target_dept))

df_join = filter_dpet.join(customers, 'customer_id')

sales_df = df_join.withColumn('year',year('order_date'))

sales_agg = sales_df.groupBy('department','state','year').agg(round(sum('total_price')).alias('total_sales'))

sales_pivot = sales_agg.groupBy('department','state').pivot('year',[2013, 2014]).agg(first('total_sales')).withColumnRenamed('2013','2013_sales').withColumnRenamed('2014','2014_sales')


sales_drop = sales_pivot.filter(col('2013_sales').isNotNull() & col('2014_sales').isNotNull()) \
.withColumn("drop%", round((col('2013_sales')-col('2014_sales'))/col('2013_sales')*100,2))

windspec = Window.partitionBy('department').orderBy(col('drop%').desc())


result_3 = sales_drop.withColumn('rn', row_number().over(windspec)).filter(col('rn')<=2) \
            .select('department','state',col('sales_2014').alias('2014_sales'),col('sales_2013').alias('2013_sales'),col('drop%'))
result_3.show()
            

# # Total sales as per state,department,year
# sales_df = df.groupBy(col('department'), col('state'), year(col('order_date')).alias('year')).agg(round(sum('total_price')).alias('total_sales'))


# # Filter 2013 sales & 2014 sales
# sales_2013 = sales_df.filter(col('year')=='2013').select('department','state', col('total_sales').alias('sales_2013'))
# sales_2014 = sales_df.filter(col('year')=='2014').select('department','state', col('total_sales').alias('sales_2014'))

# df_joined = sales_2013.join(sales_2014, ['department','state'],'inner')

# result_df = df_joined.withColumn('drop%', round(((col('sales_2013')-col('sales_2014'))/col('sales_2013'))*100, 2) )

# windspec = Window.partitionBy('department').orderBy(col('drop%').desc())

# result_3 = result_df.withColumn('rn', row_number().over(windspec)).filter(col('rn')<=2) \
#             .select('department','state',col('sales_2014').alias('2014_sales'),col('sales_2013').alias('2013_sales'),col('drop%'))\
# .orderBy(col('department'), col("drop%").desc())

# #Saving the result in question3 path

# result_3.coalesce(1).write.mode('overwrite').option('header','true').csv('question3')

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `sales_2014` cannot be resolved. Did you mean one of the following? [`state`, `drop%`, `rn`, `2013_sales`, `2014_sales`].;
'Project [department#14780, state#14790, 'sales_2014 AS 2014_sales#14988, 'sales_2013 AS 2013_sales#14989, drop%#14974]
+- Filter (rn#14981 <= 2)
   +- Project [department#14780, state#14790, 2013_sales#14964, 2014_sales#14969, drop%#14974, rn#14981]
      +- Project [department#14780, state#14790, 2013_sales#14964, 2014_sales#14969, drop%#14974, rn#14981, rn#14981]
         +- Window [row_number() windowspecdefinition(department#14780, drop%#14974 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#14981], [department#14780], [drop%#14974 DESC NULLS LAST]
            +- Project [department#14780, state#14790, 2013_sales#14964, 2014_sales#14969, drop%#14974]
               +- Project [department#14780, state#14790, 2013_sales#14964, 2014_sales#14969, round((((2013_sales#14964 - 2014_sales#14969) / 2013_sales#14964) * cast(100 as double)), 2) AS drop%#14974]
                  +- Filter (isnotnull(2013_sales#14964) AND isnotnull(2014_sales#14969))
                     +- Project [department#14780, state#14790, 2013_sales#14964, 2014#14955 AS 2014_sales#14969]
                        +- Project [department#14780, state#14790, 2013#14954 AS 2013_sales#14964, 2014#14955]
                           +- Project [department#14780, state#14790, __pivot_first(total_sales) AS `first(total_sales)`#14953[0] AS 2013#14954, __pivot_first(total_sales) AS `first(total_sales)`#14953[1] AS 2014#14955]
                              +- Aggregate [department#14780, state#14790], [department#14780, state#14790, pivotfirst(year#14884, first(total_sales)#14947, 2013, 2014, 0, 0) AS __pivot_first(total_sales) AS `first(total_sales)`#14953]
                                 +- Aggregate [department#14780, state#14790, year#14884], [department#14780, state#14790, year#14884, first(total_sales#14936, false) AS first(total_sales)#14947]
                                    +- Aggregate [department#14780, state#14790, year#14884], [department#14780, state#14790, year#14884, round(sum(total_price#14755), 0) AS total_sales#14936]
                                       +- Project [customer_id#14745, department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780, first_name#14784, last_name#14785, email#14786, phone#14787, address#14788, city#14789, state#14790, zip#14791, year(order_date#14744) AS year#14884]
                                          +- Project [customer_id#14745, department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780, first_name#14784, last_name#14785, email#14786, phone#14787, address#14788, city#14789, state#14790, zip#14791]
                                             +- Join Inner, (customer_id#14745 = customer_id#14783)
                                                :- Filter department#14780 IN (Sports,Footwear,Fitness,Golf)
                                                :  +- Project [department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780]
                                                :     +- Join Inner, (department_id#14774 = department_id#14779)
                                                :        :- Project [category_id#14764, product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, department_id#14774, category#14775]
                                                :        :  +- Join Inner, (category_id#14764 = category_id#14773)
                                                :        :     :- Project [product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, category_id#14764, product#14765, type#14766, unit_price#14767]
                                                :        :     :  +- Join Inner, (product_id#14753 = product_id#14763)
                                                :        :     :     :- Project [order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, product_id#14753, quantity#14754, total_price#14755, unit_price#14756]
                                                :        :     :     :  +- Join Inner, (order_id#14743 = order_id#14752)
                                                :        :     :     :     :- Filter (order_status#14746 = COMPLETE)
                                                :        :     :     :     :  +- Relation [order_id#14743,order_date#14744,customer_id#14745,order_status#14746] csv
                                                :        :     :     :     +- Relation [order_item_id#14751,order_id#14752,product_id#14753,quantity#14754,total_price#14755,unit_price#14756] csv
                                                :        :     :     +- Relation [product_id#14763,category_id#14764,product#14765,type#14766,unit_price#14767] csv
                                                :        :     +- Relation [category_id#14773,department_id#14774,category#14775] csv
                                                :        +- Relation [department_id#14779,department#14780] csv
                                                +- Relation [customer_id#14783,first_name#14784,last_name#14785,email#14786,phone#14787,address#14788,city#14789,state#14790,zip#14791] csv


In [110]:
# 3. Filter only COMPLETED orders
orders_filtered = orders.filter(col("order_status") == "COMPLETE")

# 4. Join orders with order_items
order_items_joined = orders_filtered.join(order_items, "order_id")

# 5. Join with products → categories → departments
products_joined = order_items_joined.join(products, "product_id") \
    .join(categories, "category_id") \
    .join(departments, "department_id")

# 6. Filter target departments
target_departments = ["Sports", "Footwear", "Fitness", "Golf"]
filtered_depts = products_joined.filter(col("department").isin(target_departments))

# 7. Join with customers to get state
sales_with_state = filtered_depts.join(customers, "customer_id")

# 8. Extract year from order_date
sales_with_year = sales_with_state.withColumn("year", year("order_date"))

# 9. Aggregate sales per department, state, year
sales_agg = sales_with_year.groupBy("department_name", "state", "year") \
    .agg(round(sum("total_price")).alias("total_sales"))

# 10. Pivot to compare 2013 vs 2014
sales_pivot = sales_agg.groupBy("department", "state") \
    .pivot("year", [2013, 2014]) \
    .agg(first("total_sales")) \
    .withColumnRenamed("2013", "2013_sales") \
    .withColumnRenamed("2014", "2014_sales")

# 11. Calculate drop %
sales_drop = sales_pivot.filter(col("2013_sales").isNotNull() & col("2014_sales").isNotNull()) \
    .withColumn("drop%", round((col("2013_sales") - col("2014_sales")) / col("2013_sales") * 100, 2))

# 12. Rank states by drop% per department and select top 2
window_spec = Window.partitionBy("department").orderBy(col("drop%").desc())

top2_states = sales_drop.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 2) \
    .select("department", "state", "2014_sales", "2013_sales", "drop%")

# 13. Save output as CSV
# top2_states.coalesce(1).write.csv("question3/output.csv", header=True, mode="overwrite")

# 14. Show result in console
top2_states.show(20, truncate=False)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `department_name` cannot be resolved. Did you mean one of the following? [`department_id`, `department`, `first_name`, `last_name`, `order_date`].;
'Aggregate ['department_name, state#14790, year#15067], ['department_name, state#14790, year#15067, round(sum(total_price#14755), 0) AS total_sales#15119]
+- Project [customer_id#14745, department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780, first_name#14784, last_name#14785, email#14786, phone#14787, address#14788, city#14789, state#14790, zip#14791, year(order_date#14744) AS year#15067]
   +- Project [customer_id#14745, department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780, first_name#14784, last_name#14785, email#14786, phone#14787, address#14788, city#14789, state#14790, zip#14791]
      +- Join Inner, (customer_id#14745 = customer_id#14783)
         :- Filter department#14780 IN (Sports,Footwear,Fitness,Golf)
         :  +- Project [department_id#14774, category_id#14764, product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, category#14775, department#14780]
         :     +- Join Inner, (department_id#14774 = department_id#14779)
         :        :- Project [category_id#14764, product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, product#14765, type#14766, unit_price#14767, department_id#14774, category#14775]
         :        :  +- Join Inner, (category_id#14764 = category_id#14773)
         :        :     :- Project [product_id#14753, order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, quantity#14754, total_price#14755, unit_price#14756, category_id#14764, product#14765, type#14766, unit_price#14767]
         :        :     :  +- Join Inner, (product_id#14753 = product_id#14763)
         :        :     :     :- Project [order_id#14743, order_date#14744, customer_id#14745, order_status#14746, order_item_id#14751, product_id#14753, quantity#14754, total_price#14755, unit_price#14756]
         :        :     :     :  +- Join Inner, (order_id#14743 = order_id#14752)
         :        :     :     :     :- Filter (order_status#14746 = COMPLETE)
         :        :     :     :     :  +- Relation [order_id#14743,order_date#14744,customer_id#14745,order_status#14746] csv
         :        :     :     :     +- Relation [order_item_id#14751,order_id#14752,product_id#14753,quantity#14754,total_price#14755,unit_price#14756] csv
         :        :     :     +- Relation [product_id#14763,category_id#14764,product#14765,type#14766,unit_price#14767] csv
         :        :     +- Relation [category_id#14773,department_id#14774,category#14775] csv
         :        +- Relation [department_id#14779,department#14780] csv
         +- Relation [customer_id#14783,first_name#14784,last_name#14785,email#14786,phone#14787,address#14788,city#14789,state#14790,zip#14791] csv


#### Question 4

**Find out all the products that recorded more than 30% drop in total number of units sold (based on ‘quantity’ column in ‘order_items’ dataset) in year 2014 compared to year 2013 in 'Sports' and 'Fitness' departments. Consider orders with status as ‘COMPLETE’ only.** <br /><br />

- Datasets to be used to solve this task are
  - order_items.csv, orders.csv, products.csv, departments.csv, categories.csv
- Quantity of each product is computed as **sum of quantity** in order_items datasets for that product
- Columns to be fetched are: **department, product, 2013_qty, 2014_qty, drop%**
    - 2013_qty: total quantity of the product in the year 2013
    - 2014_qty: total quantity of the product in the year 2014
    - drop%: drop in 2014 over 2013
- Sort the data in the **ASCENDING order of department** and in the **DESCENDING order of drop%**.
- Round the drop% to nearest integer value.
- The output should have 4 rows, excluding the header
- Save the output as a single CSV file with header in **question4** directory

In [94]:
#Write your code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType, DoubleType, DateType
from pyspark.sql.functions import col,year,when, countDistinct, sum, round, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("question4").getOrCreate()

# Dataset to be used order_items.csv, orders.csv, products.csv, departments.csv, categories.csv
orderSchema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

orderItemSchema = StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_price", DoubleType(), True),
    StructField("unit_price", DoubleType(), True),
])

productSchema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("type", StringType(), True),
    StructField("unit_price", DoubleType(), True)
])

categoriesSchema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("department_id", IntegerType(), True),
    StructField("category", StringType(), True)
])

departmentSchema = StructType([
    StructField("department_id", IntegerType(), True),
    StructField("department", StringType(), True)
])




#Reading the csv files
orders = spark.read.csv(orders_file,schema = orderSchema)
order_items = spark.read.csv(order_items_file,schema = orderItemSchema)
products = spark.read.csv(products_file,schema = productSchema)
categories = spark.read.csv(categories_file,schema = categoriesSchema)
departments = spark.read.csv(departments_file,schema = departmentSchema)

#completed orders
completed_orders = orders.filter(col('order_status')=="COMPLETE")

#Joining dataframes
df = completed_orders.join(order_items, 'order_id') \
                    .join(products, 'product_id') \
                    .join(categories, 'category_id') \
                    .join(departments, 'department_id') 
df = df.filter(col('department').isin('Sports','Fitness'))

#Aggregating the dataframe
df_agg = df.groupBy('department', 'product',year('order_date').alias('year')).agg(sum('quantity').alias('total_qty'))

# Split 2013 & 2014 records

qty_2013 = df_agg.filter(col('year')== 2013).select('department','product',col('total_qty').alias('2013_qty'))
qty_2014 = df_agg.filter(col('year')== 2014).select('department','product',col('total_qty').alias('2014_qty'))

df_joined = qty_2013.join(qty_2014, ['department','product'],'inner')

# Filtering drop %
drop_df = df_joined.withColumn('drop%',
                               round(((col('2013_qty')-col('2014_qty'))/col('2013_qty'))*100).cast('int'))
result_4 = drop_df.filter(col('drop%') > 30).orderBy(col('department').asc(), col('drop%').desc())

result_4.coalesce(1).write.mode('overwrite').option('header','true').csv('question4')

                                                                                