In [1]:
import pandas as pd
from itertools import combinations
from typing import List, Dict, Tuple, Any

In [2]:
def calculate_combined_score(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate combined_score for each product as (purchase_frequency + recency).
    You can adjust this logic as needed.
    """
    df['combined_score'] = df['purchase_frequency'] + df['recency']
    return df

In [3]:
def calculate_categorical_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate a categorical score for a given subset of products.
    This can be as simple or complex as needed.
    For example, if the subset has all unique categories, return 1; otherwise 0.
    Since we already enforce category uniqueness, this can simply return 1.
    """
    # If you're enforcing category uniqueness elsewhere, this can always return 1.
    # Or you could implement more nuanced logic here.
    return 1.0

def calculate_business_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate a business-specific score for a given subset of products.
    This is fully customizable. For now, we return a placeholder value.
    """
    # Implement your business logic here. For example:
    # return sum(...) + ... 
    return 2.0  # Example placeholder

def calculate_final_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate the final score for a given subset of products.
    final_score = sum of combined_score (base_score) + categorical_score + business_score
    """
    base_score = sum(p[3] for p in subset)
    cat_score = calculate_categorical_score(subset)
    biz_score = calculate_business_score(subset)
    return base_score + cat_score + biz_score

In [4]:
def is_category_unique(subset: Tuple[Tuple[Any, ...], ...]) -> bool:
    """Check if all categories in the subset are unique."""
    categories = [p[1] for p in subset]
    return len(set(categories)) == len(categories)

def is_within_budget(subset: Tuple[Tuple[Any, ...], ...], budget: float) -> bool:
    """Check if the sum of prices in the subset does not exceed the budget."""
    total_price = sum(p[2] for p in subset)
    return total_price <= budget

def calculate_final_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate the final score for a given subset of products.
    final_score = sum of combined_score + business_score
    """
    base_score = sum(p[3] for p in subset)
    business_score = calculate_business_score(subset)
    return base_score + business_score

In [5]:
def generate_all_valid_bundles(
    products: List[Tuple[int, str, float, float]],
    budget: float,
    min_bundle_size: int = 2
) -> List[Dict]:
    """
    Generate all valid bundles of products that meet category uniqueness, budget constraints,
    and contain at least min_bundle_size products.

    Parameters
    ----------
    products : List[Tuple[int, str, float, float]]
        A list of tuples representing products in the form (product_id, category, price, combined_score).
    budget : float
        The maximum total price allowed for a bundle.
    min_bundle_size : int
        The minimum number of products in a bundle.

    Returns
    -------
    List[Dict]
        A list of dictionaries, each representing a valid bundle:
        {
            'bundle': tuple_of_products,
            'price': float,
            'score': float
        }
    """
    valid_bundles = []
    n = len(products)
    for r in range(min_bundle_size, n + 1):
        for subset in combinations(products, r):
            if is_category_unique(subset) and is_within_budget(subset, budget):
                final_score = calculate_final_score(subset)
                valid_bundles.append({
                    'bundle': subset,
                    'price': sum(p[2] for p in subset),
                    'score': final_score
                })
    return valid_bundles

In [6]:
def is_extendable(
    bundle: Dict,
    products: List[Tuple[int, str, float, float]],
    budget: float,
    min_bundle_size: int = 2
) -> bool:
    """
    Check if a given bundle can be extended by adding another product without violating constraints.
    
    Parameters
    ----------
    bundle : Dict
        A dictionary representing the current bundle with keys 'bundle', 'price'.
    products : List[Tuple[int, str, float, float]]
        All products available to potentially add to the bundle.
    budget : float
        The maximum allowed total price.
    min_bundle_size : int
        The minimum number of products in a bundle.
    
    Returns
    -------
    bool
        True if the bundle can be extended by adding at least one more product, False otherwise.
    """
    current_ids = {p[0] for p in bundle['bundle']}
    current_categories = {p[1] for p in bundle['bundle']}
    current_price = bundle['price']
    
    # Even if min_bundle_size is 2, once we have a valid bundle we can try extending it further.
    # If a product can be added without violating constraints, return True.
    for p in products:
        if p[0] not in current_ids:
            if p[1] not in current_categories and (current_price + p[2] <= budget):
                return True
    return False

In [7]:
def generate_maximal_bundles(
    products: List[Tuple[int, str, float, float]],
    budget: float,
    min_bundle_size: int = 2
) -> List[Dict]:
    """
    Generate only maximal bundles from all valid bundles.
    A maximal bundle is one that cannot be extended further without violating constraints.

    Parameters
    ----------
    products : List[Tuple[int, str, float, float]]
        A list of tuples representing products.
    budget : float
        The maximum total price allowed.
    min_bundle_size : int
        The minimum number of products in a bundle.
    """
    all_bundles = generate_all_valid_bundles(products, budget, min_bundle_size)
    maximal_bundles = []
    for b in all_bundles:
        if not is_extendable(b, products, budget, min_bundle_size):
            maximal_bundles.append(b)
    return maximal_bundles

In [8]:
def build_result_rows(
    group: pd.DataFrame,
    consultant_id: Any,
    maximal_bundles: List[Dict]
) -> List[Dict]:
    """
    Convert maximal bundles into a list of rows (dictionaries) for the output.
    
    Parameters
    ----------
    group : pd.DataFrame
        DataFrame filtered for a single consultant, containing all product info.
    consultant_id : Any
        The ID of the consultant being processed.
    maximal_bundles : List[Dict]
        The maximal bundles for this consultant.
    
    Returns
    -------
    List[Dict]
        A list of dictionaries, each representing a product in a maximal bundle.
    """
    results = []
    for i, mb in enumerate(maximal_bundles, start=1):
        bundle_id = f"{consultant_id}_Bundle_{i}"
        product_ids = [p[0] for p in mb['bundle']]
        
        for pid in product_ids:
            product_row = group[group['product_id'] == pid].iloc[0]
            result_row = {
                'consultant_id': consultant_id,
                'bundle_id': bundle_id,
                'product_id': product_row['product_id'],
                'category': product_row['category'],
                'brand': product_row['brand'],
                'price': product_row['price'],
                'purchase_frequency': product_row['purchase_frequency'],
                'recency': product_row['recency'],
                'combined_score': product_row['combined_score'],
                'bundle_score': mb['score']
            }
            results.append(result_row)
    return results

In [9]:
def process_csv(
    input_csv: str,
    output_csv: str,
    budget: float,
    min_bundle_size: int = 2,
    max_rows: int = None
) -> None:
    """
    Main function to:
    - Load data from input CSV
    - Compute combined_score
    - Generate maximal bundles per consultant
    - Output results to a CSV
    
    Parameters
    ----------
    input_csv : str
        Path to the input CSV file.
    output_csv : str
        Path where the output CSV file should be written.
    budget : float
        The purchasing power or maximum allowed total price for a bundle.
    min_bundle_size : int
        The minimum number of products required in a bundle.
    max_rows : int, optional
        If given, read only the specified number of rows from the input file.
    """
    
    
    # Load input CSV
    df = pd.read_csv(input_csv)
    if max_rows is not None:
        df = df.head(max_rows)
    
    # Compute combined_score
    df = calculate_combined_score(df)
    
    # Prepare final results list
    results = []
    
    # Process each consultant separately
    for consultant_id, group in df.groupby('consultant_id'):
        # Convert the consultant's products into tuples: (product_id, category, price, combined_score)
        products = list(group[['product_id', 'category', 'price', 'combined_score']].itertuples(index=False, name=None))
        
        # Generate maximal bundles for this consultant
        maximal = generate_maximal_bundles(products, budget, min_bundle_size)
        
        # Build result rows from maximal bundles
        consultant_results = build_result_rows(group, consultant_id, maximal)
        results.extend(consultant_results)
    
    # Convert results to DataFrame
    results_df = pd.DataFrame(results)
    
    # Sort the results if desired
    results_df.sort_values(by=['consultant_id', 'bundle_id', 'combined_score'], inplace=True, ascending=[True, True, False])
    
    # Write to CSV
    results_df.to_csv(output_csv, index=False)

In [10]:
process_csv('data/F1_test.csv', 'data/F1_test_res.csv', budget=200, min_bundle_size=2, max_rows=50)

In [13]:
import pandas as pd
from itertools import combinations
from typing import List, Dict, Tuple, Any


def normalize_columns(df: pd.DataFrame, freq_col: str = 'purchase_frequency', rec_col: str = 'recency') -> pd.DataFrame:
    """
    Normalize frequency and recency using min-max normalization:
    normalized_value = (value - min) / (max - min)
    Adds two new columns: 'normalized_f' and 'normalized_r'.
    """
    for col, norm_col in [(freq_col, 'normalized_f'), (rec_col, 'normalized_r')]:
        min_val = df[col].min()
        max_val = df[col].max()
        if max_val > min_val:
            df[norm_col] = (df[col] - min_val) / (max_val - min_val)
        else:
            # If all values are the same, normalization would cause divide by zero. Set all to 1 or 0.
            df[norm_col] = 1.0
    return df


def calculate_normalized_score(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate the normalized_score for each product as (normalized_f + normalized_r).
    """
    df['normalized_score'] = df['normalized_f'] + df['normalized_r']
    return df


def calculate_business_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate a business-specific score for a given subset of products.
    Customize this logic as per business requirements.
    """
    # Example placeholder: assign a constant value
    return 2.0


def is_category_unique(subset: Tuple[Tuple[Any, ...], ...]) -> bool:
    """Check if all categories in the subset are unique."""
    categories = [p[1] for p in subset]
    return len(set(categories)) == len(categories)


def is_within_budget(subset: Tuple[Tuple[Any, ...], ...], budget: float) -> bool:
    """Check if the sum of prices in the subset does not exceed the budget."""
    total_price = sum(p[2] for p in subset)
    return total_price <= budget


def calculate_final_score(subset: Tuple[Tuple[Any, ...], ...]) -> float:
    """
    Calculate the final score for a given subset of products.
    final_score = sum of normalized_score + business_score
    """
    base_score = sum(p[3] for p in subset)  # p[3] is normalized_score
    business = calculate_business_score(subset)
    return base_score + business


def generate_all_valid_bundles(
    products: List[Tuple[int, str, float, float]],
    budget: float,
    min_bundle_size: int = 2
) -> List[Dict]:
    """
    Generate all valid bundles of products that:
    - Have at least min_bundle_size products
    - Have unique categories
    - Are within the budget
    """
    valid_bundles = []
    n = len(products)
    for r in range(min_bundle_size, n + 1):
        for subset in combinations(products, r):
            if is_category_unique(subset) and is_within_budget(subset, budget):
                final_score = calculate_final_score(subset)
                bundle_price = sum(p[2] for p in subset)
                valid_bundles.append({
                    'bundle': subset,  # subset of products
                    'price': bundle_price,
                    'score': final_score
                })
    return valid_bundles


def is_extendable(
    bundle: Dict,
    products: List[Tuple[int, str, float, float]],
    budget: float
) -> bool:
    """
    Check if a given bundle can be extended by adding another product without violating constraints.
    """
    current_ids = {p[0] for p in bundle['bundle']}
    current_categories = {p[1] for p in bundle['bundle']}
    current_price = bundle['price']
    
    for p in products:
        if p[0] not in current_ids:
            if p[1] not in current_categories and (current_price + p[2] <= budget):
                return True
    return False


def generate_maximal_bundles(
    products: List[Tuple[int, str, float, float]],
    budget: float,
    min_bundle_size: int = 2
) -> List[Dict]:
    """
    Generate only maximal bundles:
    A maximal bundle is one that cannot be extended further without violating constraints.
    """
    all_bundles = generate_all_valid_bundles(products, budget, min_bundle_size)
    maximal_bundles = []
    for b in all_bundles:
        if not is_extendable(b, products, budget):
            maximal_bundles.append(b)
    return maximal_bundles


def build_bundle_rows_for_consultant(
    group: pd.DataFrame,
    consultant_id: Any,
    maximal_bundles: List[Dict]
) -> List[Dict]:
    """
    Convert maximal bundles into rows. Each row represents one bundle:
    - consultant_id
    - products: list of product_ids in the bundle
    - price: total price of the bundle
    - total_normalized_score: sum of normalized_score of products in the bundle
    - bundle_score: final score of the bundle
    """
    results = []
    for mb in maximal_bundles:
        product_ids = [p[0] for p in mb['bundle']]
        # Extract product info from group
        subset_df = group[group['product_id'].isin(product_ids)]
        
        total_normalized_score = subset_df['normalized_score'].sum()
        # The mb['score'] is already final_score = total_normalized_score + business_score
        # Here total_normalized_score is extracted from the DataFrame, which should match the sum of p[3].
        
        result_row = {
            'consultant_id': consultant_id,
            'products': product_ids,
            'price': mb['price'],
            'total_normalized_score': total_normalized_score,
            'bundle_score': mb['score']
        }
        results.append(result_row)
    return results


def process_csv(
    input_csv: str,
    output_csv: str,
    budget: float,
    min_bundle_size: int = 2,
    max_rows: int = None
) -> None:
    """
    Main function to:
    - Load data from input CSV
    - Normalize frequency and recency
    - Compute normalized_score
    - Generate maximal bundles per consultant
    - Output results to a CSV (one row per bundle)
    
    Parameters
    ----------
    input_csv : str
        Path to the input CSV file.
    output_csv : str
        Path where the output CSV file should be written.
    budget : float
        The purchasing power or maximum allowed total price for a bundle.
    min_bundle_size : int
        The minimum number of products required in a bundle.
    max_rows : int, optional
        If given, read only the specified number of rows from the input file.
    """
    # Load input CSV
    df = pd.read_csv(input_csv)
    if max_rows is not None:
        df = df.head(max_rows)
    
    # Normalize frequency and recency
    df = normalize_columns(df, freq_col='purchase_frequency', rec_col='recency')
    
    # Compute normalized_score
    df = calculate_normalized_score(df)
    
    # Prepare final results list
    results = []
    
    # Process each consultant separately
    for consultant_id, group in df.groupby('consultant_id'):
        # Convert to tuples: (product_id, category, price, normalized_score)
        products = list(group[['product_id', 'category', 'price', 'normalized_score']].itertuples(index=False, name=None))
        
        # Generate maximal bundles for this consultant
        maximal = generate_maximal_bundles(products, budget, min_bundle_size)
        
        # Build result rows for each maximal bundle
        consultant_results = build_bundle_rows_for_consultant(group, consultant_id, maximal)
        results.extend(consultant_results)
    
    # Convert results to DataFrame
    results_df = pd.DataFrame(results)
    
    # Sort the results if desired
    # Sort by consultant_id and bundle_score descending if you want top bundles first
    results_df.sort_values(by=['consultant_id', 'bundle_score'], inplace=True, ascending=[True, False])
    
    # Write to CSV
    results_df.to_csv(output_csv, index=False)


if __name__ == "__main__":
    # Example usage:
    # In practice, replace 'data/F1_test.csv' and 'data/F1_test_res.csv' with actual file paths
    process_csv('data/F1_test.csv', 'data/F1_test_res.csv', budget=200, min_bundle_size=2, max_rows=500)