# Let's Build a Quant Trading Strategy - Part 3

In [154]:
# y_hat = model(x)
# orders = strategy(y_hat)
# execute(orders)

In [155]:
# 1st video = Research
# 2nd video = Strategy
# 3rd video = Implementation

In [156]:
# By the end of this video, we will have wrote the code for our strategy and model that we developed in part 2 and 1 respectively - put it live with real money and see what happens.
# I will do a follow up video to see how it performs. 
# Even if it doesn't perform as expected, we can learn from it. 
# We might experience model drift - where where the pattern it learned starts to no longer hold true. You might hear this be called alpha decay - which is your profits decay because of model drift. 




In [157]:
# This will video will be more software engineering now as we have to code it all together.

# 1. Create the strategy using the decisions we made in part 2
# 2. Create Strategy API that allows to create new strategies easily. Avoid copy and pasting code between strategies.
# 3. Create execution API to connect to an exchange - which I won't make public because I don't want people to tweak parameters willy nilly and run it then lose everything.


## The fundamental building block: Tick

In [158]:
from abc import ABC, abstractmethod
from typing import Generic, Optional, TypeVar

T = TypeVar('T')  # input type
R = TypeVar('R')  # output type

class Tick(ABC, Generic[T, R]):
    @abstractmethod
    def on_tick(self, val: T) -> Optional[R]:
        """Handle a new tick and optionally return a result."""
        pass

## The fundamental data structure: Window

In [159]:
from collections import deque
from typing import Deque, Optional

class DequeWindow(Tick[T, T], Generic[T]):
    def __init__(self, n: int):
        self._data: Deque[T] = deque(maxlen=n)

    def on_tick(self, val: T) -> Optional[T]:
        """Append a value and return the oldest value dropped (if any)."""
        dropped = None
        if len(self._data) == self.capacity():
            dropped = self._data[0]
        self._data.append(val)
        return dropped

    def capacity(self) -> int:
        n = self._data.maxlen
        return 0 if n is None else n
    
    def is_full(self) -> bool:
        return self._data.maxlen == len(self._data)
    
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(capacity={self._data.maxlen}, values={list(self._data)})"

In [160]:
w = DequeWindow(3)
w.on_tick(1)
w

DequeWindow(capacity=3, values=[1])

In [161]:
w = DequeWindow(3)
w.on_tick(1)
w.on_tick(2)
w

DequeWindow(capacity=3, values=[1, 2])

In [162]:
w = DequeWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
w


DequeWindow(capacity=3, values=[1, 2, 3])

In [163]:
w = DequeWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
old_v = w.on_tick(4)

(old_v, w)

(1, DequeWindow(capacity=3, values=[2, 3, 4]))

In [164]:
w = DequeWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
w.on_tick(4)
old_v = w.on_tick(5)

(old_v, w)

(2, DequeWindow(capacity=3, values=[3, 4, 5]))

In [234]:
import numpy as np
from typing import Optional

class NumpyWindow:
    """Fixed-capacity sliding window for scalar values using a NumPy array (shift on append)."""

    def __init__(self, n: int, dtype=np.float64):
        if n <= 0:
            raise ValueError("Capacity must be positive.")
        self._capacity = n
        self._data = np.zeros(n, dtype=dtype)
        self._size = 0

    def on_tick(self, val: float) -> Optional[float]:
        """
        Append a scalar value and return the oldest value dropped (if any).
        """
        val = float(val)
        dropped = None

        if self._size < self._capacity:
            self._data[self._size] = val
            self._size += 1
        else:
            # buffer full: drop oldest and shift left
            dropped = self._data[0]
            self._data[:-1] = self._data[1:]  # shift left
            self._data[-1] = val

        return dropped

    def append_left(self, val: float) -> Optional[float]:
        """
        Append a scalar value to the **front** (left) of the buffer.
        Shifts all elements right by one and drops the newest (rightmost) if full.
        """
        val = float(val)
        dropped = None

        if self._size < self._capacity:
            # Shift right only the valid region
            if self._size > 0:
                self._data[1:self._size + 1] = self._data[:self._size]
            self._data[0] = val
            self._size += 1
        else:
            # Drop newest (rightmost) value
            dropped = self._data[self._size - 1]
            self._data[1:] = self._data[:-1]  # shift right
            self._data[0] = val

        return dropped

    def __getitem__(self, idx: int) -> float:
        """Index access (0 = oldest)."""
        if not 0 <= idx < self._size:
            raise IndexError("Index out of range.")
        return self._data[idx]

    def __len__(self) -> int:
        return self._size

    def capacity(self) -> int:
        return self._capacity

    def is_full(self) -> bool:
        return self._size == self._capacity

    def values(self) -> np.ndarray:
        """Return current valid values as a contiguous 1D array (no copy needed)."""
        return self._data[:self._size]

    def __repr__(self) -> str:
        vals = self.values().tolist()
        return f"{self.__class__.__name__}(capacity={self._capacity}, size={self._size}, values={vals})"


In [166]:
w = NumpyWindow(3)
w.on_tick(1)
w

NumpyWindow(capacity=3, size=1, values=[1.0])

In [167]:
w = NumpyWindow(3)
w.on_tick(1)
w.on_tick(2)
w

NumpyWindow(capacity=3, size=2, values=[1.0, 2.0])

In [168]:
w = NumpyWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
w

NumpyWindow(capacity=3, size=3, values=[1.0, 2.0, 3.0])

In [169]:
w = NumpyWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
v = w.on_tick(4)
(v, w)

(np.float64(1.0), NumpyWindow(capacity=3, size=3, values=[2.0, 3.0, 4.0]))

In [170]:
import torch
from typing import Optional
  

class TensorWindow:
    """Efficient fixed-capacity sliding window for scalar values using a circular buffer."""

    def __init__(self, n: int, dtype: torch.dtype = torch.float32, device: Optional[torch.device] = None):
        if n <= 0:
            raise ValueError("Capacity must be positive.")
        self._capacity = n
        self._data = torch.zeros(n, dtype=dtype, device=device)
        self._start = 0     # index of oldest element
        self._size = 0

    def on_tick(self, val: float) -> Optional[float]:
        """
        Append a scalar value and return the dropped oldest value (if any).
        O(1) operation using circular buffer indexing.
        """
        val = float(val)
        dropped = None

        if self._size < self._capacity:
            # still filling
            insert_idx = (self._start + self._size) % self._capacity
            self._data[insert_idx] = val
            self._size += 1
        else:
            # overwrite oldest
            dropped = self._data[self._start].item()
            self._data[self._start] = val
            # move start forward (wrap if needed)
            self._start = (self._start + 1) % self._capacity

        return dropped

    def values(self) -> torch.Tensor:
        """Return the current valid values in order as a tensor view (no copy)."""
        if self._size == 0:
            return torch.empty(0, dtype=self._data.dtype, device=self._data.device)
        if self._size < self._capacity or self._start == 0:
            return self._data[:self._size]
        # Concatenate wrapped part
        return torch.cat((self._data[self._start:], self._data[:self._start]))

    def capacity(self) -> int:
        return self._capacity

    def is_full(self) -> bool:
        return self._size == self._capacity

    def __len__(self) -> int:
        return self._size

    def __repr__(self) -> str:
        cls = self.__class__.__name__
        vals = self.values().tolist()
        return f"{cls}(capacity={self._capacity}, size={self._size}, values={vals})"
    
    def __getitem__(self, idx: int) -> float:
        """Allow index access like a list (0 = oldest value)."""
        if not 0 <= idx < self._size:
            raise IndexError("Index out of range.")
        actual_idx = (self._start + idx) % self._capacity
        return self._data[actual_idx].item()    


In [171]:
w = TensorWindow(3)
w

TensorWindow(capacity=3, size=0, values=[])

In [172]:
w = TensorWindow(3)
w.on_tick(1)
w

TensorWindow(capacity=3, size=1, values=[1.0])

In [173]:
w = TensorWindow(3)
w.on_tick(1)
w.on_tick(2)
w

TensorWindow(capacity=3, size=2, values=[1.0, 2.0])

In [174]:
w = TensorWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
w

TensorWindow(capacity=3, size=3, values=[1.0, 2.0, 3.0])

In [175]:
w = TensorWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
v = w.on_tick(4)
(v, w)

(1.0, TensorWindow(capacity=3, size=3, values=[2.0, 3.0, 4.0]))

In [176]:
class Last(Tick[T, T], Generic[T]):
    def __init__(self):
        self._value: Optional[T] = None

    def on_tick(self, val: T) -> Optional[T]:
        """Append a value and return the oldest value dropped (if any)."""
        self._value = val
        return val

    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(value={self._value})"

In [177]:
last_val = Last()
last_val

Last(value=None)

In [178]:
last_val = Last()
last_val.on_tick(1)
last_val.on_tick(2)
last_val.on_tick(3)
last_val

Last(value=3)

In [179]:
# class First(Tick[T, T], Generic[T]):
#     def __init__(self):
#         self._value: Optional[T] = None

#     def on_tick(self, val: T) -> Optional[T]:
#         """Append a value and return the oldest value dropped (if any)."""
#         if self._value is None:
#             self._value = val
#         return self._value

#     def __repr__(self) -> str:
#         cls_name = self.__class__.__name__
#         return f"{cls_name}(value={self._value})"

import numpy as np

class LogReturn(Tick[float, float]):
    def __init__(self):
        self._window = NumpyWindow(2)

    def on_tick(self, val: float) -> Optional[float]:
        self._window.on_tick(val)
        if self._window.is_full():
            return np.log(self._window[1] / self._window[0])
        else:
            return None
        
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(window={self._window})"        


    

In [180]:
f = LogReturn()
f.on_tick(90.0)
f

LogReturn(window=NumpyWindow(capacity=2, size=1, values=[90.0]))

In [181]:
f = LogReturn()
f.on_tick(90.0)
v = f.on_tick(100.0)
(v, f)

(np.float64(0.10536051565782635),
 LogReturn(window=NumpyWindow(capacity=2, size=2, values=[90.0, 100.0])))

In [182]:
np.log(100/90)

np.float64(0.10536051565782635)

In [183]:
f = LogReturn()
f.on_tick(90.0)
f.on_tick(100.0)
v = f.on_tick(150.0)
(v, f)

(np.float64(0.4054651081081644),
 LogReturn(window=NumpyWindow(capacity=2, size=2, values=[100.0, 150.0])))

In [184]:
np.log(150/100)

np.float64(0.4054651081081644)

In [235]:
class LogReturnLags(Tick[float, torch.Tensor]):
    def __init__(self, no_lags: int):
        self._lags = NumpyWindow(no_lags)
        self._log_return = LogReturn()
    
    def on_tick(self, val: float) -> torch.Tensor | None:
        log_ret = self._log_return.on_tick(val)
        if log_ret is not None:
            self._lags.append_left(log_ret)
            return torch.tensor(self._lags.values(), dtype=torch.float32) if self._lags.is_full() else None
        else:
            return None
        
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(lags={self._lags}, log_return={self._log_return})" 

In [236]:
lags = LogReturnLags(3)
v = lags.on_tick(90)
v

In [237]:
lags = LogReturnLags(3)
lags.on_tick(90)
lags.on_tick(100)


In [238]:
lags = LogReturnLags(3)
lags.on_tick(90)
lags.on_tick(100)
lags.on_tick(150)

In [239]:
lags = LogReturnLags(3)
lags.on_tick(90)
lags.on_tick(100)
lags.on_tick(150)
lags.on_tick(110)

tensor([-0.3102,  0.4055,  0.1054])

In [240]:
lags = LogReturnLags(3)
lags.on_tick(90)
lags.on_tick(100)
lags.on_tick(150)
lags.on_tick(110)
features = lags.on_tick(160)
features

tensor([ 0.3747, -0.3102,  0.4055])

In [191]:
import models
model = models.LinearModel(3)
# security alert 
model.load_state_dict(torch.load('model_weights.pth', weights_only=True))
model.eval()

LinearModel(
  (linear): Linear(in_features=3, out_features=1, bias=True)
)

In [192]:
X = torch.tensor(features, dtype=torch.float32)
X

  X = torch.tensor(features, dtype=torch.float32)


tensor([ 0.4055, -0.3102,  0.3747])

In [193]:
y_hat = model(X)
y_hat

tensor([-0.0100], grad_fn=<ViewBackward0>)

In [194]:
y_hat[0]

tensor(-0.0100, grad_fn=<SelectBackward0>)

In [None]:
from decimal import Decimal
from typing import Dict, List
from dataclasses import dataclass
import torch.nn as nn
import polars as pl

@dataclass(frozen=True)
class Order:
    sym: str
    signed_qty: Decimal

    def __str__(self) -> str:
        sign = "BUY" if self.signed_qty > 0 else "SELL"
        return f"Order({sign} {abs(self.signed_qty)} {self.sym})"

class Position:
    def __init__(self, sym: str, qty: Decimal, price: Decimal):
        self.sym = sym
        self.signed_qty = qty
        self.price = price

    def close(self) -> Order:
        return Order(self.sym, -1 * self.signed_qty)

class Account:
    def __init__(self, balance: Decimal) -> None:
        self._balance = balance
        self._positions: Dict[str, Position] = {}

    def balance(self) -> Decimal:
        return self._balance
    
    def update(self, trade_pnl: Decimal):
        self._balance += trade_pnl

    def open_position(self, sym, qty, price):
        self._positions[sym] = Position(sym, qty, price) 

    def close_position(self, sym: str) -> Optional[Order]:
        position = self._positions.pop(sym, None)
        return position.close() if position else None



In [241]:
class Strategy(Tick[float, Order]):
    def backtest(self, path: str) -> pl.DataFrame:
        return pl.DataFrame()    

class BasicTakerStrat(Tick[float, List[Order]]):
    def __init__(self, sym: str,model: nn.Module, log_return_lags: LogReturnLags, account: Account) -> None:
        self.sym = sym
        self.model = model
        self.log_return_lags = log_return_lags
        self.account = account

    def _signed_trade_size(self, y_hat: float) -> Decimal:
        dir_signal = np.sign(y_hat)
        return Decimal(dir_signal) * self.account.balance()

    def _create_orders(self, y_hat: torch.Tensor) -> List[Order]:
        print(y_hat)
        qty = self._signed_trade_size(y_hat.item())
        open_order = Order(self.sym, qty)
        close_order = self.account.close_position(self.sym)
        orders = [close_order, open_order] if close_order else [open_order]
        return orders      

    def on_tick(self, val: float) -> List[Order] | None:
        X = self.log_return_lags.on_tick(val)
        if X is not None:
            with torch.no_grad():
                print(X)
                y_hat = self.model(X)
                orders = self._create_orders(y_hat)
                return orders
        return None

In [256]:
sym = 'BTCUSDT'
lags = LogReturnLags(3)
acc = Account(Decimal(100.0))

strat = BasicTakerStrat(sym, model, lags, acc)
strat.on_tick(100.0)

In [257]:
strat.on_tick(110.0)

In [258]:
strat.on_tick(90.0)

In [259]:
strat.on_tick(105)

tensor([ 0.1542, -0.2007,  0.0953])
tensor([0.0008])


[Order(sym='BTCUSDT', signed_qty=Decimal('100'))]

In [260]:
x = [np.log(105/90), np.log(90/110.0), np.log(110/100)]
x

[np.float64(0.15415067982725836),
 np.float64(-0.2006706954621511),
 np.float64(0.09531017980432493)]

In [261]:
import research
research.print_model_params(model)

linear.weight:
[[-0.10395038 -0.06726477  0.02827305]]
linear.bias:
[0.00067121]


In [262]:
w = np.array([-0.10395038, -0.06726477,  0.02827305]) 
w

array([-0.10395038, -0.06726477,  0.02827305])

In [263]:
b = 0.00067121
np.sum(w * x + b)

np.float64(0.002182385909816468)