# Exercise — FMCG Distribution Assignment using Pyomo
Domain: FMCG (Fast-Moving Consumer Goods)

Theme: Assign warehouses to stores optimally (assignment problem).

## Scenario



A consumer goods company has 3 warehouses and 3 major retail stores. There is a pre-unit transportation cost from each warehouse to each store.

To design a reusable "distribution assignment" block, you must compute the minimum-cost assignment of warehouse to stores, assuming:
- Each warehouse served exactly one store.
- Each store is served by exactly one warehouse.

### Input
A cost matrix:
```python
import numpy as np

cost_matrix = np.array([
    [8, 6, 7],  # Warehouse 0 to Stores 0,1,2
    [5, 9, 3],  # Warehouse 1
    [6, 4, 5],  # Warehouse 2
])

### Task
Implement:
```python
from typing import Any, Dict, List, Tuple
import numpy as np

def assign_warehouses_to_stores(cost_matrix: np.ndarray) -> Dict[str, Any]:
    """
    Solve the warehouse-store assignment problem using Pyomo.
    """
    ...
```

### Requirements
- Use Pyomo to solve the assignment problem.
- Return
```python
{
    "assignments": List[Tuple[int, int]], # (warehouse_index, store_index)
    "total_cost": float, # total cost of the assignment
}
```
- Make sure the function works for any square cost matrix (n x n), not just 3x3.


## Code implementation

In [None]:
import numpy as np
from typing import Any, Dict, List, Tuple
from pyomo.environ import ConcreteModel, Var, Objective, Constraint, Binary, minimize, SolverFactory, value

def assign_warehouses_to_stores(cost_matrix: np.ndarray) -> Dict[str, Any]:
    """
    Solve the warehouse-store assignment problem using Pyomo.
    
    This function demonstrates the assignment problem: given N warehouses and N stores,
    find the one-to-one assignment that minimizes total transportation cost.
    
    The assignment problem is a classic combinatorial optimization problem where we need
    to assign each warehouse to exactly one store (and vice versa) such that the total
    transportation cost is minimized. This is also known as the linear assignment problem
    or bipartite matching problem.
    
    Pyomo is a Python-based open-source optimization modeling language that provides a
    high-level interface for defining optimization problems. It can interface with various
    solvers (CBC, IPOPT, etc.) to solve the problem. Pyomo is particularly
    useful for modeling complex optimization problems with readable, maintainable code.

    Args:
        cost_matrix: A square numpy array of shape (n, n) where
                     cost_matrix[i, j] is the cost of assigning
                     warehouse i to store j.
                     Lower values are better (minimization problem).

    Returns:
        {
            "assignments": [(warehouse_index, store_index), ...],
            "total_cost": float
        }
        The assignments list contains tuples where each tuple (i, j) means
        warehouse i is assigned to store j.

    Raises:
        ValueError: If the matrix is not 2D square, or if the solver cannot find an optimal solution.
    """
    # STEP 1: Validate input shape
    # The assignment problem requires a square matrix (N warehouses, N stores)
    # This ensures we can create a one-to-one assignment (each warehouse to one store)
    if cost_matrix.ndim != 2 or cost_matrix.shape[0] != cost_matrix.shape[1]:
        raise ValueError("cost_matrix must be a square 2D array.")
    
    n_warehouses, n_stores = cost_matrix.shape
    
    # STEP 2: Create the Pyomo model
    # Pyomo uses ConcreteModel for models where all data is known at model creation time.
    # ConcreteModel allows us to define the model structure and data together.
    # 
    # Why Pyomo for assignment problems?
    # - Clean, readable model definition (similar to mathematical notation)
    # - Flexible solver interface (can use CBC, IPOPT, etc.)
    # - Supports both integer and continuous variables
    # - Good for educational purposes and production systems
    model = ConcreteModel()
    
    # STEP 3: Define index sets
    # Index sets help organize the model and make it more readable.
    # We'll use these sets to define variables and constraints.
    model.warehouses = range(n_warehouses)
    model.stores = range(n_stores)
    
    # STEP 4: Create decision variables
    # We create a binary variable x[i, j] for each warehouse-store pair.
    # x[i, j] = 1 means "warehouse i is assigned to store j"
    # x[i, j] = 0 means "warehouse i is NOT assigned to store j"
    #
    # Example: If we have 3 warehouses and 3 stores, we create 9 binary variables:
    # x[0, 0], x[0, 1], x[0, 2]  (warehouse 0 can be assigned to store 0, 1, or 2)
    # x[1, 0], x[1, 1], x[1, 2]  (warehouse 1 can be assigned to store 0, 1, or 2)
    # x[2, 0], x[2, 1], x[2, 2]  (warehouse 2 can be assigned to store 0, 1, or 2)
    #
    # Var(within=Binary) creates a binary decision variable (0 or 1) that the solver will determine.
    # The within parameter specifies the domain (Binary means values must be 0 or 1).
    model.x = Var(model.warehouses, model.stores, within=Binary)
    
    # STEP 5: Add constraints
    # Constraints ensure that our solution satisfies the problem requirements.
    # Without constraints, the solver might assign multiple stores to one warehouse,
    # or leave some warehouses unassigned.
    
    # Constraint 1: Each warehouse must be assigned to exactly one store
    # For each warehouse i, the sum of x[i, j] over all stores j must equal 1.
    # This means exactly one of x[i, 0], x[i, 1], ..., x[i, n_stores-1] must be 1.
    #
    # Example: For warehouse 0 with 3 stores: x[0, 0] + x[0, 1] + x[0, 2] == 1
    # This ensures warehouse 0 serves exactly one store (not zero, not two or more)
    def warehouse_assignment_rule(model, i):
        return sum(model.x[i, j] for j in model.stores) == 1
    
    model.warehouse_assignment = Constraint(model.warehouses, rule=warehouse_assignment_rule)
    
    # Constraint 2: Each store must be assigned to exactly one warehouse
    # For each store j, the sum of x[i, j] over all warehouses i must equal 1.
    # This means exactly one of x[0, j], x[1, j], ..., x[n_warehouses-1, j] must be 1.
    #
    # Example: For store 1 with 3 warehouses: x[0, 1] + x[1, 1] + x[2, 1] == 1
    # This ensures store 1 is served by exactly one warehouse (not zero, not two or more)
    def store_assignment_rule(model, j):
        return sum(model.x[i, j] for i in model.warehouses) == 1
    
    model.store_assignment = Constraint(model.stores, rule=store_assignment_rule)
    
    # STEP 6: Define the objective function
    # The objective is to minimize the total cost of all assignments.
    # Total cost = sum over all (i,j) of: cost_matrix[i, j] * x[i, j]
    #
    # Why multiply by x[i, j]? 
    # - If x[i, j] = 1 (warehouse i assigned to store j), we include cost_matrix[i, j]
    # - If x[i, j] = 0 (not assigned), we don't include it (multiply by 0)
    #
    # Example: If warehouse 0 is assigned to store 1, then:
    #   cost_matrix[0, 1] * x[0, 1] = cost_matrix[0, 1] * 1 = cost_matrix[0, 1]
    # If warehouse 0 is NOT assigned to store 2, then:
    #   cost_matrix[0, 2] * x[0, 2] = cost_matrix[0, 2] * 0 = 0 (doesn't contribute)
    def objective_rule(model):
        return sum(cost_matrix[i, j] * model.x[i, j] for i in model.warehouses for j in model.stores)
    
    model.objective = Objective(rule=objective_rule, sense=minimize)
    
    # STEP 7: Solve the model
    # Pyomo can interface with various solvers. We'll use:
    # - 'cbc': COIN-OR Branch and Cut (excellent for integer programming)
    # - 'ipopt': Interior Point Optimizer (good for linear and nonlinear problems)
    #
    # The solver uses advanced algorithms (branch-and-bound, cutting planes) to find
    # the optimal assignment that satisfies all constraints and minimizes the objective.
    solver_name = None
    for candidate in ['cbc', 'ipopt']:
        if SolverFactory(candidate).available():
            solver_name = candidate
            break
    
    if solver_name is None:
        raise RuntimeError(
            "No suitable solver found. Please install cbc or ipopt. "
            "For example: conda install -c conda-forge coincbc ipopt"
        )
    
    solver = SolverFactory(solver_name)
    # Set solver options for better performance and to ensure optimality
    if solver_name == 'cbc':
        solver.options['seconds'] = 60  # Time limit
    elif solver_name == 'ipopt':
        solver.options['max_iter'] = 1000  # Maximum iterations
    
    results = solver.solve(model, tee=False)  # tee=False suppresses solver output
    
    # STEP 8: Check if a solution was found
    # The solver returns a results object with status information.
    # We check if the solution is optimal. Other possible termination conditions include:
    # - 'optimal': Found the best possible solution ✓
    # - 'feasible': Found a valid solution, but might not be optimal
    # - 'infeasible': No solution exists (shouldn't happen for assignment problems with square matrices)
    # - 'unbounded': Problem is unbounded (shouldn't happen for assignment problems)
    # - 'error': Solver encountered an error
    if results.solver.termination_condition != 'optimal':
        raise ValueError(
            f"Assignment problem could not be solved optimally. "
            f"Termination condition: {results.solver.termination_condition}"
        )
    
    # STEP 9: Extract the solution
    # Now that the solver has found the optimal assignment, we need to:
    # 1. Find which x[i, j] variables are set to 1 (the actual assignments)
    # 2. Calculate the total cost using the original cost values
    assignments: List[Tuple[int, int]] = []
    total_cost = 0.0
    
    # Iterate through all warehouse-store pairs
    for i in model.warehouses:
        for j in model.stores:
            # value(model.x[i, j]) returns the value of the variable in the solution
            # It will be either 0 or 1 (since x[i, j] is a binary variable)
            # We check if the value is greater than 0.5 to account for potential floating-point precision
            # In practice, binary variables should be exactly 0 or 1, but this check is defensive
            if value(model.x[i, j]) > 0.5:
                # This assignment is part of the optimal solution
                assignments.append((i, j))
                # Add the cost to the total
                total_cost += float(cost_matrix[i, j])
    
    # STEP 10: Return the results
    # We return a dictionary with:
    # - assignments: A list of (warehouse_idx, store_idx) tuples showing which warehouse serves which store
    # - total_cost: The sum of costs for this optimal assignment
    #
    # This format is easy to:
    # - Display to users
    # - Serialize to JSON for APIs
    # - Use in downstream processing
    return {
        "assignments": assignments,  # List of (warehouse_idx, store_idx) pairs
        "total_cost": total_cost,  # Sum of costs for all assignments
    }

# ============================================================
# Test algorithm
# ============================================================
# if __name__ == "__main__":
cost_matrix_demo = np.array([
    [8, 6, 7],
    [5, 9, 3],
    [6, 4, 5],
])
assignment_result = assign_warehouses_to_stores(cost_matrix_demo)
print("Exercise 3 result:", assignment_result)

Exercise 3 result: {'assignments': [(0, 0), (1, 2), (2, 1)], 'total_cost': 15.0}
