# **Week-3 Pyspark**

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# **Initalizing spark session**

In [6]:
spark = SparkSession.builder.appName("week-3").getOrCreate()
spark

# **Loading large datasets**
---
Same as `week-2` datasets but contains more records with little alterations




In [56]:
dfEmp = spark.read.csv(r"/content/employees.csv", header=True, inferSchema=True)
dfProd = spark.read.csv(r"/content/products.csv", header=True, inferSchema=True)
dfSales = spark.read.csv(r"/content/sales.csv", header=True, inferSchema=True)
dfStores = spark.read.csv(r"/content/stores.csv", header=True, inferSchema=True)

In [16]:
dfEmp.printSchema()

root
 |-- employeeID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- storeID: integer (nullable = true)
 |-- role: string (nullable = true)
 |-- hireDate: date (nullable = true)



In [22]:
dfProd.printSchema()

root
 |-- productID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- sellprice: double (nullable = true)
 |-- cost: double (nullable = true)
 |-- dateAdded: date (nullable = true)
 |-- Margin: double (nullable = true)
 |-- Margin %: double (nullable = true)



In [18]:
dfSales.printSchema()

root
 |-- saleID: integer (nullable = true)
 |-- productID: integer (nullable = true)
 |-- storeID: integer (nullable = true)
 |-- employeeID: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- saleDate: date (nullable = true)



In [19]:
dfStores.printSchema()

root
 |-- storeID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- region: string (nullable = true)
 |-- address: string (nullable = true)
 |-- createdAt: date (nullable = true)



# **Filtering data for underperforming products**

In [45]:
dfJoined = dfProd.join(dfSales, on="productID", how="inner")

In [35]:
marginMean = dfJoined.select(F.mean("Margin %")).collect()[0][0]

1.   Filter Margin% > avergage margin.
2.   Filter days without being sold < 408 days.
3.   Adding quantity * margin column.
4.   Groupby ProductID order by sum of quantity & marginRevenue by ascending order.
5.   Join with product column to display product name.



In [50]:
underPerforming = dfJoined.withColumn("tillDate", F.abs(F.date_diff(F.col("dateAdded"), F.current_date()))) \
  .filter((F.col("Margin %") < marginMean) & (F.col("tillDate") < 408))\
  .withColumn("marginRevenue", F.col("quantity") * F.col("Margin")) \
  .groupby("productID") \
  .agg(
      F.sum("quantity").alias("totalSold"),
      F.sum("marginRevenue").alias("marginRevenue")
      )

underPerforming = underPerforming.join(dfProd.select(["productID", "name"]), on="productID", how="inner").sort("marginRevenue")
underPerforming.select("name", "totalSold", "marginRevenue").show()

+--------------+---------+-------------+
|          name|totalSold|marginRevenue|
+--------------+---------+-------------+
|  Smart TV 42"|        1|         50.0|
|Microwave Oven|        2|         60.0|
|Wireless Mouse|       12|         60.0|
|    Headphones|        5|         75.0|
|   Office Desk|        2|        140.0|
+--------------+---------+-------------+



# **Calculating monthly average**

In [62]:
df_sales = dfSales.join(dfStores, on="storeID", how="inner").join(dfProd, on="productID", how="inner")
df_sales = df_sales.withColumn("saleMonth", F.month("saleDate")) \
            .withColumn("amount", F.col("quantity") * F.col("sellprice"))

In [74]:
storeSummary = df_sales.groupBy(["storeID", "saleMonth"]) \
  .agg(
    F.mean("amount").alias("monthlyRevenue")
  ) \
  .join(dfStores.select("storeID", "name"), on="storeID") \
  .sort(["saleMonth"]) \
  .select(["name", "saleMonth", "monthlyRevenue"])

storeSummary.show()

+-----------------+---------+--------------+
|             name|saleMonth|monthlyRevenue|
+-----------------+---------+--------------+
|BudgetBazaar - FL|        6|         360.0|
|    MegaMart - IL|        6|         187.5|
|  FreshStore - TX|        6|         675.0|
|  Urban Mart - NY|        6|         831.0|
|   SuperSave - LA|        6|         400.0|
|    MegaMart - IL|        7|         450.0|
|  FreshStore - TX|        7|         280.0|
|  Urban Mart - NY|        8|         240.0|
|BudgetBazaar - FL|        8|         280.0|
|   SuperSave - LA|        9|          40.5|
|    MegaMart - IL|       11|         540.0|
|  FreshStore - TX|       11|          90.0|
|BudgetBazaar - FL|       11|         350.0|
+-----------------+---------+--------------+



# **Deliverables**

*   Pyspark script
*   Output file for underperforming & store summary



In [71]:
# 1. Pyspark script will be attached in .ipynb format in git repo

In [77]:
# 2. Output file for underperforming and store summary
underPerforming.write.mode("overwrite").csv("/content/drive/MyDrive/underperforming")

storeSummary.write.mode("overwrite").csv("/content/drive/MyDrive/storeSummary")