In [None]:
# See https://pypi.org/project/pyportfolioopt/#a-quick-example
import pandas as pd
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
import yfinance as yf
import numpy as np
# Read in price data
df = pd.read_csv("../data/pyportfolioopt/stock_prices.csv", parse_dates=True, index_col="date")

tickers = ["MSFT", "AMZN", "KO", "MA", "COST", 
           "LUV", "XOM", "PFE", "JPM", "UNH", 
           "ACN", "DIS", "GILD", "F", "TSLA"] 
df = yf.download(tickers, period="max",auto_adjust=False)['Adj Close'].loc["1990":]

df = df.dropna(axis=0, how='any')

df

In [None]:
# Calculate expected returns and sample covariance
mu = expected_returns.mean_historical_return(df)
S = risk_models.sample_cov(df)

# Remove any assets with infinite or NaN returns
valid_assets = ~(np.isinf(mu) | np.isnan(mu))
mu = mu[valid_assets]
S = S.loc[valid_assets, valid_assets]

# Check if we have valid data
if len(mu) > 0 and not np.any(np.isnan(S)):
    # Optimize for maximal Sharpe ratio
    ef = EfficientFrontier(mu, S)
    raw_weights = ef.max_sharpe()
    cleaned_weights = ef.clean_weights()
    # ef.save_weights_to_file("weights.csv")  # saves to file
    print(cleaned_weights)
    ef.portfolio_performance(verbose=True)
else:
    print("Not enough valid data for optimization")

# Convert OrderedDict to DataFrame
weights_df = pd.DataFrame.from_dict(cleaned_weights, orient='index', columns=['Weight'])
weights_df.index.name = 'Ticker'
weights_df

In [None]:
from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices

latest_prices = get_latest_prices(df)

da = DiscreteAllocation(cleaned_weights, latest_prices, total_portfolio_value=100000)
allocation, leftover = da.greedy_portfolio()
allocation_df = pd.DataFrame.from_dict(allocation, orient='index', columns=['Shares'])
allocation_df.index.name = 'Ticker'
print("Discrete allocation:\n", allocation_df)
print("Funds remaining: ${:.2f}".format(leftover))

In [None]:
# https://github.com/robertmartin8/PyPortfolioOpt/blob/master/cookbook/1-RiskReturnModels.ipynb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pypfopt
from pypfopt import risk_models, expected_returns, plotting
pypfopt.__version__

The following cells preprocess the data to prepare it for risk model evaluation.  The following
steps are taken:
1. Check for stationarity using ADF and KPSS tests
2. If the data is non-stationary, plot the data to see if it has a trend, seasonal component, or cyclic component
3. If the data has a trend, remove the trend
4. If the data has a seasonal component, remove the seasonal component
5. If the data has a cyclic component, remove the cyclic component
6. After applying transformations, check to see if the data is stationary
7. If the transformations were successful, standardize the data
8. Evaluate the risk models

In [None]:
# if the data is non-stationary, plot the data to see
# if any of the following conditions exist:
# 1. the data has a trend
# 2. the data has a seasonal component
# 3. the data has a cyclic component
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.seasonal import seasonal_decompose

# Set up the plot style
#plt.style.use('seaborn')
sns.set_palette("husl")

def analyze_time_series(df, ticker):
    # Create figure with subplots
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
    
    # 1. Original Time Series Plot (shows trend and cycles)
    df[ticker].plot(ax=ax1, title=f'{ticker} Price Over Time')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Price')
    
    # 2. Seasonal Decomposition
    decomposition = seasonal_decompose(df[ticker], period=252)  # 252 trading days in a year
    decomposition.trend.plot(ax=ax2, title=f'{ticker} Trend')
    ax2.set_xlabel('Date')
    ax2.set_ylabel('Trend')
    
    # 3. Returns Distribution (helps identify heteroscedasticity)
    returns = df[ticker].pct_change().dropna()
    returns.plot(ax=ax3, title=f'{ticker} Returns Over Time')
    ax3.set_xlabel('Date')
    ax3.set_ylabel('Returns')
    
    # 4. Rolling Volatility (another view of heteroscedasticity)
    rolling_std = returns.rolling(window=30).std()
    rolling_std.plot(ax=ax4, title=f'{ticker} 30-Day Rolling Volatility')
    ax4.set_xlabel('Date')
    ax4.set_ylabel('Volatility')
    
    plt.tight_layout()
    plt.show()

# Plot for each ticker or selected tickers
#for ticker in df.columns[:3]:  # First 3 tickers as example
for ticker in ['COST','JPM','MSFT']:  # First 3 tickers as example
    analyze_time_series(df, ticker)

In [None]:
# start with differencing the data and see if that makes the data stationary
df_diff = df.diff().dropna()

results = test_stationarity(df_diff)
print(results.to_string(float_format=lambda x: '%.6f' % x if isinstance(x, float) else x))

In [None]:
# investigate the differenced data for each ticker
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.seasonal import seasonal_decompose

ticker = 'AMZN'

# Analyze original and differenced data for ticker
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

# Original price series
df[ticker].plot(ax=ax1, title=f'{ticker} Original Price Series')
ax1.set_xlabel('Date')
ax1.set_ylabel('Price')

# Differenced series
df_diff[ticker].plot(ax=ax2, title=f'{ticker} Differenced Series')
ax2.set_xlabel('Date')
ax2.set_ylabel('Price Change')

# Returns volatility
returns = df_diff[ticker]
rolling_std = returns.rolling(window=30).std()
rolling_std.plot(ax=ax3, title='30-Day Rolling Volatility')
ax3.set_xlabel('Date')
ax3.set_ylabel('Volatility')

# ACF plot of differenced series
from statsmodels.graphics.tsaplots import plot_acf
plot_acf(df_diff[ticker].dropna(), ax=ax4, title=f'Autocorrelation of {ticker} Differenced Series')

plt.tight_layout()
plt.show()

# Print detailed statistics
print(f"\nADF Test Results for {ticker}:")
from statsmodels.tsa.stattools import adfuller
adf_result = adfuller(df_diff[ticker].dropna())
print(f'ADF Statistic: {adf_result[0]}')
print(f'p-value: {adf_result[1]}')
print('Critical values:')
for key, value in adf_result[4].items():
    print(f'\t{key}: {value}')

print(f"\nKPSS Test Results for {ticker}:")
from statsmodels.tsa.stattools import kpss
kpss_result = kpss(df_diff[ticker].dropna(), regression='c')
print(f'KPSS Statistic: {kpss_result[0]}')
print(f'p-value: {kpss_result[1]}')

In [None]:
# Compare relative changes
amzn_rel_vol = df_diff['AMZN'].std() / df_diff['AMZN'].mean()
cost_rel_vol = df_diff['COST'].std() / df_diff['COST'].mean()
print(f"AMZN relative volatility: {amzn_rel_vol}")
print(f"COST relative volatility: {cost_rel_vol}")

# Look at actual test statistics, not just pass/fail
for ticker in ['AMZN', 'COST']:
    series = df_diff[ticker].dropna()
    adf_stat, adf_pval, _, _, _, _ = adfuller(series)
    kpss_stat, kpss_pval, _, _ = kpss(series, regression='c')
    print(f"\n{ticker}:")
    print(f"ADF p-value: {adf_pval:.6f}")
    print(f"KPSS p-value: {kpss_pval:.6f}")

In [None]:
# try soem different transformations of the data and 
# see if that makes the data stationary
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller, kpss

def test_and_plot_transformation(data, title, ax):
    """Test stationarity and plot the transformed data"""
    # Remove any NaN values
    clean_data = data.dropna()
    
    # Run tests
    adf_stat, adf_pval, _, _, _, _ = adfuller(clean_data)
    kpss_stat, kpss_pval, _, _ = kpss(clean_data, regression='c')
    is_stationary = (adf_pval < 0.05) and (kpss_pval > 0.05)
    
    # Plot
    clean_data.plot(ax=ax)
    ax.set_title(f"{title}\nADF p-value: {adf_pval:.6f}\nKPSS p-value: {kpss_pval:.6f}\nStationary: {is_stationary}")
    ax.set_xlabel('Date')
    
    return is_stationary, adf_pval, kpss_pval

# Set ticker to analyze
ticker = 'COST'

# Create figure
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))

# 1. Simple difference (baseline)
diff = df[ticker].diff()
test_and_plot_transformation(diff, f'{ticker} Simple Difference', ax1)

# 2. Log returns
log_returns = np.log(df[ticker]).diff()
test_and_plot_transformation(log_returns, f'{ticker} Log Returns', ax2)

# 3. Percentage change
pct_change = df[ticker].pct_change()
test_and_plot_transformation(pct_change, f'{ticker} Percentage Change', ax3)

# 4. Standardized difference (difference divided by rolling std)
roll_std = df[ticker].rolling(window=30).std()
std_diff = df[ticker].diff() / roll_std
test_and_plot_transformation(std_diff, f'{ticker} Standardized Difference', ax4)

plt.tight_layout()
plt.show()

# Print detailed results
transformations = {
    'Simple Difference': diff,
    'Log Returns': log_returns,
    'Percentage Change': pct_change,
    'Standardized Difference': std_diff
}

print(f"\nDetailed Results for {ticker}:")
print("-" * 50)
for name, data in transformations.items():
    clean_data = data.dropna()
    adf_stat, adf_pval, _, _, _, _ = adfuller(clean_data)
    kpss_stat, kpss_pval, _, _ = kpss(clean_data, regression='c')
    is_stationary = (adf_pval < 0.05) and (kpss_pval > 0.05)
    
    print(f"\n{name}:")
    print(f"ADF p-value:  {adf_pval:.6f}")
    print(f"KPSS p-value: {kpss_pval:.6f}")
    print(f"Stationary:   {is_stationary}")

In [None]:
# next try taking the log of the data and see if that makes the data stationary
df_log = np.log(df)

results = test_stationarity(df_log)
print(results.to_string(float_format=lambda x: '%.6f' % x if isinstance(x, float) else x))

In [None]:
# next try detrending the data and see if that makes the data stationary    
from scipy import stats

def detrend_data(df):
    """
    Detrend each column in the dataframe using linear regression.
    
    Args:
        df: pandas DataFrame with time series data
        
    Returns:
        pandas DataFrame with detrended data
    """
    # Create time index for regression (0 to n-1)
    time = np.arange(len(df))
    
    # Create DataFrame for detrended data
    df_detrend = pd.DataFrame(index=df.index, columns=df.columns)
    
    # Detrend each column
    for column in df.columns:
        # Fit linear trend
        slope, intercept, _, _, _ = stats.linregress(time, df[column])
        
        # Calculate trend
        trend = slope * time + intercept
        
        # Remove trend
        df_detrend[column] = df[column] - trend
    
    return df_detrend

# Apply detrending
df_detrend = detrend_data(df)

# Test for stationarity
results = test_stationarity(df_detrend)
print(results.to_string(float_format=lambda x: '%.6f' % x if isinstance(x, float) else x))

In [16]:
# Read in price data
past_df, future_df = df.iloc[:-250], df.iloc[-250:]

future_cov = risk_models.sample_cov(future_df)
sample_cov = risk_models.sample_cov(past_df)

#plotting.plot_covariance(sample_cov, plot_correlation=True)
#plotting.plot_covariance(future_cov, plot_correlation=True)
#plt.show()

In [None]:
risk_methods = [
    "sample_cov",
    "semicovariance",
    "exp_cov",
    "ledoit_wolf",
    "ledoit_wolf_constant_variance",
    "ledoit_wolf_single_factor",
    "ledoit_wolf_constant_correlation",
    "oracle_approximating",
]

# Calculate future variance
future_variance = np.diag(future_cov)

# Create empty DataFrame with risk methods as index
results_df = pd.DataFrame(index=risk_methods, columns=['Mean Abs Error'])

for method in risk_methods:
    S = risk_models.risk_matrix(past_df, method=method)
    variance = np.diag(S)
    results_df.loc[method, 'Mean Abs Error'] = np.sum(np.abs(variance - future_variance)) / len(variance)

# Sort by error (optional)
results_df = results_df.sort_values('Mean Abs Error', ascending=True)

# Display results - with 6 decimal places
print(results_df.to_string(float_format=lambda x: '%.6f' % x))

# Plot results (optional)
results_df.plot(kind='barh')
plt.gca().invert_yaxis()  # Invert the y-axis to match DataFrame order
plt.title('Mean Absolute Error in Predicting Future Variance')
plt.show()