In [116]:
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql.functions.{regexp_replace}
import org.apache.spark.sql.functions.{year}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{rank}

val receipts = spark.read
                  .option("header", "true")
                  .option("inferschema", "true")
                  .csv("/home/jovyan/work/rewards_receipts_lat_v3.csv")
                  .withColumn("RECEIPT_PURCHASE_DATE", to_timestamp(col("RECEIPT_PURCHASE_DATE"), "yyyy-MM-dd HH:mm:ss.SSS"))

val items = spark.read
                  .option("header", "true")
                  .option("inferschema", "true")
                  .csv("/home/jovyan/work/rewards_receipts_item_lat_v2.csv")

import org.apache.spark.sql.functions.{to_date, to_timestamp}
receipts: org.apache.spark.sql.DataFrame = [RECEIPT_ID: string, STORE_NAME: string ... 14 more fields]
items: org.apache.spark.sql.DataFrame = [REWARDS_RECEIPT_ID: string, ITEM_INDEX: int ... 11 more fields]


In [117]:
receipts.show(2)
receipts.createOrReplaceTempView("receipts")

+--------------------+----------+--------------------+-----------+-----------+---------+------------+------------+--------------------+--------------------+---------------------+-------------+------------------+--------------------+--------------------+---------------+
|          RECEIPT_ID|STORE_NAME|       STORE_ADDRESS| STORE_CITY|STORE_STATE|STORE_ZIP| STORE_PHONE|STORE_NUMBER|             USER_ID|           SCAN_DATE|RECEIPT_PURCHASE_DATE|RECEIPT_TOTAL|RECEIPT_ITEM_COUNT| CONSUMER_USER_AGENT|         MODIFY_DATE|DIGITAL_RECEIPT|
+--------------------+----------+--------------------+-----------+-----------+---------+------------+------------+--------------------+--------------------+---------------------+-------------+------------------+--------------------+--------------------+---------------+
|5fe8a743f513262b9...|     GIANT|5463 Wisconsin Av...|Chevy Chase|         MD|    20815|240-497-6100|         312|94048e4f27ef9fdc9...|2019-05-02 18:47:...|  2019-05-01 21:52:00|        39.8

In [91]:
items.show(2)
items.createOrReplaceTempView("items")

+--------------------+----------+--------------------+------------+------------+--------+----------+----------------+------+-------------+--------------+--------------------+--------------------+
|  REWARDS_RECEIPT_ID|ITEM_INDEX| RECEIPT_DESCRIPTION|BARCODE_ORIG|     BARCODE|QUANTITY|ITEM_PRICE|DISCOUNTED_PRICE|WEIGHT|REWARDS_GROUP|         BRAND|            CATEGORY|        PRODUCT_NAME|
+--------------------+----------+--------------------+------------+------------+--------+----------+----------------+------+-------------+--------------+--------------------+--------------------+
|027a0c049debc76cb...|        26|Sb Basil, Garlic ...|        null|        null|       1|      1.09|            1.09|  null|         null|            Sb|Grocery|Canned & ...|Sb Basil, Garlic ...|
|05ea1810aec3a5567...|         9|Back to Nature Cr...|819898010288|819898010288|       1|       1.0|             1.0|  null|         null|Back to Nature|Grocery|Snacks|Cr...|Back to Nature Sp...|
+-------------------

In [19]:
val joined_df = receipts.join(items, receipts("RECEIPT_ID") === items("REWARDS_RECEIPT_ID"))

joined_df: org.apache.spark.sql.DataFrame = [RECEIPT_ID: string, STORE_NAME: string ... 27 more fields]


In [20]:
joined_df.count()

res10: Long = 14066


In [71]:
// In some cases a store_number or address can identify a store. There are also instances
// where both of these fields are null but there is a store_phone.
// I have also noticed garbage values in store_city and store_state, so all columns are suspect.
// There does not seem a way to use store_phone to fill in missing data.

// total dollar amount per store by date
spark.sql("""
    with filtered_stores as (
        select COALESCE(store_address, store_number, store_phone) as store_id, store_name, RECEIPT_TOTAL, RECEIPT_PURCHASE_DATE
        from receipts as r
        where not (store_address is null and store_number is null and store_phone is null)
    ),
    sums as (
        select store_id, store_name, sum(RECEIPT_TOTAL) as total from filtered_stores group by store_id, store_name
    ),
    top_ten as (
        select store_id from sums order by total desc limit 10
    )
    select f.store_id, date(RECEIPT_PURCHASE_DATE), sum(RECEIPT_TOTAL) as daily_total from filtered_stores f
    join top_ten t on f.store_id = t.store_id
    group by 1, 2
    
    
""").show()


+--------------------+---------------------+-----------+
|            store_id|RECEIPT_PURCHASE_DATE|daily_total|
+--------------------+---------------------+-----------+
|36 East West Newe...|           2019-11-04|     409.17|
|8101 W Judge Pere...|           2019-04-12|     207.45|
|  533 Middle Neck Rd|           2019-06-24|    1886.03|
|8101 W Judge Pere...|           2019-05-11|     315.37|
|   2425 Longfibre Rd|           2019-09-10|     456.74|
|1430 NW Garden Va...|           2019-03-31|     758.96|
|      820 Market St.|           2020-05-03|      768.6|
|    7250 Pacific Ave|           2019-08-01|     512.88|
|      11 Main Street|           2019-12-17|      589.6|
|  1457 FAIRFIELD AVE|           2020-06-11|     555.27|
|       111 104th Ave|           2020-01-31|     406.18|
+--------------------+---------------------+-----------+



In [172]:
spark.sql("""
    with result as (
        select STORE_ADDRESS from receipts group by 1
    )
    select count(*) from result
""").show()

+--------+
|count(1)|
+--------+
|     887|
+--------+



In [135]:
// category popularity by store and day
val windowSpec = Window.partitionBy("store_id", "purchase_date").orderBy(col("count").desc)

spark
    .sql("""
        with filtered_receipts as (
            select COALESCE(store_address, store_number, store_phone) as store_id,RECEIPT_ID, store_name, RECEIPT_TOTAL, date(RECEIPT_PURCHASE_DATE) as purchase_date
            from receipts as r
            where not (store_address is null and store_number is null and store_phone is null)
        )
        select * from filtered_receipts r
        join items i on r.RECEIPT_ID = i.REWARDS_RECEIPT_ID
    """)
    .withColumn("category_array", split(col("CATEGORY"), "\\|"))
    .withColumn("last_elm", element_at(col("category_array"), -1))
    .select(col("store_id"), col("last_elm"), col("purchase_date"))
    .filter("last_elm is not null")
    .groupBy("store_id", "last_elm", "purchase_date")
    .count()
    .withColumn("rank", rank().over(windowSpec))
    .orderBy(col("store_id"), col("purchase_date"), col("rank").asc)
    .show(50)
    

+-----------+-------------------+-------------+-----+----+
|   store_id|           last_elm|purchase_date|count|rank|
+-----------+-------------------+-------------+-----+----+
|1 Kent Road|             Yogurt|   2019-08-04|    4|   1|
|1 Kent Road|   Latin Seasonings|   2019-08-04|    4|   1|
|1 Kent Road|  Cottage & Ricotta|   2019-08-04|    4|   1|
|1 Kent Road|            Berries|   2019-08-04|    4|   1|
|1 Kent Road|       Buns & Rolls|   2019-08-04|    3|   5|
|1 Kent Road|            Poultry|   2019-08-04|    3|   5|
|1 Kent Road|     Meat & Seafood|   2019-08-04|    3|   5|
|1 Kent Road|     Poultry Pieces|   2019-08-04|    2|   8|
|1 Kent Road|             Salami|   2019-08-04|    2|   8|
|1 Kent Road|            Produce|   2019-08-04|    2|   8|
|1 Kent Road|         Sandwiches|   2019-08-04|    2|   8|
|1 Kent Road| Salads-Prepackaged|   2019-08-04|    2|   8|
|1 Kent Road|     Chips - Potato|   2019-08-04|    2|   8|
|1 Kent Road|              Dairy|   2019-08-04|    2|   

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank
windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@2be91a3c


In [156]:
// device count by year

spark.sql("select distinct RECEIPT_PURCHASE_DATE, CONSUMER_USER_AGENT from receipts")
.withColumn("device", regexp_replace(col("CONSUMER_USER_AGENT"), "Fetch.*\\(", ""))
.withColumn("device", regexp_replace(col("device"), ";.*", ""))
.withColumn("year", year(col("RECEIPT_PURCHASE_DATE")))
.groupBy("year", "device").count()
.orderBy(col("count").desc)
.show()

+----+-------------+-----+
|year|       device|count|
+----+-------------+-----+
|2020|   iPhone12,1|   49|
|2020|iPhone 8 Plus|   43|
|2019|iPhone 7 Plus|   40|
|2020|    iPhone XR|   39|
|2019|     iPhone 7|   38|
|2019|iPhone 8 Plus|   35|
|2019|     iPhone 8|   31|
|2019|    iPhone XR|   27|
|2020|     iPhone 7|   27|
|2020|iPhone 7 Plus|   24|
|2020|     iPhone X|   22|
|2020|     iPhone 8|   22|
|2020|    iPhone 6s|   22|
|2019|     iPhone X|   21|
|2019|    iPhone 6s|   21|
|2020|iPhone Xs Max|   19|
|2019|iPhone Xs Max|   18|
|2019|     SM-G950U|   15|
|2020|     SM-G950U|   15|
|2019|     iPhone 6|   13|
+----+-------------+-----+
only showing top 20 rows



import org.apache.spark.sql.functions.regexp_replace
import org.apache.spark.sql.functions.year
