## Geometric Brownian Motion

The following is the equation for geometric brownian motion:

$$
\frac{\Delta S}{S} = \mu \Delta t + \sigma \epsilon \sqrt{\Delta t}
$$

Where:
$$
S = \text{the stock price}
$$
$$
\Delta S = \text{the change in stock price}
$$
$$
\mu = \text{the expected return} 
$$
$$
\sigma = \text{the standard deviation of returns}
$$
$$
\epsilon = \text{the random variable} 
$$
$$
\Delta t = \text{the elapsed time period}
$$
Re-aranging the equation we get:
$$
\Delta S = S \times (\mu \Delta t + \sigma \epsilon \sqrt{\Delta t})
$$
So we can model different trials by calculating $\Delta S$ at each time step, calculating the next price like the following:
$$
S_{t} = S_{t-1} + S_{t-1} \times (\mu \Delta t + \sigma \epsilon \sqrt{\Delta t})
$$
Simplifying:
$$
S_{t} = S_{t-1} (1 + (\mu \Delta t + \sigma \epsilon \sqrt{\Delta t}))
$$

## Mean-reverting jump-diffusion model

An extension to the OU process is a mean-reverting jump-diffusion model that incorporates sudden price jumps in addition to continuous fluctuations. A simplified representation of such a model is as follows:

$$
\delta S_{t}=\alpha (S^{*}-\ln S_{t})S_{t}dt+S_{t}\sigma dZ_{t}+S_{t}Kdq_{t}
$$

Where: 

$\(S_{t}\)$: The spot price of the asset at time \(t\).\(\alpha \): The mean-reversion rate.\(S^{*}\): The mean-reversion level, or the long-term equilibrium price to which the spot price tends to revert.\(\sigma \): The volatility of the spot price's continuous component.\(dZ_{t}\): The increment of a standard Brownian motion (Wiener process), representing continuous random fluctuations.\(K\): The jump size, which can be modeled using a probability distribution (e.g., normal distribution).\(dq_{t}\): A Poisson process that determines the occurrence of jumps.\(dq_{t}=1\) with probability \(\lambda dt\)\(dq_{t}=0\) with probability \(1-\lambda dt\)\(\lambda \): The average number of jumps per year (jump intensity).

In [None]:
from tqdm.notebook import tqdm
import datetime as dt
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

mu = np.linspace(-0.5, 0.5, 2)
N_mu = len(mu)
N_sims = 1000
N_days = 120

start = dt.datetime.now()
end = start + dt.timedelta(days=N_days)
T = np.arange(start, end, np.timedelta64(1, '1m'))

S_0 = 100
epsilon = np.random.normal(0, 1, (N_sims, len(T)))
sigma = np.ones((N_sims, len(T))) * 0.075
delta_t = np.ones((N_sims, len(T))) * (1 / len(T))
mu = -1 * np.ones((N_sims, len(T))) * 0.10
S = S_0 * np.cumprod(1 + mu * delta_t + sigma * epsilon * np.sqrt(delta_t), axis=1)

# for i in range(N_sims):
#     time_stride = 100
#     T_subset = T[::time_stride]
#     S_subset = S[i, ::time_stride]
#     plt.plot(T_subset, S_subset)
# plt.show()

In [None]:
# Create kernel for convolution
window = 1000
kernel = np.ones(window) / window

# Apply convolution to each row
result = np.apply_along_axis(
    lambda x: np.convolve(x, kernel, mode='valid'), 
    axis=1, 
    arr=S
)
S_means = np.full_like(S, np.nan)
S_means[:, window-1:] = result

# Fill the beginning with expanding means
for i in range(S.shape[0]):  # For each row
    for j in range(window-1):  # For each position before full window
        if j == 0:
            # First position: just the first value
            S_means[i, j] = S[i, j]
        else:
            # Other positions: mean of all available data points
            S_means[i, j] = np.mean(S[i, :j+1])
        
# Plot
# for i in range(N_sims):
#     time_stride = 100
#     T_subset = T[::time_stride]
#     S_means_subset = S_means[i, ::time_stride]
#     plt.plot(T_subset, S_means_subset)
# plt.show()

In [None]:
# Compute the buy and sell signals.
threshold = 0.001
S_signals = (np.abs(S - S_means) / S_means) > threshold
S_buys = S_signals & (S_means > S)
S_sells = S_signals & (S_means < S)

# Compute the cumulative actions.
S_actions = np.zeros_like(S)
S_actions[S_buys] = 1
S_actions[S_sells] = -1
S_actions = np.where(S_buys & S_sells, 0, S_actions)

# Only keep actions that do not repeat, excluding zeros.
def keep_non_repeating(x):
    non_zero_mask = np.where(x != 0)
    keep = np.take(non_zero_mask, np.where(np.diff(x[non_zero_mask], prepend=0) != 0)[0])
    keep_mask = np.zeros_like(x, dtype=bool)
    keep_mask[keep] = True
    return np.where(keep_mask, x, 0)
S_actions = np.apply_along_axis(keep_non_repeating, axis=1, arr=S_actions)

S_actions

In [None]:
# def keep_non_repeating(x):
def final_return(x):
    sells = np.abs(x[np.where(x < 0)[0]])
    buys = np.abs(x[np.where(x > 0)[0]])

    non_zero_mask = x != 0
    if np.any(non_zero_mask):
        first_action_idx = np.argmax(non_zero_mask)
        if x[first_action_idx] < 0:
            # First action is a sell, remove it
            sells = sells[1:] if len(sells) > 1 else np.array([])

    max_len = max(len(sells), len(buys))
    sells_padded = np.concatenate([sells, np.ones(max_len - len(sells))])
    buys_padded = np.concatenate([buys, np.ones(max_len - len(buys))])

    return np.prod(sells_padded / buys_padded)
S_returns = np.apply_along_axis(final_return, axis=1, arr=S_actions * S)

S_returns

In [None]:
pd.Series(S_returns).describe()

In [None]:
pd.Series(S[:, -1] / S[:, 0]).describe()

In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

def create_ml_features(S, window_sizes=[50, 100, 200, 500]):
    """
    Create machine learning features for mean reversion strategy.
    """
    N, T = S.shape
    features = []
    
    for i in range(N):
        price_series = S[i, :]
        row_features = []
        
        for window in window_sizes:
            if window < T:
                # Rolling mean and std
                rolling_mean = pd.Series(price_series).rolling(window=window).mean().fillna(method='bfill')
                rolling_std = pd.Series(price_series).rolling(window=window).std().fillna(method='bfill')
                
                # Z-score (distance from mean)
                z_score = (price_series - rolling_mean) / rolling_std
                
                # Price momentum
                momentum = price_series / rolling_mean - 1
                
                # Volatility ratio
                vol_ratio = rolling_std / rolling_mean
                
                # RSI-like indicator
                price_diff = np.diff(price_series, prepend=price_series[0])
                gains = np.where(price_diff > 0, price_diff, 0)
                losses = np.where(price_diff < 0, -price_diff, 0)
                avg_gain = pd.Series(gains).rolling(window=window).mean().fillna(0)
                avg_loss = pd.Series(losses).rolling(window=window).mean().fillna(0)
                rs = avg_gain / (avg_loss + 1e-8)
                rsi = 100 - (100 / (1 + rs))
                
                # Bollinger Bands
                bb_upper = rolling_mean + 2 * rolling_std
                bb_lower = rolling_mean - 2 * rolling_std
                bb_position = (price_series - bb_lower) / (bb_upper - bb_lower)
                
                # Combine features
                window_features = np.column_stack([
                    z_score, momentum, vol_ratio, rsi, bb_position
                ])
                row_features.append(window_features)
        
        # Concatenate all window features
        if row_features:
            all_features = np.concatenate(row_features, axis=1)
            features.append(all_features)
    
    return np.array(features)

def create_ml_labels(S_actions, lookahead=10):
    """
    Create labels for machine learning based on future returns.
    """
    N, T = S_actions.shape
    labels = np.zeros((N, T))
    
    for i in range(N):
        actions = S_actions[i, :]
        prices = S[i, :]
        
        for t in range(T - lookahead):
            if actions[t] != 0:  # If there's an action
                # Calculate future return
                future_return = (prices[t + lookahead] - prices[t]) / prices[t]
                
                # Label based on action and future return
                if actions[t] == 1:  # Buy
                    labels[i, t] = 1 if future_return > 0.01 else 0  # 1% threshold
                elif actions[t] == -1:  # Sell
                    labels[i, t] = 1 if future_return < -0.01 else 0
    
    return labels

# Create features and labels
print("Creating ML features...")
ml_features = create_ml_features(S)
ml_labels = create_ml_labels(S_actions, lookahead=20)

print(f"Features shape: {ml_features.shape}")
print(f"Labels shape: {ml_labels.shape}")

In [None]:
def compare_strategies(S, original_signals, ml_signals, ensemble_signals):
    """
    Compare performance of different strategies.
    """
    strategies = {
        'Original': original_signals,
        'ML-Enhanced': ml_signals,
        'Ensemble': ensemble_signals
    }
    
    results = {}
    
    for name, signals in strategies.items():
        print(f"\nAnalyzing {name} strategy...")
        
        # Calculate returns using the same method as original
        def calculate_strategy_return(x):
            sells = np.abs(x[np.where(x < 0)[0]])
            buys = np.abs(x[np.where(x > 0)[0]])
            
            non_zero_mask = x != 0
            if np.any(non_zero_mask):
                first_action_idx = np.argmax(non_zero_mask)
                if x[first_action_idx] < 0:
                    sells = sells[1:] if len(sells) > 1 else np.array([])
            
            max_len = max(len(sells), len(buys))
            sells_padded = np.concatenate([sells, np.ones(max_len - len(sells))])
            buys_padded = np.concatenate([buys, np.ones(max_len - len(buys))])
            
            return np.prod(sells_padded / buys_padded)
        
        strategy_returns = np.apply_along_axis(calculate_strategy_return, axis=1, arr=signals * S)
        
        # Calculate statistics
        results[name] = {
            'mean_return': np.mean(strategy_returns),
            'std_return': np.std(strategy_returns),
            'sharpe_ratio': np.mean(strategy_returns) / np.std(strategy_returns),
            'win_rate': np.mean(strategy_returns > 1),
            'max_return': np.max(strategy_returns),
            'min_return': np.min(strategy_returns),
            'returns': strategy_returns
        }
        
        print(f"Mean Return: {results[name]['mean_return']:.4f}")
        print(f"Sharpe Ratio: {results[name]['sharpe_ratio']:.4f}")
        print(f"Win Rate: {results[name]['win_rate']:.3f}")
    
    return results

# Compare strategies
strategy_results = compare_strategies(S, S_actions, ml_signals, ensemble_signals)

In [None]:
np.where(S_returns == np.max(S_returns))

In [None]:
# Initialize the cash and positions.
initial_cash = 10000
S_positions = np.zeros(S.shape)
S_cash = np.full(S.shape, initial_cash)

In [None]:
S_cumulative_assets = np.cumsum(S_actions, axis=1)
S_cumulative_assets

In [None]:
np.where(S_actions == 1, S, 0)

In [None]:
S_buys = np.where(S_actions == 1, 1, 0)
S_sells = np.where(S_actions == -1, 1, 0)


In [None]:
actions = np.array([0, 0, 1, 0, 0, -1, 0, 0, 0, 0, 1, 0, 0, 0])
prices = np.array([100, 101, 102, 103, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95])

In [None]:
actions = S_actions[0, :]
prices = S[0, :]

In [None]:
positions = np.zeros_like(actions)
cash = np.zeros_like(actions)
initial_cash = 10000
cash[0] = initial_cash

for i in range(1, len(actions)):
    if actions[i] == 1:
        positions[i] = cash[i-1] / prices[i]
        cash[i] = 0
    elif actions[i] == -1:
        positions[i] = 0
        cash[i] = positions[i-1] * prices[i]
    else:
        positions[i] = positions[i-1]
        cash[i] = cash[i-1]

cumulative_assets = np.cumsum(actions)

initial_cash = 10000
cash[0] = initial_cash

display(pd.DataFrame({
    'actions': actions,
    'prices': prices,
    'positions': positions,
    'cumulative_assets': cumulative_assets,
    'cash': cash
}))


In [None]:
np.where(S_actions[428, :] == -1)[0]

In [None]:
np.where(S_actions[428, :] == 1)[0]

In [None]:
plt.plot(S[428, :])

In [None]:
positions = np.zeros_like(actions)
cash = np.zeros_like(actions)
initial_cash = 10000
cash[0] = initial_cash

cumulative_assets = np.cumsum(actions)

display(pd.DataFrame({
    'actions': actions,
    'prices': prices,
    'positions': positions,
    'cumulative_assets': cumulative_assets,
    'cash': cash
}))

In [None]:
103 * 95

In [None]:


for i in range(1, len(S)):
    if actions[i] == 1:
        positions[i] = cash[i-1] / prices[i]
        cash[i] = 0
    elif actions[i] == -1:

In [None]:
total_positions[-1]

In [None]:
positions[-1] * prices[-1]

In [None]:
# Solve using least squares
X = np.linspace(0, 1, len(T))
X_with_intercept = np.column_stack([np.ones(len(T)), X])
coefficients, residuals, rank, s = np.linalg.lstsq(X_with_intercept, S[0, :], rcond=None)

# Plot the data and the fitted line
plt.plot(T, S[0, :], label='Data')
plt.plot(T, coefficients[0] + coefficients[1] * X, label='Fitted line')
plt.legend()
plt.show()

In [None]:
window_size = 1000
step_size = 1

S_means = np.zeros((len(T) - window_size + 1))

for i in range(0, len(T) - window_size + 1, step_size):
    X = np.linspace(0, 1, window_size)  
    X_with_intercept = np.column_stack([np.ones(window_size), X])
    coefficients_i, _, _, _ = np.linalg.lstsq(X_with_intercept, S[0, i:i+window_size], rcond=None)
    S_means[i] = coefficients_i[0] + coefficients_i[1]

plt.plot(T[window_size-1:], S[0, window_size-1:], label='Data')
plt.plot(T[window_size-1:], S_means, label='Predicted')
plt.legend()
plt.show()

In [None]:
len(T) - window_size

In [None]:
len(S[0, :len(T) - window_size + 1])

In [None]:
len(S_means)

In [None]:
((S - S_means) / S_means) > 0.001