In [None]:
import pulp
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any

In [14]:

class AdvancedSupplyPlanningModel:
    def __init__(self):
        """Initialize the advanced supply chain optimization model."""
        self.model = pulp.LpProblem("Advanced_Supply_Chain_Optimization", pulp.LpMinimize)
        self.periods = []
        self.products = []
        self.facilities = []
        self.variables = {}
        self.scenario_results = {}

    # [Previous methods remain the same]
    def add_facilities(self, facilities: List[str]):
        """Add production and distribution facilities."""
        self.facilities = facilities
        
    def add_products(self, products: List[str]):
        """Add products to the model."""
        self.products = products
        
    def add_periods(self, periods: List[int]):
        """Add planning periods to the model."""
        self.periods = periods
        
    def setup_variables(self, initial_inventory: Dict[str, Dict[str, float]] = None):
        """
        Set up decision variables for a comprehensive supply chain model.
        Includes production, transportation, inventory, backlog, and workforce variables.
        """
        if initial_inventory is None:
            initial_inventory = {f: {p: 0 for p in self.products} for f in self.facilities}
        
        # Production variables per facility and product
        self.variables['production'] = pulp.LpVariable.dicts(
            "production", 
            ((f, p, t) for f in self.facilities for p in self.products for t in self.periods), 
            lowBound=0
        )
        
        # Setup variables
        self.variables['setup'] = pulp.LpVariable.dicts(
            "setup", 
            ((f, p, t) for f in self.facilities for p in self.products for t in self.periods), 
            cat='Binary'
        )
        
        # Inter-facility transportation variables
        self.variables['transport'] = pulp.LpVariable.dicts(
            "transport", 
            ((f1, f2, p, t) for f1 in self.facilities for f2 in self.facilities 
             for p in self.products for t in self.periods if f1 != f2), 
            lowBound=0
        )
        
        # Workforce assignment variables
        self.variables['workforce'] = pulp.LpVariable.dicts(
            "workforce", 
            ((f, s, t) for f in self.facilities for s in ['skilled', 'unskilled'] for t in self.periods), 
            lowBound=0, 
            cat='Integer'
        )
        
        # Hiring and firing variables
        self.variables['hire'] = pulp.LpVariable.dicts(
            "hire", 
            ((f, s, t) for f in self.facilities for s in ['skilled', 'unskilled'] for t in self.periods), 
            lowBound=0, 
            cat='Integer'
        )
        
        self.variables['fire'] = pulp.LpVariable.dicts(
            "fire", 
            ((f, s, t) for f in self.facilities for s in ['skilled', 'unskilled'] for t in self.periods), 
            lowBound=0, 
            cat='Integer'
        )
        
        # Inventory and backlog variables
        self.variables['inventory'] = pulp.LpVariable.dicts(
            "inventory", 
            ((f, p, t) for f in self.facilities for p in self.products for t in self.periods), 
            lowBound=0
        )
        
        self.variables['backlog'] = pulp.LpVariable.dicts(
            "backlog", 
            ((f, p, t) for f in self.facilities for p in self.products for t in self.periods), 
            lowBound=0
        )
        
        # Overtime variables
        self.variables['overtime'] = pulp.LpVariable.dicts(
            "overtime", 
            ((f, t) for f in self.facilities for t in self.periods), 
            lowBound=0
        )
        
        self.initial_inventory = initial_inventory
        
    def add_demand_satisfaction_constraints(self, demand: Dict[str, Dict[int, float]]):
        """
        Add demand satisfaction constraints across facilities.
        Allows inventory transfers and backlogging to meet demand.
        """
        for t in self.periods:
            for p in self.products:
                # Total supply must meet or exceed demand
                self.model += (
                    pulp.lpSum(
                        self.variables['inventory'][f, p, t] - 
                        self.variables['backlog'][f, p, t] 
                        for f in self.facilities
                    ) >= demand[p][t]
                )
        
    def add_material_requirements_constraints(self, bom: Dict[str, Dict[str, float]]):
        """
        Add bill of materials (BOM) constraints.
        Tracks material consumption for each product's production.
        """
        for f in self.facilities:
            for t in self.periods:
                for material, requirement in bom.items():
                    # Constraint ensuring material availability
                    self.model += (
                        pulp.lpSum(
                            requirement[p] * self.variables['production'][f, p, t] 
                            for p in self.products
                        ) <= requirement['capacity']
                    )
        
    def add_workforce_constraints(self, workforce_params: Dict):
        """
        Add workforce management constraints.
        Includes hiring, firing, skill mix, and training requirements.
        """
        for f in self.facilities:
            for t in self.periods:
                # Workforce size constraints
                self.model += (
                    self.variables['workforce'][f, 'skilled', t] >= 
                    workforce_params['min_skilled']
                )
                
                # Hiring and firing limits
                self.model += (
                    self.variables['hire'][f, 'skilled', t] <= 
                    workforce_params['max_hire']
                )
                
                # Skill mix requirement
                self.model += (
                    self.variables['workforce'][f, 'skilled', t] >= 
                    workforce_params['skill_mix_ratio'] * 
                    pulp.lpSum(
                        self.variables['workforce'][f, s, t] 
                        for s in ['skilled', 'unskilled']
                    )
                )
        
    def set_objective_function(self, cost_parameters: Dict):
        """
        Set a comprehensive objective function minimizing total supply chain costs.
        Includes production, inventory, transportation, workforce, and backlog costs.
        """
        self.model += (
            # Production costs
            pulp.lpSum(
                cost_parameters['production_cost'][f][p] * 
                self.variables['production'][f, p, t]
                for f in self.facilities for p in self.products for t in self.periods
            ) +
            # Setup costs
            pulp.lpSum(
                cost_parameters['setup_cost'][f][p] * 
                self.variables['setup'][f, p, t]
                for f in self.facilities for p in self.products for t in self.periods
            ) +
            # Transportation costs
            pulp.lpSum(
                cost_parameters['transport_cost'][f1][f2] * 
                self.variables['transport'][f1, f2, p, t]
                for f1 in self.facilities for f2 in self.facilities 
                for p in self.products for t in self.periods if f1 != f2
            ) +
            # Inventory holding costs
            pulp.lpSum(
                cost_parameters['inventory_cost'][f][p] * 
                self.variables['inventory'][f, p, t]
                for f in self.facilities for p in self.products for t in self.periods
            ) +
            # Backlog costs
            pulp.lpSum(
                cost_parameters['backlog_cost'][f][p] * 
                self.variables['backlog'][f, p, t]
                for f in self.facilities for p in self.products for t in self.periods
            ) +
            # Workforce costs
            pulp.lpSum(
                cost_parameters['workforce_cost'][f][s] * 
                self.variables['workforce'][f, s, t]
                for f in self.facilities for s in ['skilled', 'unskilled'] for t in self.periods
            ) +
            # Hiring and firing costs
            pulp.lpSum(
                (cost_parameters['hire_cost'] * self.variables['hire'][f, s, t] +
                 cost_parameters['fire_cost'] * self.variables['fire'][f, s, t])
                for f in self.facilities for s in ['skilled', 'unskilled'] for t in self.periods
            )
        )



    ####################

    def get_results(self):
        """
        Extract detailed optimization results with comprehensive metrics.
        Returns a dictionary of DataFrames for different optimization aspects.
        """
        results = {
            'production': [],
            'inventory': [],
            'transportation': [],
            'workforce': [],
            'financial_summary': {}
        }

        # Production Results
        for f in self.facilities:
            for p in self.products:
                for t in self.periods:
                    results['production'].append({
                        'facility': f,
                        'product': p,
                        'period': t,
                        'quantity': self.variables['production'][f, p, t].value(),
                        'setup': self.variables['setup'][f, p, t].value()
                    })

        # Inventory Results
        for f in self.facilities:
            for p in self.products:
                for t in self.periods:
                    results['inventory'].append({
                        'facility': f,
                        'product': p,
                        'period': t,
                        'inventory': self.variables['inventory'][f, p, t].value(),
                        'backlog': self.variables['backlog'][f, p, t].value()
                    })

        # Transportation Results
        for f1 in self.facilities:
            for f2 in self.facilities:
                for p in self.products:
                    for t in self.periods:
                        if f1 != f2:
                            transport_qty = self.variables['transport'][f1, f2, p, t].value()
                            if transport_qty > 0:
                                results['transportation'].append({
                                    'from_facility': f1,
                                    'to_facility': f2,
                                    'product': p,
                                    'period': t,
                                    'quantity': transport_qty
                                })

        # Workforce Results
        for f in self.facilities:
            for t in self.periods:
                results['workforce'].append({
                    'facility': f,
                    'period': t,
                    'skilled_workforce': self.variables['workforce'][f, 'skilled', t].value(),
                    'unskilled_workforce': self.variables['workforce'][f, 'unskilled', t].value(),
                    'overtime': self.variables['overtime'][f, t].value()
                })

        # Convert to DataFrames
        results['production_df'] = pd.DataFrame(results['production'])
        results['inventory_df'] = pd.DataFrame(results['inventory'])
        results['transportation_df'] = pd.DataFrame(results['transportation'])
        results['workforce_df'] = pd.DataFrame(results['workforce'])

        return results

    def generate_scenario_comparison(self, scenarios):
        """
        Run multiple scenarios and provide comprehensive comparison.
        
        Args:
            scenarios (dict): Dictionary of scenario configurations
        
        Returns:
            dict: Comparison results for each scenario
        """
        comparison_results = {}

        for scenario_name, scenario_config in scenarios.items():
            # Reset the model
            self.__init__()

            # Configure scenario
            self.add_facilities(scenario_config.get('facilities', ['Factory1']))
            self.add_products(scenario_config.get('products', ['ProductA']))
            self.add_periods(scenario_config.get('periods', list(range(4))))

            # Setup variables
            self.setup_variables(scenario_config.get('initial_inventory'))

            # Add constraints and objective function based on scenario
            self.add_demand_satisfaction_constraints(scenario_config['demand'])
            self.add_workforce_constraints(scenario_config['workforce_params'])
            
            if 'material_requirements' in scenario_config:
                self.add_material_requirements_constraints(scenario_config['material_requirements'])

            self.set_objective_function(scenario_config['cost_parameters'])

            # Solve and store results
            solve_status = self.solve()
            results = self.get_results()
            
            comparison_results[scenario_name] = {
                'status': solve_status,
                'results': results
            }

        return comparison_results

    def visualization_suite(self, results):
        """
        Create comprehensive visualizations for supply chain analysis.
        
        Args:
            results (dict): Results from get_results() method
        
        Returns:
            dict: Matplotlib figure objects
        """
        figures = {}

        # Production Overview
        plt.figure(figsize=(15, 10))
        plt.subplot(2, 2, 1)
        prod_df = results['production_df']
        prod_pivot = prod_df.pivot_table(
            index='period', 
            columns=['facility', 'product'], 
            values='quantity', 
            aggfunc='sum'
        )
        prod_pivot.plot(kind='bar', ax=plt.gca())
        plt.title('Production Quantities by Facility and Product')
        plt.xlabel('Period')
        plt.ylabel('Quantity')
        plt.tight_layout()
        figures['production_overview'] = plt.gcf()

        # Inventory Analysis
        plt.figure(figsize=(15, 10))
        inv_df = results['inventory_df']
        inv_pivot = inv_df.pivot_table(
            index='period', 
            columns=['facility', 'product'], 
            values='inventory', 
            aggfunc='sum'
        )
        inv_pivot.plot(kind='line', marker='o')
        plt.title('Inventory Levels by Facility and Product')
        plt.xlabel('Period')
        plt.ylabel('Inventory')
        plt.tight_layout()
        figures['inventory_analysis'] = plt.gcf()

        # Transportation Heatmap
        plt.figure(figsize=(12, 8))
        trans_df = results['transportation_df']
        trans_pivot = trans_df.pivot_table(
            index=['from_facility', 'to_facility'], 
            columns='product', 
            values='quantity', 
            aggfunc='sum'
        )
        sns.heatmap(trans_pivot, annot=True, cmap='YlGnBu')
        plt.title('Inter-Facility Transportation')
        plt.tight_layout()
        figures['transportation_heatmap'] = plt.gcf()

        # Workforce Composition
        plt.figure(figsize=(15, 10))
        workforce_df = results['workforce_df']
        workforce_pivot = workforce_df.pivot_table(
            index='facility', 
            columns='period', 
            values=['skilled_workforce', 'unskilled_workforce'], 
            aggfunc='sum'
        )
        workforce_pivot.plot(kind='bar', stacked=True)
        plt.title('Workforce Composition by Facility')
        plt.xlabel('Facility')
        plt.ylabel('Number of Workers')
        plt.tight_layout()
        figures['workforce_composition'] = plt.gcf()

        return figures

def create_comprehensive_scenario():
    """
    Generate a comprehensive sample scenario for demonstration.
    """
    scenarios = {
        'Base_Scenario': {
            'facilities': ['Factory1', 'Factory2', 'Warehouse1'],
            'products': ['ProductA', 'ProductB'],
            'periods': list(range(6)),
            'initial_inventory': {
                'Factory1': {'ProductA': 100, 'ProductB': 50},
                'Factory2': {'ProductA': 75, 'ProductB': 25},
                'Warehouse1': {'ProductA': 50, 'ProductB': 30}
            },
            'demand': {
                'ProductA': {0: 120, 1: 140, 2: 160, 3: 130, 4: 110, 5: 150},
                'ProductB': {0: 80, 1: 90, 2: 110, 3: 100, 4: 85, 5: 95}
            },
            'workforce_params': {
                'min_skilled': 10,
                'max_hire': 5,
                'skill_mix_ratio': 0.4
            },
            'material_requirements': {
                'Raw_Material_A': {
                    'ProductA': 2,
                    'ProductB': 1,
                    'capacity': 500
                }
            },
            'cost_parameters': {
                'production_cost': {
                    'Factory1': {'ProductA': 10, 'ProductB': 12},
                    'Factory2': {'ProductA': 11, 'ProductB': 13}
                },
                'setup_cost': {
                    'Factory1': {'ProductA': 500, 'ProductB': 600},
                    'Factory2': {'ProductA': 550, 'ProductB': 650}
                },
                'transport_cost': {
                    'Factory1': {'Factory2': 5, 'Warehouse1': 3},
                    'Factory2': {'Factory1': 5, 'Warehouse1': 4}
                },
                'inventory_cost': {
                    'Factory1': {'ProductA': 2, 'ProductB': 2},
                    'Factory2': {'ProductA': 3, 'ProductB': 3}
                },
                'backlog_cost': {
                    'Factory1': {'ProductA': 20, 'ProductB': 20},
                    'Factory2': {'ProductA': 22, 'ProductB': 22}
                },
                'workforce_cost': {
                    'Factory1': {'skilled': 50, 'unskilled': 30},
                    'Factory2': {'skilled': 55, 'unskilled': 35}
                },
                'hire_cost': 1000,
                'fire_cost': 1500
            }
        },
        # Additional scenarios can be added here for comparison
    }

    model = AdvancedSupplyPlanningModel()
    return model.generate_scenario_comparison(scenarios)

# Demonstration of usage
def main():
    # Run comprehensive scenario
    results = create_comprehensive_scenario()

    # Visualize results
    for scenario_name, scenario_data in results.items():
        print(f"Scenario: {scenario_name}")
        print(f"Optimization Status: {scenario_data['status']}")
        
        # Create visualizations
        visualizations = model.visualization_suite(scenario_data['results'])
        
        # You could save these figures or display them
        # for viz_name, fig in visualizations.items():
        #     fig.savefig(f'{scenario_name}_{viz_name}.png')

if __name__ == "__main__":
    main()

KeyError: 'Warehouse1'