In [31]:
import pandas as pd
from indicators import RSI, extract_bb
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.ensemble import GradientBoostingClassifier
import numpy as np
import warnings
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

warnings.filterwarnings("ignore")

In [32]:
df = pd.read_csv("sber_5min.csv")
df = df.dropna().sample(frac=1).reset_index(drop=True)
df = df.drop(columns=["<TICKER>", "<PER>", "<DATE>", "<TIME>"])
df.columns = ["open", "high", "low", "close", "volume"]
df

Unnamed: 0,open,high,low,close,volume
0,312.92,313.05,312.84,313.04,298720
1,289.10,289.73,289.06,289.44,397350
2,307.80,307.88,307.80,307.80,42360
3,308.20,308.25,308.13,308.14,22400
4,307.98,308.00,307.84,307.99,67190
...,...,...,...,...,...
11699,308.65,308.70,308.58,308.69,92180
11700,294.59,294.60,294.58,294.59,26270
11701,294.52,294.52,294.35,294.47,168720
11702,277.41,277.60,277.40,277.41,236810


In [33]:
n_steps = 11

prices = df["close"]

rsi_values = RSI(prices=prices, n_steps=n_steps)
bb_values = extract_bb(prices=prices, n_steps=n_steps)

assert len(rsi_values) == len(
    bb_values
), f"Indicators length don't coincide: {len(rsi_values)} and {len(bb_values)}"

In [34]:
def prepare_target(df, steps_obs: int = 3):
    targets = []
    for i in range(0, len(df) - steps_obs):
        current_price = df["close"].iloc[i]
        max_price = df["high"].iloc[i + 1 : i + 1 + steps_obs].max()
        targets.append(max_price > current_price)
    targets += [0] * steps_obs
    return np.array(targets, dtype=np.int32)


steps_obs = 3

targets = prepare_target(df=df, steps_obs=steps_obs)

In [35]:
all_data = (
    pd.DataFrame(
        data=np.array([rsi_values, bb_values, targets]).T,
        columns=["rsi", "bb", "target"],
    )
    .dropna()
    .reset_index(drop=True)
    .astype(np.float64)
)
all_data["target"] = all_data["target"].astype(np.int32)
all_data

Unnamed: 0,rsi,bb,target
0,48.422431,-1.539826,1
1,50.304004,0.814829,1
2,52.712586,1.146838,0
3,45.855094,-0.338982,1
4,46.126228,0.546342,1
...,...,...,...
11688,58.121775,1.234486,0
11689,43.935199,-0.497771,1
11690,53.362391,-0.537873,0
11691,37.069961,-2.072990,0


In [36]:
all_data.describe()

Unnamed: 0,rsi,bb,target
count,11693.0,11693.0,11693.0
mean,49.986794,-0.001935,0.754896
std,6.834738,0.958131,0.430167
min,5.513514,-2.93708,0.0
25%,45.455849,-0.715903,1.0
50%,49.981904,0.08564,1.0
75%,54.471545,0.774324,1.0
max,83.465399,2.547976,1.0


# Обработка фичей

In [37]:
scaler = MinMaxScaler()
scaler.fit(all_data.iloc[:, :-1])
all_data.iloc[:, :-1] = scaler.transform(all_data.iloc[:, :-1])
all_data

Unnamed: 0,rsi,bb,target
0,0.550454,0.254738,1
1,0.574591,0.684024,1
2,0.605490,0.744554,0
3,0.517519,0.473668,1
4,0.520997,0.635075,1
...,...,...,...
11688,0.674881,0.760533,0
11689,0.492890,0.444719,1
11690,0.613826,0.437408,0
11691,0.404820,0.157535,0


In [38]:
X = all_data.iloc[:, :-1]
y = all_data.iloc[:, -1]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=0, shuffle=False
)

# Обучение модели

## Логистическая регрессия

In [39]:
model = LogisticRegression()
results = cross_validate(model, X, y, cv=5, scoring="balanced_accuracy")
results["test_score"].mean()

0.7003641632555592

In [40]:
tree_params = {
    "criterion": ["gini", "log_loss", "entropy"],
    "max_depth": [15, 20, 25],
    "min_samples_split": [2, 3, 4],
    "min_samples_leaf": [2, 3, 4],
}

## Дерево решений (Decision Tree)

In [41]:
grs = GridSearchCV(
    DecisionTreeClassifier(random_state=0),
    cv=6,
    param_grid=tree_params,
    n_jobs=-1,
    scoring="balanced_accuracy",
)
grs.fit(X, y)

In [42]:
grs.best_score_

0.7062308861622433

In [43]:
grs.best_params_

{'criterion': 'gini',
 'max_depth': 15,
 'min_samples_leaf': 4,
 'min_samples_split': 2}

## Градиентный бустинг (XGboost)

In [44]:
boost_params = {
    "loss": ["log_loss", "exponential"],
    "learning_rate": [0.1, 0.2],
    "n_estimators": [70, 80, 90],
    "max_depth": [2, 3],
    "min_samples_leaf": [3, 4, 5],
    "min_samples_split": [2, 3],
}

In [45]:
grs = GridSearchCV(
    GradientBoostingClassifier(random_state=0),
    cv=5,
    param_grid=boost_params,
    n_jobs=-1,
    scoring="balanced_accuracy",
)
grs.fit(X, y)
print(grs.best_score_)
print(grs.best_params_)

0.7090379735644307
{'learning_rate': 0.1, 'loss': 'log_loss', 'max_depth': 3, 'min_samples_leaf': 5, 'min_samples_split': 2, 'n_estimators': 70}


## Базовая нейронная сеть

In [46]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [47]:
X_train = torch.Tensor(X_train.values)
y_train = torch.Tensor(y_train.values).reshape(-1, 1)
X_test = torch.Tensor(X_test.values)
y_test = torch.Tensor(y_test.values).reshape(-1, 1)

In [57]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader


# Define a simple neural network
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc1 = nn.Linear(2, 32)
        self.fc2 = nn.Linear(32, 1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

batch_size = 256
dataset_train = TensorDataset(X_train, y_train)
dataset_test = TensorDataset(X_test, y_test)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=False)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

model = SimpleModel()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

print(count_parameters(model))

129


In [58]:
epochs = 1500
for epoch in range(epochs):
    loss_train = []

    for inputs, labels in dataloader_train:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        loss_train.append(loss.item())
        optimizer.step()

    if epoch % 100 == 0:
        loss_test = []
        for inputs, labels in dataloader_test:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss_test.append(loss.item())
            optimizer.step()
        print(
            f"Epoch [{epoch+1}/{epochs}], Loss_train: {np.mean(loss_train):.4f}, Loss_test: {np.mean(loss_test):.4f}"
        )


Epoch [1/1500], Loss_train: 0.6943, Loss_test: 0.6908
Epoch [101/1500], Loss_train: 0.5046, Loss_test: 0.5071
Epoch [201/1500], Loss_train: 0.4330, Loss_test: 0.4394
Epoch [301/1500], Loss_train: 0.3991, Loss_test: 0.4215
Epoch [401/1500], Loss_train: 0.3903, Loss_test: 0.4175
Epoch [501/1500], Loss_train: 0.3839, Loss_test: 0.4090
Epoch [601/1500], Loss_train: 0.3834, Loss_test: 0.4139
Epoch [701/1500], Loss_train: 0.3817, Loss_test: 0.4136
Epoch [801/1500], Loss_train: 0.3814, Loss_test: 0.4179
Epoch [901/1500], Loss_train: 0.3806, Loss_test: 0.4122
Epoch [1001/1500], Loss_train: 0.3776, Loss_test: 0.3998
Epoch [1101/1500], Loss_train: 0.3799, Loss_test: 0.4057
Epoch [1201/1500], Loss_train: 0.3796, Loss_test: 0.4117
Epoch [1301/1500], Loss_train: 0.3791, Loss_test: 0.4098
Epoch [1401/1500], Loss_train: 0.3796, Loss_test: 0.4146


## Реккурентная нейронная сеть

In [68]:
def prepare_input(data, sequence_length):

    input_sequences = []
    for i in range(len(data) - sequence_length + 1):
        input_seq = data[i : i + sequence_length]
        input_sequences.append(input_seq)
    return torch.stack(input_sequences)


sequence_length = 3
X_train_rnn = prepare_input(X_train, sequence_length)
y_train_rnn = y_train[sequence_length - 1 :]

X_test_rnn = prepare_input(X_test, sequence_length)
y_test_rnn = y_test[sequence_length - 1 :]

In [69]:
y_train_rnn.shape, X_train_rnn.shape

(torch.Size([9352, 1]), torch.Size([9352, 3, 2]))

In [70]:
class SequentialModel(nn.Module):
    def __init__(
        self,
        input_size: int = 2,
        hidden_size: int = 8,
        output_size: int = 1,
        rnn_layers: int = 2,
        n_layers: int = 2,
        dropout: float = 0.1,
        input_layer_type=nn.RNN,
    ):
        super(SequentialModel, self).__init__()
        self.rnn_layer = input_layer_type(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=rnn_layers,
            batch_first=True,
            dropout=dropout,
        )
        self.fcc_layers = []
        for i in range(n_layers):
            self.fcc_layers.append(nn.Linear(hidden_size, hidden_size))
        self.output_layer = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        rnn_output, h0 = self.rnn_layer(x)
        x = torch.relu(rnn_output[:, -1, :])
        for fcc_layer in self.fcc_layers:
            x = torch.relu(fcc_layer(x))
            x = self.dropout(x)
        x = self.output_layer(x)
        return x


batch_size = 256
dataset_train = TensorDataset(X_train_rnn, y_train_rnn)
dataset_test = TensorDataset(X_test_rnn, y_test_rnn)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)

model = SequentialModel(
    input_size=2,
    hidden_size=4,
    rnn_layers=3,
    n_layers=2,
    output_size=1,
    dropout=0.1,
    input_layer_type=nn.RNN,
)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

count_parameters(model)

117

In [71]:
epochs = 1600
for epoch in range(epochs):
    loss_train = []

    for inputs, labels in dataloader_train:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        loss_train.append(loss.item())
        optimizer.step()

    if epoch % 100 == 0:
        loss_test = []
        for inputs, labels in dataloader_test:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss_test.append(loss.item())
            optimizer.step()
        print(
            f"Epoch [{epoch+1}/{epochs}], Loss_train: {np.mean(loss_train):.4f}, Loss_test: {np.mean(loss_test):.4f}"
        )


Epoch [1/1600], Loss_train: 0.7375, Loss_test: 0.7357
Epoch [101/1600], Loss_train: 0.6155, Loss_test: 0.6151
Epoch [201/1600], Loss_train: 0.5612, Loss_test: 0.5505
Epoch [301/1600], Loss_train: 0.5454, Loss_test: 0.5332
Epoch [401/1600], Loss_train: 0.5218, Loss_test: 0.5291
Epoch [501/1600], Loss_train: 0.5087, Loss_test: 0.5137
Epoch [601/1600], Loss_train: 0.5023, Loss_test: 0.5033
Epoch [701/1600], Loss_train: 0.4930, Loss_test: 0.4964
Epoch [801/1600], Loss_train: 0.4892, Loss_test: 0.4877
Epoch [901/1600], Loss_train: 0.4825, Loss_test: 0.4875
Epoch [1001/1600], Loss_train: 0.4804, Loss_test: 0.4689
Epoch [1101/1600], Loss_train: 0.4790, Loss_test: 0.4776
Epoch [1201/1600], Loss_train: 0.4775, Loss_test: 0.4820
Epoch [1301/1600], Loss_train: 0.4807, Loss_test: 0.4646
Epoch [1401/1600], Loss_train: 0.4817, Loss_test: 0.4786
Epoch [1501/1600], Loss_train: 0.4762, Loss_test: 0.4684


In [72]:
from sklearn.metrics import accuracy_score

y_pred = torch.sigmoid(model(X_test_rnn)).detach().numpy().reshape(-1)
y_pred = np.round(y_pred)
accuracy_score(y_test_rnn, y_pred)

0.7355584082156611