### Importing packages

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from functools import reduce


### Creating Spark Session

In [0]:
spark = SparkSession.builder.appName('DMart Analytics').getOrCreate()
spark.conf.get("spark.sql.caseSensitive")

Out[2]: 'false'

### Loading Data

In [0]:
##Product_df = spark.read.format("csv").option("header", "true").option("inferSchema", "True").load("dbfs:/FileStore/shared_uploads/venkateshprabhus27@gmail.com/Product.csv")
#Product_df = spark.read.csv("s3://dmart-data/Product.csv", header="true", inferSchema="true")
Product_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://dmart-data/Product.csv")
Customer_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://dmart-data/Customer.csv")
Sales_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3://dmart-data/Sales.csv")

### Data Cleaning - Column Renaming

In [0]:
Product_df.show(10)
Product_df.printSchema()
Product_df.columns

+---------------+---------------+------------+--------------------+
|     product_id|       category|sub_category|        product_name|
+---------------+---------------+------------+--------------------+
|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset Col...|
|FUR-CH-10000454|      Furniture|      Chairs|Hon Deluxe Fabric...|
|OFF-LA-10000240|Office Supplies|      Labels|Self-Adhesive Add...|
|FUR-TA-10000577|      Furniture|      Tables|Bretford CR4500 S...|
|OFF-ST-10000760|Office Supplies|     Storage|Eldon Fold N Roll...|
|FUR-FU-10001487|      Furniture| Furnishings|Eldon Expressions...|
|OFF-AR-10002833|Office Supplies|         Art|          Newell 322|
|TEC-PH-10002275|     Technology|      Phones|Mitel 5320 IP Pho...|
|OFF-BI-10003910|Office Supplies|     Binders|DXL Angle-View Bi...|
|OFF-AP-10002892|Office Supplies|  Appliances|Belkin F5C206VTEL...|
+---------------+---------------+------------+--------------------+
only showing top 10 rows

root
 |-- product_id: 

In [0]:
Product_df = Product_df.withColumnRenamed('Product ID','product_id').withColumnRenamed('Category', 'category').withColumnRenamed('Sub-Category','sub_category').withColumnRenamed('Product Name','product_name')
Product_df.printSchema()
Product_df.columns

root
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)

Out[5]: ['product_id', 'category', 'sub_category', 'product_name']

In [0]:
Customer_df.show(10)
Customer_df.printSchema()
Customer_df.columns

+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|customer_id|     customer_name|    segment|age|      country|           city|         state|postal_code| region|
+-----------+------------------+-----------+---+-------------+---------------+--------------+-----------+-------+
|   CG-12520|       Claire Gute|   Consumer| 67|United States|      Henderson|      Kentucky|      42420|  South|
|   DV-13045|   Darrin Van Huff|  Corporate| 31|United States|    Los Angeles|    California|      90036|   West|
|   SO-20335|    Sean O'Donnell|   Consumer| 65|United States|Fort Lauderdale|       Florida|      33311|  South|
|   BH-11710|   Brosina Hoffman|   Consumer| 20|United States|    Los Angeles|    California|      90032|   West|
|   AA-10480|      Andrew Allen|   Consumer| 50|United States|        Concord|North Carolina|      28027|  South|
|   IM-15070|      Irene Maddox|   Consumer| 66|United States|        Seattle|    Washin

In [0]:
Customer_df = Customer_df.withColumnRenamed('Customer ID','customer_id') \
    .withColumnRenamed('Customer Name','customer_name') \
        .withColumnRenamed('Segment','segment') \
            .withColumnRenamed('Age','age') \
                .withColumnRenamed('Country','country') \
                    .withColumnRenamed('City','city') \
                        .withColumnRenamed('State','state') \
                            .withColumnRenamed('Postal Code','postal_code') \
                                .withColumnRenamed('Region','region')

Customer_df.printSchema()
Customer_df.columns

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- region: string (nullable = true)

Out[7]: ['customer_id',
 'customer_name',
 'segment',
 'age',
 'country',
 'city',
 'state',
 'postal_code',
 'region']

In [0]:
Sales_df.show(10)
Sales_df.printSchema()
Sales_df.columns

+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|Order Line|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Product ID|   Sales|Quantity|Discount|  Profit|
+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|         1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|FUR-BO-10001798|  261.96|       2|     0.0| 41.9136|
|         2|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|FUR-CH-10000454|  731.94|       3|     0.0| 219.582|
|         3|CA-2016-138688|2016-06-12|2016-06-16|  Second Class|   DV-13045|OFF-LA-10000240|   14.62|       2|     0.0|  6.8714|
|         4|US-2015-108966|2015-10-11|2015-10-18|Standard Class|   SO-20335|FUR-TA-10000577|957.5775|       5|    0.45|-383.031|
|         5|US-2015-108966|2015-10-11|2015-10-18|Standard Class|   SO-20335|OFF-ST-10000760|  22.

In [0]:
Sales_df = Sales_df.withColumnRenamed('Order Line','order_line')\
    .withColumnRenamed('Order ID','order_id')\
        .withColumnRenamed('OrderID','order_id')\
            .withColumnRenamed('Order Date','order_date')\
                .withColumnRenamed('Ship Date','ship_date')\
                    .withColumnRenamed('Ship Mode','ship_mode')\
                        .withColumnRenamed('Customer ID','customer_id')\
                            .withColumnRenamed('Product ID','product_id')\
                                .withColumnRenamed('Sales','sales')\
                                    .withColumnRenamed('Quantity','quantity')\
                                        .withColumnRenamed('Discount','discount')\
                                            .withColumnRenamed('Profit','profit')

Sales_df.printSchema()
Sales_df.columns

root
 |-- order_line: integer (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- profit: double (nullable = true)

Out[9]: ['order_line',
 'order_id',
 'order_date',
 'ship_date',
 'ship_mode',
 'customer_id',
 'product_id',
 'sales',
 'quantity',
 'discount',
 'profit']

### Data Cleaning - Finding Duplicates


In [0]:
Product_df.groupBy(Product_df.columns).count().filter('count>1').show()
Customer_df.groupBy(Customer_df.columns).count().filter('count>1').show()
Sales_df.groupBy(Sales_df.columns).count().filter('count>1').show() 

+----------+--------+------------+------------+-----+
|product_id|category|sub_category|product_name|count|
+----------+--------+------------+------------+-----+
+----------+--------+------------+------------+-----+

+-----------+-------------+-------+---+-------+----+-----+-----------+------+-----+
|customer_id|customer_name|segment|age|country|city|state|postal_code|region|count|
+-----------+-------------+-------+---+-------+----+-----+-----------+------+-----+
+-----------+-------------+-------+---+-------+----+-----+-----------+------+-----+

+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+-----+
|order_line|order_id|order_date|ship_date|ship_mode|customer_id|product_id|sales|quantity|discount|profit|count|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+-----+
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+-

### Data Cleaning - Finding Null Values

In [0]:
### Product_df.filter(F.col('product_id').isNull()).show() ---> Column wise Null values
Product_df.filter(reduce(lambda x, y: x | y, (F.col(z).isNull() for z in Product_df.columns))).show()
Customer_df.filter(reduce(lambda x,y: x|y, (F.col(c).isNull() for c in Customer_df.columns))).show()
Sales_df.filter(reduce(lambda x,y: x|y, (F.col(s).isNull() for s in Sales_df.columns))).show()

+----------+--------+------------+------------+
|product_id|category|sub_category|product_name|
+----------+--------+------------+------------+
+----------+--------+------------+------------+

+-----------+-------------+-------+---+-------+----+-----+-----------+------+
|customer_id|customer_name|segment|age|country|city|state|postal_code|region|
+-----------+-------------+-------+---+-------+----+-----+-----------+------+
+-----------+-------------+-------+---+-------+----+-----+-----------+------+

+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
|order_line|order_id|order_date|ship_date|ship_mode|customer_id|product_id|sales|quantity|discount|profit|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+



In [0]:
Sales_df.count()

Out[55]: 9994

In [0]:
## Joining all the dataframes
SCP_df = Sales_df.join(Customer_df, on = 'customer_id', how='inner').join(Product_df, on='product_id', how='inner')
## Cross checking for null values in the joined df
SCP_df.filter(reduce(lambda x,y: x|y, (F.col(j).isNull() for j in SCP_df.columns))).count()
## Count of records and schema
SCP_df.count()
SCP_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_line: integer (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- profit: double (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)



### Data Analysis and Querying

1. What is the total sales for each product category?

In [0]:
##SCP_df.groupBy('category').sum('sales').show()
##SCP_df.select('category','sales').groupBy('category').sum('sales').show()
SCP_df.groupBy('category').agg(F.round(F.sum('sales'),2).alias('sum_of_sales')).orderBy(F.col('sum_of_sales'),ascending=False).show()

+---------------+------------+
|       category|sum_of_sales|
+---------------+------------+
|     Technology|   836154.03|
|      Furniture|    741999.8|
|Office Supplies|   719047.03|
+---------------+------------+



2. Which customer has made the highest number of purchases?

In [0]:

##SCP_df.select('customer_id', 'customer_name', 'order_id').groupBy('customer_id', 'customer_name').agg(F.count('order_id').alias('count_of_purchases')).orderBy(F.col('count_of_purchases').desc()).limit(1).show()
spec = Window.orderBy(F.col('count_of_purchases').desc())
SCP_df.groupBy('customer_id', 'customer_name').agg(F.count('order_id').alias('count_of_purchases')).withColumn('dense_rank', F.dense_rank().over(spec)).filter(F.col('dense_rank')==1).show()

+-----------+-------------+------------------+----------+
|customer_id|customer_name|count_of_purchases|dense_rank|
+-----------+-------------+------------------+----------+
|   WB-21850|William Brown|                37|         1|
+-----------+-------------+------------------+----------+



3. What is the average discount given on sales across all products?

In [0]:
SCP_df.groupBy('product_name').agg(F.round(F.avg('discount'),2).alias('avg_discount')).orderBy(F.col('avg_discount').desc()).show()

+--------------------+------------+
|        product_name|avg_discount|
+--------------------+------------+
|Eureka Disposable...|         0.8|
|GBC VeloBinder El...|        0.73|
|GBC Plasticlear B...|        0.73|
|Lexmark MarkNet N...|         0.7|
|Epson Perfection ...|         0.7|
|Plantronics Singl...|         0.7|
|Brother MFC-9340C...|         0.7|
|Okidata B401 Printer|         0.7|
|Zebra GK420t Dire...|         0.7|
|Bush Westfield Co...|         0.7|
|Hewlett-Packard D...|         0.7|
|Cisco 8961 IP Pho...|         0.7|
|Premier Elliptica...|        0.64|
|Wilson Jones Cust...|        0.63|
|Avery Durable Sla...|        0.62|
|Acco D-Ring Binde...|        0.62|
|Avery Triangle Sh...|        0.62|
|Avery Hidden Tab ...|        0.61|
|Eldon Executive W...|         0.6|
|Acco Economy Flex...|         0.6|
+--------------------+------------+
only showing top 20 rows



4. How many unique products were sold in each region?

In [0]:
SCP_df.groupBy('region').agg(F.countDistinct('product_id').alias('count_of_products')).show()

+-------+-----------------+
| region|count_of_products|
+-------+-----------------+
|  South|             1059|
|Central|             1316|
|   East|             1408|
|   West|             1536|
+-------+-----------------+



5. What is the total profit generated in each state?

In [0]:
SCP_df.groupBy('State').agg(F.round(F.sum('profit'),2).alias('Total_profit')).orderBy(F.col('Total_profit').desc()).show()

+-------------+------------+
|        State|Total_profit|
+-------------+------------+
|   California|    59398.31|
|     New York|    58177.83|
|   Washington|     24405.8|
|        Texas|    20528.91|
| Pennsylvania|    13604.94|
|      Georgia|    12781.34|
|      Arizona|      9563.2|
|     Illinois|     9560.15|
|    Wisconsin|     8569.87|
|     Michigan|      7752.3|
|    Minnesota|     7202.52|
|     Virginia|     6940.11|
|         Ohio|     5985.89|
|Massachusetts|     5905.54|
|     Kentucky|     4513.31|
|    Tennessee|     3434.28|
|     Delaware|     3336.38|
|      Alabama|     2845.06|
|      Indiana|     2707.35|
|    Louisiana|     2659.24|
+-------------+------------+
only showing top 20 rows



6. Which product sub-category has the highest sales?

In [0]:
spec = Window.orderBy(F.col('Total_sales').desc())
SCP_df.groupBy('sub_category').agg(F.sum('sales').alias('Total_sales')).withColumn('dense_rank', F.dense_rank().over(spec)).filter(F.col('dense_rank')==1).show()

+------------+-----------------+----------+
|sub_category|      Total_sales|dense_rank|
+------------+-----------------+----------+
|      Phones|330007.0540000001|         1|
+------------+-----------------+----------+



7. What is the average age of customers in each segment?

In [0]:
SCP_df.groupBy('segment').agg(F.round(F.avg('age'),2).alias('average_age')).show()

+-----------+-----------+
|    segment|average_age|
+-----------+-----------+
|   Consumer|      44.61|
|Home Office|      43.28|
|  Corporate|      44.82|
+-----------+-----------+



8. How many orders were shipped in each shipping mode?

In [0]:
SCP_df.groupBy('ship_mode').agg(F.count('order_id').alias('total_order')).show()

+--------------+-----------+
|     ship_mode|total_order|
+--------------+-----------+
|   First Class|       1538|
|      Same Day|        543|
|  Second Class|       1945|
|Standard Class|       5968|
+--------------+-----------+



9. What is the total quantity of products sold in each city?

In [0]:
SCP_df.groupBy('city').agg(F.count('product_id').alias('total_products')).orderBy(F.col('total_products').desc()).show()

+-------------+--------------+
|         city|total_products|
+-------------+--------------+
|New York City|           848|
|  Los Angeles|           728|
| Philadelphia|           585|
|San Francisco|           474|
|      Seattle|           386|
|      Houston|           379|
|      Chicago|           310|
|     Columbus|           236|
|       Dallas|           164|
|       Aurora|           164|
|    San Diego|           163|
| Jacksonville|            93|
|      Detroit|            85|
|    Rochester|            74|
|    Charlotte|            73|
|     Pasadena|            71|
|      Phoenix|            69|
|        Dover|            69|
|  Springfield|            68|
|      Jackson|            68|
+-------------+--------------+
only showing top 20 rows



10. Which customer segment has the highest profit margin?

In [0]:
SCP_df.groupBy('segment').agg(F.sum('profit').alias('profit_margin')).orderBy(F.col('profit_margin').desc()).limit(1).show()

+--------+------------------+
| segment|     profit_margin|
+--------+------------------+
|Consumer|134119.20919999972|
+--------+------------------+

