In [1]:
!pip install --quiet pyspark

In [2]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("YAFIM").setMaster("local")
sc = SparkContext(conf=conf)

In [4]:
def printList(list_a):
  for e in list_a:
    print (e)

In [5]:
DEBUG = 1

def Dprint(info):
    if DEBUG:
        print(info)

In [6]:
def generate_next_c(f_k, k):
    next_c = [var1 | var2 for index, var1 in enumerate(f_k) for var2 in f_k[index + 1:] if
              list(var1)[:k - 2] == list(var2)[:k - 2]]
    return next_c

In [7]:
def generate_f_k(sc, c_k, shared_itemset, sup):
    def get_sup(x):
        x_sup = len([1 for t in shared_itemset.value if x.issubset(t)])
        if x_sup >= sup:
            return x, x_sup
        else:
            return ()

    f_k = sc.parallelize(c_k).map(get_sup).filter(lambda x: x).collect()
    return f_k

In [8]:
import itertools
import time

def ParallelAprioriRunner(sc, data, min_sup):
    start = time.time()
    input = sc.textFile(data)
    TransectionRDD = input.map(lambda line: line.strip().split(" "))
    n_samples = input.count()
    # min_sup to frequency
    sup = n_samples * min_sup
    print(sup)
    # split sort
    itemset = input.map(lambda line: sorted([int(item) for item in line.strip().split(' ')]))
    # share the whole itemset with all workers
    shared_itemset = sc.broadcast(itemset.map(lambda x: set(x)).collect())
    # store for all freq_k
    frequent_itemset = []

    # prepare candidate_1
    k = 1
    c_k = itemset.flatMap(lambda x: set(x)).distinct().collect()
    c_k = [{x} for x in c_k]

    # when candidate_k is not empty
    while len(c_k) > 0:
        # generate freq_k
      Dprint("C{}: {}".format(k, c_k))
      f_k = generate_f_k(sc, c_k, shared_itemset, sup)
      Dprint("F{}: {}".format(k, f_k))

      frequent_itemset.append(f_k)
      k += 1
      # generate candidate_k+1
      c_k = generate_next_c([set(item) for item in map(lambda x: x[0], f_k)], k)

    end = time.time()
    time_taken = end - start
    print("Time Taken is:")
    print(time_taken)

In [9]:
chess = "/content/gdrive/MyDrive/Datasets/chess.dat"
mushroom = "/content/gdrive/MyDrive/Datasets/mushroom.dat"

In [10]:
ParallelAprioriRunner(sc,chess,.9)

2876.4
C1: [{1}, {3}, {5}, {7}, {9}, {11}, {13}, {15}, {17}, {19}, {21}, {23}, {25}, {27}, {29}, {31}, {34}, {36}, {38}, {40}, {42}, {44}, {46}, {48}, {50}, {52}, {54}, {56}, {58}, {60}, {62}, {64}, {66}, {68}, {70}, {72}, {74}, {12}, {16}, {20}, {47}, {51}, {63}, {24}, {65}, {43}, {32}, {73}, {4}, {33}, {39}, {71}, {69}, {10}, {18}, {14}, {8}, {49}, {55}, {6}, {37}, {28}, {26}, {75}, {57}, {45}, {22}, {2}, {67}, {35}, {53}, {41}, {61}, {30}, {59}]
F1: [({5}, 2971), ({7}, 3076), ({29}, 3181), ({34}, 3040), ({36}, 3099), ({40}, 3170), ({48}, 3013), ({52}, 3185), ({56}, 3021), ({58}, 3195), ({60}, 3149), ({62}, 3060), ({66}, 3021)]
C2: [{5, 7}, {29, 5}, {34, 5}, {36, 5}, {40, 5}, {48, 5}, {52, 5}, {56, 5}, {58, 5}, {60, 5}, {5, 62}, {66, 5}, {29, 7}, {34, 7}, {36, 7}, {40, 7}, {48, 7}, {52, 7}, {56, 7}, {58, 7}, {60, 7}, {62, 7}, {66, 7}, {34, 29}, {36, 29}, {40, 29}, {48, 29}, {52, 29}, {56, 29}, {58, 29}, {60, 29}, {29, 62}, {66, 29}, {34, 36}, {40, 34}, {48, 34}, {34, 52}, {56, 34}, {

In [11]:
ParallelAprioriRunner(sc,mushroom,.9)

7311.6
C1: [{1}, {3}, {9}, {13}, {23}, {25}, {34}, {36}, {38}, {40}, {52}, {54}, {59}, {63}, {67}, {76}, {85}, {86}, {90}, {93}, {98}, {107}, {113}, {2}, {14}, {26}, {39}, {55}, {99}, {108}, {114}, {4}, {15}, {27}, {41}, {115}, {10}, {16}, {24}, {28}, {37}, {53}, {94}, {109}, {42}, {43}, {110}, {44}, {11}, {64}, {5}, {111}, {6}, {56}, {116}, {57}, {65}, {117}, {100}, {60}, {45}, {68}, {77}, {69}, {78}, {46}, {17}, {29}, {61}, {66}, {70}, {79}, {95}, {101}, {71}, {18}, {30}, {80}, {19}, {47}, {58}, {72}, {91}, {102}, {112}, {118}, {31}, {48}, {20}, {96}, {119}, {103}, {21}, {7}, {81}, {22}, {32}, {82}, {12}, {8}, {49}, {35}, {50}, {73}, {83}, {87}, {51}, {88}, {104}, {33}, {74}, {84}, {92}, {97}, {105}, {106}, {62}, {75}, {89}]
F1: [({34}, 7914), ({85}, 8124), ({86}, 7924), ({90}, 7488)]
C2: [{34, 85}, {34, 86}, {34, 90}, {85, 86}, {90, 85}, {90, 86}]
F2: [({34, 85}, 7914), ({34, 86}, 7906), ({85, 86}, 7924), ({90, 85}, 7488)]
C3: [{34, 85, 86}]
F3: [({34, 85, 86}, 7906)]
Time Taken is: