# Opti-Trade Algorithm

In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import deque

In [9]:
def load_prices(fn):
    df = pd.read_csv(fn, sep=r'\s+', header=None)
    return df.values.T

prices = load_prices("prices.txt")
n_inst, nt = prices.shape
print(f"Loaded {n_inst} instruments in {nt} days")

Loaded 50 instruments in 1000 days


In [10]:
def calc_pl(P, strat, test_days=250, comm_rate=0.0005, pos_limit=10000):
    n_inst, nt = P.shape
    cash, pos, tot_vol, value = 0.0, np.zeros(n_inst), 0.0, 0.0
    daily_pl = []
    start = nt + 1 - test_days
    for t in range(start, nt+1):
        hist = P[:,:t]
        price = hist[:,-1]
        if t<nt:
            targ = strat.get_position(hist)
            lims = (pos_limit/price).astype(int)
            newp = np.clip(targ, -lims, lims)
            delta = newp - pos
            dv = (price*np.abs(delta)).sum()
            tot_vol += dv
            cash -= price.dot(delta) + dv*comm_rate
            pos = newp
        val = cash + pos.dot(price)
        if t>start:
            pl = val - value
            daily_pl.append(pl)
        value = val
    arr = np.array(daily_pl)
    mu, sd = arr.mean(), arr.std()
    sharpe = np.sqrt(249)*mu/sd if sd>0 else 0
    return {"mean_pl":mu, "std_pl":sd, "sharpe":sharpe, "volume":tot_vol,
            "score":mu - 0.1*sd}


## Dual Moving-Average with Volatility Control

In [None]:
class TrendStrategy:
    def __init__(self,
                 n_inst,
                 short_w=7, long_w=31,
                 confirm_d=3,
                 cap_per_tick=10000,
                 thresh_diff=0.003,
                 atr_window=20):
        self.n_inst = n_inst
        self.short_w, self.long_w = short_w, long_w
        self.confirm_d = confirm_d
        self.cap_per_tick = cap_per_tick
        self.thresh_diff = thresh_diff
        self.atr_window = atr_window

        self.current_pos = np.zeros(n_inst, int)
        self.signals_hist = deque(maxlen=confirm_d)
        self.prev_conf   = np.zeros(n_inst, int)

    def _compute_atr(self, P):
        w = self.atr_window
        if P.shape[1] < w+1: return np.zeros(self.n_inst)
        H, L = P[:,-w:].max(1), P[:,-w:].min(1)
        prev = P[:,-w-1:-1][:,-1]
        tr = np.maximum.reduce([H-L, np.abs(H-prev), np.abs(L-prev)])
        return tr

    def _compute_ma_diff(self, P):
        sma = P[:,-self.short_w:].mean(1)
        lma = P[:,-self.long_w:].mean(1)
        return (sma - lma) / lma

    def _raw_signal(self, ma_diff, atr):
        sig = np.zeros(self.n_inst, int)
        sig[ma_diff> self.thresh_diff] =  1
        sig[ma_diff< -self.thresh_diff] = -1
        med = np.median(atr)
        return sig * (atr>med).astype(int)

    def _confirm(self, raw):
        self.signals_hist.append(raw)
        if len(self.signals_hist)<self.confirm_d:
            return self.prev_conf.copy()
        H = np.stack(self.signals_hist)
        keep = H.all(axis=0)==raw
        confirmed = np.where(keep, raw, self.prev_conf)
        self.prev_conf = confirmed.copy()
        return confirmed

    def get_position(self, P):
        _, t = P.shape
        if t<self.long_w+1: return self.current_pos.copy()
        md  = self._compute_ma_diff(P)
        atr = self._compute_atr(P)
        raw = self._raw_signal(md, atr)
        sig = self._confirm(raw)
        latest = P[:,-1]
        max_sh = np.floor(self.cap_per_tick/latest).astype(int)
        self.current_pos = sig * max_sh
        return self.current_pos.copy()