## Khởi tạo môi trường

In [1]:
import os 
os.environ["PYSPARK_PYTHON"]="python3.7"
os.environ["PYSPARK_DRIVER_PYTHON"]="python3.7"

## Tạo phiên spark

In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml import PipelineModel
MAX_MEMORY = '4G'
sc = SparkContext('local')
spark = SparkSession(sc)
# spark = SparkSession 
#         .builder \
#         .master("spark://spark-master:7077")\
#         .appName("FP-Growth") \
#         .config("spark.executor.memory", MAX_MEMORY)\
#         .config("spark.driver.memoryOverhead",'5G')\
#         .config("spark.driver.memory", MAX_MEMORY) \
#         .getOrCreate()       

## Đọc dữ liệu từ HDFS

In [3]:
df = spark.read.format('csv').option("inferSchema", "true").load("hdfs://namenode/user/taipt/data_clustered")

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: integer (nullable = true)



In [5]:
df = df.withColumn("CustomerID", col("_c0")).withColumn("InvoiceNo", col("_c1")).withColumn("Description", col("_c2")).withColumn("Cluster", col("_c3"))
df = df.select("CustomerID", "InvoiceNo", "Description", "Cluster")
df_customer = df

In [6]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Cluster: integer (nullable = true)



## Tiền xử lý dữ liệu

In [7]:
df = df.groupBy("CustomerID", "InvoiceNo", "Cluster").agg(collect_set("Description").alias("items"))

In [8]:
df1=df
df2=df

# Tìm luật kết hợp cho từng cụm 

# Tìm luật kết hợp cho cụm 0

In [9]:
df = df.filter((df.Cluster=="0"))

In [10]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- Cluster: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)



## FP-Growth

In [11]:
from pyspark.ml.fpm import FPGrowth

In [12]:
df = df.select("CustomerID", "InvoiceNo", "items")

In [13]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.2)

In [14]:
model = fpGrowth.fit(df)

### Display frequent itemsets

In [15]:
model.freqItemsets.show(truncate=False)

+------------------------------------------------------------------------+----+
|items                                                                   |freq|
+------------------------------------------------------------------------+----+
|[HOT BATHS METAL SIGN]                                                  |133 |
|[LOVEBIRD HANGING DECORATION WHITE]                                     |93  |
|[SWALLOWS GREETING CARD]                                                |70  |
|[WHITE HANGING HEART T-LIGHT HOLDER]                                    |738 |
|[ASSORTED COLOUR BIRD ORNAMENT]                                         |571 |
|[ASSORTED COLOUR BIRD ORNAMENT, WHITE HANGING HEART T-LIGHT HOLDER]     |103 |
|[SET OF 3 WOODEN STOCKING DECORATION]                                   |133 |
|[SET OF 3 WOODEN STOCKING DECORATION, SET OF 3 WOODEN HEART DECORATIONS]|81  |
|[MONSTERS STENCIL CRAFT]                                                |93  |
|[MONSTERS STENCIL CRAFT, HAPPY STENCIL 

### Display generated association rules

In [16]:
df_lift = model.associationRules.orderBy(col('lift').desc())

In [17]:
df_lift.show(truncate=False)

+-------------------------------------------+----------------------+------------------+-----------------+
|antecedent                                 |consequent            |confidence        |lift             |
+-------------------------------------------+----------------------+------------------+-----------------+
|[HERB MARKER ROSEMARY, HERB MARKER PARSLEY]|[HERB MARKER THYME]   |0.9705882352941176|84.03999999999999|
|[HERB MARKER ROSEMARY]                     |[HERB MARKER MINT]    |0.9305555555555556|82.78120243531203|
|[HERB MARKER MINT]                         |[HERB MARKER ROSEMARY]|0.9178082191780822|82.78120243531203|
|[HERB MARKER THYME, HERB MARKER PARSLEY]   |[HERB MARKER ROSEMARY]|0.9166666666666666|82.67824074074075|
|[HERB MARKER MINT, HERB MARKER THYME]      |[HERB MARKER PARSLEY] |0.9705882352941176|81.85714285714285|
|[HERB MARKER ROSEMARY, HERB MARKER THYME]  |[HERB MARKER PARSLEY] |0.9705882352941176|81.85714285714285|
|[HERB MARKER THYME]                        |[

In [18]:
fp = df_lift.select("antecedent", "consequent").collect()

# Tìm luật kết hợp cho cụm 1

In [19]:
df1 = df1.filter((df1.Cluster=="1"))
df1.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- Cluster: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [20]:

df1 = df1.select("CustomerID", "InvoiceNo", "items")
model = fpGrowth.fit(df1)
model.freqItemsets.show(truncate=False)
df_lift1 = model.associationRules.orderBy(col('lift').desc())
fp1 = df_lift1.select("antecedent", "consequent").collect()

+--------------------------------------------------------------+----+
|items                                                         |freq|
+--------------------------------------------------------------+----+
|[SAVE THE PLANET MUG]                                         |186 |
|[HEART IVORY TRELLIS LARGE]                                   |119 |
|[WHITE HANGING HEART T-LIGHT HOLDER]                          |959 |
|[JUMBO BAG RED RETROSPOT]                                     |943 |
|[JUMBO BAG RED RETROSPOT, WHITE HANGING HEART T-LIGHT HOLDER] |150 |
|[CHRISTMAS CRAFT LITTLE FRIENDS]                              |186 |
|[CLASSIC METAL BIRDCAGE PLANT HOLDER]                         |118 |
|[LUNCH BAG RED RETROSPOT]                                     |762 |
|[LUNCH BAG RED RETROSPOT, JUMBO BAG RED RETROSPOT]            |290 |
|[LUNCH BAG RED RETROSPOT, WHITE HANGING HEART T-LIGHT HOLDER] |161 |
|[ROUND SNACK BOXES SET OF 4 FRUITS]                           |184 |
|[CLASSIC CAFE SUGAR

# Tìm luật kết hợp cho cụm 2

In [21]:
df2 = df2.filter((df2.Cluster=="2"))
df2.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- Cluster: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [22]:

df2 = df2.select("CustomerID", "InvoiceNo", "items")

In [23]:
model = fpGrowth.fit(df2)

In [24]:
model.freqItemsets.show(truncate=False)

+-------------------------------------------------------------------+----+
|items                                                              |freq|
+-------------------------------------------------------------------+----+
|[RED RETROSPOT UMBRELLA]                                           |25  |
|[RECYCLED ACAPULCO MAT PINK]                                       |17  |
|[WHITE HANGING HEART T-LIGHT HOLDER]                               |187 |
|[REGENCY CAKESTAND 3 TIER]                                         |162 |
|[REGENCY CAKESTAND 3 TIER, WHITE HANGING HEART T-LIGHT HOLDER]     |23  |
|[SET 2 TEA TOWELS I LOVE LONDON]                                   |25  |
|[RED SPOT CERAMIC DRAWER KNOB]                                     |17  |
|[PARTY BUNTING]                                                    |132 |
|[PARTY BUNTING, REGENCY CAKESTAND 3 TIER]                          |24  |
|[PARTY BUNTING, WHITE HANGING HEART T-LIGHT HOLDER]                |22  |
|[RED RETROSPOT MINI CASE

In [25]:
df_lift2= model.associationRules.orderBy(col('lift').desc())

In [26]:
df_lift2.show()

+--------------------+--------------------+------------------+------------------+
|          antecedent|          consequent|        confidence|              lift|
+--------------------+--------------------+------------------+------------------+
|[HERB MARKER ROSE...| [HERB MARKER THYME]|               1.0|              94.6|
| [HERB MARKER THYME]|[HERB MARKER ROSE...|               1.0|              94.6|
|[CHILDRENS CUTLER...|[CHILDRENS CUTLER...|              0.85| 57.43571428571429|
|[CHILDRENS CUTLER...|[CHILDRENS CUTLER...|0.8095238095238095| 57.43571428571428|
|[CHILDS GARDEN FO...|[CHILDS GARDEN FO...|0.7272727272727273| 54.31578947368421|
|[CHILDS GARDEN FO...|[CHILDS GARDEN FO...|0.8421052631578947|54.315789473684205|
|[SET/6 PINK  BUTT...|[SET/6 TURQUOISE ...|0.7142857142857143| 50.67857142857143|
|[SET/6 TURQUOISE ...|[SET/6 PINK  BUTT...|              0.75| 50.67857142857143|
|[POPPY'S PLAYHOUS...|[POPPY'S PLAYHOUS...|0.7272727272727273| 44.86956521739131|
|[POPPY'S PLAYHO

In [27]:
fp2 = df_lift2.select("antecedent", "consequent").collect()

In [28]:
fp_final={"0":fp, "1":fp1,"2":fp2}

### Helper recommend

In [29]:
def addToSuggest(suggest, consequent, items):
    for c in consequent:
        if c not in items:
            suggest.add(c)
    
    return suggest

In [30]:
def inArray(antecedent, items):
    flag = [1 if a in items else 0 for a in antecedent]
    sumFlag = 0
    
    for f in flag:
        sumFlag += f
    
    return sumFlag == len(flag)

In [31]:
def recommend(customer, fp, id):
    for (cID, items) in customer:
        if id == cID:
            suggest = set()
            for (antecedent, consequent) in fp:
                if inArray(antecedent, items):
                    suggest = addToSuggest(suggest, consequent,items)
            
            print(suggest)

### Recommend

In [137]:
CUSTOMERID = 17227

In [138]:
CLUSTER = df_customer.select("Cluster").filter((df_customer.CustomerID==CUSTOMERID)).collect()[0][0]

In [139]:
df_new_customer = df_customer.filter((df_customer.Cluster==CLUSTER))

In [140]:
df_new_customer.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- InvoiceNo: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Cluster: integer (nullable = true)



In [141]:
df_new_customer=df_new_customer.select(col('CustomerID'), col('Description'))
#df_new_customer=df_new_customer.na.drop().dropDuplicates()

In [142]:
df_new_customer=df_new_customer.groupBy("CustomerID").agg(collect_set("Description").alias("items"))

In [143]:
customer = df_new_customer.collect()

# Dự đoán sản phẩm cho khách hàng 

In [144]:
suggest = recommend(customer,fp_final[str(CLUSTER)], CUSTOMERID)

{"PAPER CHAIN KIT 50'S CHRISTMAS", 'PAPER CHAIN KIT VINTAGE CHRISTMAS', 'PACK OF 72 RETROSPOT CAKE CASES', '72 SWEETHEART FAIRY CAKE CASES', 'PACK OF 60 DINOSAUR CAKE CASES', 'PACK OF 60 PINK PAISLEY CAKE CASES', "ROLL WRAP 50'S RED CHRISTMAS"}


### Transform examines the input items against all the association rules and summarize the consequents as prediction


In [40]:
df_predic = model.transform(df)

In [41]:
df_predic.show()

+----------+---------+--------------------+--------------------+
|CustomerID|InvoiceNo|               items|          prediction|
+----------+---------+--------------------+--------------------+
|     12828|   569577|[RETROSPOT TEA SE...|[BAKING SET SPACE...|
|     14409|   537410|[HAND WARMER BABU...|                  []|
|     16612|   574744|[JUMBO BAG APPLES...|[RED HANGING HEAR...|
|     17135|   571641|[SWISS CHALET TRE...|                  []|
|     18189|   568051|[LAUREL HEART ANT...|[JAM MAKING SET W...|
|     13451|   546670|[PING MICROWAVE A...|[LUNCH BAG RED RE...|
|     15253|   578309|[CHRISTMAS GINGHA...|                  []|
|     16934|   571277|[NOEL WOODEN BLOC...|[HOME BUILDING BL...|
|     17247|   578620|[HOME SWEET HOME ...|[PLEASE ONE PERSO...|
|     17932|   575729|[SET OF 6 T-LIGHT...|[WHITE HANGING HE...|
|     18149|   572036|[JUMBO BAG STRAWB...|[JUMBO  BAG BAROQ...|
|     12951|   553909|[JUMBO BAG DOILEY...|                  []|
|     13481|   555125|[JU