In [1]:
import pandas as pd
from collections import defaultdict
from typing import List, Dict, Optional, Tuple

class HCFPGrowthModel:
    def __init__(self, min_item_frequency=5, min_support=5):
        self.min_item_frequency = min_item_frequency
        self.min_support = min_support
        self.frequent_patterns = {}
        self.recommendations = []
        self.item_frequency = {}
        self.active_day = None
        self.peak_hours = None
        self.basket_size = None

    class TreeNode:
        def __init__(self, item: Optional[str], parent: Optional['HCFPGrowthModel.TreeNode']):
            self.item = item
            self.count = 1
            self.parent = parent
            self.children: Dict[str, 'HCFPGrowthModel.TreeNode'] = {}
            self.link = None

        def increment(self, count=1):
            self.count += count

    def _compress_transactions(self, transactions: List[List[str]]) -> List[List[str]]:
        frequency = defaultdict(int)
        for transaction in transactions:
            for item in transaction:
                frequency[item] += 1
        self.item_frequency = dict(frequency)

        def sort_items(t):
            return sorted(
                [item for item in t if frequency[item] >= self.min_item_frequency],
                key=lambda x: (-frequency[x], x)
            )

        return [sort_items(t) for t in transactions if sort_items(t)]

    def _build_fp_tree(self, transactions: List[List[str]]) -> Tuple['TreeNode', Dict[str, List['TreeNode']]]:
        header_table: Dict[str, List[HCFPGrowthModel.TreeNode]] = defaultdict(list)
        root = self.TreeNode(None, None)

        for transaction in transactions:
            current_node = root
            for item in transaction:
                if item in current_node.children:
                    current_node.children[item].increment()
                else:
                    new_node = self.TreeNode(item, current_node)
                    current_node.children[item] = new_node
                    header_table[item].append(new_node)
                current_node = current_node.children[item]

        header_table = {
            item: nodes for item, nodes in header_table.items()
            if sum(n.count for n in nodes) >= self.min_support
        }
        return root, header_table

    def _ascend_fp_tree(self, node: 'TreeNode') -> List[str]:
        path = []
        while node.parent and node.parent.item is not None:
            node = node.parent
            path.append(node.item)
        return path[::-1]

    def _mine_patterns(self, header_table: Dict[str, List['TreeNode']]) -> Dict[Tuple[str, ...], int]:
        patterns = {}
        for item, nodes in header_table.items():
            for node in nodes:
                path = self._ascend_fp_tree(node)
                if path:
                    pattern = tuple(sorted(path + [item]))
                    patterns[pattern] = patterns.get(pattern, 0) + node.count
        return {p: c for p, c in patterns.items() if c >= self.min_support}

    def fit(self, df: pd.DataFrame):
        transactions = df.groupby("Member_number")["itemDescription"].apply(list).tolist()
        compressed = self._compress_transactions(transactions)
        _, header_table = self._build_fp_tree(compressed)
        self.frequent_patterns = self._mine_patterns(header_table)
        self.recommendations = sorted(self.frequent_patterns.items(), key=lambda x: -x[1])
        self._analyze_trends(df)

    def recommend(self, top_n=10) -> List[Tuple[Tuple[str, ...], int]]:
        return self.recommendations[:top_n]

    def _analyze_trends(self, df: pd.DataFrame):
        if 'Date' in df.columns:
            df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
            df['DayOfWeek'] = df['Date'].dt.day_name()
            df['Hour'] = df['Date'].dt.hour

            self.active_day = df['DayOfWeek'].mode()[0]
            self.peak_hours = df.groupby('Hour').size().idxmax()
            self.basket_size = df.groupby('Member_number')['itemDescription'].apply(len).mean()

    def get_trends(self) -> Tuple[Optional[str], Optional[int], Optional[float]]:
        return self.active_day, self.peak_hours, self.basket_size


In [2]:
import joblib

# Load data
df = pd.read_csv("C:/Users/tejav/OneDrive/Documents/GitHub/market-analysis/BACKEND/pandas/DATASET/Groceries_dataset.csv")

# Train model
model = HCFPGrowthModel(min_item_frequency=5, min_support=5)
model.fit(df)

# Save model
joblib.dump(model, "hc_fp_growth_model.joblib")


  df['Date'] = pd.to_datetime(df['Date'], errors='coerce')


['hc_fp_growth_model.joblib']

In [3]:
# Load saved model
model = joblib.load("hc_fp_growth_model.joblib")

# Get recommendations
top_recs = model.recommend()
for items, count in top_recs:
    print(" + ".join(items), "->", count)

# Get purchase trends
active_day, peak_hour, avg_basket = model.get_trends()
print(f"Most active day: {active_day}")
print(f"Peak hour: {peak_hour}")
print(f"Avg basket size: {avg_basket:.2f}")


whole milk + whole milk -> 534
other vegetables + whole milk -> 498
rolls/buns + whole milk -> 264
other vegetables + rolls/buns -> 179
other vegetables + whole milk + whole milk -> 179
other vegetables + other vegetables -> 166
other vegetables + rolls/buns + whole milk -> 151
soda + whole milk -> 140
whole milk + whole milk + whole milk -> 137
other vegetables + other vegetables + whole milk -> 116
Most active day: Thursday
Peak hour: 0
Avg basket size: 9.94


In [8]:
from __future__ import annotations  # This enables postponed annotations
from typing import Optional, Dict, List, Tuple, DefaultDict, Any, Set
from collections import defaultdict, OrderedDict
import pandas as pd
import joblib
import math
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class HCFPGrowthModel:
    """Hierarchical Compressed FP-Growth algorithm implementation"""
    
    class TreeNode:
        """Node for the FP-Tree structure"""
        def __init__(self, item: Optional[str], parent: Optional['TreeNode']):
            self.item = item
            self.count = 1
            self.parent = parent
            self.children: Dict[str, 'HCFPGrowthModel.TreeNode'] = OrderedDict()
            self.link: Optional['HCFPGrowthModel.TreeNode'] = None

        def increment(self, count: int = 1) -> None:
            """Increase the node's count"""
            self.count += count

    def __init__(self, min_support: int = 5, min_item_frequency: int = 5, compression_ratio: float = 0.5):
        """
        Initialize the HCFPGrowth model
        
        Args:
            min_support: Minimum support count for frequent patterns
            min_item_frequency: Minimum frequency for items to be considered
            compression_ratio: Ratio for hierarchical compression
        """
        self.min_support = min_support
        self.min_item_frequency = min_item_frequency
        self.compression_ratio = compression_ratio
        self.hierarchy: Dict[int, Dict[str, str]] = {}
        self.frequent_patterns: Dict[Tuple[str, ...], int] = {}
        self.header_table: DefaultDict[str, List[TreeNode]] = defaultdict(list)
        self.item_frequency: Dict[str, int] = {}
        self.trends: Dict[str, Any] = {}
        self.root: Optional[HCFPGrowthModel.TreeNode] = None

    def fit(self, df: pd.DataFrame) -> None:
        """Train the HCFP-Growth model"""
        logger.info("Starting model training...")
        transactions = self._prepare_transactions(df)
        self._build_hierarchy(transactions)
        compressed_transactions = self._compress_transactions(transactions)
        self._build_hcfp_tree(compressed_transactions)
        self._mine_compressed_patterns()
        self._expand_hierarchical_patterns()
        self.trends = self._analyze_trends(df)
        logger.info("Model training completed")

    # [Keep all other methods exactly the same as in your original implementation]
    # _prepare_transactions, _build_hierarchy, _compress_transactions, etc.

    def recommend(self, top_n: int = 10) -> List[Tuple[Tuple[str, ...], int, float]]:
        """
        Get top recommendations with support percentage
        
        Args:
            top_n: Number of recommendations to return
            
        Returns:
            List of tuples containing (itemset, count, support_percentage)
        """
        total_transactions = sum(self.item_frequency.values()) / len(self.item_frequency)
        return sorted(
            [(pattern, count, (count/total_transactions)*100) 
             for pattern, count in self.frequent_patterns.items()],
            key=lambda x: (-x[1], -x[2], len(x[0]))
        )[:top_n]

    def save(self, path: str) -> None:
        """Save model to file"""
        joblib.dump(self, path, compress=3)
        logger.info(f"Model saved to {path}")

    @classmethod
    def load(cls, path: str) -> 'HCFPGrowthModel':
        """Load model from file"""
        model = joblib.load(path)
        if not isinstance(model, HCFPGrowthModel):
            raise ValueError("Loaded file is not an HCFPGrowthModel")
        return model

In [14]:
from typing import Optional, Dict, List, Tuple, DefaultDict, Any
from collections import defaultdict, OrderedDict
import pandas as pd
import joblib
import math
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class HCFPGrowthModel:
    class TreeNode:
        def __init__(self, item: Optional[str], parent: Optional['HCFPGrowthModel.TreeNode']):
            self.item = item
            self.count = 1
            self.parent = parent
            self.children: Dict[str, 'HCFPGrowthModel.TreeNode'] = OrderedDict()
            self.link = None

        def increment(self, count: int = 1):
            self.count += count

    def __init__(self, min_support: int = 5, min_item_frequency: int = 5, compression_ratio: float = 0.5):
        self.min_support = min_support
        self.min_item_frequency = min_item_frequency
        self.compression_ratio = compression_ratio
        self.hierarchy: Dict[int, Dict[str, str]] = {}
        self.frequent_patterns: Dict[Tuple[str, ...], int] = {}
        self.header_table: DefaultDict[str, List[TreeNode]] = defaultdict(list)
        self.item_frequency: Dict[str, int] = {}
        self.trends: Dict[str, Any] = {}
        self.root: Optional[HCFPGrowthModel.TreeNode] = None

    def fit(self, df: pd.DataFrame) -> None:
        """Train the HCFP-Growth model with comprehensive logging"""
        logger.info("Starting model training...")
        try:
            transactions = self._prepare_transactions(df)
            logger.info(f"Processed {len(transactions)} transactions")
            
            self._build_hierarchy(transactions)
            logger.info(f"Built hierarchy with {len(self.hierarchy)} levels")
            
            compressed_transactions = self._compress_transactions(transactions)
            logger.info(f"Compressed transactions to {len(compressed_transactions)} patterns")
            
            self._build_hcfp_tree(compressed_transactions)
            logger.info("FP-tree construction completed")
            
            self._mine_compressed_patterns()
            logger.info(f"Mined {len(self.compressed_patterns)} compressed patterns")
            
            self._expand_hierarchical_patterns()
            logger.info(f"Expanded to {len(self.frequent_patterns)} frequent patterns")
            
            self.trends = self._analyze_trends(df)
            logger.info("Trend analysis completed")
            
        except Exception as e:
            logger.error(f"Error during model training: {str(e)}")
            raise

    def _prepare_transactions(self, df: pd.DataFrame) -> List[List[str]]:
        """Convert DataFrame to transactions with validation"""
        if 'Member_number' in df.columns and 'itemDescription' in df.columns:
            return df.groupby('Member_number')['itemDescription'].apply(list).tolist()
        elif 'transaction_id' in df.columns and 'item' in df.columns:
            return df.groupby('transaction_id')['item'].apply(list).tolist()
        raise ValueError("DataFrame must contain either ('Member_number', 'itemDescription') or ('transaction_id', 'item')")

    def _build_hierarchy(self, transactions: List[List[str]]) -> None:
        """Build hierarchical structure with multiple compression levels"""
        freq = defaultdict(int)
        for t in transactions:
            for item in t:
                freq[item] += 1
        self.item_frequency = dict(freq)
        
        sorted_items = sorted(freq.items(), key=lambda x: -x[1])
        num_levels = max(2, math.ceil(math.log(len(sorted_items), 1/self.compression_ratio)))
        
        for level in range(num_levels):
            self.hierarchy[level] = {}
            threshold = sorted_items[int(len(sorted_items) * (self.compression_ratio ** level))][1]
            for item, count in sorted_items:
                if count >= threshold:
                    self.hierarchy[level][item] = f"L{level}_{item[:3]}"
                else:
                    break

    def _compress_transactions(self, transactions: List[List[str]]) -> List[List[str]]:
        """Apply hierarchical compression with validation"""
        compressed = []
        for t in transactions:
            compressed_t = []
            for item in t:
                for level in sorted(self.hierarchy.keys(), reverse=True):
                    if item in self.hierarchy[level]:
                        compressed_t.append(self.hierarchy[level][item])
                        break
            if compressed_t and len(compressed_t) >= 2:  # Minimum 2 items for patterns
                compressed.append(compressed_t)
        return compressed

    def _build_hcfp_tree(self, transactions: List[List[str]]) -> None:
        """Build FP-Tree with header table"""
        self.root = self.TreeNode(None, None)
        for t in transactions:
            current = self.root
            for item in t:
                if item in current.children:
                    current.children[item].increment()
                else:
                    new_node = self.TreeNode(item, current)
                    current.children[item] = new_node
                    self.header_table[item].append(new_node)
                current = current.children[item]

    def _mine_compressed_patterns(self) -> None:
        """Mine patterns with conditional pattern bases"""
        self.compressed_patterns = {}
        sorted_items = sorted(self.header_table.keys(), 
                            key=lambda x: -len(self.header_table[x]))
        
        for item in sorted_items:
            conditional_base = []
            for node in self.header_table[item]:
                path = []
                parent = node.parent
                while parent and parent.item:
                    path.append(parent.item)
                    parent = parent.parent
                if path:
                    conditional_base.append((path, node.count))
            
            if conditional_base:
                conditional_tree = self._build_conditional_tree(conditional_base)
                conditional_patterns = self._mine_conditional_patterns(item, conditional_tree)
                for pattern, count in conditional_patterns.items():
                    full_pattern = tuple(sorted(pattern + (item,)))
                    self.compressed_patterns[full_pattern] = self.compressed_patterns.get(full_pattern, 0) + count

    def _expand_hierarchical_patterns(self) -> None:
        """Expand compressed patterns to original items"""
        self.frequent_patterns = {}
        code_to_items = defaultdict(list)
        for level in self.hierarchy:
            for item, code in self.hierarchy[level].items():
                code_to_items[code].append(item)
        
        for pattern, count in self.compressed_patterns.items():
            expanded_sets = [set(code_to_items.get(item, [item])) for item in pattern]
            from itertools import product
            for combination in product(*expanded_sets):
                if len(set(combination)) == len(combination):
                    sorted_combo = tuple(sorted(combination))
                    self.frequent_patterns[sorted_combo] = self.frequent_patterns.get(sorted_combo, 0) + count
        
        self.frequent_patterns = {k: v for k, v in self.frequent_patterns.items() 
                                if v >= self.min_support}

    def _analyze_trends(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Analyze purchase trends with date information"""
        trends = {}
        if 'Date' in df.columns:
            try:
                df['Date'] = pd.to_datetime(df['Date'])
                df['DayOfWeek'] = df['Date'].dt.day_name()
                df['Hour'] = df['Date'].dt.hour
                
                trends['active_day'] = df['DayOfWeek'].mode()[0]
                trends['peak_hour'] = df['Hour'].mode()[0]
                trends['avg_basket_size'] = df.groupby('Member_number').size().mean()
                
            except Exception as e:
                logger.warning(f"Trend analysis limited: {str(e)}")
        return trends

    def recommend(self, top_n: int = 10) -> List[Tuple[Tuple[str, ...], int]]:
        """Get top recommendations with support percentage"""
        total_transactions = sum(self.item_frequency.values()) / len(self.item_frequency)  # Approximate
        return sorted(
            [(pattern, count, (count/total_transactions)*100) 
             for pattern, count in self.frequent_patterns.items()],
            key=lambda x: (-x[1], -x[2], len(x[0]))
        )[:top_n]

    def save(self, path: str) -> None:
        """Save model with compression"""
        joblib.dump(self, path, compress=3)
        logger.info(f"Model saved to {path}")

    @classmethod
    def load(cls, path: str) -> 'HCFPGrowthModel':
        """Load model with validation"""
        model = joblib.load(path)
        if not isinstance(model, HCFPGrowthModel):
            raise ValueError("Loaded object is not an HCFPGrowthModel")
        return model


def train_and_save_model(data_path: str, model_path: str = "hc_fp_growth_model.joblib") -> bool:
    """Complete training pipeline with error handling"""
    try:
        data_path = Path(data_path)
        if not data_path.exists():
            raise FileNotFoundError(f"Data file not found at {data_path}")

        logger.info(f"Loading data from {data_path}...")
        df = pd.read_csv(data_path)

        logger.info("Initializing and training model...")
        model = HCFPGrowthModel(min_support=5, min_item_frequency=5)
        model.fit(df)

        logger.info(f"Saving model to {model_path}...")
        model.save(model_path)
        return True

    except Exception as e:
        logger.error(f"Training failed: {str(e)}")
        return False


def analyze_model(model_path: str = "hc_fp_growth_model.joblib") -> Tuple[Optional[List[Tuple[str, int, float]]], Optional[Dict[str, Any]]]:
    """Complete analysis pipeline with error handling"""
    try:
        model = HCFPGrowthModel.load(model_path)
        
        logger.info("Generating recommendations...")
        recommendations = model.recommend()
        
        logger.info("Analyzing trends...")
        trends = model.trends
        
        return recommendations, trends
        
    except Exception as e:
        logger.error(f"Analysis failed: {str(e)}")
        return None, None


def main():
    """End-to-end execution of training and analysis"""
    DATA_PATH = "Groceries_dataset.csv"
    MODEL_PATH = "hc_fp_growth_model.joblib"

    # Train or load model
    if not Path(MODEL_PATH).exists():
        logger.info("Model not found, training new model...")
        if not train_and_save_model(DATA_PATH, MODEL_PATH):
            return

    # Analyze and display results
    recommendations, trends = analyze_model(MODEL_PATH)
    
    if recommendations:
        print("\nTop Recommendations:")
        for i, (items, count, support) in enumerate(recommendations, 1):
            print(f"{i}. {' + '.join(items)} - {count} occurrences ({support:.1f}% support)")
    
    if trends:
        print("\nPurchase Trends:")
        print(f"Most active day: {trends.get('active_day', 'N/A')}")
        print(f"Peak shopping hour: {trends.get('peak_hour', 'N/A')}:00")
        print(f"Average basket size: {trends.get('avg_basket_size', 'N/A'):.2f} items")

train_and_save_model("Groceries_dataset.csv", "model.joblib")
recommendations, trends = analyze_model("model.joblib")

if __name__ == "__main__":
    main()

ERROR:__main__:Training failed: Data file not found at Groceries_dataset.csv
ERROR:__main__:Analysis failed: [Errno 2] No such file or directory: 'model.joblib'
INFO:__main__:Generating recommendations...
INFO:__main__:Analyzing trends...
ERROR:__main__:Analysis failed: 'HCFPGrowthModel' object has no attribute 'trends'


In [11]:
train_and_save_model("Groceries_dataset.csv", "model.joblib")

ERROR:__main__:Training failed: Data file not found at Groceries_dataset.csv


False