Imports

In [22]:
from datetime import datetime
import numpy as np
from ortools.sat.python import cp_model

Adding the finance_utils function

In [23]:
import yfinance as yf
import pandas as pd

def get_adj_close_from_stocks(stocks, start_date, end_date):
    """
        Extract Adjusted Close from mentioned stocks on specific dates
        Adj Close => Closing price adjusted 
                    for splits and dividend distributions
    """
    adj_close_df = pd.DataFrame()
    
    for s in stocks:
        data = yf.download(s, start=start_date, end=end_date, auto_adjust=False)
        adj_close_df[s] = data['Adj Close']
    
    return adj_close_df

Adding the markovitz utils function

In [24]:
def standard_deviation(weights, cov_matrix):
    variance = weights.T @ cov_matrix @ weights
    return np.sqrt(variance)

def expected_return(weights, log_returns):
    return np.sum(log_returns.mean() * weights) * 252

def sharpe_ratio(weights, log_returns, cov_matrix, risk_free_rate):
	p_return = expected_return(weights, log_returns) - risk_free_rate
	return p_return / standard_deviation(weights, cov_matrix)

def negative_sharpe_ratio(weights, log_returns, cov_matrix, risk_free_rate):
    return -sharpe_ratio(weights, log_returns, cov_matrix, risk_free_rate)

Defining all test inputs

In [None]:
stocks = ['AAPL', 'DSY.PA']
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 1, 1)
bounds =[(0.25, 0.75), (0.25, 0.75)]
risk_rate = 0.15
SCALE = 100

In [28]:
def optimize_portfolio_with_cpsat(stocks, start_date, end_date, bounds, risk_rate):
    """
    Optimize a portfolio using CP-SAT to find the optimal weights
    
    Parameters:
    - stocks: List of stock symbols
    - start_date: Start date for historical data
    - end_date: End date for historical data
    - bounds: List of (min, max) weight bounds for each stock
    - risk_rate: Risk tolerance parameter (0-1, higher means more aggressive)
    """
    # Step 1: Get historical data
    adj_close_df = get_adj_close_from_stocks(stocks, start_date, end_date)
    
    # Step 2: Calculate returns and covariance
    returns = adj_close_df.pct_change().dropna()
    expected_returns = returns.mean()
    cov_matrix = returns.cov()
    
    # Step 3: Set up CP-SAT model to generate valid weight combinations
    precision = 1  # 5% increments (0.05)
    
    # Create the model
    model = cp_model.CpModel()
    
    # Create integer variables for weights (as percentages)
    weight_vars = []
    for i, (lower_bound, upper_bound) in enumerate(bounds):
        # Convert bounds to integers (percentage points)
        lower = int(lower_bound * 100)
        upper = int(upper_bound * 100)
        
        # Create integer variable
        w = model.new_int_var(lower, upper, f"weight_{i}")
        weight_vars.append(w)
    
    # Add constraint: weights sum to 100%
    model.add(sum(weight_vars) == 100)
    
    # Add constraint: weights are multiples of precision
    # Fix: Use add_modulo_equality instead of % operator
    for w in weight_vars:
        model.add_modulo_equality(0, w, precision)
    
    # We'll use a callback to collect solutions
    class SolutionCollector(cp_model.CpSolverSolutionCallback):
        def __init__(self, weight_vars):
            cp_model.CpSolverSolutionCallback.__init__(self)
            self.weight_vars = weight_vars
            self.solutions = []
            
        def on_solution_callback(self):
            weights = [self.Value(v) / 100.0 for v in self.weight_vars]
            self.solutions.append(weights)
    
    # Set up the solver
    solver = cp_model.CpSolver()
    solver.parameters.enumerate_all_solutions = True
    
    # Create and register the solution collector
    solution_collector = SolutionCollector(weight_vars)
    
    # Solve the model
    status = solver.solve(model, solution_collector)
    
    if not solution_collector.solutions:
        return "No feasible portfolios found"
    
    # Step 4: Evaluate each feasible portfolio
    possible_portfolios = []
    for weights in solution_collector.solutions:
        # Calculate expected return and risk
        expected_return = sum(weights[i] * expected_returns[i] for i in range(len(stocks)))
        risk = np.sqrt(np.dot(weights, np.dot(cov_matrix, weights)))
        
        # Score based on risk_rate
        # Higher risk_rate means we prefer higher returns (more aggressive)
        # Lower risk_rate means we prefer lower risk (more conservative)
        score = risk_rate * expected_return - (1 - risk_rate) * risk
        
        possible_portfolios.append({
            "weights": {stocks[i]: weights[i] for i in range(len(stocks))},
            "expected_return": expected_return,
            "risk": risk,
            "score": score
        })
    
    # Find the portfolio with the highest score
    best_portfolio = max(possible_portfolios, key=lambda p: p["score"])
    
    # Return just the final weights as an array like [0.5, 0.5]
    optimized_weights = np.array(list(best_portfolio["weights"].values()))
    return optimized_weights


In [29]:


# Run the optimization
result = optimize_portfolio_with_cpsat(stocks, start_date, end_date, bounds, risk_rate)
print("Hello here are the results", result)

[*********************100%***********************]  1 of 1 completed


[*********************100%***********************]  1 of 1 completed

Hello here are the results [0.5 0.5]



  returns = adj_close_df.pct_change().dropna()
  expected_return = sum(weights[i] * expected_returns[i] for i in range(len(stocks)))
