In [2]:
import sys
import time
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import logging
import multiprocessing
import unittest

import mlxtend
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori

In [3]:
class Node():
    def __init__(self, item, count=0):
        self.item = item
        self.count = count
        self.parent = None
        self.children = {}

    def __str__(self):
        if self.item is not None:
            s = 'item: {} count: {}  '.format(self.item, self.count)
        else:
            s = 'root \n'
        s += 'children: '
        for child in self.children:
            s += str(child) + ' '

        return s

In [4]:
class FPTree():
    def __init__(self):
        # dict[item] = [Node1, Node2...]
        self.header_table = {}
        self.item_counter = {}
        self.root = Node(None)

    def add_tran(self, tran, weight=1):
        ptr = self.root
        for item in tran:
            if item in ptr.children:
                ptr.children[item].count += weight
                self.item_counter[item] += weight
                ptr = ptr.children[item]
            else:
                new_node = Node(item, weight)
                new_node.parent = ptr
                ptr.children[item] = new_node
                if item in self.header_table:
                    self.header_table[item].append(new_node)
                    self.item_counter[item] += weight
                else:
                    self.header_table[item] = [new_node]
                    self.item_counter[item] = weight
                ptr = new_node

        return

    def mine(self, min_cnt=1):
        """
        return:
            [list of frequent patterns, list of fp count]
        """
        fp, fp_count = [], []
        for item in self.header_table:
            if self.item_counter[item] >= min_cnt:
                fp.append([item])
                fp_count.append(self.item_counter[item])

                cond_trans, weights = self.get_conditional_tran(item, min_cnt)
                cond_tree = FPTree()
                for tran, weight in zip(cond_trans, weights):
                    assert item not in tran, (item, tran, weight)
                    cond_tree.add_tran(tran, weight)
                cond_fp, cond_fp_count = cond_tree.mine(min_cnt)
                if cond_fp:
                    cond_fp = [i + [item] for i in cond_fp]
                    fp += cond_fp
                    fp_count += cond_fp_count

        assert len(fp) == len(fp_count)
        if fp:
            fp = [sorted(i) for i in fp]
            tmp = list(zip(fp, fp_count))
            tmp = sorted(tmp, key=lambda x: (len(x[0]), x[0]))
            fp, fp_count = list(zip(*tmp))

        return fp, fp_count

    def get_conditional_tran(self, item, min_cnt=1):
        """
        excluding item
        return:
        [list of items, list of weight]
        """
        trans, weights = [], []
        for node in self.header_table[item]:
            # if node.count >= min_cnt:
            tmp_tran = []
            ptr = node.parent
            # while ptr.item: // wrong when item == 0
            while ptr.item != None:
                tmp_tran.append(ptr.item)
                ptr = ptr.parent
            if tmp_tran:
                trans.append(tmp_tran)
                weights.append(node.count)

        return trans, weights

    def print_tree(self):
        l = [self.root]
        while l:
            next_l = []
            for node in l:
                print(node)
                next_l += node.children.values()
            l = next_l
            print('----------------------------------')

In [5]:
class MineByItem(multiprocessing.Process):
    def __init__(self, tree, min_cnt, fp_list, queue):
        multiprocessing.Process.__init__(self)
        self.tree = tree
        self.min_cnt = min_cnt
        self.queue = queue
        self.fp_list = fp_list

    def run(self):
        while True:
            if self.queue.empty():
                break
            else:
                item = self.queue.get()
                self.fp_list.append(([item], self.tree.item_counter[item]))
                cond_trans, weights = self.tree.get_conditional_tran(item, self.min_cnt)
                cond_tree = FPTree()
                for tran, weight in zip(cond_trans, weights):
                    assert item not in tran, (item, tran, weight)
                    cond_tree.add_tran(tran, weight)
                cond_fp, cond_fp_count = cond_tree.mine(self.min_cnt)
                if cond_fp:
                    cond_fp = [i + [item] for i in cond_fp]
                    cond_fp = [sorted(fp) for fp in cond_fp]
                    self.fp_list += list(zip(cond_fp, cond_fp_count))

def parallel_mine(tree, min_cnt=1, n_jobs=4):
    mgr = multiprocessing.Manager()
    fp_list = mgr.list()
    queue = multiprocessing.Queue()

    for item in tree.header_table:
        if tree.item_counter[item] >= min_cnt:
            queue.put(item)

    process_list = [MineByItem(tree, min_cnt, fp_list, queue) for _ in range(n_jobs)]
    for process in process_list: process.start()
    for process in process_list: process.join()
    fp_list = list(fp_list)
    if fp_list:
        fp_list = sorted(fp_list, key=lambda x: (len(x[0]), x[0]))

    return fp_list


class CountWorker(multiprocessing.Process):
    def __init__(self, count_list, queue):
        multiprocessing.Process.__init__(self)
        self.count_list = count_list
        self.queue = queue

    def run(self):
        counter = {}
        while True:
            if self.queue.empty():
                break
            else:
                sub_trans = self.queue.get()
                for tran in sub_trans:
                    for item in tran:
                        counter[item] = counter.get(item, 0) + 1
        self.count_list.append(counter)


def parallel_count(trans, n_jobs=1):
    mgr = multiprocessing.Manager()
    count_list = mgr.list()
    queue = multiprocessing.Queue()

    batch_len = 10000
    for idx in range(len(trans) // batch_len + 1):
        queue.put(trans[idx * batch_len: (idx + 1) * batch_len])

    process_list = [CountWorker(count_list, queue) for _ in range(n_jobs)]
    for process in process_list: process.start()
    for process in process_list: process.join()

    counter = {}
    for sub_counter in count_list:
        for item in sub_counter:
            counter[item] = counter.get(item, 0) + sub_counter[item]

    return counter


def fp_growth(trans, min_support=0.1, use_log=False, n_jobs=1):
    """FP growth algorithm for frequent patterns mining
    Arguments:
        trans: a list of transactions, each transaction is a list of items
        min_support (float): minimum support, default: 0.1
        use_log: logging, default: False
        n_jobs (int): when n_jobs > 1, mining the frequent patterns in paralle. default: 1

    Return:
        list of [pattern, frequency] tuples, pattern is an items list
    """
    if use_log:
        logging.basicConfig(filename='fp_tree.log', format='%(asctime)s %(message)s',
            level=logging.DEBUG, datefmt='%Y/%m/%d %I:%M:%S %p')

    if use_log:
        logging.info('Begin to count items')
    # count and sort
    counter = {}
    min_cnt = int(min_support * len(trans))
    if min_cnt < 1: return []

#     if n_jobs == 1:
    if True:
        for tran in trans:
            for item in tran:
                counter[item] = counter.get(item, 0) + 1
#     else:
#         parallel_count(trans, n_jobs)

    if use_log:
        logging.info('Counting finished')

    frequent_item = [item for item in counter if counter[item] >= min_cnt]

    # build FP-tree
    fp_tree = FPTree()
    if use_log:
        logging.info('Begin to add transactions')

    # add trans
    for tran in trans:
        tran = [item for item in tran if item in frequent_item]
        tran = list(set(tran))
        tran = sorted(tran, key=lambda x: (counter[x], x), reverse=True) # the order is very important
        if tran: fp_tree.add_tran(tran)

    if use_log:
        logging.info('Adding transactions finished')
        logging.info('Begin to mine fp')

    # mine pattern
    if n_jobs == 1:
        res = fp_tree.mine(min_cnt)
        res = list(zip(*res))
    elif n_jobs > 1:
        res = parallel_mine(fp_tree, min_cnt, n_jobs)

    if use_log:
        logging.info('Mining fp finished')

    return res

In [6]:
class TestFPTree(unittest.TestCase):
    def setUp(self):
        pass

    def test_add_tran(self):
        tree = FPTree()
        tree.add_tran([1, 2, 3, 4])
        tree.add_tran([1, 2, 3])
        tree.add_tran([1, 2])
        tree.add_tran([1])

        self.assertEqual(len(tree.header_table), 4)
        self.assertEqual(len(tree.item_counter), 4)

        for i in range(1, 5):
            self.assertEqual(tree.item_counter[i] + i, 5)
        for i in range(1, 5):
            self.assertEqual(len(tree.header_table[i]), 1)

        tree.add_tran([1, 3, 4], 2)
        self.assertEqual(len(tree.header_table[3]), 2)
        self.assertEqual(len(tree.header_table[4]), 2)
        self.assertEqual(tree.item_counter[1], 6)
        self.assertEqual(tree.item_counter[2], 3)

    def test_get_conditional_tran(self):# test get_conditional_tran
        tree = FPTree()
        tree.add_tran([1, 2, 3, 4])
        tree.add_tran([1, 2, 3])
        tree.add_tran([1, 2])
        tree.add_tran([1])
        tree.add_tran([1, 3, 4], 2)

        trans, weights = tree.get_conditional_tran(4, 1)
        self.assertEqual(len(trans), 2)
        self.assertEqual(weights[0], 1)
        self.assertEqual(weights[1], 2)

    def compare_with_apriori(self, dataset, test_parallel=False):
        support = 0.4
        # apriori
        te = TransactionEncoder()
        te_ary = te.fit(dataset).transform(dataset)
        df = pd.DataFrame(te_ary, columns=te.columns_)
        df = apriori(df, min_support=support, use_colnames=True)
        frequency = df.iloc[:, 0].to_numpy().tolist()
        pattern = df.iloc[:, 1].to_numpy().tolist()
        pattern = [sorted(list(i)) for i in pattern]
        apriori_result = sorted(zip(pattern, frequency), key=lambda x: (len(x[0]), x[0]))

        # fp-growth
        if test_parallel:
            fp_result = fp_growth(dataset, support, n_jobs=4)
        else:
            fp_result = fp_growth(dataset, support)

        self.assertEqual(len(apriori_result), len(fp_result))
        for idx in range(len(apriori_result)):
            self.assertEqual(apriori_result[idx][0], fp_result[idx][0])
            self.assertTrue(round(apriori_result[idx][1] * len(dataset)) == fp_result[idx][1]) # float point calculation error

    def test_testify_apriori_small_dataset(self):
        dataset = [['1', '2', '3', '4', '5', '6'],
                   ['7', '2', '3', '4', '5', '6'],
                   ['1', '11', '4', '5'],
                   ['1', '10', '8', '4', '6'],
                   ['8', '2', '2', '4', '9', '5']]

        self.compare_with_apriori(dataset)
        self.compare_with_apriori(dataset, True)


    def test_testify_apriori_fake_dataset(self):
        # make fake dataset
        total_trans = 10000
        max_trans_len = 20
        dataset = []
        for _ in range(total_trans):
            tmp = [np.random.randint(max_trans_len) for __ in range(max_trans_len)]
            tmp = list(set(tmp))
            dataset.append(tmp)

#         output_lines = []
#         for tran in dataset:
#             tran = [str(i) for i in tran]
#             tran = '[' + ', '.join(tran) + ']'
#             output_lines.append(tran)

#         with open('dataset.dat', 'w') as file:
#             output_lines = '[' + ',\n'.join(output_lines) + ']'
#             file.write(output_lines)

        self.compare_with_apriori(dataset)
        self.compare_with_apriori(dataset, True)

In [9]:
if __name__ == '__main__':
    unittest.main()

E
ERROR: C:\Users\PR269KB\AppData\Roaming\jupyter\runtime\kernel-28a11521-9ff6-4aee-b23e-8525d8a070dd (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute 'C:\Users\PR269KB\AppData\Roaming\jupyter\runtime\kernel-28a11521-9ff6-4aee-b23e-8525d8a070dd'

----------------------------------------------------------------------
Ran 1 test in 0.002s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [16]:
running_time = []
for total_trans in [i * 5000 for i in range(1, 14)]:
    print(total_trans)
    min_support = 0.01
#     if min_support * len(total_trans) < 100: min_support = 100 / len(total_trans)
        
    one_pass_time = []
    
    with open("test.txt", 'r') as file: lines = file.readlines()
    
    # prepare dataset
    sents = []
    lines = [line.strip('\n').strip('0\t').strip('1\t') for line in lines]
    for line in lines: sents += line.split('\t')
    sents = [sent.split(' ') for sent in sents]   
    print(len(lines), len(sents))
    
#     total_trans= int(total_trans)
#     max_trans_len = 50
#     items_num = 1000000
#     dataset = []
#     for _ in range(total_trans):
#         tmp = [np.random.randint(items_num) for __ in range(np.random.randint(max_trans_len))]
#         tmp = list(set(tmp))
#         dataset.append(tmp)
        

    dataset = sents[:total_trans]

    # fp-tree sequential
    start = time.time()
    pattern = fp_growth(dataset, min_support, True, n_jobs=1)
    elapsed = time.time() - start
    one_pass_time.append(elapsed)


    #fp-tree parallel
    start = time.time()
    pattern = fp_growth(dataset, min_support, True, n_jobs=4)
    elapsed = time.time() - start
    one_pass_time.append(elapsed)
    
    
    # apriori 
    te = TransactionEncoder()
    te_ary = te.fit(dataset).transform(dataset)
    df = pd.DataFrame(te_ary, columns=te.columns_)
    start = time.time()
    fp = apriori(df, min_support=min_support)
    elapsed = time.time() - start
    one_pass_time.append(elapsed)
    
    running_time.append(one_pass_time)

5000


UnicodeDecodeError: 'charmap' codec can't decode byte 0x8f in position 14: character maps to <undefined>

In [None]:
names = ['fp-growth', 'fp-growth-parallel', 'apriori']
dataset_size = [i * 5000 for i in range(1, 14)]
running_time = np.asarray(running_time)
for idx in range(len(names)): plt.plot(dataset_size, running_time[:, idx], label=names[idx])
plt.legend()
plt.xlabel('dataset size')
plt.ylabel('time (s)')
plt.savefig('run_time.png', dpi=300)
# plt.show()