In [1]:
import polars as pl
import math

# Data preprocessing

- Binarize Operation type

In [2]:
market_data: pl.DataFrame = pl.read_csv("data/MOEX_SBER_20241123_20241130.csv").drop("ID")

# Binarize Operation type
market_data = market_data.with_columns(
    SELL=pl.col("OPER") == "S",
    BUY=pl.col("OPER") == "B",
).drop(["OPER"])

market_data.head(3)

DATE,TIME,LAST,VOL,SELL,BUY
i64,str,f64,i64,bool,bool
20241125,"""09:59:43""",236.0,80,False,True
20241125,"""09:59:43""",236.0,120,False,True
20241125,"""09:59:43""",236.0,20,False,True


In [3]:
# Combine DATE and TIME into a DATETIME column
market_data = market_data.with_columns(
    # Convert DATE to a Date type (YYYY-MM-DD)
    DATE=pl.col("DATE").cast(pl.String).str.to_date(format="%Y%m%d").cast(pl.Date),
    # Convert TIME to a Time type (HH:MM:SS)
    TIME=pl.col("TIME").str.strptime(pl.Time, format="%H:%M:%S")
)

market_data = market_data.with_columns(
    # Combine the two into a DATETIME
    DATETIME=pl.col("DATE").dt.combine(pl.col("TIME")).alias("d1"),
).drop(["TIME", "DATE"])

In [4]:
market_data.head(3)

LAST,VOL,SELL,BUY,DATETIME
f64,i64,bool,bool,datetime[μs]
236.0,80,False,True,2024-11-25 09:59:43
236.0,120,False,True,2024-11-25 09:59:43
236.0,20,False,True,2024-11-25 09:59:43


- Compute Buy and Sell volumes
- Compute Mid Price for each data point as the mean between best bid and best ask

In [5]:
market_data = market_data.group_by(pl.col("DATETIME"), maintain_order=True
    ).agg(
    (pl.col("VOL") * pl.col("SELL")).sum().alias("SELL_VOLUME"),
    (pl.col("VOL") * pl.col("BUY")).sum().alias("BUY_VOLUME"),
    (pl.col("SELL") * pl.col("LAST")).filter(pl.col("SELL") == True).max().alias("BEST_BID"),
    (pl.col("BUY") * pl.col("LAST")).filter(pl.col("BUY") == True).min().alias("BEST_ASK")
)

In [6]:
market_data = market_data.with_columns(
    pl.when(( ~pl.col("BEST_ASK").is_nan() ) & (~pl.col("BEST_BID").is_nan()))
      .then
        ((pl.col("BEST_ASK") + pl.col("BEST_BID")) / 2)
      .otherwise
        (pl.coalesce(pl.col("BEST_ASK"), pl.col("BEST_BID"))).alias("MID_PX")
)

In [7]:
market_data

DATETIME,SELL_VOLUME,BUY_VOLUME,BEST_BID,BEST_ASK,MID_PX
datetime[μs],i64,i64,f64,f64,f64
2024-11-25 09:59:43,68290,43250,236.0,236.0,236.0
2024-11-25 10:00:00,4850,2190,235.99,236.0,235.995
2024-11-25 10:00:01,120,2860,235.98,236.0,235.99
2024-11-25 10:00:02,0,470,,236.0,236.0
2024-11-25 10:00:03,1600,110,235.99,236.0,235.995
…,…,…,…,…,…
2024-11-29 23:49:49,500,1400,236.46,236.49,236.475
2024-11-29 23:49:52,10000,0,236.43,,236.43
2024-11-29 23:49:53,10,0,236.44,,236.44
2024-11-29 23:49:56,0,510,,236.49,236.49


In [8]:
## Interpolate missing values
# # Get all timeframes between the min and max time
# time_range = pl.DataFrame({
#     "DATETIME": pl.datetime_range(
#         start=market_data["DATETIME"].min(), end=market_data["DATETIME"].max(), interval="1s", eager=True
#     )
# # })

# market_data = time_range.join(market_data, on="DATETIME", how="left")

# market_data = market_data.with_columns([
#     pl.col("SELL_VOLUME").fill_null(0),
#     pl.col("BUY_VOLUME").fill_null(0),
#     pl.col("MID_PX").forward_fill()
# ])

In [None]:
class MomentumStrategy:

    def __init__(self, K: int = 144, risk_control: float = 10_000,
                 window_size: int = 300, data: pl.DataFrame = None,
                 a: float = None, beta: float = None,
                 alpha1: float = None, alpha2: float = None,
                 alpha3: float = None, alpha4: float = None):

        self.params_: dict = {
            "a": a,
            "beta": beta,
            "alpha1": alpha1,
            "alpha2": alpha2,
            "alpha3": alpha3,
            "alpha4": alpha4
        }

        self.hyperparams_: dict = {
            "K": K,
            "risk_control": risk_control,
            "window_size": window_size,  # Measured in seconds
            "tau": 60  # Timestep to compute position again. Measured in seconds
        }
        self.time_steps_ = data.get_column("DATETIME").to_list()
        self.sell_volume_ = data.get_column("SELL_VOLUME").to_list()
        self.buy_volume_ = data.get_column("BUY_VOLUME").to_list()
        self.volume_ = [s + b for s,
                        b in zip(self.sell_volume_, self.buy_volume_)]
        # The prefix sums below are needed for rapid computation of x3_tilda
        self.volume_prefix_sum = [0]
        for v in self.volume_:
            self.volume_prefix_sum.append(v + self.volume_prefix_sum[-1])
        self.buy_volume_prefix_sum = [0]
        for bv in self.buy_volume_:
            self.buy_volume_prefix_sum.append(
                bv + self.buy_volume_prefix_sum[-1])

        self.mid_px_ = data.get_column("MID_PX").to_list()
        self.n_transactions = 0

    def get_position(self, t: int) -> float:
        if t == 0:
            return 0
        x = self.get_x(t)

        if abs(x) < self.params_["a"]:
            return 0

        if x >= self.params_["a"]:
            return 2 * self.hyperparams_["risk_control"] / math.pi * math.atan(math.pow((x - self.params_["a"]), self.params_["beta"]))

        return -2 * self.hyperparams_["risk_control"] / math.pi * math.atan(math.pow((-x - self.params_["a"]), self.params_["beta"]))

    def get_x(self, t: int):
        x1 = self.get_x1(t)
        x2 = self.get_x2(t)
        x3 = self.get_x3(t)
        x4 = self.get_x4(t)
        return self.params_["alpha1"] * x1 + self.params_["alpha2"] * x2 + self.params_["alpha3"] * x3 + self.params_["alpha4"] * x4

    def get_x1(self, t: int):
        return self.get_mu(t) / self.get_mu_std(t)

    def get_x2(self, t: int):
        # The formula below was derived using Taylor expansion for x1(t-w) and x1(t - 2w), where w is a window_size
        return (3 * self.get_x1(t) - 4 * self.get_x1(t - self.hyperparams_["window_size"]) + self.get_x1(t - 2 * self.hyperparams_["window_size"])) / (2 * self.hyperparams_["window_size"])

    # Market Pressure normalized by its standard deviation.
    def get_x3(self, t: int):
        return self.get_x3_tilda(t) / self.get_x3_tilda_std(t)

    def get_x4(self, t: int):
        # The formula below was derived using Taylor expansion for x1(t-w) and x1(t - 2w), where w is a window_size
        return (3 * self.get_x3(t) - 4 * self.get_x3(t - self.hyperparams_["window_size"]) + self.get_x3(t - 2 * self.hyperparams_["window_size"])) / (2 * self.hyperparams_["window_size"] * self.get_x3_tilda_std(t))

    # sigma = volatility
    def get_sigma_squared(self, t: int) -> None:
        sigma_squared = 0
        for i in range(t, t - self.hyperparams_["window_size"] - 1, -1):
            sigma_squared += math.log(self.mid_px_[i] /
                                      self.mid_px_[i - 1]) ** 2

        return sigma_squared / self.hyperparams_["window_size"]

    # mu = trend strength
    def get_mu(self, t: int):
        sigma_squared = self.get_sigma_squared(t)
        return sigma_squared / 2 + math.log(self.mid_px_[t] / self.mid_px_[t - self.hyperparams_["window_size"]]) / self.hyperparams_["window_size"]

    def get_mu_std(self, t: int):
        return math.sqrt(self.get_sigma_squared(t) / self.hyperparams_["window_size"])

    # x3_tilda = Market Pressure, but is not normalized by its standard deviation (x3 is).
    def get_x3_tilda(self, t: int):
        total_volume = self.volume_prefix_sum[t] - \
            self.volume_prefix_sum[t - self.hyperparams_["window_size"]]
        if total_volume == 0:
            return 0

        buy_volume = self.buy_volume_prefix_sum[t] - \
            self.buy_volume_prefix_sum[t - self.hyperparams_["window_size"]]
        return 2 * buy_volume / total_volume - 1

    def get_x3_tilda_mean(self, t: int):
        x3_tilda_mean = 0
        for k in range(0, self.hyperparams_["K"]):
            x3_tilda_mean += self.get_x3_tilda(t -
                                               k * self.hyperparams_["window_size"])

        return x3_tilda_mean / (self.hyperparams_["K"])  # sample mean

    def get_x3_tilda_std(self, t: int):
        x3_tilda_mean = self.get_x3_tilda_mean(t)
        x3_tilda_std = 0
        for k in range(0, self.hyperparams_["K"]):
            assert t - k * \
                self.hyperparams_["window_size"] >= 0, print(
                    t, k * self.hyperparams_["window_size"])
            x3_tilda_std += (self.get_x3_tilda(t - k *
                             self.hyperparams_["window_size"]) - x3_tilda_mean) ** 2
        return x3_tilda_std / (self.hyperparams_["K"] - 1)  # sample variance

    def get_delta_PnL(self, t1: int, t2: int):
        pos = self.get_position(t1)
        return pos * (self.mid_px_[t2] - self.mid_px_[t1])

    def get_PnL(self, T: int):
        # t0 is assumed to be 0
        total_PnL = 0
        for i in range(self.hyperparams_["window_size"] * (self.hyperparams_["K"] + 1), T - self.hyperparams_["tau"], self.hyperparams_["tau"]):
            total_PnL += self.get_delta_PnL(t1=i,
                                            t2=i + self.hyperparams_["tau"])
        print(f"INFO: Profit: {round(total_PnL, 3):,} RUB")
        return total_PnL

    def get_sortino_ratio(self, T: int):
        max_drawdown = -float("inf")
        for t in range(0, T + 1):
            max_drawdown = max(max_drawdown, self.get_drawdown(t))
        return self.get_PnL(T) / max_drawdown

    def get_drawdown(self, t: int):
        PnL = self.get_PnL(t)
        return max(0, -PnL)


### Test on a fake dataset, where price is first raising up then going down 

In [10]:
fake_data = market_data.__copy__()

fake_prices = []
for i in range(1, fake_data.height // 2):
    fake_prices.append(i)
while len(fake_prices) < fake_data.height:
    fake_prices.append(fake_prices[-1] - 1)

fake_data = fake_data.with_columns(
    MID_PX=pl.Series(fake_prices)
)

test = MomentumStrategy(K=144,
                        risk_control=100,
                        window_size=300,
                        data=fake_data,
                        a=2,
                        beta=10,
                        alpha1=5,
                        alpha2=3,
                        alpha3=2,
                        alpha4=0.4)

test.get_PnL(T=fake_data.height);

INFO: Profit: 10,974,800.0 RUB


# Test on a real world dataset

In [11]:
# Choose relatively random parameters
test = MomentumStrategy(K=144,
                        risk_control=10_000,
                        window_size=60,
                        data=market_data,
                        a=1,
                        beta=10,
                        alpha1=5,
                        alpha2=3,
                        alpha3=2,
                        alpha4=0.4)



test.get_PnL(T=market_data.height);

INFO: Profit: 179,008.141 RUB


# Utilize grid search to find the best combination of params

In [None]:
import itertools

# Define the parameter grids
a_values = [1, 2, 2.5]
alpha1_values = [1, 3, 5, 10]
alpha2_values = [1, 3, 5, 10]
alpha3_values = [1, 3, 5, 10]
alpha4_values = [1, 3, 5, 10]
beta_values = [1, 3, 5]

# Generate all combinations of parameters
param_grid = itertools.product(a_values, alpha1_values, alpha2_values, alpha3_values, alpha4_values, beta_values)

# Iterate over all parameter combinations
results = []
best_pnl = -float("-inf")
for params in param_grid:
    a, alpha1, alpha2, alpha3, alpha4, beta = params
    
    # Instantiate the strategy with the current parameters
    test = MomentumStrategy(K=144,
                            risk_control=10_000,
                            window_size=100,
                            data=market_data,
                            a=a,
                            beta=beta,
                            alpha1=alpha1,
                            alpha2=alpha2,
                            alpha3=alpha3,
                            alpha4=alpha4)
    
    # Get the PnL for the current set of parameters
    pnl = test.get_PnL(T=market_data.height)
    # Store the result with the corresponding parameters
    results.append({
        "a": a,
        "alpha1": alpha1,
        "alpha2": alpha2,
        "alpha3": alpha3,
        "alpha4": alpha4,
        "beta": beta,
        "PnL": pnl
    })

# Optionally, you can sort the results by PnL to find the best performing set of parameters
sorted_results = sorted(results, key=lambda x: x["PnL"], reverse=True)

# Print the best result
best_result = sorted_results[0]
print("Best parameters:", best_result)