In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql import functions as F # same as numpy
from pyspark.ml.fpm import FPGrowth

sc = SparkContext()
sqlContext = SQLContext(sc)
inputfile = '{your_transaction_data_path}'

## Data Import

- item, customer id의 중복을 제거한 transaction 데이터로 정리

In [None]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(inputfile)
df_nodup = df.select('item_id', 'customer_id').dropDuplicates()

## Frequent Pattern Mining(Association Rule)을 위한 Data Reshaping

- (customerid : itemids) --> 109931 : [112312, 144231 ...] 의 형태로 Dataframe 정리

In [None]:
df_nodup = df_nodup.groupby("customer_id").agg(F.collect_list("item_id"))
df_nodup = df_nodup.withColumnRenamed("collect_list(item_id)", "items")
df_nodup = df_nodup.where(F.size(F.col("items")) > 1)

## Association Rule의 수치 계산

- 상품 to 상품의 confidence 구하기

In [None]:
# Apriori Rule의 cutoff value를 파라미터로 입력해준다. 
fp = FPGrowth(itemsCol="items", minSupport=0.0001, minConfidence=0.0001)
fpm = fp.fit(df_nodup)

# 선행절과 후행절의 offset을 1로 지정하여, 1개의 상품 to 상품의 연관규칙과 confidence를 찾는 예시
result_confidence = fpm.associationRules
result_confidence = result_confidence.where(F.size(F.col("antecedent")) == 1)
result_confidence = result_confidence.where(F.size(F.col("consequent")) == 1)

join_udf = F.udf(lambda x: ",".join(x))
result_confidence = result_confidence.withColumn("antecedent", join_udf(F.col("antecedent")))
result_confidence = result_confidence.withColumn("consequent", join_udf(F.col("consequent")))
result_confidence = result_confidence.withColumnRenamed("antecedent", "f_item")
result_confidence = result_confidence.withColumnRenamed("consequent", "n_item")

- 상품 to 상품의 support, confience 구하기

In [None]:
# Set의 빈도수 체크
base_count = fpm.freqItemsets.count()

# size가 2인 set의 support를 계산 (상품 to 상품 계산에 활용)
result_support = fpm.freqItemsets
result_support = result_support.where(F.size(F.col("items")) == 2)
result_support = result_support.withColumn('support', result_support['freq'] / base_count)

# 위에서 구한 confidence 정보와 support 계산 결과를 left join하여 하나의 result로
cond = [result_confidence.f_item == result_support.items[0], result_confidence.n_item == result_support.items[1]]
result_confidence.join(result_support, cond, 'inner').select('f_item','n_item','confidence','support').show(5)

In [None]:
# 결과는 아래와 같은 형태로 나옴
+--------+--------+-------------------+--------------------+
| f_item | n_item |         confidence|             support|
+--------+--------+-------------------+--------------------+
|29631069|16563825|0.12121212121212122|4.695386782486207E-4|
|29652938|31077931|                0.4|2.347693391243103...|
|30184064|30020144|0.06060606060606061|2.347693391243103...|
|30513164|29318926|                0.5|2.347693391243103...|
|31082011|31090768|0.08333333333333333|2.347693391243103...|
+--------+--------+-------------------+--------------------+