In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *

In [2]:
df= spark.read.csv('/FileStore/tables/timetravel1.csv', inferSchema=True,header=True) #read sample data from location

In [3]:
df.printSchema() #verify schema

In [4]:
df= df.withColumn("Sale1", F.col("Sale").cast("float")).drop("Sale")\
      .withColumn("Month",F.month("Date"))\
      .withColumn("Year",F.date_format(F.col("Date"),'yyyy'))\
      .withColumn("Day",F.dayofmonth("Date"))\
      .persist(StorageLevel.MEMORY_AND_DISK) #changed Sale column from integer to float. New column: Sale1. AND persisted to memory for faster in-memory computations
# extracted month,day,year to new columns for use in engineering window columns for year to date calculations in next cell.

#16 new columns will be added to provide a flexible and sliding historical context with granularity

In [6]:
days = lambda i: i * 86400
w1 = (Window().partitionBy(F.col("Type"),F.col("Coupon")).orderBy(F.col("Date").cast('long')).rangeBetween(-days(27), 0))
w2 = (Window().partitionBy(F.col("Type"),F.col("Coupon")).orderBy(F.col("Date").cast('long')).rangeBetween(-days(90), 0))
w3 = (Window().partitionBy(F.col("Type"),F.col("Coupon")).orderBy(F.col("Date").cast('long')).rangeBetween(-days(364), 0))
w4 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(27), 0))
w5 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(90), 0))
w6 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(364), 0))
w7 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(27), days(27)))
w8 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(90), days(90)))
w9 = (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(364), days(364)))
w10= (Window().orderBy(F.col("Date").cast('long')).rangeBetween(-days(49), days(0)))
w11= (Window().partitionBy(F.col("Year")).orderBy(F.col("Month"),F.col("Day")))
w12= (Window().partitionBy(F.col("Year")).orderBy(F.col("Month"),F.col("Day")))
w13= (Window().partitionBy(F.col("Year")).orderBy(F.col("Month"),F.col("Day")))
df= df.withColumn("Sales-Last4weeks-by-Coupon-Type", F.sum("Sale1").over(w1))\
      .withColumn("Sales-Last13weeks-by-Coupon-Type", F.sum("Sale1").over(w2))\
      .withColumn("Sales-Last52weeks-by-Coupon-Type", F.sum("Sale1").over(w3))\
      .withColumn("Sales-Last4weeks-by-TOTALSale", F.sum("Sale1").over(w4))\
      .withColumn("Sales-Last13weeks-by-TOTALSale", F.sum("Sale1").over(w5))\
      .withColumn("Sales-Last4andFuture4weeks", F.sum("Sale1").over(w7))\
      .withColumn("Sales-Last13andFuture13weeks", F.sum("Sale1").over(w8))\
      .withColumn("Sales-Last52andFuture52weeks", F.sum("Sale1").over(w9))\
      .withColumn("Count-of-Yearly-Sales-last50days", F.count(F.when(F.col('Type') == 'Yearly', F.col('Type'))).over(w10))\
      .withColumn("Count-of-Monthly-Sales-Last50days", F.count(F.when(F.col('Type')=='Monthly',F.col('Type'))).over(w10))\
      .withColumn("YearToDate-TotalSales",F.sum("Sale1").over(w11))\
      .withColumn("YearToDate-Sales-by-Type-Yearly", F.sum(F.when(F.col('Type') == 'Yearly', F.col('Sale1'))).over(w12))\
      .withColumn("YearToDate-Sales-by-Type-Monthly", F.sum(F.when(F.col('Type') == 'Monthly', F.col('Sale1'))).over(w12))\
      .withColumn("YearToDate-Sales-by-Coupon-Yes", F.sum(F.when(F.col('Coupon') == 'Yes', F.col('Sale1'))).over(w12))\
      .withColumn("YearToDate-Sales-by-Coupon-No", F.sum(F.when(F.col('Coupon') == 'No', F.col('Sale1'))).over(w12))\
      .withColumn("Sales-Last52weeks-TOTAL", F.sum("Sale1").over(w6)).orderBy(df.Date.desc()).drop("Month","Year","Day")
display(df)

ID,Name,Date,Type,Coupon,Sale1,Sales-Last4weeks-by-Coupon-Type,Sales-Last13weeks-by-Coupon-Type,Sales-Last52weeks-by-Coupon-Type,Sales-Last4weeks-by-TOTALSale,Sales-Last13weeks-by-TOTALSale,Sales-Last4andFuture4weeks,Sales-Last13andFuture13weeks,Sales-Last52andFuture52weeks,Count-of-Yearly-Sales-last50days,Count-of-Monthly-Sales-Last50days,YearToDate-TotalSales,YearToDate-Sales-by-Type-Yearly,YearToDate-Sales-by-Type-Monthly,YearToDate-Sales-by-Coupon-Yes,YearToDate-Sales-by-Coupon-No,Sales-Last52weeks-TOTAL
1,ETE,2019-08-29T00:00:00.000+0000,Yearly,Yes,95.0,475.0,570.0,570.0,475.0,742.0,475.0,742.0,1584.0,6,2,1184.0,1070.0,114.0,654.0,530.0,1584.0
2,TYT,2019-08-23T00:00:00.000+0000,Yearly,Yes,95.0,380.0,475.0,475.0,380.0,647.0,475.0,742.0,1584.0,5,2,1089.0,975.0,114.0,559.0,530.0,1489.0
3,YH,2019-08-23T00:00:00.000+0000,Yearly,Yes,95.0,380.0,475.0,475.0,380.0,647.0,475.0,742.0,1584.0,5,2,1089.0,975.0,114.0,559.0,530.0,1489.0
5,FFF,2019-08-12T00:00:00.000+0000,Yearly,Yes,95.0,285.0,285.0,285.0,285.0,472.0,570.0,757.0,1684.0,3,2,899.0,785.0,114.0,369.0,530.0,1399.0
4,FG,2019-08-12T00:00:00.000+0000,Yearly,Yes,95.0,285.0,285.0,285.0,285.0,472.0,570.0,757.0,1684.0,3,2,899.0,785.0,114.0,369.0,530.0,1399.0
6,GGG,2019-07-24T00:00:00.000+0000,Yearly,Yes,95.0,95.0,95.0,95.0,119.0,297.0,309.0,772.0,1684.0,2,5,709.0,595.0,114.0,179.0,530.0,1209.0
7,GUMU,2019-07-12T00:00:00.000+0000,Monthly,Yes,12.0,48.0,84.0,84.0,148.0,214.0,243.0,784.0,1684.0,1,6,614.0,500.0,114.0,84.0,530.0,1114.0
8,GHU,2019-07-11T00:00:00.000+0000,Monthly,Yes,12.0,36.0,72.0,72.0,136.0,202.0,243.0,784.0,1684.0,1,5,602.0,500.0,102.0,72.0,530.0,1102.0
9,GHG,2019-06-21T00:00:00.000+0000,Monthly,Yes,12.0,48.0,60.0,60.0,148.0,190.0,172.0,784.0,1684.0,1,5,590.0,500.0,90.0,60.0,530.0,1090.0
16,RTG,2019-06-21T00:00:00.000+0000,Yearly,No,100.0,100.0,100.0,1000.0,148.0,190.0,172.0,784.0,1684.0,1,5,590.0,500.0,90.0,60.0,530.0,1090.0


In [7]:
display(df.select("Date","Count-of-Yearly-Sales-last50days","Count-of-Monthly-Sales-Last50days","Type"))

Date,Count-of-Yearly-Sales-last50days,Count-of-Monthly-Sales-Last50days,Type
2019-08-29T00:00:00.000+0000,6,2,Yearly
2019-08-23T00:00:00.000+0000,5,2,Yearly
2019-08-23T00:00:00.000+0000,5,2,Yearly
2019-08-12T00:00:00.000+0000,3,2,Yearly
2019-08-12T00:00:00.000+0000,3,2,Yearly
2019-07-24T00:00:00.000+0000,2,5,Yearly
2019-07-12T00:00:00.000+0000,1,6,Monthly
2019-07-11T00:00:00.000+0000,1,5,Monthly
2019-06-21T00:00:00.000+0000,1,5,Monthly
2019-06-21T00:00:00.000+0000,1,5,Yearly


In [8]:
df.createOrReplaceTempView("timetravel")

#Using our transformed data, we will query between '2019-03-01' and '2019-08-22'

In [10]:
df2=spark.sql("Select * from timetravel where Date Between '2019-03-01' and '2019-08-22'")
display(df2)

ID,Name,Date,Type,Coupon,Sale1,Sales-Last4weeks-by-Coupon-Type,Sales-Last13weeks-by-Coupon-Type,Sales-Last52weeks-by-Coupon-Type,Sales-Last4weeks-by-TOTALSale,Sales-Last13weeks-by-TOTALSale,Sales-Last4andFuture4weeks,Sales-Last13andFuture13weeks,Sales-Last52andFuture52weeks,Count-of-Yearly-Sales-last50days,Count-of-Monthly-Sales-Last50days,YearToDate-TotalSales,YearToDate-Sales-by-Type-Yearly,YearToDate-Sales-by-Type-Monthly,YearToDate-Sales-by-Coupon-Yes,YearToDate-Sales-by-Coupon-No,Sales-Last52weeks-TOTAL
5,FFF,2019-08-12T00:00:00.000+0000,Yearly,Yes,95.0,285.0,285.0,285.0,285.0,472.0,570.0,757.0,1684.0,3,2,899.0,785.0,114.0,369.0,530.0,1399.0
4,FG,2019-08-12T00:00:00.000+0000,Yearly,Yes,95.0,285.0,285.0,285.0,285.0,472.0,570.0,757.0,1684.0,3,2,899.0,785.0,114.0,369.0,530.0,1399.0
6,GGG,2019-07-24T00:00:00.000+0000,Yearly,Yes,95.0,95.0,95.0,95.0,119.0,297.0,309.0,772.0,1684.0,2,5,709.0,595.0,114.0,179.0,530.0,1209.0
7,GUMU,2019-07-12T00:00:00.000+0000,Monthly,Yes,12.0,48.0,84.0,84.0,148.0,214.0,243.0,784.0,1684.0,1,6,614.0,500.0,114.0,84.0,530.0,1114.0
8,GHU,2019-07-11T00:00:00.000+0000,Monthly,Yes,12.0,36.0,72.0,72.0,136.0,202.0,243.0,784.0,1684.0,1,5,602.0,500.0,102.0,72.0,530.0,1102.0
9,GHG,2019-06-21T00:00:00.000+0000,Monthly,Yes,12.0,48.0,60.0,60.0,148.0,190.0,172.0,784.0,1684.0,1,5,590.0,500.0,90.0,60.0,530.0,1090.0
16,RTG,2019-06-21T00:00:00.000+0000,Yearly,No,100.0,100.0,100.0,1000.0,148.0,190.0,172.0,784.0,1684.0,1,5,590.0,500.0,90.0,60.0,530.0,1090.0
10,HOOH,2019-06-20T00:00:00.000+0000,Monthly,Yes,12.0,36.0,48.0,48.0,36.0,78.0,172.0,784.0,1684.0,0,4,478.0,400.0,78.0,48.0,430.0,978.0
11,PQ,2019-06-05T00:00:00.000+0000,Monthly,Yes,12.0,24.0,36.0,36.0,39.0,166.0,163.0,884.0,1684.0,0,5,466.0,400.0,66.0,36.0,430.0,966.0
12,WE,2019-06-01T00:00:00.000+0000,Monthly,Yes,12.0,12.0,24.0,24.0,27.0,154.0,163.0,884.0,1684.0,0,4,454.0,400.0,54.0,24.0,430.0,954.0
