<a href="https://colab.research.google.com/github/zolfaShefreie/Improved_Apriori-Algorithm/blob/main/improvedAprioriAlgorithm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#download datasets

In [1]:
import requests

In [2]:
groceries_url = 'https://raw.githubusercontent.com/zolfaShefreie/Improved_Apriori-Algorithm/main/groceries.csv'
groceries_file_path = "groceries.csv"

In [3]:
def download_file(url: str, file_path):
  """
  download file and save on file path
  """
  file_content = requests.get(url).text
  file = open(file_path, 'w')
  file.write(file_content)
  file.close()

In [4]:
download_file(groceries_url, groceries_file_path)

#Imports

In [5]:
import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time

#Useful methods

In [6]:
def convert_csv_to_set(path: str) -> set:
    """
    this function convert a csv file to set for get all values of the table
    :param path: the path of a file with .csv format
    :return: a set of values without np.nan
    """
    df = pd.read_csv(path, header=None)

    np_array = df.to_numpy()
    np_array = np_array.flatten().tolist()
    items = set(np_array)
    items.discard(np.nan)

    return items

In [7]:
def convert_csv_to_dict_data(path: str) -> dict:
    """
    this function convert a csv file to dict
    :param path: the path of a file with .csv format
    :return: return  a dictionary with key: index and value a set of sell items without np.nan
    """
    df = pd.read_csv(path, header=None)
    dict_data = df.T.to_dict('list')
    for each in dict_data:
        set_data = set(dict_data[each])
        set_data.discard(np.nan)
        dict_data[each] = set_data

    return dict_data

In [8]:
def convert_csv_to_list_tuple_data(path: str, element_1_type=list, elemetn_2_type=list) -> list:
    """
    this function convert a csv file to a list of tuple
    :param path: the path of a file with .csv format
    :param element_1_type: format of first element of tuple. the format must be itreable
    :param elemetn_2_type: format of second element of tuple. the format must be itreable
    :return: return  a list with key: index and value a list of sell items without np.nan
    """
    df = pd.read_csv(path, header=None)
    dict_data = df.T.to_dict('list')
    tuple_list = list()
    for key, value in dict_data.items():
        values = set(value)
        values.discard(np.nan)
        tuple_list.append((element_1_type([key]), elemetn_2_type(values)))

    return tuple_list

#Basic Apriori

In [None]:
class Rule:
    def __init__(self, rule_part_a: dict, rule_part_b: dict, a_plus_b: int, max_trans: int):
        self.rule_part_a = rule_part_a
        self.rule_part_b = rule_part_b
        self.max_transactions = max_trans
        self.sup_a_plus_b = a_plus_b
        self.sup = self.calculate_sup(a_plus_b, max_trans)
        self.conf = self.calculate_conf(list(rule_part_a.values())[0], a_plus_b)
        self.lift = self.calculate_lift(self.conf, list(rule_part_b.values())[0] / max_trans)

    @staticmethod
    def calculate_sup(sup_count_a_plus_b: int, max_trans: int):
        return sup_count_a_plus_b / max_trans

    @staticmethod
    def calculate_conf(sup_count_a: int, sup_count_a_plus_b: int):
        return sup_count_a_plus_b / sup_count_a

    @staticmethod
    def calculate_lift(conf: float, sup_b: float):
        return conf / sup_b

    def __le__(self, other):
        if self.rule_part_a < other.rule_part_a:
            return True
        return self.rule_part_a == other.rule_part_a and self.rule_part_b < other.rule_part_b

    def __eq__(self, other):
        if self.rule_part_a == other.rule_part_a and self.rule_part_b == other.rule_part_b:
            return True
        return False

    def __str__(self):
        return "{}->{} support: {}, lift: {}, confidence: {}".\
            format(list(list(self.rule_part_a.keys())[0]), list(list(self.rule_part_b.keys())[0]),
                   self.sup, self.lift, self.conf)

    @staticmethod
    def sort_by(elem, sort_by):
        if sort_by == "lift":
            return elem.lift
        if sort_by == "support":
            return elem.sup
        elif sort_by == "confidence":
            return elem.conf

        return elem.lift

In [None]:
class Arules:
    MAX_LENGTH = 1000
    MAX_ITEM_SET_RESULT = 10

    def __init__(self):
        self.max_transactions = 0
        self.frequents = []

    @staticmethod
    def get_items(transactions: dict) -> set:
        """
        get the items of transactions
        :param transactions: a dict {transaction_id: set of items}
        :return: set of items
        """
        items = set()
        for each in transactions.values():
            items.update(each)
        return set(items)

    def level_process(self, transactions: dict, level: int, min_sup: float):
        """
        a func management process of one level
        :param transactions:
        :param level: level or depth of process
        :param min_sup: minimum support
        :return:
        """
        item_keys = self.get_level_item_keys(level)
        args = [(dict(list(transactions.items())[i*self.MAX_LENGTH: (i+1)*self.MAX_LENGTH]), level, item_keys)
                for i in range(int(len(transactions) / self.MAX_LENGTH) + 1)]
        pool = Pool(int(len(transactions) / self.MAX_LENGTH) + 1)
        results = pool.starmap(self.get_c_dict, args)
        pool.close()
        pool.terminate()
        c_results = self.merge_dicts(results)
        if level == 1:
            self.frequents.append(self.get_l_dict(len(transactions), c_results, min_sup, level))
        else:
            result = self.get_l_dict(len(transactions), c_results, min_sup, level, self.frequents[level - 2])
            self.frequents.append(result)

    def get_level_item_keys(self, level: int):
        """
        :param level: level or depth of process
        :return: the key items
        """
        key_list = None
        if len(self.frequents) > 0:
            items = list(self.frequents[level - 2].keys())
            key_list = set()
            count = 1
            for i in range(len(items)):
                item = sorted(list(items[i]))
                for each in items[i + 1:]:
                    each = sorted(list(each))
                    if item[:-1] == each[:-1] and item[-1] != each[-1]:
                        key_list.add(frozenset(item + [each[-1], ]))
                    count += 1
        return key_list

    def get_c_dict(self, transactions: dict, level: int, item_keys=None) -> dict:
        """
        make the table c
        :param transactions: the dict of transactions
        :param level: level or depth of process
        :param item_keys: a list of tuple for a set of items
        :return: a dict {item: sup}
        """
        if level == 1:
            items = self.get_items(transactions)
            item_keys = list(map(frozenset, itertools.combinations(set(items), level)))

        result_dict = dict()
        for each in item_keys:
            for transaction in transactions.values():
                if transaction.intersection(each) == set(each):
                    result_dict[each] = result_dict.get(each, 0) + 1
        return result_dict

    @staticmethod
    def get_l_dict(max_length: int, c: dict, min_sup: float, level: int, pre_l=None) -> dict:
        """
        make the table l
        :param max_length: number of transactions
        :param c: the return of get_c_dict
        :param min_sup: a float number between 0 and 1
        :param level: level or depth of process
        :param pre_l: a dict for self.c[level-1]
        :return: a dict with valid sup
        """
        c_copy = dict(c)
        for each in c:
            if len(each) > 1:
                sub_keys = list(map(frozenset, itertools.combinations(set(each), level-1)))
                if pre_l is None:
                    raise Exception
                pre_keys = set(pre_l.keys())
                commons = pre_keys.intersection(sub_keys)
                if len(commons) != len(sub_keys):
                    c_copy.pop(each)
                    continue
            if c[each] / max_length < min_sup:
                c_copy.pop(each)
        return c_copy

    @staticmethod
    def merge_dicts(list_dict: list) -> dict:
        """
        if a key in dict_a and dict_b => value of key = dict_b[key] + dict_a[key]
        :param list_dict:  list of {key: value} that value must be int
        :return: merge dict
        """
        result = dict()
        keys = list(set([frozenset(key) for each in list_dict for key in each]))
        for key in keys:
            value = 0
            for each in list_dict:
                value += each.get(key, 0)
            result[key] = value
        return result

    def get_frequent_item_sets(self, transactions: dict, min_sup=float('-inf')) -> list:
        """
        get n item set
        :param transactions: a dict of transaction ids and the list of items
        :param min_sup: minimum support
        :return: the list of last level
        """
        self.max_transactions = len(transactions)
        level = 1
        while True:
            self.level_process(transactions, level, min_sup)
            if not self.frequents[level - 1]:
                break
            level += 1
        return list(self.frequents[level - 2].keys())[:self.MAX_ITEM_SET_RESULT]

    def get_arules(self, min_sup=float('-inf'), min_conf=float('-inf'), min_lift=float('-inf'), sort_by='lift'):
        """
        get all rules and sort it
        :param min_sup: a float between 0 and 1
        :param min_conf: a float between 0 and 1
        :param min_lift: a float between 0 and 1
        :param sort_by: choices = (lift, support, confidence)
        :return: sorted rules
        """
        item_sets = list(self.frequents[len(self.frequents) - 2].keys())
        args = [(set(each), self.frequents[len(self.frequents) - 2][each], min_sup, min_conf, min_lift) for each in item_sets]
        pool = Pool(len(item_sets))
        rules = pool.starmap(self.get_item_set_rule, args)
        pool.close()
        pool.terminate()
        rules = [rule for each in rules for rule in each]
        return sorted(rules, key=lambda x: Rule.sort_by(x, sort_by), reverse=True)

    def get_item_set_rule(self, item_set: set, sup_count: int, min_sup=float('-inf'), min_conf=float('-inf'),
                          min_lift=float('-inf')) -> list:
        """
        get unsorted rules of one item set
        :param item_set: one set of  frequent item
        :param sup_count: the sup count of item_set in transactions
        :param min_sup: a float between 0 and 1
        :param min_conf: a float between 0 and 1
        :param min_lift: a float between 0 and 1
        :return: a list of rule
        """
        rule_parts_list = list()
        rule_obj_list = list()
        for i in range(int(len(item_set)-1)):
            sub_sets = list(map(set, itertools.combinations(item_set, i+1)))
            for each in sub_sets:
                complement = item_set - each
                if (each, complement) not in rule_parts_list:
                    rule_parts_list.append((each, complement))
                    rule = Rule(rule_part_a={frozenset(each): self.frequents[len(each) - 1][frozenset(each)]},
                                rule_part_b={frozenset(complement): self.frequents[len(complement) - 1][frozenset(complement)]},
                                a_plus_b=sup_count,
                                max_trans=self.max_transactions)
                    if rule.sup >= min_sup and rule.conf >= min_conf and rule.lift >= min_lift:
                        rule_obj_list.append(rule)
        return rule_obj_list

##analyse results

In [None]:
transactionss = convert_csv_to_dict_data(groceries_file_path)
algo = Arules()
frequents = algo.get_frequent_item_sets(transactionss, 0.005)

l = algo.get_arules(min_sup=0.005, min_conf=0.2)
for each in l:
  print(str(each))

['root vegetables', 'yogurt']->['tropical fruit', 'whole milk'] support: 0.0056939501779359435, lift: 5.212371290127195, confidence: 0.2204724409448819
['root vegetables', 'tropical fruit']->['yogurt', 'whole milk'] support: 0.0056939501779359435, lift: 4.828813663343766, confidence: 0.27053140096618356
['root vegetables', 'pip fruit']->['other vegetables', 'whole milk'] support: 0.005490594814438231, lift: 4.716272378516624, confidence: 0.35294117647058826
['root vegetables', 'tropical fruit']->['other vegetables', 'whole milk'] support: 0.007015760040671073, lift: 4.4542572463768115, confidence: 0.3333333333333333
['root vegetables', 'citrus fruit']->['other vegetables', 'whole milk'] support: 0.005795627859684799, lift: 4.377459707646177, confidence: 0.3275862068965517
['other vegetables', 'fruit/vegetable juice']->['yogurt', 'whole milk'] support: 0.005083884087442806, lift: 4.3114407708426485, confidence: 0.24154589371980675
['other vegetables', 'pip fruit']->['root vegetables', '

#Improved Algorithm

##install pyspark and create session

In [9]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=f2684d4c37fc3fa4a016aaf2c2490dc4b51b2d68d74c447ec44bee3ad1c18786
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [10]:
import pyspark
from pyspark.sql import SparkSession

In [18]:
spark = SparkSession.builder.appName('apriori').getOrCreate()

Py4JJavaError: ignored

In [33]:
SPARK_CONTENT = spark.sparkContext

##improved class

In [13]:
from itertools import chain, combinations


class SoarkArules:
    PARTITION_NUM = 5
    SPARK_CONTENT = SparkSession.builder.config("spark.driver.memory", "10g").appName('spark_apriori').getOrCreate().sparkContext

    def __init__(self):
        self.frequent_items_rdd = None
        self.transaction_num = 0

    @classmethod
    def _first_level_process(cls, transactions_rdd, min_sup_count):
        """
        get first level rdd from transactions
        :param transactions_rdd: transactions rdd (key=set([trandaction_id]), set(items))
        :param min_sup_count:
        :return 
        """
        return transactions_rdd.flatMap(lambda x: [(frozenset([item]), frozenset(x[0])) for item in x[1]]).\
               reduceByKey(lambda x, y: x.union(y)).filter(lambda x: len(x[1])>=min_sup_count)

    @classmethod
    def _upper_level_process(cls,first_level_rdd, previous_level_rdd, level_num, min_sup_count):
        """
        calculate current level data by joining previous_level_rdd elements
        :param first_level_rdd:
        :param previous_level_rdd:
        :param level_number: what is number of current level?
        :param min_sup_count:
        :return: return current level rdd
        """
        current_level_rdd = previous_level_rdd.cartesian(first_level_rdd)
        current_level_rdd = current_level_rdd.map(lambda x: (x[0][0].union(x[1][0]), 
                                                                x[0][1].intersection(x[1][1]))).\
                                                                filter(lambda x: len(x[0])==level_num).\
                                                                filter(lambda x: len(x[1])>=min_sup_count)
        return current_level_rdd
    
    @classmethod
    def _get_partision_frequent_item_sets(cls, transactions_rdd, min_sup_count):
        """
        get frequent items in one partition
        :param transactions_rdd:
        :param min_sup_count:
        :return: all frequent_items
        """
        first_level_rdd = cls._first_level_process(transactions_rdd, min_sup_count)
        max_level = first_level_rdd.count()
        level_num = 1
        current_level_rdd = None
        while True:
            print("loop level", level_num+1)
            level_num += 1
            if current_level_rdd is None:
                result = cls._upper_level_process(first_level_rdd=first_level_rdd,
                                                  previous_level_rdd=first_level_rdd, 
                                                  level_num=level_num,
                                                  min_sup_count=min_sup_count)
                current_level_rdd = cls.SPARK_CONTENT.union([first_level_rdd, result]).distinct()
            else:
                result = cls._upper_level_process(first_level_rdd=first_level_rdd,
                                                  previous_level_rdd=current_level_rdd.filter(lambda x: len(x[0])==level_num-1), 
                                                  level_num=level_num,
                                                  min_sup_count=min_sup_count)
                current_level_rdd = cls.SPARK_CONTENT.union([current_level_rdd, result]).distinct()

            if result.isEmpty():
              break
        return current_level_rdd
    
    @classmethod
    def _get_sup_count(cls, rdd):
        """
        calculate support_count for each data
        :param rdd:
        :return: rdd with (itemset, support_count) element
        """
        return rdd.map(lambda x: (x[0], len(x[1])))
    
    @classmethod
    def _combine_frequent_items_sup(cls, rdd, min_sup_count):
        """
        compine support_count same keys and filter based on min_sup_count
        :param rdd:
        :param min_sup_count:
        :return: filterd rdd with (itemset, support_count) element
        """
        return rdd.reduceByKey(lambda x, y: x+y).filter(lambda x: x[1]>=min_sup_count)
    
    def get_frequent_item_sets(self, transactions: list, min_sup=float('-inf')) -> list:
        """
        get frequent itemsets
        :param transactions: a list of tuple(transaction_id, items)
        :param min_sup:
        :return: frequent itemset rdd
        """
        transactions_rdd = self.SPARK_CONTENT.parallelize(transactions, self.PARTITION_NUM)
        self.transaction_num = len(transactions)
        # FIRDD = transactions_rdd.mapPartitions(self._get_partision_frequent_item_sets)
        min_sup_count = int(self.transaction_num * min_sup) if min_sup != float('-inf') else 0
        FIRDD = self._get_partision_frequent_item_sets(transactions_rdd, min_sup_count)
        self.frequent_items_rdd = self._get_sup_count(FIRDD)
        # self.frequent_items_rdd = self._combine_frequent_items_sup(FIRDD_plau_Sup, min_sup_count)
        return self.frequent_items_rdd
    
    def get_arules(self, min_sup=float('-inf'), min_conf=float('-inf'), min_lift=float('-inf'), sort_by='lift'):
        """
        get all rules and sort it
        :param min_sup: a float between 0 and 1
        :param min_conf: a float between 0 and 1
        :param min_lift: a float between 0 and 1
        :param sort_by: choices = (lift, support, confidence)
        :return: sorted rules
        """
        base_rules_format = self.frequent_items_rdd.cartesian(self.frequent_items_rdd).\
                            filter(lambda item:frozenset(item[0][0].union(item[1][0]))==frozenset(item[0][0])).\
                            cartesian(self.frequent_items_rdd).\
                            map(lambda x:(x[0][0], x[0][1], x[1])).\
                            filter(lambda item: frozenset(item[0][1])==frozenset(item[1][0].union(item[2][0])))
        trans_num = self.transaction_num
        rules = base_rules_format.map(lambda x: ((x[1][0], x[2][0]), 
                                                 x[0][0]/trans_num,
                                                 x[0][0]/x[1][0],
                                                 x[0][0]/(x[1][0]*x[2][0])))
        rules = rules.filter(lambda x: x[1]>=min_sup and x[2]>=min_conf and x[3]>=min_lift)
        if sort_by=='lift':
          return rules.sortBy(lambda x: x[3])
        elif sort_by=='support':
          return rules.sortBy(lambda x: x[1])
        elif sort_by=='confidence':
          return rules.sortBy(lambda x: x[2])
        return rules

In [14]:
transactions = convert_csv_to_list_tuple_data(groceries_file_path, element_1_type=frozenset, elemetn_2_type=frozenset)
spark_arules=SoarkArules()
frequents = spark_arules.get_frequent_item_sets(transactions, 0.01)

loop level 2
loop level 3
loop level 4


In [None]:
rule = spark_arules.get_arules(min_sup=0.01, min_conf=0.4)

In [None]:
rule.take(20)