# Đồ án giữa kỳ
# Môn: Xử lý dữ liệu lớn
# Học kỳ 1 - Năm học 2022-2023
# Giảng viên: Th.S Nguyễn Thành An

# Cài đặt PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()

# Yêu cầu

## Spark Context

In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext("local")
sqlc = SQLContext(sc)

## Đọc dữ liệu data.csv

In [4]:
data = sc.textFile('/content/data.csv')

## Câu 1: Đếm món hàng

In [5]:
'''
Câu 1
'''
import numpy
import pandas as pd
import pyspark.sql.functions as pf
from pyspark.sql.functions import col
from operator import add

rdd = data.map(lambda line: line.split(",")) \
          .map(lambda line: ((line[0], line[1]), 1)) \
          .reduceByKey(add).map(lambda line: str(line[0][0]) +','+ str(line[0][1]) + ','+ str(line[1]))

rdd.coalesce(1, shuffle=False).saveAsTextFile('counters')

In [6]:
import os

if os.path.exists('/content/counters/part-00000'):
  sqlc.read.csv('/content/counters/part-00000', header=True).show()

+-------------+----------+---+
|Member_number|      Date|  1|
+-------------+----------+---+
|         1249|01/01/2014|  2|
|         1381|01/01/2014|  2|
|         1440|01/01/2014|  2|
|         1659|01/01/2014|  2|
|         1789|01/01/2014|  2|
|         1922|01/01/2014|  2|
|         2226|01/01/2014|  2|
|         2237|01/01/2014|  2|
|         2351|01/01/2014|  2|
|         2542|01/01/2014|  2|
|         2610|01/01/2014|  3|
|         2709|01/01/2014|  2|
|         2727|01/01/2014|  2|
|         2943|01/01/2014|  2|
|         2974|01/01/2014|  3|
|         3681|01/01/2014|  3|
|         3797|01/01/2014|  2|
|         3942|01/01/2014|  3|
|         3956|01/01/2014|  4|
|         4260|01/01/2014|  2|
+-------------+----------+---+
only showing top 20 rows



## Câu 2: Giỏ hàng

In [7]:
from os.path import join
'''
Câu 2
'''

rdd_cau2 = data.map(lambda x: x.split(',')) \
                .map(lambda x: ((x[0], x[1]), [x[2]])) \
                .reduceByKey(add).map((lambda line: str(line[0][0]) +';'+ str(line[0][1]) + ';'+ str(','.join(line[1]))))

rdd_cau2.coalesce(1, shuffle=False).saveAsTextFile('baskets')

In [8]:
import os

if os.path.exists('/content/baskets/part-00000'):
  sqlc.read.csv('/content/baskets/part-00000', header=True, sep=';').show()

+-------------+----------+--------------------+
|Member_number|      Date|     itemDescription|
+-------------+----------+--------------------+
|         1249|01/01/2014| citrus fruit,coffee|
|         1381|01/01/2014|           curd,soda|
|         1440|01/01/2014|other vegetables,...|
|         1659|01/01/2014|specialty chocola...|
|         1789|01/01/2014|hamburger meat,ca...|
|         1922|01/01/2014|tropical fruit,ot...|
|         2226|01/01/2014|sausage,bottled w...|
|         2237|01/01/2014|bottled water,Ins...|
|         2351|01/01/2014|cleaner,shopping ...|
|         2542|01/01/2014|sliced cheese,bot...|
|         2610|01/01/2014|hamburger meat,bo...|
|         2709|01/01/2014|yogurt,frozen veg...|
|         2727|01/01/2014|hamburger meat,fr...|
|         2943|01/01/2014|whole milk,flower...|
|         2974|01/01/2014|berries,whipped/s...|
|         3681|01/01/2014|onions,whipped/so...|
|         3797|01/01/2014|  waffles,whole milk|
|         3942|01/01/2014|other vegetabl

## Câu 3: Tập phổ biến

In [9]:
from pyspark.sql.functions import split

tmp = sqlc.read.csv('/content/baskets/part-00000', header=True, sep=';').rdd
temp = tmp.map(lambda x: (x[0], x[1], list(set(x[2].split(',')))))
dfBaskets = sqlc.createDataFrame(temp,["Member_number","Date","Items"])
dfBaskets.show()

from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="Items", 
                    minSupport=0.01, 
                    minConfidence=0.1)
model = fpGrowth.fit(dfBaskets)

model.freqItemsets.show()
model.associationRules.show()

+-------------+----------+--------------------+
|Member_number|      Date|               Items|
+-------------+----------+--------------------+
|         1249|01/01/2014|[coffee, citrus f...|
|         1381|01/01/2014|        [soda, curd]|
|         1440|01/01/2014|[yogurt, other ve...|
|         1659|01/01/2014|[specialty chocol...|
|         1789|01/01/2014|[hamburger meat, ...|
|         1922|01/01/2014|[other vegetables...|
|         2226|01/01/2014|[sausage, bottled...|
|         2237|01/01/2014|[bottled water, I...|
|         2351|01/01/2014|[shopping bags, c...|
|         2542|01/01/2014|[bottled water, s...|
|         2610|01/01/2014|[hamburger meat, ...|
|         2709|01/01/2014|[yogurt, frozen v...|
|         2727|01/01/2014|[hamburger meat, ...|
|         2943|01/01/2014|[whole milk, flow...|
|         2974|01/01/2014|[bottled water, w...|
|         3681|01/01/2014|[onions, whipped/...|
|         3797|01/01/2014|[whole milk, waff...|
|         3942|01/01/2014|[yogurt, other

## Câu 4: Giỏ hàng thành vectors 

In [10]:
from IPython.utils.py3compat import iteritems
'''
Câu 4
'''
from pyspark.sql.functions import array_distinct
tmp = sqlc.read.csv('/content/baskets/part-00000', header=True, sep=';').rdd
temp = tmp.map(lambda x: (x[0], x[2].split(','))).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], ','.join(x[1])))
dfMembers = sqlc.createDataFrame(temp,["Member_number","Items"])
dfMembers.show()

items = dfMembers.select("Items").rdd.flatMap(lambda x: (it.split(',') for it in x))

listItems = []
for i in items.collect():
  listItems += i

listItems = list(set(listItems))
listItems.sort()
print(listItems)

def dictItems(listItems):
  return {it:i for i, it in enumerate(listItems)}

dictItems = dictItems(listItems)
print(dictItems)

from pyspark.ml.linalg import Vectors
  
def basket2vector(member, basket, dictItems):
  basket = basket.split(',')
  basket.sort()
  items = []
  exist = []
  for i in basket:
    items.append(dictItems[i])
    exist.append(1.0)
  return Vectors.sparse(len(dictItems), items, exist)

print(basket2vector(dfMembers.first()["Member_number"], dfMembers.first()["Items"], dictItems))

+-------------+--------------------+
|Member_number|               Items|
+-------------+--------------------+
|         1249|citrus fruit,coff...|
|         1381|curd,soda,coffee,...|
|         1440|other vegetables,...|
|         1659|specialty chocola...|
|         1789|hamburger meat,ca...|
|         1922|tropical fruit,ot...|
|         2226|sausage,bottled w...|
|         2237|bottled water,Ins...|
|         2351|cleaner,shopping ...|
|         2542|sliced cheese,bot...|
|         2610|hamburger meat,bo...|
|         2709|yogurt,frozen veg...|
|         2727|hamburger meat,fr...|
|         2943|whole milk,flower...|
|         2974|berries,whipped/s...|
|         3681|onions,whipped/so...|
|         3797|waffles,whole mil...|
|         3942|other vegetables,...|
|         3956|yogurt,shopping b...|
|         4260|soda,brown bread,...|
+-------------+--------------------+
only showing top 20 rows

['Instant food products', 'UHT-milk', 'abrasive cleaner', 'artif. sweetener', 'baby co

## Câu 5: Giỏ hàng tương tự

In [11]:
from pyspark.ml.feature import MinHashLSH

dfTemp = dfMembers.rdd.map(lambda x: (x[0], x[1]))

vectorItems = []
for i in dfTemp.collect():
 vectorItems.append((i[0], basket2vector(i[0],','.join(list(set(i[1].split(',')))), dictItems)))

dfA = sqlc.createDataFrame(vectorItems, ["Member_number", "Items"])

mh = MinHashLSH(inputCol="Items", outputCol="Hashes", numHashTables=5)
model = mh.fit(dfA)

model.transform(dfA).show()

tranform_df = model.transform(dfA)
model.approxSimilarityJoin(tranform_df, tranform_df, 0.3, "JaccardDistance") \
  .select(col("datasetA.Member_number").alias("idA"),
    col("datasetB.Member_number").alias("idB"),
    col("JaccardDistance")).filter('JaccardDistance>0').show()

key = dfA.collect()[0]['Items']

model.approxNearestNeighbors(dfA, key, 5).show()

+-------------+--------------------+--------------------+
|Member_number|               Items|              Hashes|
+-------------+--------------------+--------------------+
|         1249|(167,[11,30,34,61...|[[2.2371912E7], [...|
|         1381|(167,[1,10,11,28,...|[[2.2371912E7], [...|
|         1440|(167,[28,64,102,1...|[[1.21253679E8], ...|
|         1659|(167,[12,14,26,34...|[[1.98803892E8], ...|
|         1789|(167,[8,18,30,58,...|[[1.14143161E8], ...|
|         1922|(167,[10,12,15,16...|[[1.21253679E8], ...|
|         2226|(167,[9,12,23,40,...|[[4595617.0], [4....|
|         2237|(167,[0,12,27,34,...|[[1.10587902E8], ...|
|         2351|(167,[31,49,75,10...|[[1.8816653E7], [...|
|         2542|(167,[12,82,88,94...|[[1.10587902E8], ...|
|         2610|(167,[11,49,68,90...|[[2.2371912E7], [...|
|         2709|(167,[12,30,40,44...|[[1.98803892E8], ...|
|         2727|(167,[11,52,56,61...|[[2.2371912E7], [...|
|         2943|(167,[20,29,33,54...|[[1.14143161E8], ...|
|         2974

## Câu 6: Phân cụm người dùng theo giỏ hàng

In [12]:
'''
Câu 6
'''
from pyspark.ml.clustering import KMeans, KMeansModel

kmeans = KMeans(k=5)

dfTemp = dfMembers.rdd.map(lambda x: (x[0], x[1]))

vectorItems = []
for i in dfTemp.collect():
 vectorItems.append((i[0], basket2vector(i[0],','.join(list(set(i[1].split(',')))), dictItems)))

dfB = sqlc.createDataFrame(vectorItems, ["label", "features"])

model = kmeans.fit(dfB)
model.setPredictionCol('prediction')
transformed = model.transform(dfB)
transformed = transformed.withColumnRenamed("label","Member_number") \
                          .withColumnRenamed("features","Items")
transformed.show()

+-------------+--------------------+----------+
|Member_number|               Items|prediction|
+-------------+--------------------+----------+
|         1249|(167,[11,30,34,61...|         1|
|         1381|(167,[1,10,11,28,...|         0|
|         1440|(167,[28,64,102,1...|         4|
|         1659|(167,[12,14,26,34...|         4|
|         1789|(167,[8,18,30,58,...|         4|
|         1922|(167,[10,12,15,16...|         4|
|         2226|(167,[9,12,23,40,...|         4|
|         2237|(167,[0,12,27,34,...|         1|
|         2351|(167,[31,49,75,10...|         2|
|         2542|(167,[12,82,88,94...|         3|
|         2610|(167,[11,49,68,90...|         2|
|         2709|(167,[12,30,40,44...|         4|
|         2727|(167,[11,52,56,61...|         1|
|         2943|(167,[20,29,33,54...|         1|
|         2974|(167,[9,12,63,70,...|         1|
|         3681|(167,[47,63,88,99...|         1|
|         3797|(167,[15,38,64,10...|         3|
|         3942|(167,[0,21,32,35,...|    