In [286]:
import matplotlib.pyplot as plt
import pandas as pd
import torch

In [287]:
df = pd.read_csv("Gemini_BTCUSD_1h.csv", parse_dates=["date"])

In [288]:
def generate_label_window(closings, dim: int, lookahead: int):
    window = closings[:dim]
    window_last_price = window[-1]
    with_lookahead = closings[:dim + lookahead]

    stddev_half = torch.std(with_lookahead) * 0.5

    buy = torch.abs(torch.min(with_lookahead) - window_last_price) < stddev_half 
    sell = torch.abs(torch.max(with_lookahead) - window_last_price) < stddev_half

    if buy == sell:
        return 0
    
    return -1 if sell else 1

In [289]:
closings = torch.flip(torch.tensor(df["close"][:4000], dtype=torch.float), dims=(0,))
closings -= torch.mean(closings)
closings /= torch.std(closings)
dim = 48
lookahead = 24
win_size = dim + lookahead

n = len(closings) - dim - lookahead

xTr_sell = []
yTr_sell = []

xTr_buy = []
yTr_buy = []

xTr_none = []

for win_start in range(n):
  xTr_i = closings[win_start:][:dim]
  yTr_i = generate_label_window(closings[win_start:], dim, lookahead)
  
  if yTr_i == 1:
    xTr_buy.append(xTr_i)
    yTr_buy.append(1)
  elif yTr_i == -1:
    xTr_sell.append(xTr_i)
    yTr_sell.append(1)
  else:
    xTr_none.append(xTr_i)


xTr_sell = torch.stack(xTr_sell + xTr_none[:len(xTr_sell)])
xTr_buy = torch.stack(xTr_buy + xTr_none[:len(xTr_buy)])

yTr_sell = torch.stack([torch.tensor(yTr_sell + [0] * len(yTr_sell), dtype=torch.float)]).mT
yTr_buy = torch.stack([torch.tensor(yTr_buy + [0] * len(yTr_buy), dtype=torch.float)]).mT

In [None]:
import torch


def train_model(xTr, yTr):
    model = torch.nn.Sequential( 
        torch.nn.Linear(in_features = dim, out_features = 10),
        torch.nn.ReLU(),
        torch.nn.Linear(in_features = 10, out_features = 10),
        torch.nn.ReLU(),
        torch.nn.Linear(in_features = 10, out_features = 1),
        torch.nn.Sigmoid(),
    )

    # Train the model 
    criterion = torch.nn.BCELoss() 
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    
    err = 0
    num_epochs = 10000
    for epoch in range(num_epochs): 
        # Forward pass 
        y_pred = model(xTr)
        loss = criterion(y_pred, yTr) 
    
        # Backward pass and optimization 
        optimizer.zero_grad() 
        loss.backward()
        optimizer.step()
    
        err = torch.abs(y_pred - yTr).sum() / yTr.shape[0]

        # Print the loss every 100 epochs 
        if (epoch+1) % 100 == 0: 
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    return model, err


sell_model, sell_error = train_model(xTr_sell, yTr_sell)
buy_model, buy_error = train_model(xTr_buy, yTr_buy)

print(f"Buy Error: {buy_error * 100}%, Sell Error: {sell_error * 100}%")