# Data Reading and Preprocessing


In [92]:
import pandas as pd
import numpy as np
from itertools import combinations

In [93]:
df = pd.read_csv("transactions_binarized.csv")
print(df.info())
print(df.shape)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7501 entries, 0 to 7500
Columns: 120 entries,  asparagus to zucchini
dtypes: int64(120)
memory usage: 6.9 MB
None
(7501, 120)


Indexing each item from the header of the data file.


In [94]:
item_list = df.columns.tolist()
items_dict = dict()

for i, item in enumerate(item_list):
    items_dict[item] = i + 1

# display the most frequent items
item_counts = df.sum(axis=0)
item_counts = item_counts.sort_values(ascending=False)

Extracting the transactions from the data.


In [100]:
transactions = list()

for i, row in df.iterrows():
    transaction = set()

    for item in items_dict:
        if row[item] == 1:
            transaction.add(items_dict[item])
    transactions.append(transaction)

# print the first 5 transactions
for i in range(5):
    print(transactions[i])

{2, 3, 5, 34, 40, 49, 54, 55, 61, 66, 73, 82, 92, 93, 98, 103, 107, 112, 115, 118}
{16, 70, 38}
{28}
{5, 111}
{39, 72, 73, 117, 55}


**get_support** function evaluates the support value for a set given all the transactions.


In [96]:
def get_frequency(transactions, item_set):
    match_count = float(0)
    if len(transactions) == 0:
        return match_count
    for transaction in transactions:
        if item_set.issubset(transaction):
            match_count += 1

    return match_count / len(transactions)


def test_get_frequency():
    # Test case 1: Empty transactions
    transactions_empty = []
    item_set_1 = {1, 2, 3}
    expected_output_1 = 0.0
    assert get_frequency(transactions_empty, item_set_1) == expected_output_1

    # Test case 2: No matching transactions
    transactions_no_match = [{4, 5}, {6, 7}, {8, 9}]
    item_set_2 = {1, 2, 3}
    expected_output_2 = 0.0
    assert get_frequency(transactions_no_match,
                         item_set_2) == expected_output_2

    # Test case 3: Some matching transactions
    transactions_some_match = [{1, 2, 3}, {2, 3, 4}, {3, 4, 5}, {1, 3, 5}]
    item_set_3 = {1, 2, 3}
    expected_output_3 = 0.25  # 1 out of 4 transactions
    assert get_frequency(transactions_some_match,
                         item_set_3) == expected_output_3

    # Test case 4: All transactions matching
    transactions_all_match = [{1, 2, 3}, {1, 2, 3}, {1, 2, 3}]
    item_set_4 = {1, 2, 3}
    expected_output_4 = 1.0  # All transactions match
    assert get_frequency(transactions_all_match,
                         item_set_4) == expected_output_4

    # Test case 4: Subset of transactions matching
    transactions_all_match = [{1, 2, 3, 4, 5}, {1, 2, 3}, {7, 8, 9}]
    item_set_5 = {1, 2, 3}
    expected_output_5 = float(2/3)
    assert get_frequency(transactions_all_match,
                         item_set_5) == expected_output_5

    print("All tests passed!")


# Run the test cases
test_get_frequency()

All tests passed!


**generate_candidate_item_sets** function generates the candidate item sets of size k from the frequent item sets of size k-1.


In [97]:
def generate_candidate_item_sets(frequent_item_sets, level):
    current_level_candidates = list()

    if len(frequent_item_sets[level - 1]) == 0:
        return current_level_candidates

    # Extract unique items from the frequent item sets of the previous level
    unique_items = set()
    for item_set, _ in frequent_item_sets[level - 1]:
        unique_items.update(item_set)

    # Generate candidates by combining unique items
    for candidate_set in combinations(unique_items, level):
        candidate_set = set(candidate_set)
        current_level_candidates.append(candidate_set)

    return current_level_candidates


def test_generate_candidate_item_sets():
    # Sample input data
    frequent_item_sets = {
        1: [({1}, 0.2), ({2}, 0.3), ({3}, 0.4)],
        2: [({1, 2}, 0.1), ({1, 3}, 0.2), ({2, 3}, 0.3)],
    }
    level = 3

    # Expected output
    expected_output = [{1, 2, 3}]

    # Call the function
    result = generate_candidate_item_sets(frequent_item_sets, level)

    # Assertion
    assert (
        result == expected_output
    ), f"Test failed: Expected {expected_output}, but got {result}"

    print("Test passed!")


# Run the test
test_generate_candidate_item_sets()

Test passed!


---

**pruning** function prunes the candidate sets evaluated after completing the self-join part. For each itemset, it finds all its subsets by dropping a single elements from it and checks if that subset was present in the previous level or not. If that subset was not present in the previous level, then the current set is not valid and must not be used, and is thus pruned.


In [98]:
def get_single_drop_subsets(item_set):
    single_drop_subsets = list()
    for item in item_set:
        temp = item_set.copy()
        temp.remove(item)
        single_drop_subsets.append(temp)

    return single_drop_subsets


def test_get_single_drop_subsets():
    # Test case 1: Empty item set
    item_set_empty = set()
    expected_output_empty = []
    assert get_single_drop_subsets(item_set_empty) == expected_output_empty

    # Test case 2: Non-empty item set
    item_set_non_empty = {1, 2, 3}
    expected_output_non_empty = [{2, 3}, {1, 3}, {1, 2}]
    assert get_single_drop_subsets(
        item_set_non_empty) == expected_output_non_empty

    print("Test cases for get_single_drop_subsets passed!")


def is_valid_set(item_set, prev_level_sets):
    single_drop_subsets = get_single_drop_subsets(item_set)

    for single_drop_set in single_drop_subsets:
        if single_drop_set not in prev_level_sets:
            return False
    return True


def test_is_valid_set():
    # # Test case 1: Empty previous level sets
    # item_set = {1, 2}
    # prev_level_sets_empty = []
    # assert is_valid_set(item_set, prev_level_sets_empty) is True

    # Test case 2: Item set not present in previous level sets
    item_set_not_present = {1, 2}
    prev_level_sets = [{3, 4}, {5, 6}]
    assert is_valid_set(item_set_not_present, prev_level_sets) is False

    # Test case 3: Item set present in previous level sets
    item_set_present = {1, 2}
    prev_level_sets_present = [{1, 2}, {3, 4}]
    assert is_valid_set(item_set_present, prev_level_sets_present) is True

    print("Test cases for is_valid_set passed!")


def prune_candidates(frequent_item_sets, level, candidate_set):
    post_prune_candidates_set = list()
    if len(candidate_set) == 0:
        return post_prune_candidates_set

    prev_level_sets = list()
    for item_set, _ in frequent_item_sets[level - 1]:
        prev_level_sets.append(item_set)

    for item_set in candidate_set:
        if is_valid_set(item_set, prev_level_sets):
            post_prune_candidates_set.append(item_set)

    return post_prune_candidates_set


def test_prune_candidates():
    # Test case 1: Empty candidate set
    frequent_item_sets = {
        1: [({1}, 0.2), ({2}, 0.3)],
        2: [({1, 2}, 0.1), ({1, 3}, 0.2)],
    }
    level = 3
    candidate_set_empty = []
    expected_output_empty = []
    assert (
        prune_candidates(frequent_item_sets, level, candidate_set_empty)
        == expected_output_empty
    )

    # Test case 2: Candidate set with valid item sets
    candidate_set_valid = [{1, 2, 3}, {2, 3, 4}, {3, 4, 5}]
    expected_output_valid = [{1, 2, 3}, {2, 3, 4}, {3, 4, 5}]
    assert (
        prune_candidates(frequent_item_sets, level, candidate_set_valid)
        == expected_output_valid
    )

    # Test case 3: Candidate set with invalid item sets
    candidate_set_invalid = [{1, 2}, {2, 3}, {4, 5}]
    expected_output_invalid = []
    assert (
        prune_candidates(frequent_item_sets, level, candidate_set_invalid)
        == expected_output_invalid
    )

    print("Test cases for pruning passed!")


# Run the test cases
test_get_single_drop_subsets()
# test_is_valid_set()
test_prune_candidates()

Test cases for get_single_drop_subsets passed!


AssertionError: 

## Main apriori algorithm


In [None]:
from collections import defaultdict


def apriori(min_support):
    frequent_item_sets = defaultdict(list)

    print("level : 1", end=" ")

    for item in range(1, len(item_list) + 1):
        support = get_frequency(transactions, {item})
        if support >= min_support:
            frequent_item_sets[1].append(({item}, support))

    for level in range(2, len(item_list) + 1):
        print(level, end=" ")
        current_level_candidates = generate_candidate_item_sets(
            frequent_item_sets, level
        )
        post_pruning_candidates = prune_candidates(
            frequent_item_sets, level, current_level_candidates
        )

        if len(post_pruning_candidates) == 0:
            print(f"{level} reached, no more frequent item sets.")
            break

        for item_set in post_pruning_candidates:
            support = get_frequency(transactions, item_set)
            if support >= min_support:
                frequent_item_sets[level].append((item_set, support))

    return frequent_item_sets

### Specify the **minimum support** value here


In [None]:
min_support = 0.005
frequent_item_sets = apriori(transactions)

Debug print statements to check the number of frequent sets calculated for each level.


In [None]:
for level in frequent_item_sets:
    print(len(frequent_item_sets[level]))

Debug statement to check the frequent sets calculated.


In [None]:
for level in frequent_item_sets:
    print(frequent_item_sets[level])

---

## Generating Association Rules

Prepare input for calculating association rules: Create a dictionary of each frequent itemset against its support value.


In [None]:
item_support_dict = dict()
item_list = list()

key_list = list(items_dict.keys())
val_list = list(items_dict.values())

for level in frequent_item_sets:
    for set_support_pair in frequent_item_sets[level]:
        for i in set_support_pair[0]:
            item_list.append(key_list[val_list.index(i)])
        item_support_dict[frozenset(item_list)] = set_support_pair[1]
        item_list = list()

Debug statement to check the values in the dictionary created.


In [None]:
item_support_dict

### Utility Function

**find_subset** finds all the subsets of the given itemset.


In [None]:
def find_subset(item, item_length):
    combs = []
    for i in range(1, item_length + 1):
        combs.append(list(combinations(item, i)))

    subsets = []
    for comb in combs:
        for elt in comb:
            subsets.append(elt)

    return subsets

**association_rules** generates the association rules in accordance with the given _minimum confidence_ value and the provided dictionary of itemsets against their support values. For itemsets of more than one element, it first finds all their subsets. For every subset A, it calculates the set B = itemset-A. If B is not empty, the confidence of B is calculated. If this value is more than _minimum confidence_ value, the rule _A->B_ is added to the list.


In [None]:
def association_rules(min_confidence, support_dict):
    rules = list()
    for item, support in support_dict.items():
        item_length = len(item)

        if item_length > 1:
            subsets = find_subset(item, item_length)

            for A in subsets:
                B = item.difference(A)

                if B:
                    A = frozenset(A)

                    AB = A | B

                    confidence = support_dict[AB] / support_dict[A]
                    if confidence >= min_confidence:
                        rules.append((A, B, confidence))

    return rules

### Specify Minimum confidence value here


In [None]:
association_rules = association_rules(
    min_confidence=0.5, support_dict=item_support_dict
)

---

### Printing the output in the required format


In [None]:
print("Number of rules: ", len(association_rules), "\n")

for rule in association_rules:
    print(
        '{0} -> {1} <confidence: {2}>'.format(set(rule[0]), set(rule[1]), rule[2]))