In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,to_timestamp, month, year,concat_ws

In [2]:
spark = SparkSession.builder.appName('learning').master('local').getOrCreate()
sc = spark.sparkContext

**PROBLEM 1**: Fetch the product(s) - with product description consisting of "Universal fit, well-vented" and meet the condition c1

------
condition c1 - product had been ordered more than five times within a month "continously" over months

continously means "month after month" this condition c1 had been met

Eg. IF (in jan 2016 that particular product had been ordered more than five times, in feb 2016 too, in mar 2016 too, however, in apr 2016 this product had not been ordered more than five times, no matter again in may 2016 it had been again ordered more than five times) --- condition c1 stands failed 

In [3]:
sales_df = spark.read.format('csv')\
          .option('header', 'True') \
          .option('inferSchema', 'True') \
          .load('file:///home/saif/LFS/datasets/datasets_pavan/sales/sales.csv')
products_df = spark.read.format('csv')\
          .option('header', 'True') \
          .option('inferSchema', 'True') \
          .load('file:///home/saif/LFS/datasets/datasets_pavan/sales/products.csv')

In [4]:
sales_df.show(5)

+----------+----------+-----------+----------+-----------+------------+-------------+-------------+
| OrderDate| StockDate|OrderNumber|ProductKey|CustomerKey|TerritoryKey|OrderLineItem|OrderQuantity|
+----------+----------+-----------+----------+-----------+------------+-------------+-------------+
|01-01-2015| 9/21/2001|    SO45080|       332|      14657|           1|            1|            1|
|01-01-2015|12-05-2001|    SO45079|       312|      29255|           4|            1|            1|
|01-01-2015|10/29/2001|    SO45082|       350|      11455|           9|            1|            1|
|01-01-2015|11/16/2001|    SO45081|       338|      26782|           6|            1|            1|
|01-02-2015|12/15/2001|    SO45083|       312|      14947|          10|            1|            1|
+----------+----------+-----------+----------+-----------+------------+-------------+-------------+
only showing top 5 rows



In [5]:
products_df.show(5)

+----------+---------------------+----------+--------------------+-------------------+--------------------+------------+-----------+------------+-----------+------------+
|ProductKey|ProductSubcategoryKey|ProductSKU|         ProductName|          ModelName|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+---------------------+----------+--------------------+-------------------+--------------------+------------+-----------+------------+-----------+------------+
|       214|                   31| HL-U509-R|Sport-100 Helmet,...|          Sport-100|Universal fit, we...|         Red|          0|           0|    13.0863|       34.99|
|       215|                   31|   HL-U509|Sport-100 Helmet,...|          Sport-100|Universal fit, we...|       Black|          0|           0|    12.0278|     33.6442|
|       218|                   23| SO-B909-M|Mountain Bike Soc...|Mountain Bike Socks|Combination of na...|       White|          M|           U|

In [20]:
sales_df.count()

56046

In [None]:
from pyspark.sql.functions import to_timestamp,month,year

In [36]:
## the Logic which I am trying to Implement

## WITH ordered_monthly AS (
##   SELECT 
##     product_id,
##     DATE_TRUNC('month', order_date) AS month,
##     COUNT(*) AS order_count
##   FROM orders
##   GROUP BY product_id, DATE_TRUNC('month', order_date)
##   HAVING COUNT(*) > 5
## ),
## continuous_orders AS (
##   SELECT 
##     product_id,
##     month,
##     LAG(month) OVER (PARTITION BY product_id ORDER BY month) AS prev_month
##   FROM ordered_monthly
## )
## SELECT DISTINCT o.product_id
## FROM ordered_monthly o
## JOIN continuous_orders co ON o.product_id = co.product_id
##                          AND o.month = co.prev_month;

In [18]:
ordered_monthly = sales_df.select( col("ProductKey"), 
                 year(to_timestamp(col('OrderDate'),'MM-dd-yyyy')).alias('OrderYear'),
                 month(to_timestamp(col('OrderDate'),'MM-dd-yyyy')).alias('OrderMonth')
               ).groupby(col('OrderYear'),col('OrderMonth'),col('ProductKey'))\
               .count()\
               .where(col('count')>5)# .sort(col('OrderYear').asc(),col('OrderMonth').asc())\
               


ordered_monthly.show(5)

+---------+----------+----------+-----+
|OrderYear|OrderMonth|ProductKey|count|
+---------+----------+----------+-----+
|     2015|         1|       311|    7|
|     2017|         3|       489|   12|
|     2017|         4|       606|   22|
|     null|      null|       471|   87|
|     2016|        11|       358|   10|
+---------+----------+----------+-----+
only showing top 5 rows



In [21]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

windowSpec = Window.partitionBy("ProductKey").orderBy(col('OrderYear'),col('OrderMonth'))

In [23]:
continuous_orders = ordered_monthly.select(
                        col("ProductKey"),
                        col("OrderYear"),
                        col("OrderMonth"),
                        lag(col("OrderMonth")).over(windowSpec).alias("prev_month")
                    )
continuous_orders.show(5)

+----------+---------+----------+----------+
|ProductKey|OrderYear|OrderMonth|prev_month|
+----------+---------+----------+----------+
|       471|     null|      null|      null|
|       471|     2016|        11|      null|
|       471|     2016|        12|        11|
|       471|     2017|         2|        12|
|       471|     2017|         4|         2|
+----------+---------+----------+----------+
only showing top 5 rows



In [33]:
c1_df = ordered_monthly.join(continuous_orders, (ordered_monthly.ProductKey == continuous_orders.ProductKey) & 
                                        (ordered_monthly.OrderMonth == continuous_orders.prev_month)   ,'inner')\
                        .select(ordered_monthly.ProductKey)
c1_df.show(5)

+----------+
|ProductKey|
+----------+
|       311|
|       489|
|       606|
|       358|
|       313|
+----------+
only showing top 5 rows



In [34]:
a1_df = products_df.where(col('ProductDescription').like('%Universal fit, well-vented%'))
a1_df.show(5)

+----------+---------------------+----------+--------------------+---------+--------------------+------------+-----------+------------+-----------+------------+
|ProductKey|ProductSubcategoryKey|ProductSKU|         ProductName|ModelName|  ProductDescription|ProductColor|ProductSize|ProductStyle|ProductCost|ProductPrice|
+----------+---------------------+----------+--------------------+---------+--------------------+------------+-----------+------------+-----------+------------+
|       214|                   31| HL-U509-R|Sport-100 Helmet,...|Sport-100|Universal fit, we...|         Red|          0|           0|    13.0863|       34.99|
|       215|                   31|   HL-U509|Sport-100 Helmet,...|Sport-100|Universal fit, we...|       Black|          0|           0|    12.0278|     33.6442|
|       220|                   31| HL-U509-B|Sport-100 Helmet,...|Sport-100|Universal fit, we...|        Blue|          0|           0|    12.0278|     33.6442|
+----------+---------------------+

In [35]:
result_df = c1_df.alias("t1").join(a1_df.alias("t2"), col("t2.ProductKey") == col("t1.ProductKey"), 'inner')\
                             .select(col("t2.ProductKey"),col("t2.ProductSubcategoryKey"),
                                     col("t2.ProductName"),col("t2.ModelName"))
result_df.show(5)

+----------+---------------------+--------------------+---------+
|ProductKey|ProductSubcategoryKey|         ProductName|ModelName|
+----------+---------------------+--------------------+---------+
|       220|                   31|Sport-100 Helmet,...|Sport-100|
|       220|                   31|Sport-100 Helmet,...|Sport-100|
|       220|                   31|Sport-100 Helmet,...|Sport-100|
|       220|                   31|Sport-100 Helmet,...|Sport-100|
|       220|                   31|Sport-100 Helmet,...|Sport-100|
+----------+---------------------+--------------------+---------+
only showing top 5 rows

