In [1]:
import findspark
findspark.init()

from pyspark import *
from pyspark.sql.functions import desc, col, rand
from pyspark.sql import *
from graphframes import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

import os
from IPython.display import display, HTML
import pandas as pd
import numpy as np
import sys
from sympy.ntheory.generate import nextprime
import time
from tqdm import tqdm
import copy
from math import comb
import multiprocessing as mp

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
# https://graphframes.github.io/graphframes/docs/_site/quick-start.html
# https://stackoverflow.com/questions/65011599/how-to-start-graphframes-on-spark-on-pyspark-on-juypter-on-docker
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'
os.environ['PYSPARK_PYTHON'] = sys.executable

In [4]:
spark = SparkSession.builder.appName('hw2') \
        .master("local[2]").getOrCreate()

21/11/18 19:04:50 WARN Utils: Your hostname, mark-machine resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface wlp8s0)
21/11/18 19:04:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mark/.ivy2/cache
The jars for the packages stored in: /home/mark/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-74be1d16-eec7-4fc0-a83a-297022d361ed;1.0
	confs: [default]
	found graphframes#graphframes;0.8.1-spark3.0-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 105ms :: artifacts dl 4ms
	:: modules in use:
	graphframes#graphframes;0.8.1-spark3.0-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------

In [5]:
data_path = "data/T10I4D100K.dat"

In [6]:
nprocs = mp.cpu_count()
print(f"Number of CPU cores: {nprocs}")

Number of CPU cores: 8


In [7]:
def read_data(data_path):
    # read raw data
    data_raw_rdd = spark.sparkContext.textFile(data_path)
    # transform raw data to list of lists (list of ordered baskets)
    data_rdd = data_raw_rdd.map(lambda x: set([int(y) for y in x.strip().split(" ")]))
    # get item counts (k:v where k is hashed item id and v is count)
    items_counts_rdd = data_rdd.flatMap(lambda list: list).map(lambda w: (w,1)).reduceByKey(lambda a, b: a+b)
    # get items
    items_rdd = items_counts_rdd.map(lambda x: x[0])
    
    return data_rdd, items_rdd, items_counts_rdd

In [8]:
data_rdd, items_rdd, items_counts_rdd = read_data(data_path=data_path)
data_rdd_c = data_rdd.collect()

                                                                                

In [9]:
print(data_rdd.take(2))
n_baskets = data_rdd.count()
print(f"n_baskets = {n_baskets}")

[{448, 834, 164, 775, 328, 687, 240, 368, 274, 561, 52, 630, 730, 825, 538, 25}, {704, 834, 581, 39, 205, 814, 401, 120, 825, 124}]
n_baskets = 100000


In [10]:
items_rdd.take(20)
n_items = items_rdd.count()
print(f"n_items = {n_items}")

[Stage 3:>                                                          (0 + 2) / 2]

n_items = 870


                                                                                

In [11]:
print(items_counts_rdd.count())
items_counts_rdd.take(10)

870


[(448, 1370),
 (834, 1373),
 (164, 744),
 (328, 663),
 (240, 1399),
 (368, 7828),
 (274, 2628),
 (52, 1983),
 (630, 1523),
 (538, 3982)]

In [12]:
def get_singletons(items_counts_rdd, s):
    singletons_rdd = items_counts_rdd.filter(lambda x: s <= x[1])
    singletons_rdd = singletons_rdd.map(lambda x: (set([x[0]]), x[1]))
    return singletons_rdd

In [13]:
def construct_itemsets(k, itemsets_frequent_rdd, from_ckpt=False):
    
    assert 1 < k
    
    if from_ckpt:
        candidates = np.load(f'ckpt/candidates_k_{k}.npy', allow_pickle=True)
        candidates = spark.sparkContext.parallelize(candidates.tolist())
        print(f"loaded proposed n={candidates.count()} candidates")
    else:
        singletons_rdd = itemsets_frequent_rdd.filter(lambda x: len(x[0]) == 1)
        k_minus_1_tons_rdd = itemsets_frequent_rdd.filter(lambda x: len(x[0]) == k-1)
        l1 = singletons_rdd.map(lambda x: x[0])
        l2 = k_minus_1_tons_rdd.map(lambda x: x[0])
        l3 = l1.cartesian(l2)

        l4 = l3.map(lambda x: x[0].union(x[1])).filter(lambda x: len(x) == k)
        l4c = l4.collect()
        l6c = [set(item) for item in set(frozenset(item) for item in l4c)]
        len(l6c)
        #l6czip = [(idx, x) for idx, x in enumerate(l6c)]
        l6czip = [(x, 0) for idx, x in enumerate(l6c)]
        candidates = spark.sparkContext.parallelize(l6czip)

        n_before_prune = candidates.count()

        np.save(f'ckpt/candidates_notpruned_k_{k}', np.array(candidates.collect()))


        for i in range(1, k):
            n_comb = comb(k,i)
            itemsets_i = itemsets_frequent_rdd.filter(lambda x: len(x[0]) == i).collect()
            candidates = candidates.map(lambda x: (x[0], sum([len(x[0].intersection(s[0])) == i for s in itemsets_i])))\
                .filter(lambda x: n_comb == x[1])

        candidates = candidates.map(lambda x: x[0]).zipWithIndex().map(lambda x: (x[1], x[0]))
        n_after_prune = candidates.count()

        np.save(f'ckpt/candidates_k_{k}', np.array(candidates.collect()))

        print(f"proposing n={candidates.count()} candidates (n_pruned={n_before_prune - n_after_prune})")
    
    return candidates

def f(candidate, data_rdd_c, k):
        return len(list(filter(lambda x: len(x) == k, map(lambda x: x & candidate, data_rdd_c))))

def filter_itemsets(candidates_rdd, k, s, itemsets_frequent_rdd, from_ckpt=False):
    if from_ckpt:
        start_time = time.time()
        print("Filtering loading from file...")
        res = np.load(f'ckpt/filtered_candidates_k_{k}_s_{s}.npy', allow_pickle=True)
        res = spark.sparkContext.parallelize(res.tolist())
        end_time = time.time()
        print(f"k={k}, t={end_time - start_time}, n={res.count()}")
    else:
        start_time = time.time()
        print("Staring filtering...")

        candidates = candidates_rdd.collect()

        pool = mp.Pool(processes=nprocs)
        supports = pool.starmap(f, [(c,data_rdd_c,k) for (idx, c) in candidates])

        res = \
            spark.sparkContext.parallelize(candidates)\
            .filter(lambda x: s <= supports[x[0]]).map(lambda x: (x[1], supports[x[0]]))

        np.save(f'ckpt/filtered_candidates_k_{k}_s_{s}', np.array(res.collect()))

        end_time = time.time()
        print(f"k={k}, t={end_time - start_time}, n={res.count()}")
    
    return res

In [14]:
s = 1000
from_ckpt = True

itemsets_frequent_rdd_1 = get_singletons(items_counts_rdd=items_counts_rdd, s=s)
itemsets_frequent_rdd_1.count()

375

In [15]:
itemsets_frequent_rdd_1.take(10)

[({448}, 1370),
 ({834}, 1373),
 ({240}, 1399),
 ({368}, 7828),
 ({274}, 2628),
 ({52}, 1983),
 ({630}, 1523),
 ({538}, 3982),
 ({704}, 1794),
 ({814}, 1672)]

In [16]:
candidates_2 = construct_itemsets(k=2, itemsets_frequent_rdd=itemsets_frequent_rdd_1, from_ckpt=from_ckpt)
candidates_2.take(20)

loaded proposed n=70125 candidates


[[0, {413, 494}],
 [1, {874, 978}],
 [2, {701, 946}],
 [3, {335, 804}],
 [4, {576, 583}],
 [5, {242, 684}],
 [6, {597, 641}],
 [7, {581, 766}],
 [8, {335, 538}],
 [9, {39, 884}],
 [10, {516, 854}],
 [11, {115, 735}],
 [12, {126, 952}],
 [13, {854, 895}],
 [14, {682, 740}],
 [15, {774, 984}],
 [16, {468, 984}],
 [17, {738, 749}],
 [18, {675, 790}],
 [19, {529, 600}]]

In [17]:
new_itemsets_frequent_rdd_2 = \
    filter_itemsets(candidates_rdd=candidates_2, k=2, s=s, itemsets_frequent_rdd=itemsets_frequent_rdd_1, from_ckpt=from_ckpt)

itemsets_frequent_rdd_2 = itemsets_frequent_rdd_1.union(new_itemsets_frequent_rdd_2)
print(itemsets_frequent_rdd_2.count())

Filtering loading from file...
k=2, t=0.005522727966308594, n=9
384


In [18]:
candidates_3 = construct_itemsets(k=3, itemsets_frequent_rdd=itemsets_frequent_rdd_2, from_ckpt=from_ckpt)
candidates_3.take(20)

loaded proposed n=1 candidates


[[0, {39, 704, 825}]]

In [19]:
new_itemsets_frequent_rdd_3 = \
    filter_itemsets(candidates_rdd=candidates_3, k=3, s=s, itemsets_frequent_rdd=itemsets_frequent_rdd_2, from_ckpt=from_ckpt)

itemsets_frequent_rdd_3 = itemsets_frequent_rdd_2.union(new_itemsets_frequent_rdd_3)
print(itemsets_frequent_rdd_3.count())

Filtering loading from file...
k=3, t=0.0051195621490478516, n=1
385
