In [1]:
import tkinter as tk
from tkinter import simpledialog, messagebox
import pandas as pd
import numpy as np
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from scipy.stats import skew, kurtosis
from scipy.optimize import minimize
from IPython.display import display, Markdown
import yfinance as yf
import matplotlib.pyplot as plt
import pybacktestchain
#import sys
#sys.path.append(r"C:\Users\Ludovic\anaconda3\Lib\site-packages\pybacktestchain")

from pybacktestchain.data_module import get_stocks_data, get_stock_data

In [132]:
### MY NEW MODULE FUNCTIONS MODIFYING EXISTING FUNCTIONS AND ADDING NEW FUNCTIONS


import tkinter as tk
import pandas as pd
import pybacktestchain
import numpy as np
import logging
import yfinance as yf
import matplotlib.pyplot as plt

from dataclasses import dataclass
from datetime import datetime, timedelta
from scipy.stats import skew, kurtosis
from scipy.optimize import minimize
from IPython.display import display, Markdown
from tkinter import simpledialog, messagebox
from pybacktestchain.data_module import get_stocks_data, get_stock_data  # Import the existing functions

class Data_module2:
    @staticmethod
    def get_stock_data(ticker, start_date, end_date):
        """Retrieve historical stock data for a single ticker using yfinance."""
        try:
            stock = yf.Ticker(ticker)
            data = stock.history(start=start_date, end=end_date, auto_adjust=False, actions=False)
            data['ticker'] = ticker
            data.reset_index(inplace=True)
            return data[['Date', 'ticker', 'Adj Close', 'Volume']]
        except Exception as e:
            logging.warning(f"Failed to fetch data for {ticker}: {e}")
            return pd.DataFrame()

    @staticmethod
    def get_stocks_data(tickers, start_date, end_date):
        """Retrieve historical stock data for multiple tickers."""
        dfs = []
        
        for ticker in tickers:
            df = Data_module2.get_stock_data(ticker, start_date, end_date)
            if not df.empty:
                df = df[['Date', 'Adj Close']]  # Keep only 'Date' and 'Adj Close'
                df['Ticker'] = ticker  # Add a column for ticker
                dfs.append(df)
        
        # Combine all dataframes into one
        all_data = pd.concat(dfs)
        
        # Pivot the data so tickers are columns and dates are rows
        all_data = all_data.pivot(index='Date', columns='Ticker', values='Adj Close')
        
        # Drop rows with NaN values
        all_data = all_data.dropna(how='all')
        
        return all_data


# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to get stock inputs
def get_stock_inputs():
    root = tk.Tk()
    root.withdraw()  # Hide the main tkinter window
    num_stocks = simpledialog.askinteger("Number of Stocks", "How many stocks do you want to enter?")
    
    if not num_stocks or num_stocks <= 0:
        print("No stocks to enter.")
        return []

    root = tk.Tk()
    root.title("Enter Stock Names")
    entries = []
    stock_names = []

    def on_submit():
        nonlocal stock_names
        stock_names = [entry.get().strip().upper() for entry in entries]
        root.quit()
        root.destroy()

    for i in range(num_stocks):
        tk.Label(root, text=f"Stock {i+1}").pack(padx=10, pady=5)
        entry = tk.Entry(root)
        entry.pack(padx=10, pady=5)
        entries.append(entry)

    tk.Button(root, text="Submit", command=on_submit).pack(pady=10)
    root.mainloop()
    return stock_names

# Function to get date inputs via a userform
def get_date_inputs():
    # Fonction pour demander la saisie des dates via une input box (fenêtre graphique)
    root = tk.Tk()
    root.withdraw()  # Cacher la fenêtre principale de Tkinter

    # Demander la date de début
    start_date_str = simpledialog.askstring("Start Date", "Please enter the start date (YYYY-MM-DD):")
    if not start_date_str:
        return None, None  # Si l'utilisateur annule ou ne rentre rien, on retourne None

    try:
        # Valider et convertir la date de début
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    except ValueError:
        messagebox.showerror("Invalid Date", "Invalid start date format. Please use YYYY-MM-DD.")
        return None, None  # Retourner None en cas d'erreur

    # Demander la date de fin
    end_date_str = simpledialog.askstring("End Date", "Please enter the end date (YYYY-MM-DD):")
    if not end_date_str:
        return None, None  # Si l'utilisateur annule ou ne rentre rien, on retourne None

    try:
        # Valider et convertir la date de fin
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    except ValueError:
        messagebox.showerror("Invalid Date", "Invalid end date format. Please use YYYY-MM-DD.")
        return None, None  # Retourner None en cas d'erreur

    if start_date > end_date:
        messagebox.showerror("Invalid Date Range", "Start date must be before end date!")
        return None, None  # Retourner None en cas d'erreur
    
    return start_date, end_date
    
def get_target_return():
    """
    Prompt the user to input a target return using a simple dialog box.
    Returns the target return as a float.
    """
    root = tk.Tk()
    root.withdraw()  # Hide the main Tkinter window
    target_return = simpledialog.askfloat(
        "Target Return", 
        "Enter the target return (e.g., 0.1 for 10%):",
        minvalue=0.0, maxvalue=1.0  # Optional bounds for input
    )
    root.destroy()  # Close the Tkinter window
    if target_return is None:
        raise ValueError("No target return provided!")
    return target_return

# Function to get the rebalancing strategy input from the user
def get_rebalancing_strategy():
    root = tk.Tk()
    root.withdraw()  # Masquer la fenêtre principale de Tkinter
    
    # Demander la stratégie de rééquilibrage
    choices = ["End of Week", "End of Month", "Every Quarter"]
    
    rebalancing_strategy = simpledialog.askstring("Rebalancing Strategy", 
                                                  "Choose rebalancing strategy: End of Week, End of Month, or Every Quarter")
    
    # Valider le choix
    if rebalancing_strategy not in choices:
        print("Invalid choice. Please choose one of the following strategies: End of Week, End of Month, or Every Quarter.")
        return None
    
    # Retourner la classe appropriée en fonction du choix
    if rebalancing_strategy == "End of Week":
        return EndOfWeek  # Retourne la classe, pas la chaîne de caractères
    elif rebalancing_strategy == "End of Month":
        return EndOfMonth
    elif rebalancing_strategy == "Every Quarter":
        return EveryQuarter
        
def get_stop_loss_threshold():
    def on_submit():
        nonlocal stop_loss_threshold
        try:
            user_input = threshold_entry.get()
            # Vérifier que l'entrée n'est pas vide
            if not user_input:
                raise ValueError("Input cannot be empty.")
            
            # Convertir l'entrée en float
            stop_loss_threshold = float(user_input)
            
            # Vérifier que la valeur est positive
            if stop_loss_threshold <= 0:
                raise ValueError("Threshold must be a positive number.")
            
            root.quit()  # Fermer la fenêtre de l'input
            root.destroy()  # Détruire la fenêtre
        except ValueError as e:
            # Afficher un message d'erreur si l'entrée est invalide
            messagebox.showerror("Invalid Input", f"Invalid input: {e}")
    
    # Initialisation de la variable stop_loss_threshold
    stop_loss_threshold = None
    
    # Créer la fenêtre Tkinter
    root = tk.Tk()
    root.title("Enter Stop-Loss Threshold")
    
    # Ajouter un label et un champ de saisie
    tk.Label(root, text="Enter Stop-Loss Threshold (e.g., 0.1 for 10%):").pack(padx=10, pady=5)
    threshold_entry = tk.Entry(root)
    threshold_entry.pack(padx=10, pady=5)
    
    # Ajouter un bouton de soumission
    tk.Button(root, text="Submit", command=on_submit).pack(pady=10)
    
    # Lancer la boucle principale de Tkinter
    root.mainloop()
    
    return stop_loss_threshold
    
def get_initial_cash_input():
    def on_submit():
        nonlocal initial_cash
        try:
            initial_cash = float(initial_cash_entry.get())
            if initial_cash <= 0:
                raise ValueError("Initial cash must be a positive number.")
            root.quit()
            root.destroy()
        except ValueError as e:
            messagebox.showerror("Invalid Input", f"Invalid input: {e}")
    
    initial_cash = None
    
    root = tk.Tk()
    root.title("Enter Initial Cash")
    
    tk.Label(root, text="Enter Initial Cash Amount:").pack(padx=10, pady=5)
    initial_cash_entry = tk.Entry(root)
    initial_cash_entry.pack(padx=10, pady=5)
    
    tk.Button(root, text="Submit", command=on_submit).pack(pady=10)
    root.mainloop()
    
    return initial_cash


In [168]:
#### MODIFYIN AND ADDING FUNCTION TO DATA_MODULE

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta

import os 
from pybacktestchain.data_module import get_stocks_data, DataModule, Information
from pybacktestchain.broker import Position, StopLoss, RebalanceFlag, Broker, Backtest
from pybacktestchain.utils import generate_random_name
#from python_pro.new_datamodule import get_stock_inputs
from typing import Dict
from numba import jit 


# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
from datetime import timedelta, datetime

#---------------------------------------------------------
# Modifying Classes from pybacktestchain.broker
#---------------------------------------------------------

class StopLoss_new(StopLoss):
    threshold: float

    def __post_init__(self):
        # Demander à l'utilisateur de définir le seuil si ce n'est pas passé
        if not hasattr(self, 'threshold') or self.threshold is None:
            self.threshold = float(input("Enter the stop-loss threshold (e.g., 0.1 for 10% loss): "))
        
    def trigger_stop_loss(self, t: datetime, portfolio: dict, prices: dict, broker: 'Broker'):
        for ticker, position in list(broker.positions.items()):
            entry_price = broker.entry_prices[ticker]
            current_price = prices.get(ticker)
            
            if current_price is None:
                logging.warning(f"Price for {ticker} not available on {t}")
                continue

            # Calculate the loss percentage
            loss = (current_price - entry_price) / entry_price
            if loss < -self.threshold:
                logging.info(f"Stop loss triggered for {ticker} at {t}. Selling all shares.")
                broker.sell(ticker, position.quantity, current_price, t)

class Broker_new(Broker): 
# Modifying the buy and sell functions from pybacktestchain.broker to add new conditions : max daily trades and max exposure.  
# In addition, we add a function that count the number of daily trades and check if we respect the new condition.

    def get_daily_trade_count(self, date: datetime):
        """Returns the number of trades already executed on the given date."""
        daily_trades = self.transaction_log[self.transaction_log['Date'] == date]
        return len(daily_trades)

    def get_total_portfolio_value(self, market_prices: dict):
        """Calculate the total portfolio value based on the current market prices."""
        total_value = self.cash
        for ticker, position in self.positions.items():
            total_value += position.quantity * market_prices[ticker]
        return total_value

    def buy(self, ticker: str, quantity: int, price: float, date: datetime, market_prices: dict):
        """Executes a buy order for the specified ticker with additional constraints based on quantity."""
        if quantity > self.max_single_stock_trade:
            if self.verbose:
                logging.warning(
                    f"Cannot execute buy for {ticker}. Maximum allowed trade quantity is {self.max_single_stock_trade}."
                )
            return

        total_cost = price * quantity
        total_portfolio_value = self.get_total_portfolio_value(market_prices)

        # Check if daily trades max is respected
        daily_trades_for_ticker = self.transaction_log[(self.transaction_log['Date'] == date) & (self.transaction_log['Ticker'] == ticker)]
        if len(daily_trades_for_ticker) >= 1:  # check for that specific ticker
            if self.verbose:
                logging.warning(
                    f"Cannot execute buy for {ticker} on {date}. Maximum daily trades limit ({self.max_daily_trades}) reached for this ticker."
                )
            return

        # Check if the purchase respects the max exposure constraint
        current_value = self.positions.get(ticker, Position(ticker, 0, 0)).quantity * price
        proposed_value = current_value + total_cost
        max_allowed_value = total_portfolio_value * self.max_exposure

        if proposed_value > max_allowed_value:
            if self.verbose:
                logging.warning(
                    f"Cannot buy {quantity} shares of {ticker} due to max exposure limit. "
                    f"Proposed value: {proposed_value}, Max allowed: {max_allowed_value}"
                )
            return

        # Check if enough cash is available
        if self.cash < total_cost:
            if self.verbose:
                logging.warning(
                    f"Not enough cash to buy {quantity} shares of {ticker} at {price}. Available cash: {self.cash}"
                )
            return

        # Execute the buy order
        self.cash -= total_cost
        if ticker in self.positions:
            position = self.positions[ticker]
            new_quantity = position.quantity + quantity
            new_entry_price = ((position.entry_price * position.quantity) + (price * quantity)) / new_quantity
            position.quantity = new_quantity
            position.entry_price = new_entry_price
        else:
            self.positions[ticker] = Position(ticker, quantity, price)

        self.log_transaction(date, 'BUY', ticker, quantity, price)
        self.entry_prices[ticker] = price

    def sell(self, ticker: str, quantity: int, price: float, date: datetime):
        """Executes a sell order for the specified ticker with additional constraints based on quantity."""
        if quantity > self.max_single_stock_trade:
            if self.verbose:
                logging.warning(
                    f"Cannot execute sell for {ticker}. Maximum allowed trade quantity is {self.max_single_stock_trade}."
                )
            return

        if ticker in self.positions and self.positions[ticker].quantity >= quantity:
            # Check if max daily trades limit is reached
            if self.get_daily_trade_count(date) >= self.max_daily_trades:
                if self.verbose:
                    logging.warning(
                        f"Cannot execute sell for {ticker} on {date}. Maximum daily trades limit ({self.max_daily_trades}) reached."
                    )
                return

            position = self.positions[ticker]
            position.quantity -= quantity
            self.cash += price * quantity

            if position.quantity == 0:
                del self.positions[ticker]
                del self.entry_prices[ticker]
            self.log_transaction(date, 'SELL', ticker, quantity, price)
        else:
            if self.verbose:
                logging.warning(
                    f"Not enough shares to sell {quantity} shares of {ticker}. Position size: {self.positions.get(ticker, 0)}"
                )


#---------------------------------------------------------
# Creating new classes for portfolio analysis
#---------------------------------------------------------


#Creation of a new class that computes different statistics to analyse the portfolio. The class includes the below functions:
#   Computation of the performance of the portfolio
#   Calculation or returns, mean, vol, Sharpe Ratio and VaR


@dataclass
class AnalysisTool:
    def __init__(self, portfolio_values, initial_value, final_value, risk_free_rate=0.03):
        self.portfolio_values = np.array(portfolio_values)
        self.initial_value = initial_value
        self.final_value = final_value
        self.risk_free_rate = risk_free_rate

    def total_performance(self):
        return (self.final_value - self.initial_value) / self.initial_value

    def calculate_returns(self):
        return np.diff(self.portfolio_values) / self.portfolio_values[:-1]
    
    def mean_returns(self):
        returns = self.calculate_returns()
        return np.mean(returns)
    
    def volatility_returns(self):
        returns = self.calculate_returns()
        return np.std(returns)
        
    def sharpe_ratio(self):
        returns = self.calculate_returns()
        excess_returns = returns - self.risk_free_rate
        return np.mean(excess_returns) / np.std(returns) if np.std(returns) > 0 else 0

    def calculate_var(self, confidence_level=0.95):
        returns = self.calculate_returns()
        var = np.percentile(returns, (1 - confidence_level) * 100)
        return var

    def analyze(self):
        return {
            "Portfolio Total Performance": self.total_performance(),
            "Mean of the Returns": self.mean_returns(),
            "Volatility of the Returns": self.volatility_returns(),
            "Sharpe Ratio": self.sharpe_ratio(),
            "VaR (95% Confidence)": self.calculate_var(confidence_level=0.95)  # VaR at 95% confidence level
        }
#---------------------------------------------------------
# Creating new classes allowing for new rebalances
#---------------------------------------------------------

# Creation of new classes to allow for more frequent rebalances : every week/month/quarter

@dataclass
class RebalanceFlag:
    def time_to_rebalance(self, t: datetime):
        pass


class EndOfWeek:
    def time_to_rebalance(self, t):
        """Rebalances at the end of the week (Friday)"""
        pd_date = pd.Timestamp(t)
        return pd_date.weekday() == 4  # 4 is Friday

class EndOfMonth:
    def time_to_rebalance(self, t):
        """Rebalances at the end of the month"""
        pd_date = pd.Timestamp(t)
        last_business_day = pd_date + pd.offsets.BMonthEnd(0)
        return pd_date == last_business_day

class EveryQuarter:
    def time_to_rebalance(self, t):
        """Rebalances at the start of each quarter"""
        pd_date = pd.Timestamp(t)
        return pd_date.month in [1, 4, 7, 10] and pd_date.day == 1


#---------------------------------------------------------
# Made changes to backtest function to account for modifications
#---------------------------------------------------------

# Allow new rebalances
# Allow dynamic threshold
# Allow for dynamic universe
# Plot graphs

@dataclass
class Backtest2:
    initial_date: datetime
    final_date: datetime
    initial_cash: int = 1000000  # Default initial cash
    threshold: float = 0.1  
    universe: list = None  # list of stock tickers
    information_class: type = Information
    s: timedelta = timedelta(days=360)
    time_column: str = 'Date'
    company_column: str = 'ticker'
    adj_close_column: str = 'Adj Close'
    rebalance_flag: type = EndOfMonth # Default rebalancing is monthly
    risk_model: type = StopLoss
    verbose: bool = True
    name_blockchain: str = 'backtest'
    broker = Broker(cash=initial_cash, verbose=verbose)

    def __post_init__(self):
        self.rebalance_flag = get_rebalancing_strategy()  # Cela récupère la classe
        self.rebalance_flag = self.rebalance_flag()  # Instancier la classe

        # Autres initialisations
        self.stop_loss_threshold = get_stop_loss_threshold()
        self.broker.initialize_blockchain(self.name_blockchain)
        self.backtest_name = generate_random_name()

    def run_backtest(self):
        logging.info(f"Running backtest from {self.initial_date} to {self.final_date}.")
        logging.info(f"Retrieving price data for universe: {self.universe}")
        
        # Pass the custom stop-loss threshold to the StopLoss class
        self.risk_model = self.risk_model(self.stop_loss_threshold)
        
        # Convert dates to strings
        init_ = self.initial_date.strftime('%Y-%m-%d')
        final_ = self.final_date.strftime('%Y-%m-%d')
        
        # Retrieve stock data
        df = get_stocks_data(self.universe, init_, final_)

        # Initialize the DataModule
        data_module = DataModule(df)

        # Create the Information object
        info = self.information_class(s=self.s,
                                      data_module=data_module,
                                      time_column=self.time_column,
                                      company_column=self.company_column,
                                      adj_close_column=self.adj_close_column)

        # Initialize a DataFrame to store portfolio values
        portfolio_values = []
        
        # Run the backtest
        for t in pd.date_range(start=self.initial_date, end=self.final_date, freq='D'):
            
            if self.risk_model is not None:
                portfolio = info.compute_portfolio(t, info.compute_information(t))
                prices = info.get_prices(t)
                self.risk_model.trigger_stop_loss(t, portfolio, prices, self.broker)
           
            if self.rebalance_flag.time_to_rebalance(t):
                logging.info("-----------------------------------")
                logging.info(f"Rebalancing portfolio at {t}")
                information_set = info.compute_information(t)
                portfolio = info.compute_portfolio(t, information_set)
                prices = info.get_prices(t)
                self.broker.execute_portfolio(portfolio, prices, t)


        logging.info(f"Backtest completed. Final portfolio value: {self.broker.get_portfolio_value(info.get_prices(self.final_date))}")
        df = self.broker.get_transaction_log()

        # Create the backtests folder if it does not exist
        if not os.path.exists('backtests'):
            os.makedirs('backtests')

        # Save the transaction log to CSV
        df.to_csv(f"backtests/{self.backtest_name}.csv")

        # Store the backtest results in the blockchain
        self.broker.blockchain.add_block(self.backtest_name, df.to_string())


In [124]:
##### ALL MY GRAPHS FUNCTIONS

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go

class PortfolioVisualizer:
    def __init__(self, data=None):
        """
        Constructor to initialize the PortfolioVisualizer class.

        Parameters:
        - data: Optional DataFrame containing asset data (prices, weights, etc.).
        """
        self.data = data

    def plot_covariance(self, cov_matrix, plot_correlation=False, show_tickers=True, **kwargs):
        """
        Plot the covariance (or correlation) matrix as a heatmap.
        
        Parameters:
        - cov_matrix: Covariance matrix to be plotted.
        - plot_correlation: Whether to plot correlation instead of covariance (default: False).
        - show_tickers: Whether to display tickers as labels (default: True).
        - **kwargs: Additional arguments passed to imshow and customization.
        """
        # Convert covariance matrix to correlation if required
        matrix = cov_to_corr(cov_matrix) if plot_correlation else cov_matrix

        # Create the heatmap
        fig, ax = plt.subplots()
        cax = ax.imshow(matrix, cmap='viridis')
        fig.colorbar(cax)

        # Configure tick labels
        if show_tickers:
            ax.set_xticks(np.arange(0, matrix.shape[0], 1))
            ax.set_xticklabels(matrix.index, rotation=90)
            ax.set_yticks(np.arange(0, matrix.shape[0], 1))
            ax.set_yticklabels(matrix.index)

        # Show the plot
        plt.show()

        return ax

    def plot_weights(self, weights, tickers, ax=None, title="Portfolio Weights", **kwargs):
        """
        Plot portfolio weights as a horizontal bar chart.

        Parameters:
        - weights: 1D array of portfolio weights (e.g., [0.4, 0.3, 0.2, 0.1]).
        - tickers: List of asset tickers corresponding to the weights (e.g., ['AAPL', 'META']).
        - ax: Optional Matplotlib axis object.
        - title: Title of the plot (default: "Portfolio Weights").
        - **kwargs: Additional keyword arguments for customization.
        
        Returns:
        - ax: Matplotlib axis object.
        """
        ax = ax or plt.gca()  # Use provided axis or get the current one

        # Sort weights and tickers by weight (largest to smallest)
        desc = sorted(zip(tickers, weights), key=lambda x: x[1], reverse=True)
        labels = [i[0] for i in desc]
        vals = [i[1] for i in desc]

        # Positions for the bars
        y_pos = np.arange(len(labels))

        # Create horizontal bar chart
        ax.barh(y_pos, vals, color=kwargs.get('color', 'blue'))
        ax.set_xlabel("Weight", fontsize=12)
        ax.set_yticks(y_pos)
        ax.set_yticklabels(labels, fontsize=10)
        ax.invert_yaxis()  # Invert y-axis for descending order
        ax.set_title(title, fontsize=14)

        # Display the plot
        plt.show()

        return ax

    def plot_historical_prices(self, df):
        """
        Plots historical prices from a DataFrame where tickers are columns,
        and the index represents dates.

        Parameters:
        - df: DataFrame containing historical prices of assets.
        """
        tickers = df.columns 
        plt.figure(figsize=(12, 6))

        for ticker in tickers:
            plt.plot(df.index, df[ticker], label=ticker)

        plt.title("Historical Prices")
        plt.xlabel("Date")
        plt.ylabel("Adjusted Close Price")
        plt.legend()
        plt.show()

    def plot_portfolio_allocation(self, portfolio):
        """
        Plot the portfolio allocation as a pie chart.

        Parameters:
        - portfolio: Dictionary with asset tickers as keys and weights as values.
        """
        labels = list(portfolio.keys())
        sizes = list(portfolio.values())  
        plt.figure(figsize=(8, 8))
        plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140)
        plt.title("Portfolio Allocation")
        plt.axis('equal')  
        plt.show()

    def plot_portfolio_value(self, portfolio_values_df):
        """
        Plot portfolio value over time.

        Parameters:
        - portfolio_values_df: DataFrame containing portfolio values over time.
        """
        fig = go.Figure(data=go.Scatter(
            x=portfolio_values_df.index,
            y=portfolio_values_df['Portfolio Value'],
            mode='lines',
            name='Portfolio Value'
        ))

        fig.update_layout(
            title="Portfolio Value Over Time",
            xaxis_title="Date",
            yaxis_title="Portfolio Value ($)",
            xaxis=dict(tickformat="%Y-%m-%d"),
            yaxis=dict(tickprefix="$"),
            width=900,
            height=500,
        )

        fig.show()

    def plot_cumulative_performance(self, portfolio_returns, benchmark_returns):
        """
        Plot cumulative portfolio performance against a benchmark.

        Parameters:
        - portfolio_returns: DataFrame of portfolio returns over time.
        - benchmark_returns: DataFrame of benchmark returns over time.
        """
        fig = go.Figure()

        fig.add_trace(go.Scatter(
            x=portfolio_returns.index,
            y=(1 + portfolio_returns).cumprod(),
            mode='lines',
            name='Portfolio Cumulative Return'
        ))

        fig.add_trace(go.Scatter(
            x=benchmark_returns.index,
            y=(1 + benchmark_returns).cumprod(),
            mode='lines',
            name='Benchmark Cumulative Return'
        ))

        fig.update_layout(
            title="Cumulative Portfolio Performance vs Benchmark",
            xaxis_title="Date",
            yaxis_title="Cumulative Return",
            xaxis=dict(tickformat="%Y-%m-%d"),
            yaxis=dict(tickformat="%"),
            width=900,
            height=500,
        )

        fig.show()

    def plot_weights_over_time(self, portfolio_history_df):
        """
        Plot the weights of assets in the portfolio over time.

        Parameters:
        - portfolio_history_df: DataFrame containing portfolio weights over time.
        """
        fig = go.Figure()

        for asset in portfolio_history_df.columns:
            fig.add_trace(go.Scatter(
                x=portfolio_history_df.index,
                y=portfolio_history_df[asset],
                mode='lines',
                stackgroup='one',
                name=asset
            ))

        fig.update_layout(
            title="Portfolio Weights Over Time",
            xaxis_title="Date",
            yaxis_title="Weight",
            xaxis=dict(tickformat="%Y-%m-%d"),
            yaxis=dict(tickformat=".0%"),
            width=900,
            height=500,
        )

        fig.show()

    def plot_drawdown(self, portfolio_returns):
        """
        Plot the drawdown of the portfolio.

        Parameters:
        - portfolio_returns: DataFrame containing portfolio returns over time.
        """
        cumulative_returns = (1 + portfolio_returns).cumprod()
        peak = cumulative_returns.cummax()
        drawdown = (cumulative_returns - peak) / peak

        fig = go.Figure(data=[
            go.Scatter(
                x=drawdown.index,
                y=drawdown,
                mode='lines',
                name="Drawdown"
            )
        ])

        fig.update_layout(
            title="Portfolio Drawdown",
            xaxis_title="Date",
            yaxis_title="Drawdown",
            xaxis=dict(tickformat="%Y-%m-%d"),
            yaxis=dict(tickformat=".0%"),
            width=900,
            height=500,
        )

        fig.show()


In [126]:
##### MY RISK MODELS FUNCTIONS


def sample_cov(prices, returns_data=False, frequency=252, log_returns=False, fix_method="spectral"):
    """
    Calculate the sample covariance matrix of returns and fix it if it's not positive semidefinite.
    """
    # Ensure input is a DataFrame
    prices = pd.DataFrame(prices) if not isinstance(prices, pd.DataFrame) else prices
    
    # Calculate returns if returns_data is False
    returns = prices if returns_data else returns_from_prices(prices, log_returns)
    
    # Compute covariance and fix non-positive semidefinite matrices
    cov_matrix = returns.cov() * frequency
    return cov_matrix if is_positive_semidefinite(cov_matrix) else fix_nonpositive_semidefinite(cov_matrix, fix_method)


def is_positive_semidefinite(matrix):
    """
    Check if a matrix is positive semidefinite using Cholesky decomposition.
    """
    try:
        np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
        return True
    except np.linalg.LinAlgError:
        return False


def fix_nonpositive_semidefinite(matrix, fix_method="spectral"):
    """
    Fix a non-positive semidefinite matrix (stub for now).
    """
    # You can implement specific fixes based on the method (e.g., spectral, nearest PSD matrix, etc.)
    return matrix

def returns_from_prices(prices, log_returns=False):
    if log_returns:
        returns = np.log(1 + prices.pct_change(fill_method=None)).dropna(how="all")
    else:
        returns = prices.pct_change(fill_method=None).dropna(how="all")
    return returns

def cov_to_corr(cov_matrix):
    if not isinstance(cov_matrix, pd.DataFrame):
        warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
        cov_matrix = pd.DataFrame(cov_matrix)

    Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix)))
    corr = np.dot(Dinv, np.dot(cov_matrix, Dinv))
    return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)

In [128]:
from scipy import optimize as opt  

# Volatility minimization with return objective
def port_minvol_ro(expected_return, covariance_matrix, ro):
    """
    Minimize portfolio volatility for a target return.
    """
    def objective(W, R, C, ro):
        # Portfolio variance
        varp = np.dot(np.dot(W.T, C), W)
        return varp**0.5  # Minimize volatility
    
    n = len(covariance_matrix)
    W = np.ones([n]) / n  # Initial weights: equally distributed
    bounds = [(0., 1.) for _ in range(n)]  # No short selling
    constraints = [
        {'type': 'eq', 'fun': lambda W: np.sum(W) - 1.},  # Weights sum to 1
        {'type': 'eq', 'fun': lambda W: np.dot(W.T, expected_return) - ro}  # Target return
    ]
    
    optimized = opt.minimize(
        objective, W, (expected_return, covariance_matrix, ro),
        method='SLSQP', constraints=constraints, bounds=bounds,
        options={'maxiter': 100, 'ftol': 1e-08}
    )
    return optimized.x

# Volatility minimization
def port_minvol(expected_return, covariance_matrix):
    """
    Minimize portfolio volatility.
    """
    def objective(W, R, C):
        # Portfolio variance
        varp = np.dot(np.dot(W.T, C), W)
        return varp**0.5  # Minimize volatility
    
    n = len(covariance_matrix)
    W = np.ones([n]) / n  # Initial weights: equally distributed
    bounds = [(0., 1.) for _ in range(n)]  # No short selling
    constraints = [
        {'type': 'eq', 'fun': lambda W: np.sum(W) - 1.}  # Weights sum to 1
    ]
    
    optimized = opt.minimize(
        objective, W, (expected_return, covariance_matrix),
        method='SLSQP', constraints=constraints, bounds=bounds,
        options={'maxiter': 100, 'ftol': 1e-08}
    )
    return optimized.x

# Return maximization
def port_maxret(expected_return, covariance_matrix):
    """
    Maximize portfolio return.
    """
    def objective(W, R, C):
        # Portfolio return
        meanp = np.dot(W.T, R)
        return -meanp  # Minimize negative return (maximize return)
    
    n = len(covariance_matrix)
    W = np.ones([n]) / n  # Initial weights: equally distributed
    bounds = [(0., 1.) for _ in range(n)]  # No short selling
    constraints = [
        {'type': 'eq', 'fun': lambda W: np.sum(W) - 1.}  # Weights sum to 1
    ]
    
    optimized = opt.minimize(
        objective, W, (expected_return, covariance_matrix),
        method='SLSQP', constraints=constraints, bounds=bounds,
        options={'maxiter': 100, 'ftol': 1e-08}
    )
    return optimized.x


In [170]:
#### MY DEV.PY FILE

from pybacktestchain.data_module import FirstTwoMoments
#from pybacktestchain.broker import StopLoss
from pybacktestchain.blockchain import load_blockchain
from datetime import datetime

# Set verbosity for logging
verbose = False  # Set to True to enable logging, or False to suppress it

# Get initial inputs from the user
start_date, end_date = get_date_inputs()  # Get start and end date from the user input
stop_loss_threshold = get_stop_loss_threshold()  # Get the stop-loss threshold
rebalancing_strategy = get_rebalancing_strategy()  # Get the rebalancing strategy
initial_cash = get_initial_cash_input()  # Get the initial cash amount
tickers = get_stock_inputs()  # Get the list of stock tickers

rebalancing_strategy_instance = rebalancing_strategy()

# Now we need to use the Backtest class, correctly passing the parameters.
backtest = Backtest2(
    initial_date=start_date,               
    final_date=end_date,   
    threshold=stop_loss_threshold,  # Le seuil de stop-loss à partir de l'input utilisateur
    information_class=FirstTwoMoments,         # La classe pour calculer les informations du portefeuille
    risk_model=StopLoss_new,                    # Le modèle de risque, ici StopLoss
    name_blockchain='backtest',             # Le nom de la blockchain pour stocker les résultats
    initial_cash=initial_cash,  # La somme d'argent initiale
    universe=tickers,  # Les tickers fournis par l'utilisateur
    rebalance_flag=rebalancing_strategy_instance,  # La stratégie de rééquilibrage choisie par l'utilisateur
    verbose=verbose                         # Verbosité des logs
)

# Run the backtest
backtest.run_backtest()

# Load the blockchain to check the results
block_chain = load_blockchain('backtest')
print(str(block_chain))
# Check if the blockchain is valid
print(block_chain.is_valid())


INFO:root:Running backtest from 2020-01-01 00:00:00 to 2025-01-01 00:00:00.
INFO:root:Retrieving price data for universe: ['AMZN', 'TSLA', 'NFLX', 'MSFT']
  base_cov = np.cov(mat.T, ddof=ddof)
  c *= np.true_divide(1, fact)
  c *= np.true_divide(1, fact)
INFO:root:-----------------------------------
INFO:root:Rebalancing portfolio at 2020-01-31 00:00:00
  self.transaction_log = pd.concat([self.transaction_log, transaction], ignore_index=True)
INFO:root:-----------------------------------
INFO:root:Rebalancing portfolio at 2020-02-28 00:00:00
INFO:root:Stop loss triggered for AMZN at 2020-03-13 00:00:00. Selling all shares.
INFO:root:Stop loss triggered for TSLA at 2020-03-13 00:00:00. Selling all shares.
INFO:root:-----------------------------------
INFO:root:Rebalancing portfolio at 2020-03-31 00:00:00
INFO:root:-----------------------------------
INFO:root:Rebalancing portfolio at 2020-04-30 00:00:00
INFO:root:Buying as many shares of TSLA as possible with available cash.
INFO:root:S

--------------------------------------------------------------------------------
Block 0
--------------------------------------------------------------------------------
Backtest: Genesis Block
Timestamp: 1736784501.3920927
Hash: 134b0a1a3887cc6663168f9c6d5ca5c1f983c7df8cf6962278689224fc6f3b5a
Previous Hash: 0
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Block 1
--------------------------------------------------------------------------------
Backtest: BlackLionBlacksmith
Timestamp: 1736784509.1096544
Hash: 1ea9de68dbd3ad683ccd2c89b95d74598771a6d563d2352cc069c0fd3907bd49
Previous Hash: 134b0a1a3887cc6663168f9c6d5ca5c1f983c7df8cf6962278689224fc6f3b5a
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Block 2
--------------------------------------------------------