# Other Shopper Attributes

#### Importing Yocuda data from GCS

In [3]:
data = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load('gs://westfield-tom/datalab/Yocuda_clean_data_Nov15_Nov17_20171214_v01_*.csv'))

In [4]:
from pyspark.sql.functions import *

#### Recency

In [5]:
data.registerTempTable("data")
df1 =data.selectExpr("identifier","cat1","cat2","timestamp", "1 as ind","item_total","transaction_id","brandName as brand","item_quantity")  \
    .filter((to_date(col("timestamp"))>'2016-11-30') & \
                  (to_date(col("timestamp"))<'2017-12-01') & \
                  (col("identifier").isNotNull()) & \
                  (col("item_total")>0) & \
                  (col("retailer_name")=='Argos'))\

df2 =data.selectExpr("cat1","cat2","timestamp ", "1 as ind").filter((to_date(col("timestamp"))>'2016-11-30') & \
                  (to_date(col("timestamp"))<'2017-12-01') & \
                  (col("identifier").isNotNull()) & \
                  (col("item_total")>0) & \
                  (col("retailer_name")=='Argos'))\
.groupBy(col("cat1"),col("cat2"),col("ind")).agg(max("timestamp").alias("timestamp_mx_data"))

df3 = df1.join(df2,["cat1","cat2","ind"],"left")

df4 =data.selectExpr("identifier","cat1","cat2","timestamp").filter((to_date(col("timestamp"))>'2016-11-30') & \
                  (to_date(col("timestamp"))<'2017-12-01') & \
                  (col("identifier").isNotNull()) & \
                  (col("item_total")>0) & \
                  (col("retailer_name")=='Argos'))\
.groupBy("identifier","cat1","cat2").agg(max("timestamp").alias("timestamp_mx_user"))

df5 = df3.join(df4,["identifier","cat1","cat2"],"left")

df6 = df5.selectExpr("identifier","cat1","cat2","timestamp_mx_data","timestamp_mx_user") \
         .withColumn('recency',datediff('timestamp_mx_data','timestamp_mx_user'))

df6.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|          identifier|                cat1|                cat2|   timestamp_mx_data|   timestamp_mx_user|recency|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+
|    ,10euw@gmail.com|Patio, Lawn & Gar...| Grills & Outdoor...|2017-11-30 21:08:...|2017-02-06 13:58:...|    297|
|,mark.paoace12@ho...|     Home & Kitchen |   Small Appliances |2017-11-30 22:01:...|2017-11-13 16:42:...|     17|
|          .@mail.com|        Electronics |     Camera & Photo |2017-11-30 22:46:...|2017-07-17 18:00:...|    136|
|          .@mail.com|        Electronics |     Camera & Photo |2017-11-30 22:46:...|2017-07-17 18:00:...|    136|
|          .@mail.com|        Electronics |     Camera & Photo |2017-11-30 22:46:...|2017-07-17 18:00:...|    136|
+--------------------+--------------------+--------------------+----------------

#### Calculating metrics

In [6]:
total_spend = df5.selectExpr("identifier","cat1","cat2", "item_total")\
.groupBy(col("identifier"),col("cat1"),col("cat2")).agg(sum("item_total").alias("total_spend"))

frequency = df5.selectExpr("identifier","cat1","cat2", "transaction_id","timestamp")\
.groupBy(col("identifier"),col("cat1"),col("cat2")).agg(countDistinct(to_date("timestamp")).alias("frequency"))

total_spend.show(5)
frequency.show(5)

+--------------------+--------------------+--------------------+-----------+
|          identifier|                cat1|                cat2|total_spend|
+--------------------+--------------------+--------------------+-----------+
|    ,10euw@gmail.com|Patio, Lawn & Gar...| Grills & Outdoor...|       5.99|
|,mark.paoace12@ho...|     Home & Kitchen |   Small Appliances |      29.99|
|          .@mail.com|        Electronics |     Camera & Photo |     109.83|
|          .@mail.com|     Home & Kitchen |           Cookware |      39.99|
|          .@mail.com|    Office Products | Tape Adhesives &...|      19.99|
+--------------------+--------------------+--------------------+-----------+
only showing top 5 rows

+--------------------+--------------------+--------------------+---------+
|          identifier|                cat1|                cat2|frequency|
+--------------------+--------------------+--------------------+---------+
|    ,10euw@gmail.com|Patio, Lawn & Gar...| Grills & Outd

#### Combining data

In [7]:
overall = data.filter((to_date(col("timestamp"))>'2016-11-30') & (to_date(col("timestamp"))<'2017-12-01')).select("identifier","cat1","cat2").distinct()

combined = overall.join(total_spend,["identifier","cat1","cat2"],"left") \
                        .join(frequency,["identifier","cat1","cat2"],"left") 
  
final_combined = combined.selectExpr("identifier","cat1","cat2","total_spend","frequency") 
                         
final_combined.show(5)

+--------------------+--------------------+----------------+-----------+---------+
|          identifier|                cat1|            cat2|total_spend|frequency|
+--------------------+--------------------+----------------+-----------+---------+
|                null|Health & Personal...|    Bath & Body |       null|     null|
| "d.hives""@sky.com"|                null|            null|       null|     null|
|                 #-@|                null|            null|       null|     null|
|$$joeflinter@hotm...|                null|            null|       null|     null|
|        ''@gmail.com|Clothing & Access...| Luggage & Bags |       null|     null|
+--------------------+--------------------+----------------+-----------+---------+
only showing top 5 rows



#### Percentile bins - Spend & Frequency Decile Buckets

In [8]:
from pyspark.sql.window import Window
xx1=final_combined.groupBy("cat1","cat2") \
             .agg(count("*").alias("count"))  

xx=final_combined.join(xx1,["cat1","cat2"],"left")\
                 .withColumn("spend_bin", 11-ceil(10*((row_number().over(Window.partitionBy("cat1","cat2")\
                                                                   .orderBy(desc("total_spend"))))/col('count')))) \
                 .withColumn("frequency", 11-ceil(10*((row_number().over(Window.partitionBy("cat1","cat2")\
                                                                   .orderBy(desc("frequency"))))/col('count')))) 
                 
xx.show(5)

+--------------------+-------------+--------------------+------------------+---------+-----+---------+
|                cat1|         cat2|          identifier|       total_spend|frequency|count|spend_bin|
+--------------------+-------------+--------------------+------------------+---------+-----+---------+
|Health & Personal...| Bath & Body |strood.businessce...|122.93999999999998|       10|38727|       10|
|Health & Personal...| Bath & Body | peter@shopdirect.gi|103.44999999999999|       10|38727|       10|
|Health & Personal...| Bath & Body |notsosweet99@yaho...|50.940000000000005|       10|38727|       10|
|Health & Personal...| Bath & Body |malinaqvi@hotmail...|119.93999999999998|       10|38727|       10|
|Health & Personal...| Bath & Body |clarke178@btinter...| 49.93000000000001|       10|38727|       10|
+--------------------+-------------+--------------------+------------------+---------+-----+---------+
only showing top 5 rows



#### Top preferred brand

In [None]:
total_spend_by_brand = df5.selectExpr("identifier","cat1","cat2","brand", "item_total")\
.groupBy(col("identifier"),col("cat1"),col("cat2"),col("brand")).agg(sum("item_total").alias("total_spend")).orderBy("identifier","brand")

row_num = total_spend_by_brand.withColumn("rownum",row_number().over(Window.partitionBy("identifier","cat1","cat2")\
                                                                   .orderBy(desc("total_spend"))))
top_preferred_brand = row_num.filter(col("rownum")==1).selectExpr("identifier","cat1","cat2","total_spend",
                                                                  "brand as top_preferred_brand")
top_preferred_brand.show(5)

+--------------------+--------------------+--------------------+-----------+-------------------+
|          identifier|                cat1|                cat2|total_spend|top_preferred_brand|
+--------------------+--------------------+--------------------+-----------+-------------------+
|    ,10euw@gmail.com|Patio, Lawn & Gar...| Grills & Outdoor...|       5.99|       Other Brands|
|,mark.paoace12@ho...|     Home & Kitchen |   Small Appliances |      29.99|    Morphy Richards|
|          .@mail.com|        Electronics |     Camera & Photo |       79.9|             amazon|
|          .@mail.com|     Home & Kitchen |           Cookware |      39.99|      Russell Hobbs|
|          .@mail.com|    Office Products | Tape Adhesives &...|      19.99|       Other Brands|
+--------------------+--------------------+--------------------+-----------+-------------------+
only showing top 5 rows



#### Category source of volume - Sub category level spend

In [None]:
total_spend = df5.selectExpr("identifier","cat1","cat2", "item_total")\
.groupBy(col("identifier"),col("cat1"),col("cat2")).agg(sum("item_total").alias("total_spend"))

total_spend.show(5)

#### Shopper - day of week spend

In [None]:
xx=df5.selectExpr("identifier","to_date(timestamp) as date","item_total")\
.groupBy("identifier","date").agg(sum("item_total").alias("spend"))
yy=xx.withColumn("Day_of_week",date_format("date","E")).groupBy("identifier","Day_of_week").agg(sum("spend").alias("spend"))
abc=yy.withColumn("rownum",row_number().over(Window.partitionBy("identifier","Day_of_week")\
                                                                   .orderBy(desc("spend"))))
top_preferred_day = abc.filter(col("rownum")==1).selectExpr("identifier","spend",
                                                                  "Day_of_week as top_preferred_day")
top_preferred_day.show(5)

#### Time of day - Shopper spends

In [None]:
xx=df5.selectExpr("identifier","timestamp","item_total").withColumn("Timestamp",to_timestamp("timestamp", 'yyyy-MM-dd HH:mm:ss'))\
.groupBy("identifier","Timestamp").agg(sum("item_total").alias("spend"))

yy=xx.withColumn("Hour_of_day",hour("Timestamp")).groupBy("identifier","Hour_of_day").agg(sum("spend").alias("spend"))

zz=yy.withColumn("Hour_bucket", when(((col("Hour_of_day")>=8) & (col("Hour_of_day")<12)),"8AM-12 noon")\
                                .when(((col("Hour_of_day")>=12) & (col("Hour_of_day")<17)),"12 noon-4PM")\
                                .when(((col("Hour_of_day")>=17) & (col("Hour_of_day")<22)),"5PM-10PM"))\
.groupBy("identifier","Hour_bucket").agg(sum("spend").alias("spend"))
abc=zz.withColumn("rownum",row_number().over(Window.partitionBy("identifier","Hour_bucket")\
                                                                   .orderBy(desc("spend"))))
top_preferred_hour_bucket = abc.filter(col("rownum")==1).selectExpr("identifier","spend",
                                                                  "Hour_bucket as top_preferred_hour_bucket")
top_preferred_hour_bucket.show(5)