# Retail Channel Management - Take home assignment

## Step 1 : Create Spark session

##### Using Spark SQL

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import datetime
import pandas as pd

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark SQL for Retail Management") \
    .getOrCreate()

## Step 2: Read from the input file

#### Note: 
          For this assignment input file name is changed to "channel_dataset.csv"
          This file is kept in the local directory called 'input', please changed the filename & path before use

In [3]:
path = "input/channel_dataset.csv"
input_retail = spark.read.csv(path,header=True)

Checking schema and sample records

In [5]:
#user_log.take(1)
input_retail.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- flyer_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)



## Step 3: Create a Spark Table from file

#### Note: 
          Table Name: input_retail_table

In [6]:
input_retail.createOrReplaceTempView("input_retail_table")

In [7]:
spark.sql("SELECT * FROM input_retail_table LIMIT 2").show()

+--------------------+--------------------+------------------+--------+-----------+
|           timestamp|             user_id|             event|flyer_id|merchant_id|
+--------------------+--------------------+------------------+--------+-----------+
|2018-10-01T13:54:...|9ea672779feb1e088...|shopping_list_open|    null|       null|
|2018-10-01T13:34:...|01ca5536abc5e0992...|shopping_list_open|    null|       null|
+--------------------+--------------------+------------------+--------+-----------+



## Step 4: Write SQL queries to extract the results

Extract Date from timestamp

In [8]:
#spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

spark.sql('''
        SELECT timestamp, to_date(timestamp), user_id, event, flyer_id, merchant_id 
        from input_retail_table where flyer_id is NOT NULL
''').show(10,True)

+--------------------+---------------------------------------+--------------------+----------+--------+-----------+
|           timestamp|to_date(input_retail_table.`timestamp`)|             user_id|     event|flyer_id|merchant_id|
+--------------------+---------------------------------------+--------------------+----------+--------+-----------+
|2018-10-01T13:56:...|                             2018-10-01|1c1231e7a41a1bee1...|flyer_open| 2016315|       2268|
|2018-10-01T13:34:...|                             2018-10-01|7c3e5dadd6c0d7170...|flyer_open| 1993325|       2188|
|2018-10-01T13:31:...|                             2018-10-01|9ea672779feb1e088...|flyer_open| 2002542|       3383|
|2018-10-01T13:39:...|                             2018-10-01|4997a2ca5d6f3a8e2...|flyer_open| 2031695|       2148|
|2018-10-01T13:31:...|                             2018-10-01|71e01a216fd6d973c...|flyer_open| 1996644|       2694|
|2018-10-01T13:31:...|                             2018-10-01|87c2f342e4

Playing around the table for testing

In [9]:
"""
spark.sql('''
    select flyer_id, count(*) from input_retail_table
       where flyer_id IS NOT NULL
       group by flyer_id
''').show(10)

spark.sql('''
    select distinct(event) from input_retail_table       
''').show(10,False)
"""
spark.sql('''
    select timestamp, event, flyer_id, merchant_id from input_retail_table as lrt
    where lrt.user_id = '1c1231e7a41a1bee17dd8e8111ebaef941525995f330199844fd9a0293edf9aa'
    and lrt.event = 'flyer_open'
    order by timestamp desc
    ''').show(100,False)

+-------------------------+----------+--------+-----------+
|timestamp                |event     |flyer_id|merchant_id|
+-------------------------+----------+--------+-----------+
|2018-10-01T19:45:44-04:00|flyer_open|2015666 |1344       |
|2018-10-01T16:29:25-04:00|flyer_open|2026399 |2549       |
|2018-10-01T16:19:19-04:00|flyer_open|2026386 |5265       |
|2018-10-01T16:16:59-04:00|flyer_open|1982865 |246        |
|2018-10-01T13:56:36-04:00|flyer_open|2016315 |2268       |
|2018-10-01T11:39:36-04:00|flyer_open|2018389 |2123       |
|2018-10-01T11:24:14-04:00|flyer_open|1983425 |2366       |
|2018-10-01T08:53:11-04:00|flyer_open|1990059 |249        |
|2018-10-01T08:49:24-04:00|flyer_open|2009425 |2631       |
|2018-10-01T08:45:56-04:00|flyer_open|1990059 |249        |
|2018-10-01T06:22:48-04:00|flyer_open|2016095 |2694       |
|2018-10-01T06:19:52-04:00|flyer_open|2016095 |2694       |
+-------------------------+----------+--------+-----------+



Checking the desired output for 1 record

In [10]:
# using lambda function timestamp_diff:
#          timestamp_diff(to_timestamp(min(timestamp)),to_timestamp(max(timestamp))) as time_diff_in_second
#def timestamp_diff(time1: datetime.datetime, time2: datetime.datetime):
#    return int((time1-time2).total_seconds()*1000)
#
#spark.udf.register("timestamp_diff", timestamp_diff)

"""
spark.sql('''
    select timestamp, event, flyer_id, merchant_id from input_retail_table as lrt
    where lrt.user_id = '0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91'
    and lrt.event = 'flyer_open'
    order by timestamp desc
    ''').show(100,False)
    """
df = spark.sql("""
    select max(timestamp) as maxT, min(timestamp) as minT, 
         ((( to_timestamp(max(timestamp)) - to_timestamp(min(timestamp)) ) / count(timestamp))) as time_diff,
         ((( unix_timestamp(to_timestamp(max(timestamp))) - unix_timestamp(to_timestamp(min(timestamp))) ) 
                 / count(timestamp))) as time_diff_in_Sec
           from input_retail_table as lrt
           where lrt.user_id = '0017345b89958a1d8cae79020dbbf6e2f687124ae8bf937fa6ed729e66a13f91'
             and (lrt.event = 'flyer_open' or lrt.event = 'item_open')
    """)

df.show(10,False)

+-------------------------+-------------------------+---------------------------+-----------------+
|maxT                     |minT                     |time_diff                  |time_diff_in_Sec |
+-------------------------+-------------------------+---------------------------+-----------------+
|2018-10-01T12:03:52-04:00|2018-10-01T11:30:48-04:00|1 minutes 22.666667 seconds|82.66666666666667|
+-------------------------+-------------------------+---------------------------+-----------------+



In [11]:
""" Not  Needed
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

#time1 = datetime.datetime.now()
#time2 = datetime.datetime.strptime("2020-11-09 13:54:19.541035","%Y-%m-%d %H:%M:%S.%f")
#timediff = time2 - time1
#print(time1, time2, datetime.timedelta(time2 - time1).total_seconds() )

df.withColumn("max_timestamp", to_timestamp(col("maxT"))) \
  .withColumn("min_timestamp", to_timestamp(col("minT"))) \
  .withColumn("DiffInSeconds", col("time_diff").cast(LongType)) \
  .show(false)
  """

' Not  Needed\nimport pyspark\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import to_timestamp, current_timestamp\nfrom pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType\n\n#time1 = datetime.datetime.now()\n#time2 = datetime.datetime.strptime("2020-11-09 13:54:19.541035","%Y-%m-%d %H:%M:%S.%f")\n#timediff = time2 - time1\n#print(time1, time2, datetime.timedelta(time2 - time1).total_seconds() )\n\ndf.withColumn("max_timestamp", to_timestamp(col("maxT")))   .withColumn("min_timestamp", to_timestamp(col("minT")))   .withColumn("DiffInSeconds", col("time_diff").cast(LongType))   .show(false)\n  '

## Step 5: Store the output result in csv file to be used for BI

Output file: result_dataset.csv 
##### Note:
          Change the output file name and path before running, for test purpose local path is used.

In [12]:
spark.sql('''
    select user_id, merchant_id, to_date(timestamp) as tdate, 
         ( (to_timestamp(max(timestamp)) - to_timestamp(min(timestamp))) / count(timestamp)) as avg_time,
         ( (( unix_timestamp(to_timestamp(max(timestamp))) - unix_timestamp(to_timestamp(min(timestamp))) ) 
                 / count(timestamp))) as avg_time_in_Sec
      from input_retail_table as lrt
      where (lrt.event = 'flyer_open' or lrt.event = 'item_open')
      group by user_id, merchant_id, tdate
    ''').show(50,True)

+--------------------+-----------+----------+--------------------+---------------+
|             user_id|merchant_id|     tdate|            avg_time|avg_time_in_Sec|
+--------------------+-----------+----------+--------------------+---------------+
|00f4a877b16a98a20...|       2345|2018-10-01|1 hours 37 minute...|         5861.5|
|031eb2968048a6b02...|       2609|2018-10-01|           0 seconds|            0.0|
|07ee047fa6b71db13...|       2268|2018-10-01|1 minutes 52 seconds|          112.0|
|0cdc3cd22bb3acce6...|       2163|2018-10-01|           0 seconds|            0.0|
|15e3005b2d8fe8c74...|       2046|2018-10-01|           0 seconds|            0.0|
|1679d463324e9c02b...|       2366|2018-10-01|4 hours 24 minute...|        15870.0|
|24c0f98368d52f3a8...|       2944|2018-10-01|           0 seconds|            0.0|
|3378aa355a65ccc88...|       1991|2018-10-01|           0 seconds|            0.0|
|365a011424432621d...|       2345|2018-10-01|           0 seconds|            0.0|
|3d7

In [13]:
#Change the output path name before running
out_path = "C:/Users/saurabh/input/result/result_dataset.csv"

df = spark.sql("""
    select user_id, merchant_id, to_date(timestamp) as tdate, 
         string( (to_timestamp(max(timestamp)) - to_timestamp(min(timestamp))) / count(timestamp)) as avg_time,
               ( (( unix_timestamp(to_timestamp(max(timestamp))) - unix_timestamp(to_timestamp(min(timestamp))) ) 
                 / count(timestamp))) as avg_time_in_Sec
      from input_retail_table as lrt
      where (lrt.event = 'flyer_open' or lrt.event = 'item_open')
      group by user_id, merchant_id, tdate
    """)

#df.show()

# In case of large data set, where partition is required, we use 1 of the following:

#df.coalesce(1).write.csv(out_path)
#df.repartition(1).write.format("com.databricks.spark.csv").option("header","true").save(out_path)

#df.write.format("com.databricks.spark.csv").option("header","false").mode("overwrite").save(out_path)
#df.write.format("com.databricks.spark.csv").option("header","true").save(out_path)

# For smaller dataset, we can use the following
df.toPandas().to_csv(out_path, sep=',', header=True, index=False)

