In [2]:
%pip install PyPortfolioOpt


Note: you may need to restart the kernel to use updated packages.


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
from typing import Dict, List, Tuple, Optional

# Import PyPortfolioOpt components
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from pypfopt import objective_functions
from pypfopt import black_litterman
from pypfopt.discrete_allocation import DiscreteAllocation

In [2]:


# Define asset classes and their characteristics
ASSETS = {
    "us_large_cap": {"expected_return": 0.08, "volatility": 0.16, "label": "US Large Cap Equities"},
    "us_small_cap": {"expected_return": 0.10, "volatility": 0.20, "label": "US Small Cap Equities"},
    "intl_developed": {"expected_return": 0.07, "volatility": 0.18, "label": "International Developed Equities"},
    "emerging_markets": {"expected_return": 0.11, "volatility": 0.24, "label": "Emerging Markets Equities"},
    "us_bonds": {"expected_return": 0.03, "volatility": 0.05, "label": "US Bonds"},
    "high_yield_bonds": {"expected_return": 0.05, "volatility": 0.10, "label": "High Yield Bonds"},
    "treasury_inflation": {"expected_return": 0.025, "volatility": 0.04, "label": "Treasury Inflation-Protected Securities"},
    "real_estate": {"expected_return": 0.07, "volatility": 0.15, "label": "Real Estate (REITs)"},
    "commodities": {"expected_return": 0.05, "volatility": 0.18, "label": "Commodities"},
    "bitcoin": {"expected_return": 0.20, "volatility": 0.65, "label": "Bitcoin"}
}

# Define correlation matrix (simplified)
def get_correlation_matrix():
    # This is a simplified correlation matrix (not real market data)
    corr = np.array([
        [1.00, 0.85, 0.70, 0.60, 0.10, 0.30, 0.20, 0.50, 0.20, 0.15],  # us_large_cap
        [0.85, 1.00, 0.65, 0.60, 0.05, 0.30, 0.15, 0.45, 0.20, 0.15],  # us_small_cap
        [0.70, 0.65, 1.00, 0.75, 0.10, 0.30, 0.20, 0.40, 0.25, 0.10],  # intl_developed
        [0.60, 0.60, 0.75, 1.00, 0.00, 0.25, 0.10, 0.35, 0.30, 0.15],  # emerging_markets
        [0.10, 0.05, 0.10, 0.00, 1.00, 0.60, 0.80, 0.10, 0.00, -0.05], # us_bonds
        [0.30, 0.30, 0.30, 0.25, 0.60, 1.00, 0.55, 0.30, 0.15, 0.00],  # high_yield_bonds
        [0.20, 0.15, 0.20, 0.10, 0.80, 0.55, 1.00, 0.15, 0.20, -0.05], # treasury_inflation
        [0.50, 0.45, 0.40, 0.35, 0.10, 0.30, 0.15, 1.00, 0.35, 0.10],  # real_estate
        [0.20, 0.20, 0.25, 0.30, 0.00, 0.15, 0.20, 0.35, 1.00, 0.25],  # commodities
        [0.15, 0.15, 0.10, 0.15, -0.05, 0.00, -0.05, 0.10, 0.25, 1.00]  # bitcoin
    ])
    return corr

# Create expected returns and covariance matrix in format for PyPortfolioOpt
def prepare_optimization_inputs(assets=ASSETS, correlation_matrix=None):
    """Convert our asset data to format expected by PyPortfolioOpt"""
    if correlation_matrix is None:
        correlation_matrix = get_correlation_matrix()
    
    asset_names = list(assets.keys())
    expected_returns_dict = {asset: assets[asset]["expected_return"] for asset in asset_names}
    volatilities = np.array([assets[asset]["volatility"] for asset in asset_names])
    
    # Create a covariance matrix
    cov_matrix = np.outer(volatilities, volatilities) * correlation_matrix
    
    # Convert to pandas Series and DataFrame for PyPortfolioOpt
    mu = pd.Series(expected_returns_dict)
    S = pd.DataFrame(cov_matrix, index=asset_names, columns=asset_names)
    
    return mu, S, asset_names

# Define the user interface
class PortfolioUI:
    def __init__(self):
        self.risk_tolerances = ["Conservative", "Moderate", "Aggressive"]
        self.time_horizons = ["Short-term (1-3 years)", "Medium-term (3-7 years)", "Long-term (7+ years)"]
        self.investing_factors = ["Value", "Growth", "Small Cap", "Dividend", "ESG", "Momentum", "Bitcoin"]
        
    def collect_user_preferences(self) -> Dict:
        """Simulate collecting user preferences from a UI"""
        # For this example, I'll return some preset preferences
        # In a real application, this would come from user inputs
        return {
            "risk_tolerance": "Moderate",
            "time_horizon": "Medium-term (3-7 years)",
            "investing_factors": ["Value", "Small Cap", "Momentum"],
            "initial_investment": 10000,
            "monthly_contribution": 500
        }

# Expert Model Base Class
class ExpertModel:
    """Base class for portfolio optimization expert models"""
    
    def __init__(self, assets=ASSETS, correlation_matrix=None):
        self.assets = assets
        if correlation_matrix is None:
            correlation_matrix = get_correlation_matrix()
        self.correlation_matrix = correlation_matrix
        
        # Prepare inputs for PyPortfolioOpt
        self.mu, self.S, self.asset_names = prepare_optimization_inputs(assets, correlation_matrix)
        
    def optimize_portfolio(self, risk_tolerance="Moderate", factor_preferences=None):
        """
        Optimize portfolio based on risk tolerance and factors
        To be implemented by subclasses for each specific investment horizon
        """
        raise NotImplementedError("Subclasses must implement optimize_portfolio")
    
    def _apply_factor_constraints(self, ef, factors):
        """Apply factor-based constraints to the efficient frontier"""
        if factors is None:
            factors = []
            
        if "Value" in factors:
            # Enforce minimum allocation to value-oriented assets
            value_assets = ["us_large_cap", "us_bonds"]
            valid_assets = [a for a in value_assets if a in self.asset_names]
            if valid_assets:
                ef.add_constraint(lambda w: sum(w[a] for a in valid_assets) >= 0.2)
        
        if "Small Cap" in factors:
            # Include small cap exposure if requested
            if "us_small_cap" in self.asset_names:
                ef.add_constraint(lambda w: w["us_small_cap"] >= 0.1)
        
        if "Momentum" in factors:
            # Momentum strategy favors assets with strong recent performance
            momentum_assets = ["us_small_cap", "emerging_markets"]
            valid_assets = [a for a in momentum_assets if a in self.asset_names]
            if valid_assets:
                ef.add_constraint(lambda w: sum(w[a] for a in valid_assets) >= 0.15)
        
        if "Growth" in factors:
            # Growth strategy favors higher-growth assets
            growth_assets = ["us_small_cap", "emerging_markets", "intl_developed"]
            valid_assets = [a for a in growth_assets if a in self.asset_names]
            if valid_assets:
                ef.add_constraint(lambda w: sum(w[a] for a in valid_assets) >= 0.25)
                
        if "Dividend" in factors:
            # Dividend strategy favors income-generating assets
            dividend_assets = ["us_large_cap", "real_estate"]
            valid_assets = [a for a in dividend_assets if a in self.asset_names]
            if valid_assets:
                ef.add_constraint(lambda w: sum(w[a] for a in valid_assets) >= 0.3)
                
        if "ESG" in factors:
            # ESG strategy might avoid certain assets
            if "commodities" in self.asset_names:
                ef.add_constraint(lambda w: w["commodities"] <= 0.05)
        
        if "Bitcoin" in factors:
            # If bitcoin is requested, include it (with limits)
            if "bitcoin" in self.asset_names:
                # Allow bitcoin but keep it reasonable - subclasses will override
                ef.add_constraint(lambda w: w["bitcoin"] <= 0.1)
        elif "bitcoin" in self.asset_names:
            # If bitcoin not requested, exclude it
            ef.add_constraint(lambda w: w["bitcoin"] == 0)
            
        return ef

# Specific expert models
class ShortTermModel(ExpertModel):
    """Expert model for short-term goals (1-3 years)"""
    
    def __init__(self, assets=ASSETS, correlation_matrix=None):
        # For short-term, adjust expected returns and risk
        short_term_assets = assets.copy()
        for asset_name, asset_data in short_term_assets.items():
            # Short-term investments are more sensitive to immediate market conditions
            short_term_assets[asset_name] = asset_data.copy()
            # More conservative return estimates for short-term
            short_term_assets[asset_name]["expected_return"] *= 0.8
            
        super().__init__(short_term_assets, correlation_matrix)
    
    def optimize_portfolio(self, risk_tolerance="Moderate", factor_preferences=None):
        """Generate portfolio optimized for short-term horizon"""
        # Get target risk level based on risk tolerance
        risk_targets = {
            "Conservative": 0.05,
            "Moderate": 0.08,
            "Aggressive": 0.12
        }
        target_risk = risk_targets.get(risk_tolerance, 0.08)
        
        # Create efficient frontier object
        ef = EfficientFrontier(self.mu, self.S)
        
        # Add short-term specific constraints
        # Minimum bond allocation for short-term goals
        if "us_bonds" in self.asset_names:
            ef.add_constraint(lambda w: w["us_bonds"] >= 0.2)
        
        # Add factor-based constraints
        ef = self._apply_factor_constraints(ef, factor_preferences)
        
        # For short-term, bitcoin constraint should be more conservative
        if "Bitcoin" in (factor_preferences or []) and "bitcoin" in self.asset_names:
            # Override with more conservative bitcoin allocation for short-term
            ef.add_constraint(lambda w: w["bitcoin"] <= 0.05)
        
        # Set L2 regularization parameter (to reduce extreme allocations)
        ef.add_objective(objective_functions.L2_reg, gamma=0.5)
        
        # Optimize for maximum Sharpe ratio
        ef.max_sharpe()
        
        # Clean weights (remove very small allocations)
        weights = ef.clean_weights()
        
        # Calculate expected return and volatility
        expected_return = ef.portfolio_performance()[0]
        expected_volatility = ef.portfolio_performance()[1]
        
        return {
            "weights": weights,
            "expected_return": expected_return,
            "expected_volatility": expected_volatility
        }

class MediumTermModel(ExpertModel):
    """Expert model for medium-term goals (3-7 years)"""
    
    def optimize_portfolio(self, risk_tolerance="Moderate", factor_preferences=None):
        """Generate portfolio optimized for medium-term horizon"""
        # Get target risk level based on risk tolerance
        risk_targets = {
            "Conservative": 0.08,
            "Moderate": 0.12,
            "Aggressive": 0.18
        }
        target_risk = risk_targets.get(risk_tolerance, 0.12)
        
        # Create efficient frontier object
        ef = EfficientFrontier(self.mu, self.S)
        
        # Add medium-term specific constraints
        # More balanced allocation for medium-term
        if "us_bonds" in self.asset_names:
            # Less bonds required than short-term
            ef.add_constraint(lambda w: w["us_bonds"] >= 0.1)
        
        # Add factor-based constraints
        ef = self._apply_factor_constraints(ef, factor_preferences)
        
        # For medium-term, bitcoin constraint is standard
        # The base class already sets this to 10% max
        
        # Set L2 regularization parameter
        ef.add_objective(objective_functions.L2_reg, gamma=0.3)
        
        # For medium-term, optimize for efficiency at target volatility
        ef.efficient_risk(target_risk)
        
        # Clean weights
        weights = ef.clean_weights()
        
        # Calculate expected return and volatility
        expected_return = ef.portfolio_performance()[0]
        expected_volatility = ef.portfolio_performance()[1]
        
        return {
            "weights": weights,
            "expected_return": expected_return,
            "expected_volatility": expected_volatility
        }

class LongTermModel(ExpertModel):
    """Expert model for long-term goals (7+ years)"""
    
    def __init__(self, assets=ASSETS, correlation_matrix=None):
        # For long-term, adjust expected returns to reflect long-term growth
        long_term_assets = assets.copy()
        for asset_name, asset_data in long_term_assets.items():
            # Long-term investments benefit from compounding and mean reversion
            long_term_assets[asset_name] = asset_data.copy()
            # Slightly higher return expectations for long horizon
            long_term_assets[asset_name]["expected_return"] *= 1.1
            
        super().__init__(long_term_assets, correlation_matrix)
    
    def optimize_portfolio(self, risk_tolerance="Moderate", factor_preferences=None):
        """Generate portfolio optimized for long-term horizon"""
        # Get target risk level based on risk tolerance
        risk_targets = {
            "Conservative": 0.1,
            "Moderate": 0.15,
            "Aggressive": 0.22
        }
        target_risk = risk_targets.get(risk_tolerance, 0.15)
        
        # Create efficient frontier object
        ef = EfficientFrontier(self.mu, self.S)
        
        # Add long-term specific constraints
        # More aggressive allocation for long-term
        if "us_bonds" in self.asset_names and "Conservative" not in risk_tolerance:
            # Less bonds required than medium-term for moderate/aggressive
            ef.add_constraint(lambda w: w["us_bonds"] <= 0.3)
        
        # Add factor-based constraints
        ef = self._apply_factor_constraints(ef, factor_preferences)
        
        # For long-term, bitcoin constraint can be more aggressive
        if "Bitcoin" in (factor_preferences or []) and "bitcoin" in self.asset_names:
            # Override with more liberal bitcoin allocation for long-term
            ef.add_constraint(lambda w: w["bitcoin"] <= 0.15)
        
        # Set L2 regularization parameter
        ef.add_objective(objective_functions.L2_reg, gamma=0.2)
        
        # For long-term, optimize for efficiency at target volatility
        ef.efficient_risk(target_risk)
        
        # Clean weights
        weights = ef.clean_weights()
        
        # Calculate expected return and volatility
        expected_return = ef.portfolio_performance()[0]
        expected_volatility = ef.portfolio_performance()[1]
        
        return {
            "weights": weights,
            "expected_return": expected_return,
            "expected_volatility": expected_volatility
        }

# Router/Mixture model
class PortfolioMixtureModel:
    """Routes or blends expert models based on user preferences"""
    
    def __init__(self, assets=ASSETS, correlation_matrix=None):
        if correlation_matrix is None:
            correlation_matrix = get_correlation_matrix()
            
        self.assets = assets
        self.correlation_matrix = correlation_matrix
        self.short_term_model = ShortTermModel(assets, correlation_matrix)
        self.medium_term_model = MediumTermModel(assets, correlation_matrix)
        self.long_term_model = LongTermModel(assets, correlation_matrix)
    
    def _get_time_horizon_weights(self, time_horizon):
        """Determine weights for each expert model based on time horizon"""
        weights = {
            "Short-term (1-3 years)": {"short": 0.8, "medium": 0.2, "long": 0.0},
            "Medium-term (3-7 years)": {"short": 0.2, "medium": 0.6, "long": 0.2},
            "Long-term (7+ years)": {"short": 0.0, "medium": 0.3, "long": 0.7}
        }
        return weights.get(time_horizon, {"short": 0.33, "medium": 0.34, "long": 0.33})
    
    def generate_portfolio(self, preferences):
        """Generate portfolio based on user preferences"""
        risk_tolerance = preferences.get("risk_tolerance", "Moderate")
        time_horizon = preferences.get("time_horizon", "Medium-term (3-7 years)")
        factors = preferences.get("investing_factors", [])
        
        # Get model weights based on time horizon
        model_weights = self._get_time_horizon_weights(time_horizon)
        
        # Generate portfolios from each expert model
        short_term_portfolio = self.short_term_model.optimize_portfolio(risk_tolerance, factors)
        medium_term_portfolio = self.medium_term_model.optimize_portfolio(risk_tolerance, factors)
        long_term_portfolio = self.long_term_model.optimize_portfolio(risk_tolerance, factors)
        
        # Blend portfolios based on weights
        blended_portfolio = {}
        for asset in self.assets:
            short_weight = short_term_portfolio["weights"].get(asset, 0) * model_weights["short"]
            medium_weight = medium_term_portfolio["weights"].get(asset, 0) * model_weights["medium"]
            long_weight = long_term_portfolio["weights"].get(asset, 0) * model_weights["long"]
            
            blended_portfolio[asset] = short_weight + medium_weight + long_weight
        
        # Normalize weights to ensure they sum to 1
        total_weight = sum(blended_portfolio.values())
        if total_weight > 0:
            for asset in blended_portfolio:
                blended_portfolio[asset] /= total_weight
        
        # Calculate expected return and volatility for the blended portfolio
        expected_return = sum(blended_portfolio[asset] * self.assets[asset]["expected_return"] 
                             for asset in blended_portfolio)
        
        # Calculate volatility
        asset_names = list(self.assets.keys())
        weights_array = np.array([blended_portfolio.get(asset, 0) for asset in asset_names])
        volatilities = np.array([self.assets[asset]["volatility"] for asset in asset_names])
        cov_matrix = np.outer(volatilities, volatilities) * self.correlation_matrix
        expected_volatility = np.sqrt(weights_array.T @ cov_matrix @ weights_array)
        
        return {
            "weights": blended_portfolio,
            "expected_return": expected_return,
            "expected_volatility": expected_volatility,
            "model_weights": model_weights,
            "component_portfolios": {
                "short_term": short_term_portfolio,
                "medium_term": medium_term_portfolio,
                "long_term": long_term_portfolio
            }
        }
    
    def explain_portfolio(self, portfolio):
        """Generate explanation of portfolio allocation"""
        weights = portfolio["weights"]
        model_weights = portfolio["model_weights"]
        
        # Sort assets by allocation
        sorted_assets = sorted(weights.items(), key=lambda x: x[1], reverse=True)
        
        # Calculate return target for 5 years
        expected_annual_return = portfolio['expected_return']
        expected_5y_growth = (1 + expected_annual_return) ** 5 - 1
        
        explanation = {
            "summary": {
                "expected_annual_return": f"{portfolio['expected_return']*100:.2f}%",
                "expected_volatility": f"{portfolio['expected_volatility']*100:.2f}%",
                "sharpe_ratio": f"{(portfolio['expected_return']-0.02)/portfolio['expected_volatility']:.2f}",
                "expected_5y_growth": f"{expected_5y_growth*100:.2f}%"
            },
            "allocation": {self.assets[asset]["label"]: f"{weight*100:.1f}%" 
                          for asset, weight in sorted_assets if weight > 0.01},
            "model_contribution": {
                "Short-term model": f"{model_weights['short']*100:.0f}%",
                "Medium-term model": f"{model_weights['medium']*100:.0f}%",
                "Long-term model": f"{model_weights['long']*100:.0f}%"
            }
        }
        
        return explanation
    
    def generate_discrete_allocation(self, portfolio, total_value):
        """Convert percentage weights to actual number of shares"""
        weights = portfolio["weights"]
        
        # Set up fake prices for this example
        prices = {asset: 100 for asset in weights.keys()}  # Assume $100 per share for simplicity
        
        # Create discrete allocation
        da = DiscreteAllocation(weights, prices, total_value=total_value)
        allocation, leftover = da.greedy_portfolio()
        
        return {
            "allocation": {self.assets[ticker]["label"]: shares for ticker, shares in allocation.items()},
            "leftover_cash": leftover
        }

# Database simulation for storing generated portfolios
class PortfolioDB:
    """Simulates a database for storing generated portfolios"""
    
    def __init__(self):
        self.portfolios = []
    
    def store_portfolio(self, user_id, preferences, portfolio_data):
        """Store a generated portfolio"""
        portfolio_record = {
            "user_id": user_id,
            "timestamp": pd.Timestamp.now().isoformat(),
            "preferences": preferences,
            "portfolio": portfolio_data
        }
        self.portfolios.append(portfolio_record)
        return len(self.portfolios) - 1  # Return the index as a portfolio ID
    
    def get_portfolio(self, portfolio_id):
        """Retrieve a stored portfolio"""
        if 0 <= portfolio_id < len(self.portfolios):
            return self.portfolios[portfolio_id]
        return None
    
    def find_portfolios_by_criteria(self, criteria):
        """Find portfolios matching specified criteria"""
        results = []
        for idx, record in enumerate(self.portfolios):
            match = True
            for key, value in criteria.items():
                # Simple matching for demo purposes
                if key not in record["preferences"] or record["preferences"][key] != value:
                    match = False
                    break
            if match:
                results.append({"portfolio_id": idx, "record": record})
        return results

# Visualization functions
def visualize_portfolio(weights, assets):
    """Visualize portfolio allocation as a pie chart"""
    # Filter out tiny allocations
    filtered_weights = {assets[asset]["label"]: weight 
                       for asset, weight in weights.items() 
                       if weight > 0.01}
    
    plt.figure(figsize=(10, 6))
    plt.pie(filtered_weights.values(), labels=filtered_weights.keys(), autopct='%1.1f%%')
    plt.title('Portfolio Allocation')
    plt.axis('equal')
    print("Portfolio visualization would be displayed here")

def visualize_efficient_frontier(portfolio_data):
    """Create a efficient frontier visualization with the portfolio plotted"""
    plt.figure(figsize=(10, 6))
    
    # Extract data from component portfolios
    portfolios = portfolio_data["component_portfolios"]
    
    # Collect returns and volatilities
    returns = [p["expected_return"] for p in portfolios.values()]
    volatilities = [p["expected_volatility"] for p in portfolios.values()]
    
    # Add the blended portfolio
    returns.append(portfolio_data["expected_return"])
    volatilities.append(portfolio_data["expected_volatility"])
    
    # Plot the portfolios
    plt.scatter(volatilities[:-1], returns[:-1], c='blue', marker='o', s=50, label='Expert Models')
    plt.scatter(volatilities[-1], returns[-1], c='red', marker='*', s=200, label='Blended Portfolio')
    
    # Add labels
    plt.xlabel('Expected Volatility')
    plt.ylabel('Expected Return')
    plt.title('Portfolio Efficiency')
    plt.legend()
    plt.grid(True)
    
    print("Efficient frontier visualization would be displayed here")

In [3]:
def main():
    # Initialize components
    ui = PortfolioUI()
    db = PortfolioDB()
    
    # Get user preferences
    user_preferences = ui.collect_user_preferences()
    print("User preferences:")
    print(json.dumps(user_preferences, indent=2))
    
    # Initialize mixture model
    mixture_model = PortfolioMixtureModel(ASSETS, get_correlation_matrix())
    
    # Generate portfolio
    portfolio = mixture_model.generate_portfolio(user_preferences)
    
    # Store in database
    user_id = "user123"  # In a real app, this would be a real user ID
    portfolio_id = db.store_portfolio(user_id, user_preferences, portfolio)
    
    # Generate explanation
    explanation = mixture_model.explain_portfolio(portfolio)
    
    # Generate discrete allocation (for a $10,000 portfolio)
    discrete_allocation = mixture_model.generate_discrete_allocation(portfolio, user_preferences["initial_investment"])
    
    # Print results
    print("\nGenerated Portfolio:")
    print(json.dumps(explanation, indent=2))
    
    print("\nDiscrete Allocation (number of shares to buy):")
    print(json.dumps(discrete_allocation, indent=2))
    
    # Visualize the portfolio allocation
    visualize_portfolio(portfolio["weights"], ASSETS)
    
    # Visualize efficient frontier
    visualize_efficient_frontier(portfolio)
    
    print(f"\nPortfolio stored in database with ID: {portfolio_id}")

if __name__ == "__main__":
    main()

User preferences:
{
  "risk_tolerance": "Moderate",
  "time_horizon": "Medium-term (3-7 years)",
  "investing_factors": [
    "Value",
    "Small Cap",
    "Momentum"
  ],
  "initial_investment": 10000,
  "monthly_contribution": 500
}


ValueError: invalid literal for int() with base 10: 'us_bonds'