In [None]:
import configparser

from datetime import datetime
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, minute, second
from pyspark.sql.functions import split
    
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, StringType, TimestampType
from pyspark.sql.functions import col, when, coalesce, desc
import functools

In [None]:
AWS_ACCESS_KEY_ID = ""

In [None]:
AWS_SECRET_ACCESS_KEY = ""

In [None]:
AWS_S3_ENDPOINT = "s3://e-commerce-sellout-data/nov/"

In [None]:
CSV_FORMAT = 'csv'

In [None]:
PARQUET_FORAMT = "parquet"

In [None]:
PRODUCT_TABLE = 'product'

In [None]:
EVENT_TABLE = 'event'

In [None]:
USER_TABLE = 'user'

In [None]:
def sparkSession():
    spark = SparkSession\
            .builder\
            .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:2.7.0')\
            .getOrCreate()
    print(spark)
    return spark

In [None]:
spark = sparkSession()
#print(spark)

In [None]:
def loadData(spark, formatType ,path):
    """
    function : read data from the selected storage
    @params 
    spark: sparkSession
    path: the selected storage
    """ 
    print(spark, formatType, path)
    return spark.read.format(formatType)\
                        .option("header", 'true')\
                        .load(path)

In [None]:
def getColumnInformation(dataFrame, field):
    nullValues = dataFrame.filter(col(field).isNull()).count()
    uniqueValues = rawData.select(field).drop_duplicates().count()
    
    print(f"Data check by Field : {field} : Null counts : {nullValues} , Unique Values : {uniqueValues}")

# 2.Explore and Assess the Data

### Purpose: To analyze the quality check and identifty missing valuse
#### 1) Identify data size, columns
#### 2) Null Check
#### 3) brief information

## Main Data

- This is the e-commerce selling data in 2019.Nov, combining multiple categories also continuous timestamps for online transaction
- This data is main data to create dimensional tables
- Total rows are 67501979
- There are null values in two columns such as brand, category_code<br>
  brand : 9218235 nulls  
  category_code : 21898171 nulls
- Largest counts for unique value is event_time, over 250k<br>
- Lowest count for unique value is event_type, only exists 3 values

In [20]:
rawData = loadData(spark, CSV_FORMAT, AWS_S3_ENDPOINT)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5daf7c81d0> csv s3://e-commerce-sellout-data/nov/

### Data dictionary

In [25]:
totalCount = rawData.count()
colums = len(rawData.columns)
print(f"Main data = Total rows {totalCount} : colums {colums}" )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Main data = Total rows 67501979 : colums 9

In [41]:
rawData.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)

In [31]:
rawData.show(10)

An error was encountered:
Error sending http request and maximum retry encountered.


### Column : event_type 

In [65]:
getColumnInformation(rawData ,'event_time')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : event_time : Null counts : 0 , Unique Values : 2549559

### Column : event_type

In [66]:
getColumnInformation(rawData ,'event_type')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : event_type : Null counts : 0 , Unique Values : 3

### Column : product_id

In [67]:
getColumnInformation(rawData ,'product_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : product_id : Null counts : 0 , Unique Values : 190662

### Column : category_id

In [68]:
getColumnInformation(rawData ,'category_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : category_id : Null counts : 0 , Unique Values : 684

### Column : category_code

In [69]:
getColumnInformation(rawData ,'category_code')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : category_code : Null counts : 21898171 , Unique Values : 130

### Column : brand

In [71]:
getColumnInformation(rawData ,'brand')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : brand : Null counts : 9218235 , Unique Values : 4202

### Column : price

In [72]:
getColumnInformation(rawData ,'price')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : price : Null counts : 0 , Unique Values : 60435

### Column : user_id

In [73]:
getColumnInformation(rawData ,'user_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : user_id : Null counts : 0 , Unique Values : 3696117

### Column : user_session

In [74]:
getColumnInformation(rawData ,'user_session')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data check by Field : user_session : Null counts : 10 , Unique Values : 13776051

## Product Data

- This is a product dimensional table derivated from main data<br>
- ETL : fill null values when creating diemesonal table <br> 
  : default brand value is 'brand'  
  : default category_code value is 'category1.category2.category3'
- This tabble is included with category code field in detail and also brand<br>
  - Electric items are mainly taken up in product data

In [26]:
productPath = AWS_S3_ENDPOINT + PRODUCT_TABLE
 
productData = loadData(spark, PARQUET_FORAMT, productPath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5daf7c81d0> parquet s3://e-commerce-sellout-data/nov/product

#### Data dictionarty

In [27]:
totalCount = productData.count()
colums = len(productData.columns)
print(f"Product data = Total rows {totalCount} : colums {colums}" )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Product data = Total rows 379314 : colums 5

In [25]:
productData.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)

#### Information

In [33]:
brandRank = productData.filter(col('brand') != 'none')\
.groupby('brand').count() 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
brandRank.sort( desc('count') ).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|    brand|count|
+---------+-----+
|  samsung| 6676|
|   xiaomi| 5697|
|    bosch| 5310|
|    apple| 4785|
|       hp| 3831|
|   makita| 3545|
|     sony| 3357|
|   febest| 2912|
|   gipfel| 2551|
|     asus| 2507|
|       lg| 2448|
|    casio| 2374|
|  sokolov| 2310|
|     smeg| 2201|
|  respect| 2135|
|milavitsa| 2095|
|  polaris| 2077|
|    canon| 2054|
|     lego| 1866|
|  philips| 1802|
+---------+-----+
only showing top 20 rows

In [47]:
codeRank = productData.filter(col("category_code") != 'category1.category2.category3')\
.groupby('category_code').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
codeRank.sort( desc('count') ).show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------+-----+
|category_code                   |count|
+--------------------------------+-----+
|electronics.smartphone          |12277|
|electronics.clocks              |11870|
|apparel.shoes                   |9556 |
|electronics.audio.headphone     |6499 |
|appliances.kitchen.refrigerators|5697 |
|construction.components.faucet  |5510 |
|construction.tools.drill        |5305 |
|kids.carriage                   |4568 |
|accessories.bag                 |3908 |
|appliances.kitchen.hood         |3889 |
|computers.notebook              |3636 |
|appliances.kitchen.hob          |3551 |
|kids.toys                       |3472 |
|computers.peripherals.mouse     |3267 |
|computers.desktop               |3036 |
|appliances.kitchen.oven         |2967 |
|appliances.environment.vacuum   |2822 |
|apparel.shoes.keds              |2768 |
|electronics.video.tv            |2668 |
|computers.peripherals.monitor   |2663 |
+--------------------------------+-----+
only showing top

## Event Data

- This is a product dimensional table derivated from main data<br>
- There are two columns to track selling pattern<br>
    - event_time: there is time stamp value and able to split by time unit<br>
    - event_type: there 3 types : view, purchase, cart<br>
        purchase is the ture purchase transaction<br>
    
- This tabble is included with category code field in detail and also brand<br>
  

In [28]:
eventPath = AWS_S3_ENDPOINT + EVENT_TABLE
 
eventData = loadData(spark, PARQUET_FORAMT, eventPath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f5daf7c81d0> parquet s3://e-commerce-sellout-data/nov/event

#### Data dictionary

In [29]:
totalCount = eventData.count()
colums = len(eventData.columns)
print(f"Event data = Total rows {totalCount} : colums {colums}" )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Event data = Total rows 67401449 : colums 4

In [52]:
eventData.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- user_session: string (nullable = true)

#### Information

In [53]:
timeUnit = eventData.withColumn("yyyyMmDd", split(col("event_time"),' ').getItem(0))\
                        .withColumn("year", year("yyyyMmDd"))\
                        .withColumn("month", month("yyyyMmDd"))\
                        .withColumn("day", dayofmonth("yyyyMmDd"))\
                        .select('event_time', 'year', 'month','day'
                                ,'event_type','product_id','user_session')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
timeUnit.groupby('year', 'month','day').count().\
sort(desc('count')).show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+---+-------+
|year|month|day|count  |
+----+-----+---+-------+
|2019|11   |16 |6488921|
|2019|11   |17 |6379913|
|2019|11   |15 |6205340|
|2019|11   |14 |3064736|
|2019|11   |18 |2018957|
|2019|11   |13 |2016711|
|2019|11   |11 |2006897|
|2019|11   |12 |1985382|
|2019|11   |10 |1938354|
|2019|11   |8  |1893950|
|2019|11   |9  |1875586|
|2019|11   |29 |1850824|
|2019|11   |7  |1796237|
|2019|11   |4  |1792508|
|2019|11   |30 |1752607|
|2019|11   |19 |1726197|
|2019|11   |5  |1716759|
|2019|11   |20 |1697436|
|2019|11   |6  |1694224|
|2019|11   |21 |1674077|
+----+-----+---+-------+
only showing top 20 rows

In [57]:
eventTypes = timeUnit.select('event_type').groupBy('event_type').count().sort(desc('event_type'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
eventTypes.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+
|event_type|   count|
+----------+--------+
|      view|63554512|
|  purchase|  916930|
|      cart| 2930007|
+----------+--------+

## User Data

- This is a user dimensional table derivated from main data<br>
- There are only two columns


In [32]:
userPath = AWS_S3_ENDPOINT + USER_TABLE
 
userData = loadData(spark, PARQUET_FORAMT, userPath)

An error was encountered:
Error sending http request and maximum retry encountered.
