<a href="https://colab.research.google.com/github/prometheus404/AMD_project/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
##### DOWNLOAD DATASET #####
from IPython.display import clear_output
from google.colab import files
files.upload()
!ls -lha kaggle.json
!pip install -q kaggle # installing the kaggle package
!mkdir -p ~/.kaggle # creating .kaggle folder where the key should be placed
!cp kaggle.json ~/.kaggle/ # move the key to the folder
!pwd # checking the present working directory
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d gsimonx37/letterboxd
!unzip /content/letterboxd.zip -d /content/letterbox/
clear_output()
############################

In [1]:
########### SPARK CONTEXT #####################
import pandas as pd
import itertools
from tqdm import tqdm

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better

sc = spark.sparkContext
###############################################

In [4]:
############## BASKET CREATION ################
#TODO do it using SPARK directly -> cars = spark.read.csv('cars.csv', header=True, sep=";")
actors = pd.read_csv("letterbox/actors.csv")
actors = actors
baskets_full = actors.groupby("id")["name"].apply(list)
baskets = baskets_full.sample(30000)

print("number of baskets: " + str(len(baskets)))
print("biggest basket: " + str(baskets.map(len).max()))
print(baskets)
baskets_RDD = sc.parallelize(baskets).cache()
##############################################

number of baskets: 30000
biggest basket: 246
id
1061722    [Victor Andres Trelles Turgeon, Paul-Antoine T...
1780554    [Günter Pfitzmann, Gerd Duwner, Inge Wolffberg...
1049826    [Andressa Koetz, Juninho Bill, Paulo Reis, Zac...
1113595    [Kurt Sinclair, Leslie Taylor, Kathleen Klein,...
1594379    [Çetin Başaran, Naki Yurter, Recep Filiz, Haka...
                                 ...                        
1483472    [Song Yoo-dam, Haechan, Kim Sang-gyun, Kim Si-...
1374678    [Taeko Yoshida, Mitsuru Tamaki, Masao Teruya, ...
1304228    [Megumi Okada, Kikuko Inoue, Aya Endo, Kanehir...
1749248                                      [Chloë Fratani]
1875572                                        [Nick Cheung]
Name: name, Length: 30000, dtype: object


In [3]:
test_basket = [
    ['a','b','c','d','e','f','g','h','i','l'],
    ['n','o'],
    ['o','n'],
    ['a','b','c','d','e','f',],
    ['a','b','c'],
    ['f','q'],
    ['f','q','z'],
    ['f','u','q'],
    ['n','o','i'],
    ['a','b','c','d','e','f','q','p','o','n'],
    ['a','b','c','d','e','f','r','s','t','u'],
]
test_RDD = sc.parallelize(test_basket).cache()

In [5]:
## APRIORI
def apriori(chunk, s, tot_bsk):

  n_bsk = 0 # number of baskets in chunk used to calculate support
#  trsl = dict() # using a translation could be better for the combination step
  count = dict() # should be relatively efficient since is a hash table

#
#  def translate(e, dic):
#    try:
#      return dic[e]
#    except KeyError:
#      dic[e] = len(dic)+1
#      return dic[e]
  def incr(e,dic):
    try:
      dic[e] += 1;
    except KeyError:
      dic[e] = 1

  chunk = list(chunk) # to avoid consuming after first pass
  ### first pass
  for bsk in chunk:
    n_bsk += 1;
    #elements = [translate(x,trsl) for x in bsk]
    for b in bsk:
      incr(b,count)

  ### filter
  to_del = []
  frequent = []
  for c in count:
    if count[c] >= s* n_bsk/tot_bsk:
      frequent.append(c)

  k = 2
  while len(frequent) > 0:
    count = dict()

    for bsk in tqdm(chunk):
      #filter only members of C_k
      filtered = set()
      for b in bsk:
        for f in frequent:
          if b in f:
            filtered.add(b)
            break;
      bsk = list(filtered)
      bsk.sort()
      for tpl in itertools.combinations(bsk, k):
          incr(tuple(tpl), count)

    frequent = []
    for c in count:
      if count[c] >= s * n_bsk/tot_bsk:
        frequent.append(c)
        yield(c,1)
    k+=1

#SON second map
def second_map(chunk,itemset,s):
  def incr(e,dic):
    try:
      dic[e] += 1;
    except KeyError:
      dic[e] = 1

  count = dict()
  for bsk in tqdm(chunk):
    for i in itemset:
      flag = True
      for e in i:
        if e not in bsk:
          flag = False
          break
      if flag:
        incr(i,count)
  for c in count:
    yield (c,count[c])

In [6]:
# Comparative test between apriori and SON
S =  5; #support threshold (maybe a fraction is better)
TOT = len(test_basket)#len(baskets)

real = []
for a in apriori(test_basket,S,TOT):
  real.append(a[0])
print("apriori output: "+ str(real))

#SON first step
candidates = test_RDD.mapPartitions(lambda x: apriori(x,S,TOT)).reduceByKey(lambda a,b: a).keys().collect()
print("candidates: " + str(candidates))
#SON second step
real_from_candidates = test_RDD.mapPartitions(lambda x: second_map(x, candidates,S)).reduceByKey(lambda a,b: a+b).filter(lambda a: a[1] >= S).keys().collect()
print("SON output: "+ str(real_from_candidates))
not_real = [x for x in candidates if x not in real]
not_present = [x for x in real if x not in candidates]
print("candidates not in real set:" + str(not_real))
print("real itemset not present in candidates:" + str(not_present))
f1 = [x for x in real_from_candidates if x not in real]
f2 = [x for x in real if x not in real_from_candidates]
print("diff between final and real:"+ str(f1) + str(f2))


100%|██████████| 11/11 [00:00<00:00, 13380.90it/s]
100%|██████████| 11/11 [00:00<00:00, 60867.21it/s]
100%|██████████| 11/11 [00:00<00:00, 62771.90it/s]


apriori output: [('a', 'b'), ('a', 'c'), ('b', 'c'), ('a', 'b', 'c')]
candidates: [('b', 'c'), ('a', 'b', 'c'), ('f', 'q'), ('a', 'b'), ('a', 'c')]
SON output: [('b', 'c'), ('a', 'b', 'c'), ('a', 'b'), ('a', 'c')]
candidates not in real set:[('f', 'q')]
real itemset not present in candidates:[]
diff between final and real:[][]


In [7]:
def son(rdd,s,tot):
  #FIRST STEP
  candidates = rdd.mapPartitions(lambda x: apriori(x,s,tot)).reduceByKey(lambda a,b: a).keys().collect()
  #SECOND STEP
  return rdd.mapPartitions(lambda x: second_map(x, candidates,s)).reduceByKey(lambda a,b: a+b).filter(lambda a: a[1] >= S).keys().collect()

#son(test_RDD,S,TOT)

In [8]:
son(baskets_RDD, 6, len(baskets))

[('Jeon Jung-kook', 'Jung Ho-seok'),
 ('Jeon Jung-kook', 'Kim Seok-jin'),
 ('Jeon Jung-kook', 'Kim Tae-hyung'),
 ('Jeon Jung-kook', 'Min Yoon-gi'),
 ('Jung Ho-seok', 'Kim Seok-jin'),
 ('Jung Ho-seok', 'Kim Tae-hyung'),
 ('Jung Ho-seok', 'Min Yoon-gi'),
 ('Kim Nam-joon', 'Park Ji-min'),
 ('Kim Seok-jin', 'Kim Tae-hyung'),
 ('Kim Seok-jin', 'Min Yoon-gi'),
 ('Kim Tae-hyung', 'Min Yoon-gi'),
 ('Jeon Jung-kook', 'Jung Ho-seok', 'Kim Seok-jin'),
 ('Jeon Jung-kook', 'Jung Ho-seok', 'Kim Tae-hyung'),
 ('Jeon Jung-kook', 'Jung Ho-seok', 'Min Yoon-gi'),
 ('Jeon Jung-kook', 'Kim Nam-joon', 'Park Ji-min'),
 ('Jeon Jung-kook', 'Kim Seok-jin', 'Kim Tae-hyung'),
 ('Jeon Jung-kook', 'Kim Seok-jin', 'Min Yoon-gi'),
 ('Jeon Jung-kook', 'Kim Tae-hyung', 'Min Yoon-gi'),
 ('Jung Ho-seok', 'Kim Nam-joon', 'Park Ji-min'),
 ('Jung Ho-seok', 'Kim Seok-jin', 'Kim Tae-hyung'),
 ('Jung Ho-seok', 'Kim Seok-jin', 'Min Yoon-gi'),
 ('Jung Ho-seok', 'Kim Tae-hyung', 'Min Yoon-gi'),
 ('Kim Nam-joon', 'Kim Seok-jin', '

In [13]:
#son(full_RDD, 20, len(baskets_full))

In [12]:
execute_full = True
result = []
if execute_full:
  print("cc")
  full_RDD = sc.parallelize(baskets_full).cache()
  for i in apriori(baskets_full, 140, len(baskets_full)):
    result.append(i[0])
print(result)

cc


100%|██████████| 603163/603163 [09:02<00:00, 1111.29it/s]
100%|██████████| 603163/603163 [00:08<00:00, 72722.17it/s] 
100%|██████████| 603163/603163 [00:02<00:00, 269889.57it/s]

[('Larry Fine', 'Moe Howard'), ('Oliver Hardy', 'Stan Laurel'), ('James Hetfield', 'Kirk Hammett'), ('James Hetfield', 'Lars Ulrich'), ('Kirk Hammett', 'Lars Ulrich'), ('Jack Mercer', 'Mae Questel'), ('Ali Basha', 'Brahmanandam'), ('Bebe Daniels', 'Harold Lloyd'), ('Bebe Daniels', "Harry 'Snub' Pollard"), ('Harold Lloyd', "Harry 'Snub' Pollard"), ('James Hetfield', 'Kirk Hammett', 'Lars Ulrich'), ('Bebe Daniels', 'Harold Lloyd', "Harry 'Snub' Pollard")]





603163