<a href="https://colab.research.google.com/github/zxq991109/MF810-Final-Project/blob/main/12_E_commerce_Demo_DAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Playing with Large Datasets

In this notebook we play with data showing eCommerce data from a store that sells products across multiple categories. The data is available through Kaggle and represents two months of behavior data (October and November 2019).

This type of individual and transactional type data is becoming significantly more common as logging and data gathering operations become more prevalent. They may also offer unique perspectives into consumer demand or other such economics. 

The data can be found [here](https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store)

### Data

You can either setup using the Kaggle package or download the directly from the website. If you choose to use the Kaggle package, you will need to set up your account and download your credentials. Instructions can be found [here](https://github.com/Kaggle/kaggle-api). You basically need to download kaggle.json credential file and save it to your ~/.kaggle/ folder in Docker container. Note: This dataset is 14G spreaded between two files. In this notebook, it loads 2019-Oct.csv file which is 5.3G.

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
# Establish a path for our dataset
download_path = "ecommerce"

# The location for our parquet file
parquet_path = "ecommerce.parquet"

In [3]:
import os
os.environ["KAGGLE_CONFIG_DIR"] = "/content/"
!chmod 600 "/content/kaggle.json"

##### register Kaggle account and grab API key

In [4]:
!pip install kaggle
import kaggle
kaggle.api.authenticate()



In [5]:
dataset_name = 'mkechinov/ecommerce-behavior-data-from-multi-category-store'
kaggle.api.dataset_download_files(dataset_name, path=download_path, unzip=True)


### Spark ETL

#### Creating/retrieving the SparkSession

We establish our SparkSession and set some memory constraints on our cluster. In this case, we are running in local mode so there is only one executor which is the driver. The master option of local\[n\] tells Spark to start in local mode with n threads. An asterisk will run with as many logical cores as your machine has available.

In [7]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 39.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=4462c132bd095855784b478d42bc11e62edfef047a60e8bd6720e569b5837eed
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [8]:
from pyspark.sql import SparkSession
MAX_MEMORY = "16g"
spark = SparkSession \
    .builder \
    .appName("EcommerceAnalysis") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .config("spark.ui.port", "4050") \
    .master("local[*]") \
    .getOrCreate()
spark

#### Read in CSV files to dataframe

This reads all of the CSVs in the path that I've provided since this is a lazy operation, this doesn't execute until an action is taken on the dataframe. 

In [9]:
df = spark \
    .read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"{download_path}/2019-Oct.csv")

#### Convert to Parquet

View progress in the Spark Web UI at http://127.0.0.1:4040/jobs/
This action takes the CSV files being read in and writes them as compressed parquet files. Look at the output folder with the parquet file and notice that it is partitioned. 

In [10]:
df.write \
    .mode("overwrite") \
    .option("compression", "gzip")\
    .parquet(parquet_path)

#### Read from Parquet

This simply reads the data back in and overwrites the df variable effectively destroying the original DAG from the CSVs and starting
them from the parquet files. 

In [11]:
df = spark.read.parquet(parquet_path).persist()

In [12]:
dfNov = spark \
    .read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"{download_path}/2019-Nov.csv")

### Basic Statistics

#### Column schema

As mentioned before, the Spark DataFrame allows us to impose a column naming and schema upon the data.

In [None]:
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [None]:
df.count()

42448764

In [None]:
df.head(1)

[Row(event_time='2019-10-13 06:25:46 UTC', event_type='view', product_id=1002544, category_id=2053013555631882655, category_code='electronics.smartphone', brand='apple', price=460.51, user_id=518958788, user_session='e7e27c5c-1e78-4812-9f55-cdc658bb40fe')]

In [None]:
df.show(1)

+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|2019-10-13 06:25:...|      view|   1002544|2053013555631882655|electronics.smart...|apple|460.51|518958788|e7e27c5c-1e78-481...|
+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
only showing top 1 row



The describe command summarizes the columns of the dataframe. This requires a computation to occur on every column and across the entire dataset; therefore, it will actually take some time to run. On my machine, it takes ~4 minutes.

In [None]:
df.describe().show()

+-------+--------------------+----------+--------------------+--------------------+-------------------+--------+-----------------+-------------------+--------------------+
|summary|          event_time|event_type|          product_id|         category_id|      category_code|   brand|            price|            user_id|        user_session|
+-------+--------------------+----------+--------------------+--------------------+-------------------+--------+-----------------+-------------------+--------------------+
|  count|            42448764|  42448764|            42448764|            42448764|           28933155|36335756|         42448764|           42448764|            42448762|
|   mean|                null|      null|1.0549932375842676E7|2.057404237884572...|               null|     NaN|290.3236606850655|5.335371475081686E8|                null|
| stddev|                null|      null|1.1881906970608277E7|1.843926466140400...|               null|     NaN|358.2691553394025|1.85237381

In [None]:
df.describe("price").show()

+-------+-----------------+
|summary|            price|
+-------+-----------------+
|  count|         42448764|
|   mean|290.3236606850655|
| stddev|358.2691553394025|
|    min|              0.0|
|    max|          2574.07|
+-------+-----------------+



### Data Grammar

#### Select - select()

In [None]:
df \
    .select("event_time","category_code","price") \
    .show(5)

+--------------------+--------------------+-------+
|          event_time|       category_code|  price|
+--------------------+--------------------+-------+
|2019-10-13 06:25:...|electronics.smart...| 460.51|
|2019-10-13 06:25:...|appliances.enviro...| 120.93|
|2019-10-13 06:25:...|                null|  45.05|
|2019-10-13 06:25:...|computers.periphe...|  12.56|
|2019-10-13 06:25:...|  computers.notebook|1801.82|
+--------------------+--------------------+-------+
only showing top 5 rows



#### Filter - filter()

In [None]:
df \
    .select("event_time","category_code","price") \
    .filter("price < 100.00 and category_code == 'electronics.smartphone'") \
    .show(5)

+--------------------+--------------------+-----+
|          event_time|       category_code|price|
+--------------------+--------------------+-----+
|2019-10-13 06:25:...|electronics.smart...|88.26|
|2019-10-13 06:25:...|electronics.smart...|98.51|
|2019-10-13 06:25:...|electronics.smart...|92.14|
|2019-10-13 06:25:...|electronics.smart...|65.61|
|2019-10-13 06:25:...|electronics.smart...|98.51|
+--------------------+--------------------+-----+
only showing top 5 rows



#### Mutate - withColumn()

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
usd_cad_udf = udf(lambda usd: usd * 1.38, FloatType())

df \
    .select("event_time","category_code","price") \
    .filter("price < 100 and category_code == 'electronics.smartphone'") \
    .withColumn("cad_price", usd_cad_udf(df.price)) \
    .show(5)

+--------------------+--------------------+-----+---------+
|          event_time|       category_code|price|cad_price|
+--------------------+--------------------+-----+---------+
|2019-10-13 06:25:...|electronics.smart...|88.26| 121.7988|
|2019-10-13 06:25:...|electronics.smart...|98.51| 135.9438|
|2019-10-13 06:25:...|electronics.smart...|92.14| 127.1532|
|2019-10-13 06:25:...|electronics.smart...|65.61|  90.5418|
|2019-10-13 06:25:...|electronics.smart...|98.51| 135.9438|
+--------------------+--------------------+-----+---------+
only showing top 5 rows



#### Arrange - orderBy()

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, desc

usd_cad_udf = udf(lambda usd: usd * 1.38, FloatType())

df \
    .select("event_time","category_code","price") \
    .filter("price < 100 and category_code == 'electronics.smartphone'") \
    .withColumn("cad_price", usd_cad_udf(df.price)) \
    .orderBy(desc("cad_price")) \
    .show(5)

+--------------------+--------------------+-----+---------+
|          event_time|       category_code|price|cad_price|
+--------------------+--------------------+-----+---------+
|2019-10-01 15:08:...|electronics.smart...|99.87| 137.8206|
|2019-10-01 15:07:...|electronics.smart...|99.87| 137.8206|
|2019-10-01 15:07:...|electronics.smart...|99.87| 137.8206|
|2019-10-01 15:04:...|electronics.smart...|99.87| 137.8206|
|2019-10-01 15:08:...|electronics.smart...|99.87| 137.8206|
+--------------------+--------------------+-----+---------+
only showing top 5 rows



#### Group By and Summarize - groupBy() and agg()

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, desc

usd_cad_udf = udf(lambda usd: usd * 1.38, FloatType())

df \
    .select("event_time","category_code","price") \
    .filter("price < 100 and category_code == 'electronics.smartphone'") \
    .withColumn("cad_price", usd_cad_udf(df.price)) \
    .orderBy(desc("cad_price")) \
    .groupBy("category_code") \
    .agg({"price": "mean"}) \
    .show()

+--------------------+-----------------+
|       category_code|       avg(price)|
+--------------------+-----------------+
|electronics.smart...|84.48657257277912|
+--------------------+-----------------+



In [None]:
df \
    .select("event_time","category_code","price") \
    .filter("price < 100 and category_code == 'electronics.smartphone'") \
    .withColumn("cad_price", usd_cad_udf(df.price)) \
    .orderBy(desc("cad_price")) \
    .groupBy("category_code") \
    .agg({"price": "mean"}) \
    .explain(extended=True)

== Parsed Logical Plan ==
'Aggregate ['category_code], ['category_code, 'avg(price#58) AS avg(price)#3320]
+- Sort [cad_price#3311 DESC NULLS LAST], true
   +- Project [event_time#52, category_code#56, price#58, <lambda>(price#58) AS cad_price#3311]
      +- Filter ((price#58 < cast(100 as double)) AND (category_code#56 = electronics.smartphone))
         +- Project [event_time#52, category_code#56, price#58]
            +- Relation [event_time#52,event_type#53,product_id#54,category_id#55L,category_code#56,brand#57,price#58,user_id#59,user_session#60] parquet

== Analyzed Logical Plan ==
category_code: string, avg(price): double
Aggregate [category_code#56], [category_code#56, avg(price#58) AS avg(price)#3320]
+- Sort [cad_price#3311 DESC NULLS LAST], true
   +- Project [event_time#52, category_code#56, price#58, <lambda>(price#58) AS cad_price#3311]
      +- Filter ((price#58 < cast(100 as double)) AND (category_code#56 = electronics.smartphone))
         +- Project [event_time#52, c

## SparkSQL

We can register any DataFrame as a table and query from it using SQL by using the registerTempTable command on the DataFrame and passing a table name.

In [None]:
df.registerTempTable("ecommerce")



* What does our data look like?

In [None]:
spark.sql("select * from ecommerce limit 5").show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-13 06:25:...|      view|   1002544|2053013555631882655|electronics.smart...|   apple| 460.51|518958788|e7e27c5c-1e78-481...|
|2019-10-13 06:25:...|      view|   3700301|2053013565983425517|appliances.enviro...|   vitek| 120.93|557977070|7afc206c-7259-4be...|
|2019-10-13 06:25:...|      view|  49100004|2127425375913902544|                null|    null|  45.05|514456508|9d6837a5-40df-49d...|
|2019-10-13 06:25:...|      view|   9200409|2053013552913973497|computers.periphe...|defender|  12.56|512530774|df2d048d-c1ae-41b...|
|2019-10-13 06:25:...|      view|   1306558|205301355892021719

* What are some of the brands in this dataset?

In [None]:
spark.sql("select distinct brand from ecommerce limit 5").show()

+--------+
|   brand|
+--------+
|yokohama|
| tuffoni|
|   welss|
|    tega|
| edifier|
+--------+



* Out of all of the events, which brand had the highest average price?

In [None]:
import timeit
t0 = timeit.default_timer()
spark.sql("""
    with avg_prices as (
        select brand, avg(price) as avg_price 
        from ecommerce group by brand),
        
      max_price as (
        select max(avg_price) as max_price from avg_prices)
    
    select 
      *
    from 
      avg_prices a
      inner join max_price m
        on a.avg_price = m.max_price

""").show()
t1 = timeit.default_timer()
print(t1-t0)

+---------+-----------------+-----------------+
|    brand|        avg_price|        max_price|
+---------+-----------------+-----------------+
|climadiff|2393.170979020979|2393.170979020979|
+---------+-----------------+-----------------+

6.663189763999981


In [None]:
t0 = timeit.default_timer()
spark.sql("""
    with avg_prices as (
        select brand, avg(price) as avg_price 
        from ecommerce group by brand)
    
    select
      *
    from
      avg_prices
    order by avg_price desc
    limit 1

""").show()
t1 = timeit.default_timer()
print(t1-t0)

+---------+-----------------+
|    brand|        avg_price|
+---------+-----------------+
|climadiff|2393.170979020979|
+---------+-----------------+

4.663765392000187


In [None]:
spark.sql("""
    with 
    
      avg_prices as (
        select brand, avg(price) as avg_price 
        from ecommerce group by brand),
        
      max_price as (
        select max(avg_price) as max_price from avg_prices)
    
    select 
      *
    from 
      avg_prices a
      inner join max_price m
        on a.avg_price = m.max_price

""").explain(extended=True)

== Parsed Logical Plan ==
CTE [avg_prices, max_price]
:  :- 'SubqueryAlias avg_prices
:  :  +- 'Aggregate ['brand], ['brand, 'avg('price) AS avg_price#4570]
:  :     +- 'UnresolvedRelation [ecommerce], [], false
:  +- 'SubqueryAlias max_price
:     +- 'Project ['max('avg_price) AS max_price#4571]
:        +- 'UnresolvedRelation [avg_prices], [], false
+- 'Project [*]
   +- 'Join Inner, ('a.avg_price = 'm.max_price)
      :- 'SubqueryAlias a
      :  +- 'UnresolvedRelation [avg_prices], [], false
      +- 'SubqueryAlias m
         +- 'UnresolvedRelation [max_price], [], false

== Analyzed Logical Plan ==
brand: string, avg_price: double, max_price: double
WithCTE
:- CTERelationDef 3
:  +- SubqueryAlias avg_prices
:     +- Aggregate [brand#57], [brand#57, avg(price#58) AS avg_price#4570]
:        +- SubqueryAlias ecommerce
:           +- View (`ecommerce`, [event_time#52,event_type#53,product_id#54,category_id#55L,category_code#56,brand#57,price#58,user_id#59,user_session#60])
:         

In [None]:
spark.sql("""
    with avg_prices as (
        select brand, avg(price) as avg_price 
        from ecommerce group by brand)
    
    select
      *
    from
      avg_prices
    order by avg_price desc
    limit 1

""").explain(extended=True)

== Parsed Logical Plan ==
CTE [avg_prices]
:  +- 'SubqueryAlias avg_prices
:     +- 'Aggregate ['brand], ['brand, 'avg('price) AS avg_price#4763]
:        +- 'UnresolvedRelation [ecommerce], [], false
+- 'GlobalLimit 1
   +- 'LocalLimit 1
      +- 'Sort ['avg_price DESC NULLS LAST], true
         +- 'Project [*]
            +- 'UnresolvedRelation [avg_prices], [], false

== Analyzed Logical Plan ==
brand: string, avg_price: double
WithCTE
:- CTERelationDef 5
:  +- SubqueryAlias avg_prices
:     +- Aggregate [brand#57], [brand#57, avg(price#58) AS avg_price#4763]
:        +- SubqueryAlias ecommerce
:           +- View (`ecommerce`, [event_time#52,event_type#53,product_id#54,category_id#55L,category_code#56,brand#57,price#58,user_id#59,user_session#60])
:              +- Relation [event_time#52,event_type#53,product_id#54,category_id#55L,category_code#56,brand#57,price#58,user_id#59,user_session#60] parquet
+- GlobalLimit 1
   +- LocalLimit 1
      +- Sort [avg_price#4763 DESC NULLS LAST

In [None]:
spark.sql("select distinct category_code from ecommerce").show()

+--------------------+
|       category_code|
+--------------------+
|    computers.ebooks|
|apparel.shoes.sli...|
|computers.periphe...|
|electronics.video...|
|appliances.kitche...|
|     sport.snowboard|
|electronics.camer...|
|       apparel.shirt|
|electronics.audio...|
|appliances.kitche...|
|appliances.kitche...|
|appliances.kitche...|
|  electronics.tablet|
|auto.accessories....|
|apparel.shoes.moc...|
|       apparel.jeans|
|computers.periphe...|
|furniture.living_...|
| stationery.cartrige|
|furniture.kitchen...|
+--------------------+
only showing top 20 rows



Windowing functions

In [None]:
# workaround of time parser with Spark 3.0
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [None]:
from pyspark.sql.functions import to_timestamp, window

ts = to_timestamp(df.event_time, "yyyy-MM-dd HH:mm:ss")
df_ts = df.withColumn("time", ts)
grouped_counts = df_ts \
    .groupBy(window(df_ts.time, "1 hour")) \
    .count()

In [None]:
grouped_counts.show()

+--------------------+------+
|              window| count|
+--------------------+------+
|{2019-10-11 08:00...| 84183|
|{2019-10-08 13:00...| 79332|
|{2019-10-15 16:00...|111779|
|{2019-10-11 06:00...| 83501|
|{2019-10-13 15:00...|104344|
|{2019-10-11 14:00...| 99806|
|{2019-10-13 07:00...| 97515|
|{2019-10-11 09:00...| 85433|
|{2019-10-13 09:00...|112539|
|{2019-10-15 05:00...| 74147|
|{2019-10-08 17:00...| 88401|
|{2019-10-13 14:00...| 94476|
|{2019-10-15 14:00...|102932|
|{2019-10-11 17:00...| 90579|
|{2019-10-11 13:00...| 86093|
|{2019-10-11 11:00...| 77983|
|{2019-10-13 16:00...|108415|
|{2019-10-15 11:00...| 83196|
|{2019-10-13 08:00...|108000|
|{2019-10-08 07:00...| 79633|
+--------------------+------+
only showing top 20 rows



In [None]:
dfNov.registerTempTable("ecommerce_Nov")



In [None]:
spark.sql("""
    with user_spending_Oct as (
        select user_id, sum(price) as total_spending_Oct
        from ecommerce group by user_id),

        user_spending_Nov as (
        select user_id, sum(price) as total_spending_Nov 
        from ecommerce_Nov group by user_id)
    
    select
      *, total_spending_Oct + total_spending_Nov as total
    from
      user_spending_Oct
      join user_spending_Nov using (user_id)
      order by total_spending_Oct + total_spending_Nov desc
""").show(5)
t1 = timeit.default_timer()
print(t1-t0)

+---------+------------------+------------------+------------------+
|  user_id|total_spending_Oct|total_spending_Nov|             total|
+---------+------------------+------------------+------------------+
|512365995|        1499595.88|         2263807.2|        3763403.08|
|512845454|        1116256.84|        1537491.58|        2653748.42|
|563459593|1993636.4100000006| 657277.2100000001|2650913.6200000006|
|536399452|        1692370.92| 369216.9399999999|2061587.8599999999|
|513558661|399091.07999999996|        1557656.67|        1956747.75|
+---------+------------------+------------------+------------------+
only showing top 5 rows

1640.866041848


In [None]:
s = """
    select user_id, avg(price) as avg_price 
        from ecommerce
        join ecommerce_Nov using(user_id)
        group by user_id
"""
print(s)


    select user_id, avg(price) as avg_price 
        from ecommerce
        join ecommerce_Nov using(user_id)
        group by user_id



In [None]:
#spark.stop()