## Dataset

E-Commerce Data : https://www.kaggle.com/carrie1/ecommerce-data

## 1. Spark Intialization

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark Frequent Itemsets Example") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x0000014DA413BE10>


In [3]:
df = spark.read.csv("D:/KULIAH/SMT-6_BIG DATA/ecommerce-data/data.csv", header=True)

In [4]:
df.show

<bound method DataFrame.show of DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]>

In [5]:
df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

In [6]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [7]:
df.createOrReplaceTempView("ecommerce")

In [8]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



## 2. Data Grouping 

collect data and group into same id without any redundant data

In [10]:
#import library collect set to collect not redundant data
from pyspark.sql.functions import collect_set

In [11]:
#query to find id and item set
query = spark.sql("SELECT DISTINCT `InvoiceNo` as id , collect_set(`StockCode`) as items \
                  FROM ecommerce \
                  GROUP BY `InvoiceNo`")

In [12]:
query.show()

+-------+--------------------+
|     id|               items|
+-------+--------------------+
| 557040|[23286, 21533, 85...|
| 576093|[21159, 22915, 21...|
| 555075|            [85123A]|
| 562965|[21984, 21889, 23...|
| 536550|             [85044]|
|C551180|[22418, 21212, 84...|
| 539353|[20983, 21430, 21...|
| 556095|[21974, 21931, 22...|
| 581132|[22297, 22470, 22...|
| 574948|[40001, 21922, 22...|
| 579187|[22755, 37448, 21...|
| 572097|[23208, 84375, 22...|
| 547551|[22663, 21931, 22...|
| 553155|             [21621]|
| 547225|             [72819]|
| 576539|      [23313, 22776]|
|C545728|[21232, 22423, 22...|
|C558469|             [23243]|
| 538687|[22961, 22665, 22...|
| 574251|[22131, 23400, 23...|
+-------+--------------------+
only showing top 20 rows



## 3. FP Growth Algorithm Training

In [13]:
#import fp growth algo library
from pyspark.ml.fpm import FPGrowth

In [14]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.5)
model=fpGrowth.fit(query)

In [15]:
model.freqItemsets.show()

+-----+----+
|items|freq|
+-----+----+
+-----+----+



There is no result, so set lower minimimun

In [18]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.05, minConfidence=0.05)
model2=fpGrowth.fit(query)

In [19]:
model2.freqItemsets.show()

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
+--------+----+



In [20]:
model2.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



get frequent but no rule, so set lower minimum

In [111]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.025, minConfidence=0.025)
model3=fpGrowth.fit(query)

In [112]:
model3.freqItemsets.show()

+---------------+----+
|          items|freq|
+---------------+----+
|       [85123A]|2246|
|        [22423]|2172|
|       [85099B]|2135|
|        [47566]|1706|
|        [20725]|1608|
|        [84879]|1468|
|        [22720]|1462|
|        [22197]|1442|
|        [21212]|1334|
|        [22383]|1306|
| [22383, 20725]| 663|
|        [20727]|1295|
| [20727, 20725]| 648|
|        [22457]|1266|
|         [POST]|1254|
|        [23203]|1249|
|        [22386]|1231|
|[22386, 85099B]| 833|
|        [22960]|1220|
|        [22469]|1214|
+---------------+----+
only showing top 20 rows



In [113]:
model3.associationRules.show()

+----------+----------+-------------------+
|antecedent|consequent|         confidence|
+----------+----------+-------------------+
|   [22699]|   [22697]|                0.7|
|   [22386]|  [85099B]| 0.6766856214459789|
|   [20727]|   [20725]| 0.5003861003861004|
|   [20725]|   [22383]| 0.4123134328358209|
|   [20725]|   [20727]|0.40298507462686567|
|  [85099B]|   [22386]| 0.3901639344262295|
|  [85099B]|   [21931]| 0.3433255269320843|
|  [85099B]|   [22411]| 0.3199063231850117|
|   [22411]|  [85099B]| 0.5754001684919966|
|   [22383]|   [20725]| 0.5076569678407351|
|   [21931]|  [85099B]| 0.6103247293921732|
|   [22697]|   [22699]| 0.7417218543046358|
+----------+----------+-------------------+



In [114]:
model3.transform(query).show()

+-------+--------------------+--------------------+
|     id|               items|          prediction|
+-------+--------------------+--------------------+
| 557040|[23286, 21533, 85...|                  []|
| 576093|[21159, 22915, 21...|                  []|
| 555075|            [85123A]|                  []|
| 562965|[21984, 21889, 23...|            [85099B]|
| 536550|             [85044]|                  []|
|C551180|[22418, 21212, 84...|                  []|
| 539353|[20983, 21430, 21...|                  []|
| 556095|[21974, 21931, 22...|             [22697]|
| 581132|[22297, 22470, 22...|                  []|
| 574948|[40001, 21922, 22...|                  []|
| 579187|[22755, 37448, 21...|[22383, 22386, 22...|
| 572097|[23208, 84375, 22...|             [20725]|
| 547551|[22663, 21931, 22...|             [20725]|
| 553155|             [21621]|                  []|
| 547225|             [72819]|                  []|
| 576539|      [23313, 22776]|                  []|
|C545728|[21

In [160]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.01)
model4=fpGrowth.fit(query)

In [161]:
model4.freqItemsets.show()

+----------------+----+
|           items|freq|
+----------------+----+
|         [22633]| 487|
|         [23236]| 344|
|        [85123A]|2246|
|         [22423]|2172|
| [22423, 85123A]| 355|
|         [22667]| 486|
|         [22579]| 343|
|  [22579, 22578]| 282|
|        [85099B]|2135|
| [85099B, 22423]| 288|
|[85099B, 85123A]| 404|
|         [22620]| 486|
|        [84536A]| 342|
|         [71053]| 342|
|         [47566]|1706|
| [47566, 85099B]| 332|
|  [47566, 22423]| 398|
| [47566, 85123A]| 391|
|         [85150]| 483|
|         [20725]|1608|
+----------------+----+
only showing top 20 rows



In [162]:
model4.associationRules.show()

+--------------+----------+-------------------+
|    antecedent|consequent|         confidence|
+--------------+----------+-------------------+
|       [22554]|   [22551]| 0.4823695345557123|
|       [22554]|   [22556]| 0.3991537376586742|
|       [22960]|   [21212]|0.21885245901639344|
|       [22960]|  [85099B]|0.23688524590163934|
|       [22960]|   [22423]|0.23852459016393443|
|       [22960]|   [22720]| 0.3155737704918033|
|       [22960]|   [22961]|0.38934426229508196|
|       [22960]|   [22666]|0.28032786885245903|
|       [22960]|   [22993]| 0.2540983606557377|
|       [22960]|   [22697]|0.21475409836065573|
|       [22960]|   [22722]|0.22131147540983606|
|[20726, 22382]|   [20728]|  0.546583850931677|
|[20726, 22382]|   [20725]| 0.6356107660455487|
|[20726, 22382]|   [20727]| 0.5445134575569358|
|[20726, 22382]|   [22383]| 0.5403726708074534|
|       [21977]|   [21212]| 0.4948571428571429|
|       [21977]|   [84991]| 0.4045714285714286|
|       [22699]|   [22423]|0.47946428571

## 4. Get Prediction

#### 4.1 Get prediction from model 3

In [130]:
#Get 1 example of items and put it into Data Frame
df_compare1=spark.createDataFrame([
    ('0',['22699'])
],['id','items'])

In [131]:
df_compare1.show()

+---+-------+
| id|  items|
+---+-------+
|  0|[22699]|
+---+-------+



In [132]:
model3.transform(df_compare1).show()

+---+-------+----------+
| id|  items|prediction|
+---+-------+----------+
|  0|[22699]|   [22697]|
+---+-------+----------+



In [133]:
# collect the prediction
predict1 = model3.transform(df_compare).select('prediction').collect()[0][0]

In [140]:
predict1 = (', '.join(predict1))

In [141]:
print(predict1)

22697


In [147]:
query_awal = spark.sql("SELECT DISTINCT Description \
                        FROM ecommerce WHERE StockCode = '22699'")
query_awal.show()

+--------------------+
|         Description|
+--------------------+
|ROSES REGENCY TEA...|
+--------------------+



In [142]:
query_predict1 = spark.sql("SELECT DISTINCT Description \
                    FROM ecommerce \
                    WHERE StockCode IN ({})".format(predict1))
query_predict1.show()

+--------------------+
|         Description|
+--------------------+
|GREEN REGENCY TEA...|
+--------------------+



#### 4.2 Get prediction from model 4

In [163]:
#Get 1 example of items and put it into Data Frame
df_compare2=spark.createDataFrame([
    ('0',['85099B'])
],['id','items'])

In [167]:
df_compare2.show()

+---+--------+
| id|   items|
+---+--------+
|  0|[85099B]|
+---+--------+



In [164]:
model3.transform(df_compare2).show()

+---+--------+--------------------+
| id|   items|          prediction|
+---+--------+--------------------+
|  0|[85099B]|[22386, 21931, 22...|
+---+--------+--------------------+



In [165]:
# collect the prediction
predict2 = model3.transform(df_compare2).select('prediction').collect()[0][0]

In [166]:
predict2 = (', '.join(predict2))
print(predict2)

22386, 21931, 22411


In [168]:
query_awal = spark.sql("SELECT DISTINCT Description \
                        FROM ecommerce WHERE StockCode = '85099B'")
query_awal.show()

+--------------------+
|         Description|
+--------------------+
|JUMBO BAG RED RET...|
+--------------------+



In [169]:
query_predict2 = spark.sql("SELECT DISTINCT Description \
                    FROM ecommerce \
                    WHERE StockCode IN ({})".format(predict2))
query_predict2.show()

+--------------------+
|         Description|
+--------------------+
|JUMBO SHOPPER VIN...|
|JUMBO STORAGE BAG...|
|JUMBO BAG PINK PO...|
+--------------------+



## Conclusion

> (from model 3 & predict 1)When someone buy  Roses Regency Tea, then they will buy Green Regency Tea

>(from model 4 & predict 2) When someone buy Jumbo Bag Retrospot then the will buy : 
- Jumbo Shopper Vintage Red Paisley or/and
- Jumbo Storage Bag Suki or/and
- Jumbo Bag Pink Polkadot

## Reference

- https://spark.apache.org/docs/2.3.0/ml-frequent-pattern-mining.html
- https://www.npntraining.com/blog/difference-between-collect_set-and-collect_list-functions-in-hive/
- https://stackoverflow.com/questions/37284216/spark-sql-passing-a-variable