![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

# Introduction

This notebook demonstrates how to use a 1-dimensional convolutional neural network to detect technical trading patterns. In particular, this example trains the model to detect the head and shoulders pattern.

# Generate Chart Patterns

The following code block generates 100,000 samples of the head and shoulders pattern. Each sample consists of 25 data points but each sample contains some randomness to make it unique.

<img src="https://cdn.quantconnect.com/i/tu/head-and-shoulders-pattern-1.png" alt="Head and shoulders pattern" style="width: 800px; max-width: 100%;">


In [1]:
from scipy.stats import norm, uniform

np.random.seed(1)
ref_count = 100_000
v1 = np.array([0] * ref_count) + 0.02 * norm.rvs(size=(ref_count,))
p1 = np.array([1] * ref_count) + 0.2 * norm.rvs(size=(ref_count,))
v2 = v1 + 0.2 * norm.rvs(size=(ref_count,))
v3 = v1 + 0.2 * norm.rvs(size=(ref_count,))
p3 = p1 + 0.02 * norm.rvs(size=(ref_count,))
p2 = 1.5 * np.maximum(p1, p3) + abs(uniform.rvs(size=(ref_count,)))
v4 = v1 + 0.02 * norm.rvs(size=(ref_count,))
ref = pd.DataFrame([
    v1, 
    (v1*.75 + p1*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v1+p1)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v1*.25 + p1*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    p1, 
    (v2*.25 + p1*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v2+p1)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v2*.75 + p1*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    v2, 
    (v2*.75 + p2*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v2+p2)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v2*.25 + p2*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    p2, 
    (v3*.25 + p2*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v3+p2)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v3*.75 + p2*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    v3, 
    (v3*.75 + p3*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v3+p3)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v3*.25 + p3*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    p3, 
    (v4*.25 + p3*.75) + 0.2 * norm.rvs(size=(ref_count,)), 
    (v4+p3)/2 + 0.2 * norm.rvs(size=(ref_count,)), 
    (v4*.75 + p3*.25) + 0.2 * norm.rvs(size=(ref_count,)), 
    v4
])

ref = ((ref - ref.mean()) / ref.std()).T
display(ref)

# Create Training Data
The positive training samples are the samples we created above.

In [2]:
import plotly.graph_objects as go

# Generate positive samples.
positive_samples = []
for _, row in ref.iterrows():    
    positive_samples.append(row.values)
# Show one of the positive samples.
fig = go.Figure(go.Scatter(x=ref.columns, y=positive_samples[0], mode='lines'))
fig.update_layout(
    title='Positive Sample<br><sup>Example head and shoulders pattern'
        + ' to train the model</sup>', 
    xaxis_title='Data Point', yaxis_title='Value', width=500, height=500
)
fig.show()

The negative training samples are random walks.

In [3]:
# Generate negative samples.
equity_curves = (
    np.random.randn(ref.shape[0], ref.shape[1]) / 1000 + 1
).cumprod(axis=1)
negative_samples = (
    (equity_curves - np.mean(equity_curves, axis=1, keepdims=True))
    / np.std(equity_curves, axis=1, keepdims=True)
)
# Show one of the negative samples.
fig = go.Figure(go.Scatter(x=ref.columns, y=negative_samples[0], mode='lines'))
fig.update_layout(
    title='Negative Sample<br><sup>Example of the absense of the pattern'
        + ' to train the model</sup>', 
    xaxis_title='Data Point', yaxis_title='Value', width=500, height=500
)
fig.show()

To ensure our model has a balanced training dataset, let's alternate between positive and negative samples.

In [4]:
# Create samples of features and labels.
X = np.array(
    [
        value 
        for pair in zip(positive_samples, negative_samples) 
        for value in pair
    ]
)
y = np.array([1 if i % 2 == 0 else 0 for i in range(len(X))])

# Build & Train the CNN Model

With the data samples created, you can now split it into training and testing datasets, train the neural network, and then run an out-of-sample test.

In [9]:
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Input
from keras.utils import set_random_seed

set_random_seed(0)

# Split the data into training and testing sets.
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, shuffle=False
)

X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

# Build the CNN model.
model = Sequential()
model.add(Input(shape=(X_train.shape[1], 1)))
model.add(Conv1D(filters=32, kernel_size=5, activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))

# Compile the model.
model.compile(
    optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']
)

# Train the model.
model.fit(
    X_train, y_train, epochs=10, batch_size=16, 
    validation_data=(X_test, y_test)
)

# Generate and plot the OOS predictions.
predictions = model.predict(X_test)
go.Figure(
    go.Scatter(
        x=list(range(100)), y=predictions[:100, :].flatten(), name='Prediction'
    ),
    dict(
        title='Predictions<br><sup>Alternative between positive and negatve '
            + 'samples makes the plot jump between 0 and 1</sup>', 
        xaxis_title='Sample Number', 
        yaxis_title='Probability of Pattern', showlegend=True
    )
).show()

# Evaluate the model.
test_loss, test_acc = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {test_acc}')

# Use the CNN to Identify H&S Patterns in Real Forex Prices

At this point, you've trained the model using fake data. Let's see how well the model can detect the head and shoulders pattern in real Forex data. In this example, use the daily USDCAD exchange rate.

In [7]:
qb = QuantBook()
symbol = qb.add_forex("USDCAD").symbol
prices = qb.history(
    symbol, datetime(2012, 1, 1), datetime(2024, 1, 1), Resolution.DAILY
).loc[symbol]['close']

All the head and shoulders patterns we created in the first cell of this notebook consisted of 25 data points, but the head and shoulders pattern can present itself over a longer period of time. To detect the pattern over longer periods of time, let's define a method that transforms any number of data points >25 down to 25 data points while retaining the general shape of the original data points.

In [8]:
def downsample(values, num_points=25):
    if num_points == len(values):
        return values

    adj_values = []
    duplicates = int(2 * len(values) / num_points)
    if duplicates > 0:
        for x in values:
            for i in range(duplicates):
                adj_values.append(x)
    else:
        adj_values = values
    
    num_steps = num_points - 2
    step_size = int(len(adj_values) / num_steps)

    smoothed_data = [adj_values[0]]
    for i in range(num_steps):
        start_idx = i * step_size
        if i == num_steps - 1:
            end_idx = len(adj_values) - 1
        else:
            end_idx = (i + 1) * step_size - 1
        segment = np.array(adj_values[start_idx : end_idx+1])

        avg = sum(segment) / len(segment)
        smoothed_data.append(avg)
        
    smoothed_data.append(adj_values[-1])

    return np.array(smoothed_data)

Now let's scan through the USDCAD closing prices and search for the head and shoulders pattern. When the model is at least 80% confident it found the pattern, the following code block displays the price path.

In [9]:
from plotly.subplots import make_subplots

min_size = ref.shape[1]
max_size = 100
step_size = 10
confidence_threshold = 0.8  # 0.8 => 80%

for i in prices.index:
    # Get the trailing prices for this timestep.
    trailing_prices = prices.loc[:i]

    for size in range(min_size, max_size+1, step_size):
        # Ensure there are enough trailing data points to fill this 
        # window size.
        if len(trailing_prices) < size:
            continue

        window_trailing_prices = trailing_prices.iloc[-size:]
        # Downsample the trailing prices in this window to be 25 data 
        # points.
        low_res_window = downsample(window_trailing_prices.values)
        # Standardize the downsampled trailing prices.
        factors = np.array(
            (
                (low_res_window - low_res_window.mean()) 
                / low_res_window.std()
            ).reshape(1, min_size, 1)
        )
        # Get the probability of the technical trading pattern in the 
        # downsampled and standardized window.
        prediction = model.predict(factors, verbose=0)[0][0]
        if prediction > confidence_threshold:
            print(
                f"Pattern detected between "
                + f"{window_trailing_prices.index[0]} and "
                + f"{window_trailing_prices.index[-1]} with "
                + f"{round(prediction * 100, 1)}% confidence:"
            )

            fig = make_subplots(
                rows=1, cols=2, subplot_titles=[
                    "Price Over Time", "Downsampled Price Over Time"
                ]
            )
            fig.add_trace(
                go.Scatter(
                    x=window_trailing_prices.index, y=window_trailing_prices, 
                    mode='lines', name='Price'
                ), 
                row=1, col=1
            )
            fig.add_trace(
                go.Scatter(
                    x=list(range(len(low_res_window))), y=low_res_window,
                    mode='lines', name='Downsampled Price'
                ), 
                row=1, col=2
            )

            fig.update_layout(
                title='Detected Pattern<br><sup>The raw data sample and the '
                    + 'downsampled version of it</sup>',
                showlegend=False
            )
            fig.update_xaxes(title_text="Date", row=1, col=1)
            fig.update_xaxes(title_text="Data Point", row=1, col=2)
            fig.update_yaxes(title_text="Price", row=1, col=1)
            fig.update_yaxes(title_text="Price", row=1, col=2)
            fig.show()

# Save the Model

Save the model into the Object Store so that you can run a backtest with it in the `main.py` file.

In [None]:
model.save(qb.object_store.get_file_path("head-and-shoulders-model.keras"))