In [None]:
import math

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.nn.utils import parameters_to_vector, vector_to_parameters

from pymoo.core.problem import ElementwiseProblem

import column_names
from engine import Signal, Spot, Trader
import usdt

In [None]:
class NNTrader(Trader):
	def __init__(self, model: nn.Module, max_pos_value: usdt = 100, warmup: int = 600):
		super().__init__()

		self.model = model
		self.max_pos_value = max_pos_value
		self.warmup = warmup

	@torch.inference_mode()
	def __call__(self, *, data_so_far: pd.DataFrame, engine: Spot) -> Signal:
		# feed the model the ratio of the last two bars Open
		ratio: float = data_so_far.iloc[-1]['Open'] / data_so_far.iloc[-2]['Open']
		ratio = math.log(ratio)
		ratio = torch.tensor([[ratio]], dtype=torch.float64)
		signal = self.model(ratio)

		# ignore the first warmup bars
		if len(data_so_far) < self.warmup: return Signal.HOLD

		if signal < -0.5: return Signal.SELL

		# do not buy if the position is too big
		if engine.position and engine.position.tot_value >= self.max_pos_value:
			return Signal.HOLD

		if signal > +0.5: return Signal.BUY

		return Signal.HOLD


class Problem(ElementwiseProblem):
	@torch.inference_mode()
	def __init__(self, model: nn.Module, data: pd.DataFrame, *args, **kwargs):
		super().__init__(
			n_var = len(parameters_to_vector(model.parameters())),
			n_obj = 4,
			n_constr = 0,
			xl = -3,
			xu = +3,
			*args,
			**kwargs,
		)

		self.model = model
		self.data = data

	@torch.inference_mode()
	def _evaluate(self, x: np.ndarray, out, *args, **kwargs):
		self.model.h = None # reset hidden state
		vector_to_parameters(torch.from_numpy(x), self.model.parameters())

		engine = Spot()
		trader = NNTrader(self.model, max_pos_value=100)
		engine.trade(trader=trader, data=self.data, min_order_value=5, initial_balance=200, commission=0.001)

		# Since we are doing minimization, multiply balance and the number of positions by -1
		# Mark all profitable positions with -1, so they are always better,
		# even when their MDD and the number of positions is worse
		out['F'] = [-engine.balance, engine.mdd, -math.log(len(engine.positions)+1), -int(engine.balance > 200)]


class Model(nn.Module):
	def __init__(self, input_size=1, hidden_size=10, num_layers=5, bias=True):
		super().__init__()
		self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, bias=bias)
		self.linear = nn.Linear(hidden_size, 1, bias=bias)
		self.h = None # hidden state is preserved between calls

	def forward(self, x):
		x, self.h = self.lstm(x, self.h)
		x = self.linear(x)
		return nn.functional.tanh(x)

with torch.inference_mode():
	model = Model(input_size=1, hidden_size=12, num_layers=2, bias=False)
	model.compile()

len(parameters_to_vector(model.parameters()))

In [None]:
!mkdir -p data
!wget -nc https://data.binance.vision/data/spot/monthly/klines/XRPUSDT/1s/XRPUSDT-1s-2025-02.zip -O data/XRPUSDT-1s-2025-02.zip

In [None]:
df = pd.read_csv(
	'data/XRPUSDT-1s-2025-02.zip',
	names = column_names.names,
	index_col = 'Open time',
	usecols = ['Open time', 'Open', 'High', 'Low', 'Close', 'Volume'],
	skiprows=24 * 60 * 60,
	nrows=30 * 60,
)
df.index = pd.to_datetime(df.index, unit='us')

In [None]:
import multiprocessing

from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.core.problem import StarmapParallelization
from pymoo.core.termination import NoTermination
from pymoo.operators.crossover.ux import UniformCrossover

pool = multiprocessing.Pool()
runner = StarmapParallelization(pool.starmap)

problem = Problem(model=model, data=df, elementwise_runner=runner)

algorithm = NSGA2(
	pop_size=100,
	crossover=UniformCrossover(prob=1.0),
)
algorithm.setup(problem, termination=NoTermination());

In [None]:
# rerun for as many generations as you want
pop = algorithm.ask()
# restore profitable individuals
if algorithm.n_gen == 1:
	for p, r in zip(pop, np.loadtxt('results.txt', delimiter=',')):
		p.X = r

algorithm.evaluator.eval(problem, pop)
algorithm.tell(infills=pop)
res = algorithm.result()

In [None]:
print(f'gen: {algorithm.n_gen - 1}  profitable: {(res.F[:, 0] < -200).sum()}')
for balance, mdd, n_pos, _ in res.F:
	balance = -balance
	mdd *= 100
	# n_pos = int(-n_pos)
	n_pos = round(math.exp(-n_pos) - 1)

	if balance < 199.99 or n_pos == 0: continue

	print(f'{balance:.4f}, {mdd:.4f}%, {n_pos}')

In [None]:
np.savetxt('results.txt', res.X[res.F[:, 0] < -200], delimiter=', ')