In [35]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
from scipy.linalg import svd
from scipy.optimize import minimize
import datetime as dt
import time

In this project done for an undergraduate DRP (Directed Reading Program), we analyze the trading strategy described by the paper "Dynamic Mode Decomposition for Financial Trading Strategies" by Jordan Mann and J. Nathan Kutz (2015). The strategy within the paper is based mathematically in the theory of Koopman operators from nonlinear dynamics: the core intuitive idea is to linearly approximate the next state of a given nonlinear dynamical system at a given point in time. In theory, this is accomplished by considering the composition (or Koopman) operators $K_t(g) = g\circ F_t$, where $F_t$ is our nonlinear dynamical system, and $g$ is a real-valued function. The functions $g$ model our measurements in practice: if we take snapshots of our data at certain points in time with a discrete time-step $\Delta t$ and $x_k$ are our real sample points, we have $K_{\Delta t}(g(x_k)) = g(F_{\Delta t}(x_k)) = g(x_{k+1})$. In other words, this gives us a linear approximation of the next state in our dynamical system; in practice, we take $g$ to be the Identity, and simply try to approximate the next sample points from given data in a linear fashion.

The mathematical theory is applied in practice as the following problem: given a set of time series data, how can we predict future values? The idea of the paper by Mann and Kutz is to use the Koopman operator formalism and apply it to time series. That is, we now sample pairs of past data ${(x(t_k), x(t_k'))}_{k=1}^m$ where $t_k' = t_k + \Delta t$ for some sufficiently small timestep $\Delta t$. For example, we could consider the stock prices of home construction companies in the past 100 days, and consider data from times -80 to -20, and the incremented data from times -79 to -19. The pairs are arranged into two matrices, $X$ (past) and $X'$ (future), and one then uses best-fit linear regression to find a matrix $A$ that most closely approximates $X' \approx AX$. The predictive value for the next time-step is then given by $Ax_{k} \approx x_{k+1}$. In code, this is implemented by repeatedly applying linear regression on a sliding window of past data to predict future values, and then updating the matrix $A$ with each new set of future training data. Mathematically, the matrix $A$ is meant to model our Koopman operator discussed previously, and $A$ is precisely defined as $A = \text{argmin}_A||X'-AX||_F = X'X^\dagger$, where $||\cdot||_F$ denotes the Frobenius norm, and $X^\dagger$ represents the pseudo-inverse of the data matrix $X$.

This usage of the Koopman operator theory to approximate the next time series value using past data is called the dynamic mode decomposition (DMD). To make the algorithm more efficient in computing the matrix $A$ and finding future values, one uses dimensionality reduction (e.g., PCA) to compute the dominant eigenvalues and eigenvectors of $A$ without computing it explicitly. In particular, the pseudo-inverse $X^\dagger$ is computed via the singular value decomposition of $X$. Since $X^\dagger$ typically has far fewer columns than rows, i.e. $m << n$, there are at most $m$ non-zero singular values and corresponding singular vectors, so the matrix $A$ will have at most rank m.
Instead of computing $A$ directly, one then computes the projection of $A$ onto these leading singular vectors, resulting in a small matrix $\tilde{A}$ of size at most $m \times m$.

Thus the full DMD algorithm is as follows: 

Step 1: compute the singular value decomposition of the data matrix $X = U\Sigma V^*$
\
Step 2: compute the pseudo-inverse $X^\dagger = V\Sigma^{-1}U^*$ to compute $A = X'X^\dagger$. Project $A$ to the $r$ leading eigenspaces, where $r$ is a hyperparameter
\
Step 3: apply $A$ to the $k$-th sample $x_k$ to obtain a prediction $Ax_k = \hat{x}_{k+1}$ for $x_{k+1}$. Repeat with the predictive value used as a feature in a new data matrix $X$

In [37]:
home_construction_portfolio = ['DHI', 'LEN', 'PHM', 'TOL', 'NVR', 'HD', 'LOW', 'SHW', 'SPY']
start_date = dt.datetime.today()-dt.timedelta(days = 100)
end_date = dt.datetime.today()

home_construction_data = yf.download(home_construction_portfolio, start = start_date, end = end_date)

daily_returns = np.log(home_construction_data['Close']/home_construction_data['Close'].shift(1))
daily_returns = daily_returns.dropna()

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  9 of 9 completed


In [39]:
print(daily_returns)

Ticker           DHI        HD       LEN       LOW       NVR       PHM  \
Date                                                                     
2025-05-28 -0.034248 -0.006337 -0.026626 -0.005729 -0.029632 -0.028850   
2025-05-29  0.007468  0.000625  0.007761  0.001824  0.013835  0.008459   
2025-05-30 -0.001777  0.000000  0.000094  0.003550  0.002574 -0.005189   
2025-06-02 -0.009960 -0.000896 -0.010043 -0.000931 -0.010244 -0.004908   
2025-06-03  0.012413  0.013819  0.015402  0.014439  0.003618  0.011416   
...              ...       ...       ...       ...       ...       ...   
2025-08-25 -0.010128 -0.009468 -0.007023 -0.018600 -0.010539 -0.000378   
2025-08-26 -0.003498 -0.003332 -0.011415 -0.002088 -0.007220 -0.008732   
2025-08-27 -0.005181  0.001716  0.000450  0.001315 -0.004276 -0.000763   
2025-08-28  0.008383 -0.001937 -0.001501 -0.004378  0.005088  0.003352   
2025-08-29  0.003369 -0.001670  0.000075  0.002017  0.005293  0.004175   

Ticker           SHW       SPY       

In [41]:
def predict_price(M1, M2, l):
    ''' 
    Uses Koopman related DMD algorithm to predict future prices.
    l is the number of days in the future to predict
    This uses the pseudo-inverse to compute A
    ''' 
    U, s, Vt = np.linalg.svd(M1, full_matrices=False)
    Sigma_inv = np.diag(1.0/s)
    A_tilde = U.T @ M2 @ Vt.T @ Sigma_inv
    evals, evecs = np.linalg.eig(A_tilde)
    Phi = U @ evecs
    x0 = M1[:, 0].reshape((-1,1))
    b = np.linalg.pinv(Phi) @ x0

    n, r = Phi.shape
    preds = np.zeros((n, l))
    for t in range(1, l+1):
        D = np.diag(np.exp(np.log(evals) * t))
        x_t = Phi @ D @ b
        preds[:, t-1] = x_t.ravel().real
    
    return preds

In [43]:
def directional_accuracy(prediction, actual):
    '''
    Returns the accuracy of the predictions. The model is accurate if
    the predicted direction of the price change matches the actual direction.
    '''
    if prediction.shape != actual.shape:
        raise ValueError("Prediction and actual arrays must have the same shape.")

    diff_pred = prediction[:, 1:] - prediction[:, :-1]  
    diff_act = actual[:, 1:] - actual[:, :-1]     
    
    correct = np.sum(diff_pred * diff_act > 0)
    return correct/diff_pred.size
    

In [45]:
def find_hotspot(data_arr):
    ''' 
    Finds potential hotspots based on the directional accuracy of price predictions. 
    Uses past 100 days of data to run through a range of parameters (m, l) to find
    combinations that yield an accuracy greater than 0.53.

    Returns a list of tuples (m, l, accuracy) for all combinations tested.
    '''
    accuracy_scores = []
    potential_hotspots = []

    for m in range(4, 15):
        for l in range(2, 10):
            scores = []
            for t in range(m, len(data_arr)-l): 
                M = data_arr[:, t-m:t]
                M1 = M[:, :-1]
                M2 = M[:, 1:]
                preds = predict_price(M1, M2, l)[: -1, :] #drop the mean column
                actual = data_arr[:, t:t+l][: -1, :]
                scores.append(directional_accuracy(preds, actual))
            accuracy = np.sum(scores) / len(scores)
            accuracy_scores.append((m, l, accuracy))
            if accuracy > 0.53: #arbitrary threshold
                potential_hotspots.append((m, l, accuracy))
    return accuracy_scores, potential_hotspots

We compare 3 strategies based on their P&L:

Strategy 1: split up 10 years of data into 100 day increments. Then find a hotspot using the first 100 days, invest l future days; recalibrate the hotspot after l days, and repeat on 10 years of data.
\
Strategy 2: Find a hotspot over 2 years of data, then use that hotspot for the next 6 months or something. Then recalibrate.
\
Strategy 3: Find a hotspot over 3 years, and use the same hotspot for the next 7 years.
Compare all these to the benchmark S&P 500.

In [48]:
def strategy_1(portfolio):
    '''
    Strategy 1: split up 10 years of data into 100 day increments.
    Then find a hotspot using the first 100 days, invest l future days; 
    recalibrate the hotspot after l days, and repeat on 10 years of data.
    '''
    hist_data_10_yrs = get_close_prices(portfolio)
    n = hist_data_10_yrs.shape[1]
    hist_data_10_yrs = hist_data_10_yrs[:, n//2:] # Use only the most recent 10 years of data

    delta = 0
    while delta < hist_data_10_yrs.shape[1]-100: #can adjust this 100 to change the number of days used to find hotspots
        data_arr = hist_data_10_yrs[:, delta:delta+100]
        accuracy_scores, potential_hotspots = find_hotspot(data_arr)
        if potential_hotspots:
            sorted_by_score = sorted(potential_hotspots, key=lambda tpl: tpl[2], reverse=True)
            #here we can build and call a function that checks if the average of its neighbors are also > 51% 
            #for now will just take the highest accuracy score and use the corresponding (m,l) as the hotspot
            m, l, accuracy = sorted_by_score[0]
            #build and call a function that invests and tracks P&L using (m,l) hotspot
            delta += l

In [None]:
ms = sorted({m for m, l, score in accuracy_scores})
ls = sorted({l for m, l, score in accuracy_scores})

heatmap = np.zeros((len(ls), len(ms)))

for (m_val, l_val, score) in accuracy_scores:
    i = ls.index(l_val)   
    j = ms.index(m_val) 
    heatmap[i, j] = score

fig, ax = plt.subplots(figsize=(8, 6))
im = ax.imshow(heatmap, origin='lower', aspect='auto')

ax.set_xticks(np.arange(len(ms)))
ax.set_xticklabels(ms)
ax.set_yticks(np.arange(len(ls)))
ax.set_yticklabels(ls)

ax.set_xticks(np.arange(len(ms) + 1) - 0.5, minor=True)
ax.set_yticks(np.arange(len(ls) + 1) - 0.5, minor=True)

ax.grid(which='minor', color='white', linestyle='-', linewidth=1)

ax.tick_params(which='minor', bottom=False, left=False)

ax.set_xlabel('m (look-back window)')
ax.set_ylabel('l (prediction horizon)')
ax.set_title('Accuracy Heatmap')

cbar = fig.colorbar(im, ax=ax)
cbar.set_label('Directional Accuracy')

plt.tight_layout()
plt.show()