In [8]:
import numpy as np
from itertools import combinations

class Apriori:
    def __init__(self, min_support=0.5, min_confidence=0.7):
        """
        Initialize the Apriori algorithm for association rule mining.

        Parameters:
        - min_support: Minimum support threshold (0 to 1) for itemset frequency
        - min_confidence: Minimum confidence threshold (0 to 1) for rule strength
        """
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.frequent_itemsets = None  # Dictionary storing frequent itemsets by size
        self.association_rules = None  # List of generated association rules

    def _calculate_support(self, itemset, transaction_records):
        """
        Calculate the support value for an itemset.

        Parameters:
        - itemset: The itemset to evaluate
        - transaction_records: List of all transactions

        Returns:
        - support: Fraction of transactions containing the itemset
        """
        matching_transactions = sum(1 for transaction in transaction_records
                                 if itemset.issubset(transaction))
        return matching_transactions / len(transaction_records)

    def _generate_candidate_itemsets(self, frequent_itemsets, itemset_size):
        """
        Generate candidate itemsets of specified size using frequent itemsets.

        Parameters:
        - frequent_itemsets: Previously found frequent itemsets
        - itemset_size: Desired size of new candidate itemsets

        Returns:
        - candidates: List of candidate itemsets
        """
        unique_items = sorted({item for itemset in frequent_itemsets for item in itemset})
        return [frozenset(combination) for combination in combinations(unique_items, itemset_size)]

    def _prune_candidates(self, candidate_itemsets, previous_frequent_itemsets, itemset_size):
        """
        Prune candidate itemsets that don't meet the Apriori property.

        Parameters:
        - candidate_itemsets: Generated candidate itemsets
        - previous_frequent_itemsets: Frequent itemsets from previous iteration
        - itemset_size: Size of current candidate itemsets

        Returns:
        - valid_candidates: Candidates that satisfy the Apriori property
        """
        valid_candidates = []
        previous_itemsets = set(previous_frequent_itemsets)

        for candidate in candidate_itemsets:
            # Check if all subsets of size k-1 are frequent
            if all(frozenset(subset) in previous_itemsets
                   for subset in combinations(candidate, itemset_size-1)):
                valid_candidates.append(candidate)

        return valid_candidates

    def fit(self, transaction_records):
        """
        Execute the Apriori algorithm to find frequent itemsets and association rules.

        Parameters:
        - transaction_records: List of transactions (each transaction is a list of items)
        """
        # Convert transactions to frozensets for efficient subset operations
        transaction_records = [frozenset(transaction) for transaction in transaction_records]

        # Initialize storage for frequent itemsets
        self.frequent_itemsets = {}
        current_itemset_size = 1

        # Start with single items
        unique_items = {item for transaction in transaction_records for item in transaction}
        candidate_itemsets = [frozenset([item]) for item in unique_items]

        while candidate_itemsets:
            # Evaluate support for current candidates
            frequent_items = []
            for itemset in candidate_itemsets:
                support = self._calculate_support(itemset, transaction_records)
                if support >= self.min_support:
                    frequent_items.append((itemset, support))

            # Store frequent itemsets of current size if any found
            if frequent_items:
                self.frequent_itemsets[current_itemset_size] = frequent_items
                current_itemset_size += 1

                # Generate next level candidates
                candidate_itemsets = self._generate_candidate_itemsets(
                    [itemset for itemset, _ in frequent_items],
                    current_itemset_size
                )

                # Prune invalid candidates
                frequent_itemsets_only = [itemset for itemset, _ in frequent_items]
                candidate_itemsets = self._prune_candidates(
                    candidate_itemsets,
                    frequent_itemsets_only,
                    current_itemset_size-1
                )
            else:
                candidate_itemsets = None

        # Generate association rules from frequent itemsets
        self.association_rules = []
        for itemset_size, itemsets in self.frequent_itemsets.items():
            if itemset_size == 1:  # Skip single items
                continue

            for itemset, itemset_support in itemsets:
                # Generate all possible non-empty subsets as antecedents
                for antecedent_size in range(1, itemset_size):
                    for antecedent in combinations(itemset, antecedent_size):
                        antecedent = frozenset(antecedent)
                        consequent = itemset - antecedent

                        # Calculate rule confidence
                        antecedent_support = self._calculate_support(antecedent, transaction_records)
                        confidence = itemset_support / antecedent_support

                        if confidence >= self.min_confidence:
                            self.association_rules.append({
                                'antecedent': antecedent,
                                'consequent': consequent,
                                'support': itemset_support,
                                'confidence': confidence
                            })

    def get_frequent_itemsets(self, itemset_size=None):
        """
        Retrieve frequent itemsets found by the algorithm.

        Parameters:
        - itemset_size: Specific size of itemsets to return (None returns all)

        Returns:
        - List of tuples containing (itemset, support) pairs
        """
        if itemset_size is None:
            return [itemset for size_group in self.frequent_itemsets.values()
                   for itemset in size_group]
        return self.frequent_itemsets.get(itemset_size, [])

    def get_association_rules(self):
        """
        Retrieve association rules meeting minimum confidence threshold.

        Returns:
        - List of dictionaries containing rule components and metrics
        """
        return self.association_rules

# Demonstration of Apriori algorithm
if __name__ == "__main__":
    # Sample market basket transactions
    market_baskets = [
        ['bread', 'milk'],
        ['bread', 'diapers', 'beer', 'eggs'],
        ['milk', 'diapers', 'beer', 'cola'],
        ['bread', 'milk', 'diapers', 'beer'],
        ['bread', 'milk', 'diapers', 'cola']
    ]

    # Initialize and execute Apriori algorithm
    apriori_analyzer = Apriori(min_support=0.4, min_confidence=0.6)
    apriori_analyzer.fit(market_baskets)

    # Display discovered frequent itemsets
    print("Frequent Itemsets:")
    for size, itemsets in apriori_analyzer.frequent_itemsets.items():
        print(f"\nItemset Size {size}:")
        for itemset, support in itemsets:
            print(f"{set(itemset)}: Support = {support:.2f}")

    # Display generated association rules
    print("\nAssociation Rules:")
    for rule in apriori_analyzer.get_association_rules():
        print(f"{set(rule['antecedent'])} => {set(rule['consequent'])} "
              f"(Support={rule['support']:.2f}, Confidence={rule['confidence']:.2f})")

Frequent Itemsets:

Itemset Size 1:
{'diapers'}: Support = 0.80
{'bread'}: Support = 0.80
{'beer'}: Support = 0.60
{'cola'}: Support = 0.40
{'milk'}: Support = 0.80

Association Rules:
