# McDonald's Basket Data Merge - v5: Ingenico card tokens
## April 2018
### Dr Jose M Albornoz

This notebook performs basket data and card data merge for March 2018, accounting for all 7 stores in the Reading area. Only Ingenico tokens are being considered. Merge is performed using timestamps only

# 1.- Import necessary modules, define SQLContext

In [1]:
# Import required modules
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.functions import col
from pyspark.sql.functions import row_number
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp
import math
from pyspark.sql.window import *

In [2]:
# Define SQLContext
sqlContext = SQLContext(sc)

# 2.- Generic functions to load data from a text-based file

In [3]:
# a function to load a colon-separated value file
def load_data_colon(filename, schema, columns = None):
    df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", ";").options(header='true'). \
    load(filename, schema = schema)
    if columns is None:
        # If no columns are given, then select all
        columns = schema.names
    return df.select(columns)

In [4]:
# a function to load a pipe-separated value file
def load_data_pipe(filename, schema, columns = None):
    df = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", "|").options(header='false'). \
    load(filename, schema = schema)
    if columns is None:
        # If no columns are given, then select all
        columns = schema.names
    return df.select(columns)

# 3.- Schema for card data files

In [5]:
schema_card = StructType([ 
    StructField('store_number', IntegerType(), True), 
    StructField('terminal_number', IntegerType(), True), 
    StructField('transaction_date', StringType(), True), 
    StructField('transaction_time', IntegerType(), True), 
    StructField('transaction_amount', FloatType(), True),
    StructField('card_scheme', StringType(), True),
    StructField('card_provider', StringType(), True),
    StructField('pan_token', StringType(), True)
])

# 4.- Schema for basket data files

In [6]:
schema_basket = StructType([ 
    StructField('store_number', StringType(), True), 
    StructField('time_period', StringType(), True), 
    StructField('time_of_day', StringType(), True), 
    StructField('business_date', StringType(), True), 
    StructField('pos_code', StringType(), True),
    StructField('c6', StringType(), True),
    StructField('total_cost', StringType(), True),
    StructField('unit_cost', StringType(), True),
    StructField('quantity', StringType(), True), 
    StructField('food_cost', StringType(), True), 
    StructField('paper_cost', StringType(), True), 
    StructField('hour', StringType(), True), 
    StructField('transaction_time', StringType(), True), 
    StructField('transaction_date', StringType(), True),
    StructField('pos_id', StringType(), True),
    StructField('unique_till_code', StringType(), True),
    StructField('sale_number', StringType(), True),
    StructField('menu_item_id', StringType(), True), 
    StructField('till_key', StringType(), True), 
    StructField('channel', StringType(), True), 
    StructField('c20', StringType(), True), 
    StructField('sale_or_refund', StringType(), True),
    StructField('eatin_eatout', StringType(), True),
    StructField('eat_in_eatout_str', StringType(), True),
    StructField('payment_type_id', StringType(), True),
    StructField('payment_type', StringType(), True) 
])

# 5.- Basket data preprocessing

## 5.1.- Load basket data

In [7]:
df_basket = load_data_pipe("BasketDataReading7Mar18_not15/000", schema_basket)

In [8]:
df_basket.show(10)

+------------+-----------+-----------+-------------+--------+----+----------+---------+--------+---------+----------+----+-------------------+----------------+-----------------+----------------+-----------+------------+--------+-------------+---+--------------+------------+-----------------+---------------+------------+
|store_number|time_period|time_of_day|business_date|pos_code|  c6|total_cost|unit_cost|quantity|food_cost|paper_cost|hour|   transaction_time|transaction_date|           pos_id|unique_till_code|sale_number|menu_item_id|till_key|      channel|c20|sale_or_refund|eatin_eatout|eat_in_eatout_str|payment_type_id|payment_type|
+------------+-----------+-----------+-------------+--------+----+----------+---------+--------+---------+----------+----+-------------------+----------------+-----------------+----------------+-----------+------------+--------+-------------+---+--------------+------------+-----------------+---------------+------------+
|         787|          1|  Breakf

## 5.2.- Cast basket data columns to the correct types

In [9]:
df_basket = df_basket.withColumn("store_number", df_basket["store_number"].cast(IntegerType()))        
df_basket = df_basket.withColumn("time_period", df_basket["time_period"].cast(IntegerType()))
df_basket = df_basket.withColumn("time_of_day", df_basket["time_of_day"].cast(StringType()))
df_basket = df_basket.withColumn("business_date", df_basket["business_date"].cast(DateType()))
df_basket = df_basket.withColumn("pos_code", df_basket["pos_code"].cast(IntegerType()))
df_basket = df_basket.withColumn("c6", df_basket["c6"].cast(StringType()))
df_basket = df_basket.withColumn("total_cost", df_basket["total_cost"].cast(FloatType()))
df_basket = df_basket.withColumn("unit_cost", df_basket["unit_cost"].cast(FloatType()))
df_basket = df_basket.withColumn("quantity", df_basket["quantity"].cast(IntegerType()))
df_basket = df_basket.withColumn("food_cost", df_basket["food_cost"].cast(FloatType()))
df_basket = df_basket.withColumn("paper_cost", df_basket["paper_cost"].cast(FloatType()))
df_basket = df_basket.withColumn("hour", df_basket["hour"].cast(IntegerType()))
df_basket = df_basket.withColumn("transaction_time", df_basket["transaction_time"].cast(StringType()))
df_basket = df_basket.withColumn("transaction_date", df_basket["transaction_date"].cast(StringType()))
df_basket = df_basket.withColumn("pos_id", df_basket["pos_id"].cast(StringType()))
df_basket = df_basket.withColumn("unique_till_code", df_basket["unique_till_code"].cast(IntegerType()))
df_basket = df_basket.withColumn("sale_number", df_basket["sale_number"].cast(IntegerType()))
df_basket = df_basket.withColumn("menu_item_id", df_basket["menu_item_id"].cast(IntegerType()))
df_basket = df_basket.withColumn("till_key", df_basket["till_key"].cast(IntegerType()))
df_basket = df_basket.withColumn("channel", df_basket["channel"].cast(StringType()))
df_basket = df_basket.withColumn("c20", df_basket["c20"].cast(StringType()))
df_basket = df_basket.withColumn("sale_or_refund", df_basket["sale_or_refund"].cast(StringType()))
df_basket = df_basket.withColumn("eatin_eatout", df_basket["eatin_eatout"].cast(IntegerType()))
df_basket = df_basket.withColumn("eat_in_eatout_str", df_basket["eat_in_eatout_str"].cast(StringType()))
df_basket = df_basket.withColumn("payment_type_id", df_basket["payment_type_id"].cast(IntegerType()))
df_basket = df_basket.withColumn("payment_type", df_basket["payment_type"].cast(StringType()))

In [10]:
all_transactions = df_basket.count()

In [11]:
all_transactions

1500705

## 5.3.- Only 'Sales' and not 'Refunds' will be considered

In [12]:
[i.sale_or_refund for i in df_basket.select('sale_or_refund').distinct().collect()]

['Sale']

In [13]:
df_basket = df_basket.filter(df_basket['sale_or_refund'] == 'Sale')

In [14]:
df_basket.count()

1500705

## 5.4.- Convert transaction date and time into a timestamp, filter so that only data for February 2018 is considered

In [15]:
col = to_timestamp(df_basket['transaction_time'], 'yyyy-MM-dd HH:mm:ss')

In [16]:
df_basket = df_basket.withColumn('timestamp_basket', col)

In [17]:
df_basket = df_basket.drop('transaction_date', 'transaction_time', 'hour').orderBy('timestamp_basket')

In [18]:
from pyspark.sql.functions import col
df_basket = df_basket.filter(col('timestamp_basket') >= '2018-03-01 00:00:00')

In [19]:
df_basket.count()

1500491

## 5.5.- Select relevant columns from basket data

In [20]:
df_basket_reduced = df_basket.select('store_number', 'timestamp_basket', 'pos_code', 'pos_id', 'sale_number', 
                                     'total_cost', 'menu_item_id', 'payment_type_id', 'channel')

In [21]:
df_basket_reduced.show(5)

+------------+-------------------+--------+-----------------+-----------+----------+------------+---------------+-------------+
|store_number|   timestamp_basket|pos_code|           pos_id|sale_number|total_cost|menu_item_id|payment_type_id|      channel|
+------------+-------------------+--------+-----------------+-----------+----------+------------+---------------+-------------+
|         102|2018-03-01 00:00:06|       3|POS0003:392555973|          5|       0.0|        4600|              2|FRONT COUNTER|
|         102|2018-03-01 00:00:06|       3|POS0003:392555973|          1|       0.0|        4834|              2|FRONT COUNTER|
|         102|2018-03-01 00:00:06|       3|POS0003:392555973|          3|      1.06|        3080|              2|FRONT COUNTER|
|         102|2018-03-01 00:00:06|       3|POS0003:392555973|          4|       3.1|        6160|              2|FRONT COUNTER|
|         102|2018-03-01 00:00:06|       3|POS0003:392555973|          7|      2.91|        1410|       

## 5.7.- Split basket data into 'cash' and 'cashless' transaction, sort by store number, timestamp, till ID and sale number

In [22]:
df_basket_cash = df_basket_reduced.filter(df_basket["payment_type_id"] == 1)

In [23]:
df_basket_cash = df_basket_cash.drop('payment_type_id')

In [24]:
df_basket_cash = df_basket_cash.orderBy("store_number", "pos_code", "pos_id", "timestamp_basket", "sale_number")

In [25]:
df_basket_cashless = df_basket_reduced.filter(df_basket["payment_type_id"] == 2)

In [26]:
df_basket_cashless = df_basket_cashless.drop('payment_type_id')

In [27]:
df_basket_cashless = df_basket_cashless.orderBy("store_number", "pos_code", "pos_id", "timestamp_basket", "sale_number")

In [28]:
df_basket_cash.show(20)

+------------+-------------------+--------+-----------------+-----------+----------+------------+-------------+
|store_number|   timestamp_basket|pos_code|           pos_id|sale_number|total_cost|menu_item_id|      channel|
+------------+-------------------+--------+-----------------+-----------+----------+------------+-------------+
|         102|2018-03-01 11:44:02|       1|POS0001:216506957|          1|      1.24|        3531|FRONT COUNTER|
|         102|2018-03-01 11:45:53|       1|POS0001:216506958|          1|      2.49|        2286|FRONT COUNTER|
|         102|2018-03-01 11:45:53|       1|POS0001:216506958|          2|      0.83|        1010|FRONT COUNTER|
|         102|2018-03-01 11:45:53|       1|POS0001:216506958|          3|      1.41|        3521|FRONT COUNTER|
|         102|2018-03-01 11:45:53|       1|POS0001:216506958|          4|      1.07|        3608|FRONT COUNTER|
|         102|2018-03-01 11:46:46|       1|POS0001:216506959|          1|      0.83|        1442|FRONT C

In [29]:
df_basket_cashless.show(20)

+------------+-------------------+--------+-----------------+-----------+----------+------------+-------------+
|store_number|   timestamp_basket|pos_code|           pos_id|sale_number|total_cost|menu_item_id|      channel|
+------------+-------------------+--------+-----------------+-----------+----------+------------+-------------+
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|          1|      1.66|        6976|FRONT COUNTER|
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|          2|       0.0|        4833|FRONT COUNTER|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|          1|      1.33|        1508|FRONT COUNTER|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|          2|     -0.76|        1370|FRONT COUNTER|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|          3|      0.82|        3425|FRONT COUNTER|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|          4|      4.09|        7451|FRONT C

In [30]:
cash_transactions = df_basket_cash.count()

In [31]:
cashless_transactions = df_basket_cashless.count()

In [32]:
cash_transactions

441897

In [33]:
cashless_transactions

1046946

## 5.8.- Compute percentage of 'cash' and 'cashless' transaction

In [34]:
cash_transactions*100/all_transactions

29.44596039861265

In [35]:
cashless_transactions*100/all_transactions

69.7636111027817

## 5.9.- Get Reading store numbers

In [36]:
stores_list = [i.store_number for i in df_basket.select('store_number').distinct().collect()]

In [37]:
sorted(stores_list)

[102, 787, 960, 980, 1036, 1262, 1339]

# 6.- Card data preprocessing

## 6.1.- Load and filter card data by store number, drop irrelevant fields

In [38]:
df_Mar2018 = sqlContext.read.csv("March2018.csv/part-00000-3a074bc4-5c49-46cb-8aec-19e298d4713a-c000.csv", \
                                 header=False, mode="DROPMALFORMED", schema=schema_card)

In [39]:
df_cards = df_Mar2018.filter(df_Mar2018['store_number'].isin(stores_list))

In [40]:
df_cards = df_cards.orderBy("store_number", "transaction_date", "transaction_time")

In [41]:
df_cards = df_cards.drop('card_provider', 'card_scheme')

In [42]:
df_cards.show(5)

+------------+---------------+----------------+----------------+------------------+-------------------+
|store_number|terminal_number|transaction_date|transaction_time|transaction_amount|          pan_token|
+------------+---------------+----------------+----------------+------------------+-------------------+
|         102|              3|      2018/03/01|             108|            1066.0|4751421607812972106|
|         102|             24|      2018/03/01|             145|             198.0|4751298883781058303|
|         102|             21|      2018/03/01|             154|             149.0|4921815233995504418|
|         102|              3|      2018/03/01|             220|             218.0|4751292925303368468|
|         102|             21|      2018/03/01|             447|              99.0|4921816127599479812|
+------------+---------------+----------------+----------------+------------------+-------------------+
only showing top 5 rows



In [43]:
num_cards = df_cards.count()

In [44]:
num_cards

207062

## 6.2.- Count of Toshiba and Ingenico tokens

In [45]:
token_type = when(length(col("pan_token")) == 64 , 'T')\
            .when(length(col("pan_token")) == 19, 'I')\
            .otherwise('U')         

In [46]:
df_data = df_cards.withColumn("token_type", token_type)

In [47]:
df_data.count()

207062

### 6.2.1.- Ingenico tokens

In [48]:
Ingenico_tokens = df_data.filter(df_data['token_type'] == 'I').count()

In [49]:
Ingenico_tokens 

206654

### 6.2.2.- Toshiba tokens

In [50]:
Toshiba_tokens = df_data.filter(df_data['token_type'] == 'T').count()

In [51]:
Toshiba_tokens

0

### 6.2.3.- Unknown tokens

In [52]:
df_data.filter(df_data['token_type'] == 'U').show(5)

+------------+---------------+----------------+----------------+------------------+---------+----------+
|store_number|terminal_number|transaction_date|transaction_time|transaction_amount|pan_token|token_type|
+------------+---------------+----------------+----------------+------------------+---------+----------+
|         102|              1|      2018/03/03|          210237|             129.0|     null|         U|
|         102|             24|      2018/03/06|           21845|             975.0|     null|         U|
|         102|              1|      2018/03/07|          193226|             458.0|     null|         U|
|         102|             23|      2018/03/13|          194207|             459.0|     null|         U|
|         102|              1|      2018/03/13|          194851|             149.0|     null|         U|
+------------+---------------+----------------+----------------+------------------+---------+----------+
only showing top 5 rows



In [53]:
unknown_tokens = df_data.filter(df_data['token_type'] == 'U').count()

In [54]:
unknown_tokens

408

### 6.2.4.- Token proportion

In [55]:
Ingenico_proportion = Ingenico_tokens*100/num_cards

In [56]:
Ingenico_proportion 

99.80295756826457

In [57]:
Toshiba_proportion = Toshiba_tokens*100/num_cards

In [58]:
Toshiba_proportion

0.0

In [59]:
unknown_proportion = unknown_tokens*100/num_cards

In [60]:
unknown_proportion

0.19704243173542224

In [61]:
df_data.unpersist()

DataFrame[store_number: int, terminal_number: int, transaction_date: string, transaction_time: int, transaction_amount: float, pan_token: string, token_type: string]

## 6.3.- Convert transaction amount to pounds

In [62]:
def get_amount_pounds(amount_pence):
    amount_pounds = amount_pence/100.0
    return amount_pounds

get_amount_pounds_udf = udf(get_amount_pounds, FloatType())

In [63]:
df_cards = df_cards.withColumn("transaction_amount", get_amount_pounds_udf(df_cards['transaction_amount']))

In [64]:
df_cards.show(5)

+------------+---------------+----------------+----------------+------------------+-------------------+
|store_number|terminal_number|transaction_date|transaction_time|transaction_amount|          pan_token|
+------------+---------------+----------------+----------------+------------------+-------------------+
|         102|              3|      2018/03/01|             108|             10.66|4751421607812972106|
|         102|             24|      2018/03/01|             145|              1.98|4751298883781058303|
|         102|             21|      2018/03/01|             154|              1.49|4921815233995504418|
|         102|              3|      2018/03/01|             220|              2.18|4751292925303368468|
|         102|             21|      2018/03/01|             447|              0.99|4921816127599479812|
+------------+---------------+----------------+----------------+------------------+-------------------+
only showing top 5 rows



## 6.4.- Convert transaction time to hour, minutes and seconds

In [65]:
def convert_time(time_int):
    
    # computes seconds
    z1 = math.modf(time_int/99.99999999)
    seconds = str(int(z1[0]*100)) 
    
    # computes minutes
    z2 = math.modf(z1[1]/99.99999999)
    minutes = str(int(z2[0]*100))
    
    # computes hour
    hour = str(int(z2[1]))  
    
    if len(seconds) == 1:
        seconds = seconds.zfill(2)
    if len(minutes) == 1:
        minutes = minutes.zfill(2)
    if len(hour) == 1:
        hour = hour.zfill(2)
    
    time_str = hour + ':' + minutes + ':' + seconds
    return time_str

convert_time_udf = udf(convert_time, StringType())

In [66]:
df_cards = df_cards.withColumn("transaction_time", convert_time_udf(df_cards['transaction_time']))

In [67]:
df_cards.show(5)

+------------+---------------+----------------+----------------+------------------+-------------------+
|store_number|terminal_number|transaction_date|transaction_time|transaction_amount|          pan_token|
+------------+---------------+----------------+----------------+------------------+-------------------+
|         102|              3|      2018/03/01|        00:01:08|             10.66|4751421607812972106|
|         102|             24|      2018/03/01|        00:01:45|              1.98|4751298883781058303|
|         102|             21|      2018/03/01|        00:01:54|              1.49|4921815233995504418|
|         102|              3|      2018/03/01|        00:02:20|              2.18|4751292925303368468|
|         102|             21|      2018/03/01|        00:04:47|              0.99|4921816127599479812|
+------------+---------------+----------------+----------------+------------------+-------------------+
only showing top 5 rows



## 6.5.- Combine transaction date and time into a single timestamp, drop transaction_date and transaction_time

In [68]:
df_cards = df_cards.withColumn('timestamp_tmp', concat(df_cards["transaction_date"], lit(" "), \
                                                       df_cards["transaction_time"]))

In [69]:
col = to_timestamp(df_cards['timestamp_tmp'], 'yyyy/MM/dd HH:mm:ss')

In [70]:
df_cards = df_cards.withColumn('timestamp_cards', col)

In [71]:
df_cards = df_cards.drop('transaction_date', 'transaction_time', 'timestamp_tmp')

In [72]:
df_cards = df_cards.orderBy('store_number', 'terminal_number', 'timestamp_cards')

In [73]:
df_cards.show(20)

+------------+---------------+------------------+-------------------+-------------------+
|store_number|terminal_number|transaction_amount|          pan_token|    timestamp_cards|
+------------+---------------+------------------+-------------------+-------------------+
|         102|              1|              1.99|5573611294275381348|2018-03-01 12:14:19|
|         102|              1|              7.48|5573611294275381348|2018-03-01 12:15:10|
|         102|              1|              5.99|3770647320610855465|2018-03-01 13:25:13|
|         102|              1|              0.89|4658598774717247027|2018-03-01 13:32:12|
|         102|              1|              4.79|4832043329918528320|2018-03-01 13:42:27|
|         102|              1|              6.07|4462721520674372218|2018-03-01 13:45:35|
|         102|              1|              6.39|4757147527171760525|2018-03-01 13:49:09|
|         102|              1|              6.48|4762484728867996199|2018-03-01 13:49:49|
|         

# 7.- Remove unnecessary data from memory

In [74]:
df_Mar2018.unpersist()

DataFrame[store_number: int, terminal_number: int, transaction_date: string, transaction_time: int, transaction_amount: float, card_scheme: string, card_provider: string, pan_token: string]

# 8.- Linkage

## 8.1.- Compute aggregated basket with sale total plus VAT

In [75]:
df_basket_cashless_aggregated = df_basket_cashless.groupBy("store_number", "timestamp_basket", "pos_code", \
                                                           "pos_id", "channel").agg(sum("total_cost").\
                                                            alias("total_cost")). \
                                                            orderBy("store_number", "pos_code", "timestamp_basket")

In [76]:
df_basket_cashless_aggregated.show(20)

+------------+-------------------+--------+-----------------+-------------+------------------+
|store_number|   timestamp_basket|pos_code|           pos_id|      channel|        total_cost|
+------------+-------------------+--------+-----------------+-------------+------------------+
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|FRONT COUNTER| 1.659999966621399|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|FRONT COUNTER| 5.480000197887421|
|         102|2018-03-01 13:24:55|       1|POS0001:216506988|FRONT COUNTER| 4.990000009536743|
|         102|2018-03-01 13:31:58|       1|POS0001:216506993|FRONT COUNTER|0.7400000095367432|
|         102|2018-03-01 13:41:34|       1|POS0001:216507006|FRONT COUNTER| 3.480000004172325|
|         102|2018-03-01 13:45:02|       1|POS0001:216507008|FRONT COUNTER| 5.049999952316284|
|         102|2018-03-01 13:48:46|       1|POS0001:216507013|FRONT COUNTER| 5.330000221729279|
|         102|2018-03-01 13:49:23|       1|POS0001

In [77]:
# calculate total cost plus vat = sale_total
from pyspark.sql.functions import col
df_basket_cashless_aggregated = df_basket_cashless_aggregated.withColumn('sale_total', col('total_cost')*1.2)

In [78]:
# round amounts to two decimal digits
df_basket_cashless_aggregated = df_basket_cashless_aggregated.\
                                withColumn("total_cost", round(df_basket_cashless_aggregated["total_cost"], 2))
    
df_basket_cashless_aggregated = df_basket_cashless_aggregated.\
                                withColumn("sale_total", round(df_basket_cashless_aggregated['sale_total'], 2))    

In [79]:
df_basket_cashless_aggregated.show(20)

+------------+-------------------+--------+-----------------+-------------+----------+----------+
|store_number|   timestamp_basket|pos_code|           pos_id|      channel|total_cost|sale_total|
+------------+-------------------+--------+-----------------+-------------+----------+----------+
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|FRONT COUNTER|      1.66|      1.99|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|FRONT COUNTER|      5.48|      6.58|
|         102|2018-03-01 13:24:55|       1|POS0001:216506988|FRONT COUNTER|      4.99|      5.99|
|         102|2018-03-01 13:31:58|       1|POS0001:216506993|FRONT COUNTER|      0.74|      0.89|
|         102|2018-03-01 13:41:34|       1|POS0001:216507006|FRONT COUNTER|      3.48|      4.18|
|         102|2018-03-01 13:45:02|       1|POS0001:216507008|FRONT COUNTER|      5.05|      6.06|
|         102|2018-03-01 13:48:46|       1|POS0001:216507013|FRONT COUNTER|      5.33|       6.4|
|         102|2018-0

In [80]:
# select relevant fields
df_basket_cashless_aggregated = df_basket_cashless_aggregated.select("store_number", "timestamp_basket", \
                                                           "pos_code", "pos_id", "channel", "sale_total")

In [81]:
df_basket_cashless_aggregated.show(20)

+------------+-------------------+--------+-----------------+-------------+----------+
|store_number|   timestamp_basket|pos_code|           pos_id|      channel|sale_total|
+------------+-------------------+--------+-----------------+-------------+----------+
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|FRONT COUNTER|      1.99|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|FRONT COUNTER|      6.58|
|         102|2018-03-01 13:24:55|       1|POS0001:216506988|FRONT COUNTER|      5.99|
|         102|2018-03-01 13:31:58|       1|POS0001:216506993|FRONT COUNTER|      0.89|
|         102|2018-03-01 13:41:34|       1|POS0001:216507006|FRONT COUNTER|      4.18|
|         102|2018-03-01 13:45:02|       1|POS0001:216507008|FRONT COUNTER|      6.06|
|         102|2018-03-01 13:48:46|       1|POS0001:216507013|FRONT COUNTER|       6.4|
|         102|2018-03-01 13:49:23|       1|POS0001:216507014|FRONT COUNTER|      5.82|
|         102|2018-03-01 13:59:07|       1|

In [82]:
df_basket_cashless_aggregated.count()

210294

## 8.2.- Adds a column with row number to aggregated basket data

In [83]:
# df_basket_cashless_aggregated = df_basket_cashless_aggregated.withColumn("row_num", row_number().\
#                                                                          over(Window.orderBy("store_number", \
#                                                                                  "pos_code", "pos_id", \
#                                                                                  "timestamp_basket")))

In [84]:
# df_basket_cashless_aggregated.show(20)

## 8.3.- Adds a column with next basket timestamp to aggregated basket data

In [85]:
# w = Window().partitionBy().orderBy(col("row_num"))
w = Window().partitionBy(col('store_number'), col('pos_code')).orderBy(col("store_number"), col('pos_code'), \
                                                                       col('timestamp_basket'))
df_basket_cashless_aggregated = df_basket_cashless_aggregated.select("*", lead("timestamp_basket"). \
                                                                     over(w).alias("next_timestamp")). \
                                                                     orderBy("store_number", \
                                                                             "pos_code", "pos_id", \
                                                                             "timestamp_basket")

In [87]:
df_basket_cashless_aggregated.show(10000)

+------------+-------------------+--------+-----------------+-------------+----------+-------------------+
|store_number|   timestamp_basket|pos_code|           pos_id|      channel|sale_total|     next_timestamp|
+------------+-------------------+--------+-----------------+-------------+----------+-------------------+
|         102|2018-03-01 12:13:19|       1|POS0001:216506961|FRONT COUNTER|      1.99|2018-03-01 12:14:40|
|         102|2018-03-01 12:14:40|       1|POS0001:216506962|FRONT COUNTER|      6.58|2018-03-01 13:24:55|
|         102|2018-03-01 13:24:55|       1|POS0001:216506988|FRONT COUNTER|      5.99|2018-03-01 13:31:58|
|         102|2018-03-01 13:31:58|       1|POS0001:216506993|FRONT COUNTER|      0.89|2018-03-01 13:41:34|
|         102|2018-03-01 13:41:34|       1|POS0001:216507006|FRONT COUNTER|      4.18|2018-03-01 13:45:02|
|         102|2018-03-01 13:45:02|       1|POS0001:216507008|FRONT COUNTER|      6.06|2018-03-01 13:48:46|
|         102|2018-03-01 13:48:46|   

### 8.3.1- Filter out last row of aggregated basket dataframe

In [88]:
df_basket_cashless_aggregated = df_basket_cashless_aggregated.where(col('next_timestamp').isNotNull())

In [89]:
# df_basket_cashless_aggregated.count()

In [90]:
# df_basket_cashless_aggregated = df_basket_cashless_aggregated.filter(col('row_num') != 210294) 

In [91]:
# df_basket_cashless_aggregated.count()

## 8.4.- Performs a first join of aggregated basket with card data using timestamps 

In [92]:
df_basket_cashless_aggregated.registerTempTable('basket_aggregated')
df_cards.registerTempTable('cards')

In [93]:
df_joined_basket0 = sqlContext.sql("SELECT basket_aggregated.store_number, basket_aggregated.pos_code, \
                             basket_aggregated.pos_id, basket_aggregated.sale_total, \
                             cards.transaction_amount, cards.pan_token, basket_aggregated.timestamp_basket, \
                             cards.timestamp_cards, basket_aggregated.next_timestamp, basket_aggregated.channel \
                             FROM basket_aggregated \
                             INNER JOIN cards ON \
                             basket_aggregated.store_number = cards.store_number AND \
                             basket_aggregated.pos_code = cards.terminal_number AND \
                             (basket_aggregated.timestamp_basket < cards.timestamp_cards AND \
                             basket_aggregated.next_timestamp > cards.timestamp_cards) \
                             ORDER BY basket_aggregated.store_number, basket_aggregated.pos_code, \
                             basket_aggregated.timestamp_basket")

In [94]:
df_joined_basket0.show(100)

+------------+--------+-----------------+----------+------------------+-------------------+-------------------+-------------------+-------------------+-------------+
|store_number|pos_code|           pos_id|sale_total|transaction_amount|          pan_token|   timestamp_basket|    timestamp_cards|     next_timestamp|      channel|
+------------+--------+-----------------+----------+------------------+-------------------+-------------------+-------------------+-------------------+-------------+
|         102|       1|POS0001:216506961|      1.99|              1.99|5573611294275381348|2018-03-01 12:13:19|2018-03-01 12:14:19|2018-03-01 12:14:40|FRONT COUNTER|
|         102|       1|POS0001:216506962|      6.58|              7.48|5573611294275381348|2018-03-01 12:14:40|2018-03-01 12:15:10|2018-03-01 13:24:55|FRONT COUNTER|
|         102|       1|POS0001:216506988|      5.99|              5.99|3770647320610855465|2018-03-01 13:24:55|2018-03-01 13:25:13|2018-03-01 13:31:58|FRONT COUNTER|
|   

In [95]:
df_joined_basket1 = df_joined_basket0.select('store_number', 'pos_code', 'pos_id', 'pan_token', 'channel', \
                                             'timestamp_basket', 'timestamp_cards', 'next_timestamp'). \
                                             orderBy('store_number', 'pos_code', 'timestamp_basket')

In [96]:
df_joined_basket1.show()

+------------+--------+-----------------+-------------------+-------------+-------------------+-------------------+-------------------+
|store_number|pos_code|           pos_id|          pan_token|      channel|   timestamp_basket|    timestamp_cards|     next_timestamp|
+------------+--------+-----------------+-------------------+-------------+-------------------+-------------------+-------------------+
|         102|       1|POS0001:216506961|5573611294275381348|FRONT COUNTER|2018-03-01 12:13:19|2018-03-01 12:14:19|2018-03-01 12:14:40|
|         102|       1|POS0001:216506962|5573611294275381348|FRONT COUNTER|2018-03-01 12:14:40|2018-03-01 12:15:10|2018-03-01 13:24:55|
|         102|       1|POS0001:216506988|3770647320610855465|FRONT COUNTER|2018-03-01 13:24:55|2018-03-01 13:25:13|2018-03-01 13:31:58|
|         102|       1|POS0001:216506993|4658598774717247027|FRONT COUNTER|2018-03-01 13:31:58|2018-03-01 13:32:12|2018-03-01 13:41:34|
|         102|       1|POS0001:216507006|4832043

In [97]:
df_joined_basket1.count()

202481

### 8.4.1.- Examine problematic transactions

In [None]:
df_basket_cashless_aggregated.filter(col('pos_id') == 'POS0023:35435297').show()

In [None]:
# df_basket_cashless_aggregated.filter((col('row_num') >= 5725) & (col('row_num') <= 5729)).show()

## 8.5.- Merge joined basket data with card data 

In [None]:
df_basket_cashless.registerTempTable('basket')
df_joined_basket1.registerTempTable('joined_basket1')

In [None]:
df_joined_basket2 = sqlContext.sql("SELECT basket.*, joined_basket1.pan_token, \
                             joined_basket1.timestamp_cards, joined_basket1.next_timestamp \
                             FROM basket \
                             INNER JOIN joined_basket1 ON \
                             basket.store_number = joined_basket1.store_number AND \
                             basket.pos_code = joined_basket1.pos_code AND \
                             basket.timestamp_basket = joined_basket1.timestamp_basket \
                             ORDER BY basket.store_number, basket.pos_code, \
                             basket.timestamp_basket, basket.sale_number")

In [None]:
df_joined_basket2.show(100)

In [None]:
df_joined_basket2.count()

In [None]:
[i.channel for i in df_joined_basket2.select('channel').distinct().collect()]

## 8.6.- Save joined basket data to disk 

In [None]:
df_joined_basket2.repartition(1).write.format('com.databricks.spark.csv').save('joined_basket_ReadingMarch2018.csv', header = 'true')