In [1]:
import os
import time
import datetime

def get_latest_downloaded_files(directory, num_files=10):
    """
    Lists the N most recent files in a directory, sorted by modification time.

    Args:
        directory (str): The path to the directory to search.
        num_files (int): The number of files to list (default: 10).

    Returns:
        list: A list of tuples, where each tuple contains:
              (filename, file_size_bytes, last_modified_time)
              Returns an empty list if the directory doesn't exist or is empty.
    """

    if not os.path.exists(directory):
        print(f"Error: Directory '{directory}' not found.")
        return []

    try:
        files = [(f, os.path.getsize(os.path.join(directory, f)), os.path.getmtime(os.path.join(directory, f)))
                 for f in os.listdir(directory)
                 if os.path.isfile(os.path.join(directory, f))]  # Check if it's a file

        # Sort files by modification time (most recent first)
        files.sort(key=lambda x: x[2], reverse=True)

        return files[:num_files]  # Return the top N files
    except OSError as e:
        print(f"Error accessing directory: {e}")
        return []
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return []


def main():
    # Number of files to retrieve
    num_files = 5
    
    # Get the user's Downloads directory
    downloads_dir = os.path.expanduser("~\\Downloads")  # Windows-specific

    recent_files = get_latest_downloaded_files(downloads_dir, num_files=num_files)

    if recent_files:
        print(f"{num_files} Most Recent Files in Downloads:")
        for filename, size, last_modified_time in recent_files:
            # Format file size for readability
            size_kb = size / 1024
            size_mb = size_kb / 1024
            if size_mb > 1:
                file_size = f"{size_mb:.2f} MB"
            elif size_kb > 1:
                file_size = f"{size_kb:.2f} KB"
            else:
                file_size = f"{size} bytes"
            
            # Format last modified time
            formatted_time = datetime.datetime.fromtimestamp(last_modified_time).strftime('%Y-%m-%d %H:%M:%S')

            print(f"  - Name: {filename}")
            print(f"    Size: {file_size}")
            print(f"    Last Modified: {formatted_time}")
    else:
        print("No files found in the Downloads directory.")

if __name__ == "__main__":
    main()

5 Most Recent Files in Downloads:
  - Name: CursorUserSetup-x64-0.46.8.exe
    Size: 106.36 MB
    Last Modified: 2025-03-02 08:23:57
  - Name: _df.pkl
    Size: 34.09 MB
    Last Modified: 2025-02-28 16:58:12
  - Name: OHLCV.pkl
    Size: 46.35 MB
    Last Modified: 2025-02-28 16:48:26
  - Name: download_stocks_ETFs_OHLCV_v3.ipynb
    Size: 361.86 KB
    Last Modified: 2025-02-28 16:45:16
  - Name: adj_close_prices_data.csv
    Size: 4.59 KB
    Last Modified: 2025-02-28 11:54:22


In [2]:
# Retrieve pickled dataframe with symbols' OHLCV
filename = "OHLCV.pkl"

In [3]:
risk_free_rate = 0.04

In [4]:
import os

def get_download_path(filename):
  """
  Constructs the full path to a file in the Windows Downloads directory.

  Args:
    filename: The name of the file.

  Returns:
    A string representing the absolute path to the file, or None if the
    Downloads directory cannot be found.
  """
  if os.name == 'nt':  # Check if running on Windows
    try:
      # Method 1: Using the 'USERPROFILE' environment variable
      downloads_path = os.path.join(os.environ['USERPROFILE'], 'Downloads')
      full_path = os.path.join(downloads_path, filename)
      return full_path
    except KeyError:
      # Method 2: If 'USERPROFILE' isn't set, try 'HOMEPATH'
      try:
          downloads_path = os.path.join(os.environ['HOMEDRIVE'], os.environ['HOMEPATH'], 'Downloads')
          full_path = os.path.join(downloads_path, filename)
          return full_path
      except KeyError:
        print("Error: Unable to find the Downloads directory using environment variables.")
        return None
  else:
    print("This function is designed for Windows systems.")
    return None



In [5]:
# Example usage:
full_path = get_download_path(filename)

if full_path:
  print(f"The full path to '{filename}' is: {full_path}")
else:
  print(f"Could not determine the full path to '{filename}'.")

The full path to 'OHLCV.pkl' is: C:\Users\ping\Downloads\OHLCV.pkl


In [6]:
import pandas as pd

# Load the DataFrame from the pickle file
df = pd.read_pickle(full_path)

# Display the first few rows of the DataFrame to verify
df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 630966 entries, ('AAPL', Timestamp('2025-02-28 00:00:00')) to ('IBTE', Timestamp('2024-03-01 00:00:00'))
Data columns (total 9 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   Open       630966 non-null  float64
 1   High       630966 non-null  float64
 2   Low        630966 non-null  float64
 3   Close      630966 non-null  float64
 4   Adj Close  630966 non-null  float64
 5   Volume     630764 non-null  Int64  
 6   Adj Open   630966 non-null  float64
 7   Adj High   630966 non-null  float64
 8   Adj Low    630966 non-null  float64
dtypes: Int64(1), float64(8)
memory usage: 46.4+ MB


In [7]:
import numpy as np
import pandas as pd
# import pyfolio as pf
import empyrical  # Import the empyrical package
import warnings

warnings.filterwarnings("ignore", message="Module \"zipline.assets\" not found.*")

def calculate_performance_metrics(returns, risk_free_rate=0.0):
    """
    Calculates Sortino Ratio, Sharpe Ratio, and Omega Ratio using PyFolio/Empyrical.

    Args:
        returns (pd.Series or np.array):  Daily returns of the investment.
                                         Must be a Pandas Series with a DatetimeIndex.
        risk_free_rate (float):  The risk-free rate (annualized). Default is 0.0.

    Returns:
        dict: A dictionary containing the calculated ratios.
              Returns None if there is an error or the input is invalid.
    """

    try:
        # Ensure returns is a pandas Series with a DatetimeIndex.  Crucial for pyfolio.
        if not isinstance(returns, pd.Series):
            returns = pd.Series(returns)  # Convert to Series if needed
        if not isinstance(returns.index, pd.DatetimeIndex):
            raise ValueError("Returns must be a Pandas Series with a DatetimeIndex.")

        # Convert annualized risk-free rate to daily rate
        days_per_year = 252  # Standard for financial calculations
        daily_risk_free_rate = risk_free_rate / days_per_year

        # Calculate the Sharpe Ratio using empyrical (as pyfolio's is deprecated)
        sharpe_ratio = empyrical.sharpe_ratio(returns, risk_free=daily_risk_free_rate, annualization=days_per_year)

        # Calculate the Sortino Ratio using empyrical
        sortino_ratio = empyrical.sortino_ratio(returns, required_return=daily_risk_free_rate, annualization=days_per_year)

        # Calculate the Omega Ratio using empyrical
        omega_ratio = empyrical.omega_ratio(returns, risk_free=daily_risk_free_rate, annualization=days_per_year)


        return {
            "Sharpe Ratio": sharpe_ratio,
            "Sortino Ratio": sortino_ratio,
            "Omega Ratio": omega_ratio
        }

    except Exception as e:
        print(f"An error occurred: {e}")
        return None


def calculate_returns(adj_close_prices):
    """
    Calculates daily returns from adjusted close prices.

    Args:
        adj_close_prices (pd.Series): Pandas Series of adjusted close prices with DatetimeIndex.

    Returns:
        pd.Series: Pandas Series of daily returns with DatetimeIndex, sorted by date (oldest to newest).
    """
    try:
        if not isinstance(adj_close_prices, pd.Series):
            raise TypeError("Input must be a Pandas Series.")
        if not isinstance(adj_close_prices.index, pd.DatetimeIndex):
            raise ValueError("Input Series must have a DatetimeIndex.")

        # Sort the index to ensure correct return calculation (oldest to newest)
        adj_close_prices = adj_close_prices.sort_index()

        # Calculate daily returns using pct_change()
        returns = adj_close_prices.pct_change().dropna()  # Drop the first NaN value

        return returns

    except Exception as e:
        print(f"Error calculating returns: {e}")
        return None


In [8]:
def analyze_stock(df, ticker, risk_free_rate=0.0, output_debug_data=False):
    """
    Analyzes a single stock's performance based on its adjusted close prices.

    Args:
        df (pd.DataFrame): Pandas DataFrame containing stock data, including 'Adj Close' column.
        ticker (str): The stock ticker symbol (e.g., 'NVDA').
        risk_free_rate (float): The annualized risk-free rate. Default is 0.0.
        output_debug_data (bool): If True, print Adj Close prices and returns (default: False).

    Returns:
        pd.DataFrame: A DataFrame with the ticker as index and 'Sharpe Ratio', 'Sortino Ratio', and 'Omega Ratio' as columns.
                       Returns None if there is an error.  Crucially changed to return None, not an empty dataframe.
    """
    try:
        # Extract Adj Close prices for the specified ticker
        adj_close_prices = df.loc[ticker]['Adj Close']

        # Check if adj_close_prices is a Series
        if not isinstance(adj_close_prices, pd.Series):
             raise TypeError(f"Expected a Pandas Series for Adj Close prices of {ticker}. Check that {ticker} exists in the DataFrame, and that 'Adj Close' is a valid column")

        # Calculate returns
        returns_series = calculate_returns(adj_close_prices)

        if returns_series is not None:
            # Output debug data if requested
            if output_debug_data:
                print(f"--- Debug Data for {ticker} ---")
                print("\nAdj Close Prices (Dates and Values):")
                print(adj_close_prices)  #This is a Series, prints the index(dates) and values.
                print("\nReturns:")
                print(returns_series)

            # Calculate performance metrics
            performance_metrics = calculate_performance_metrics(returns_series, risk_free_rate=risk_free_rate)

            if performance_metrics:
                # Create a DataFrame from the metrics
                metrics_df = pd.DataFrame(performance_metrics, index=[ticker])
                return metrics_df
            else:
                print(f"Could not calculate performance metrics for {ticker}.")
                return None  # Return None, not an empty DataFrame
        else:
            print(f"Failed to calculate returns for {ticker}.")
            return None # Return None, not an empty DataFrame

    except KeyError:
        print(f"Ticker '{ticker}' not found in DataFrame.")
        return None # Return None, not an empty DataFrame
    except Exception as e:
        print(f"An error occurred during analysis: {e}")
        return None # Return None, not an empty DataFrame


In [9]:
analyze_stock(df, 'NVDA', risk_free_rate, output_debug_data=False)

Unnamed: 0,Sharpe Ratio,Sortino Ratio,Omega Ratio
NVDA,0.969964,1.38213,1.175341


In [10]:
# Analyze a single stock
ticker = 'NVDA'
# risk_free_rate = 0.01  # Example risk-free rate

result_df = analyze_stock(df, ticker, risk_free_rate=risk_free_rate)
if result_df is not None:  # Correctly check for None
    print(f"\nPerformance metrics DataFrame for {ticker}:")
    print(result_df)
else:
    print(f"\nCould not calculate performance metrics for {ticker}.")





Performance metrics DataFrame for NVDA:
      Sharpe Ratio  Sortino Ratio  Omega Ratio
NVDA      0.969964        1.38213     1.175341


In [11]:
# Retrieve tickers from the file into a list
tickers = []
with open('tickers.txt', 'r') as f:
    for line in f:
        tickers.append(line.strip())

print(tickers)

['AAPL', 'ABBV', 'ABNB', 'ABT', 'ACN', 'ADBE', 'ADI', 'ADP', 'ADSK', 'AJG', 'AMAT', 'AMD', 'AMGN', 'AMT', 'AMZN', 'ANET', 'AON', 'APD', 'APH', 'APO', 'APP', 'ARM', 'ASML', 'AVGO', 'AXP', 'AZN', 'AZO', 'BABA', 'BAC', 'BAM', 'BCS', 'BDX', 'BK', 'BKNG', 'BLK', 'BMO', 'BNS', 'BP', 'BSX', 'BTI', 'BUD', 'BX', 'C', 'CDNS', 'CEG', 'CHTR', 'CI', 'CL', 'CM', 'CME', 'CMG', 'CNI', 'COF', 'COP', 'COST', 'CP', 'CRH', 'CRM', 'CRWD', 'CSX', 'CTAS', 'CVS', 'CVX', 'DASH', 'DELL', 'DHR', 'DIS', 'DUK', 'ECL', 'ELV', 'EMR', 'ENB', 'EPD', 'EQIX', 'EQNR', 'ET', 'ETN', 'FDX', 'FI', 'FTNT', 'GD', 'GE', 'GEV', 'GILD', 'GOOG', 'GOOGL', 'GS', 'GSK', 'HCA', 'HDB', 'HLT', 'HON', 'IBKR', 'IBM', 'IBN', 'ICE', 'INTU', 'ISRG', 'JD', 'JNJ', 'JPM', 'KKR', 'KLAC', 'KMI', 'KO', 'LIN', 'LLY', 'LMT', 'LOW', 'LRCX', 'MA', 'MAR', 'MCD', 'MCK', 'MCO', 'MDT', 'MELI', 'MET', 'META', 'MFG', 'MMC', 'MMM', 'MO', 'MRK', 'MS', 'MSFT', 'MSI', 'MU', 'MUFG', 'NEE', 'NFLX', 'NOC', 'NOW', 'NTES', 'NVDA', 'NVO', 'NVS', 'OKE', 'ORCL', 'ORLY'

In [12]:
# Analyze all stocks in the DataFrame and concatenate the results
all_results = []
# for ticker in df.columns.levels[0]: #get unique tickers
for ticker in tickers: 
    result_df = analyze_stock(df, ticker, risk_free_rate=risk_free_rate)
    if result_df is not None: # Correctly check for None
        all_results.append(result_df)

if all_results:
    combined_df = pd.concat(all_results)
    print("\nCombined performance metrics DataFrame:")
    print(combined_df)
else:
    print("No performance metrics were calculated.")


Combined performance metrics DataFrame:
      Sharpe Ratio  Sortino Ratio  Omega Ratio
AAPL      1.220949       1.826844     1.236133
ABBV      0.744732       0.995488     1.153213
ABNB     -0.328764      -0.448952     0.939889
ABT       0.791461       1.222935     1.145934
ACN      -0.337535      -0.460173     0.938945
...            ...            ...          ...
WFC       1.341553       2.204764     1.287815
WM        0.630657       0.866401     1.132698
WMB       2.221235       3.259412     1.466137
WMT       2.604496       4.336011     1.606503
ZTS      -0.537786      -0.722450     0.910585

[200 rows x 3 columns]


In [13]:
fn_performance_ratios = 'performance_ratios.pkl'
combined_df.to_pickle(fn_performance_ratios)
print(f"DataFrame pickled successfully to {fn_performance_ratios}")

DataFrame pickled successfully to performance_ratios.pkl


In [14]:
_df = pd.read_pickle(fn_performance_ratios)

In [15]:
_df.loc['NVDA']

Sharpe Ratio     0.969964
Sortino Ratio    1.382130
Omega Ratio      1.175341
Name: NVDA, dtype: float64

Sanity Check on calculation for: Sharpe Ratio, Sortino Ratio, Omega Ratio

In [16]:
import numpy as np
import pandas as pd

def calculate_ratios(daily_close, risk_free_rate=0.02, debug=False):
    """
    Calculate risk-adjusted performance metrics from daily price data.
    
    Parameters:
        daily_close (pd.Series): Series of daily closing prices with DatetimeIndex
        risk_free_rate (float): Annualized risk-free rate (default: 0.02)
        debug (bool): Whether to print intermediate calculations (default: False)
        
    Returns:
        dict: Dictionary containing Sharpe, Sortino, and Omega ratios
    """
    # Validate and sort input
    if not isinstance(daily_close, pd.Series):
        raise TypeError("Input must be a pandas Series")
        
    # Sort by date (oldest first) to ensure proper return calculations
    sorted_series = daily_close.sort_index(ascending=True)
    
    # Calculate daily returns from closing prices
    returns = sorted_series.pct_change().dropna()
    if returns.empty:
        raise ValueError("Insufficient data to calculate returns")
    
    # Common parameters
    days_per_year = 252
    daily_risk_free = risk_free_rate / days_per_year
    
    # Initialize results storage
    results = {}
    debug_info = {
        'sorted_dates': sorted_series.index,
        'returns': returns
    }

    # Calculate Sharpe Ratio
    avg_daily_return = returns.mean()
    daily_std = returns.std()
    sharpe_excess = avg_daily_return - daily_risk_free
    results['Sharpe Ratio'] = (sharpe_excess / daily_std) * np.sqrt(days_per_year)
    
    # Store debug information
    debug_info.update({
        'avg_daily_return': avg_daily_return,
        'daily_std': daily_std,
        'sharpe_excess': sharpe_excess
    })

    # Calculate Sortino Ratio
    excess_returns = returns - daily_risk_free
    downside_returns = excess_returns[excess_returns < 0]
    downside_std = np.sqrt(np.sum(downside_returns**2) / len(excess_returns))
    results['Sortino Ratio'] = (excess_returns.mean() / downside_std) * np.sqrt(days_per_year)
    
    # Store debug information
    debug_info.update({
        'excess_returns': excess_returns,
        'downside_returns': downside_returns,
        'downside_std': downside_std
    })

    # Calculate Omega Ratio
    threshold = daily_risk_free
    returns_above = returns[returns > threshold]
    returns_below = returns[returns <= threshold]
    
    sum_above = (returns_above - threshold).sum()
    sum_below = (threshold - returns_below).sum()
    
    try:
        results['Omega Ratio'] = sum_above / sum_below
    except ZeroDivisionError:
        results['Omega Ratio'] = np.nan
    
    # Store debug information
    debug_info.update({
        'threshold': threshold,
        'sum_above': sum_above,
        'sum_below': sum_below
    })

    if debug:
        print("\n=== Debug Information ===")
        print(f"First 5 sorted dates: {debug_info['sorted_dates'][:5].strftime('%Y-%m-%d').tolist()}")
        print(f"Last 5 sorted dates: {debug_info['sorted_dates'][-5:].strftime('%Y-%m-%d').tolist()}")
        print(f"Number of trading days: {len(returns)}")
        print(f"\nSharpe Ratio Components:")
        print(f"Average Daily Return: {debug_info['avg_daily_return']:.6f}")
        print(f"Daily Std Dev: {debug_info['daily_std']:.6f}")
        print(f"Daily Risk-Free Rate: {daily_risk_free:.6f}")
        print(f"Sharpe Excess Return: {debug_info['sharpe_excess']:.6f}")
        
        print(f"\nSortino Ratio Components:")
        print(f"Excess Returns Mean: {excess_returns.mean():.6f}")
        print(f"Downside Returns Count: {len(debug_info['downside_returns'])}")
        print(f"Downside Std Dev: {debug_info['downside_std']:.6f}")
        
        print(f"\nOmega Ratio Components:")
        print(f"Threshold: {debug_info['threshold']:.6f}")
        print(f"Returns Above Threshold: {len(returns_above)}")
        print(f"Returns Below Threshold: {len(returns_below)}")
        print(f"Sum Above Threshold: {debug_info['sum_above']:.6f}")
        print(f"Sum Below Threshold: {debug_info['sum_below']:.6f}")
        print("\n" + "="*40 + "\n")
        
    return results

# Check calculation on NVDA ratios
adj_close_series = df.loc['NVDA']['Adj Close']
adj_close_series

ratios = calculate_ratios(adj_close_series, risk_free_rate, debug=False)
print(f'Santity check for NVDA ratios:\n{ratios}\n')


Santity check for NVDA ratios:
{'Sharpe Ratio': 0.9699636443948492, 'Sortino Ratio': 1.3821304890461559, 'Omega Ratio': 1.175340985431688}



In [17]:
# import pandas as pd
# import os

# def save_returns_to_csv(returns, filename="returns.csv"):
#     """
#     Saves a Pandas Series of returns to a CSV file in the user's Downloads directory.

#     Args:
#         returns (pd.Series): The Pandas Series containing the returns data.  The index is assumed to be dates.
#         filename (str, optional): The name of the CSV file to create. Defaults to "returns.csv".
#     """

#     # Get the user's Downloads directory
#     downloads_path = os.path.join(os.path.expanduser("~"), "Downloads")

#     # Construct the full file path
#     file_path = os.path.join(downloads_path, filename)

#     try:
#         # Write the Series to a CSV file
#         returns.to_csv(file_path, header=True)  # Include header for column name

#         print(f"Returns data saved to: {file_path}")

#     except Exception as e:
#         print(f"Error saving returns to CSV: {e}")


# # Example Usage (replace with your actual returns Series)
# # Create sample data
# import numpy as np
# dates = pd.to_datetime(pd.date_range('2023-01-01', periods=252))
# returns = pd.Series(np.random.normal(0.0005, 0.01, 252), index=dates, name="Daily Return") # important to give series a name!

# # Save the data
# save_returns_to_csv(returns, filename="my_returns_data.csv")