Download historical daily BTC/USD prices 

In [7]:
import requests
from dotenv import load_dotenv
import os

load_dotenv()

api_key = os.environ['ALHPA_AVANTAGE_API_KEY']
url = 'https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_DAILY&symbol=BTC&market=USD&apikey={}'.format(api_key)
r = requests.get(url)
data = r.json()

In [None]:
daily_data = data['Time Series (Digital Currency Daily)']
closing_prices = []
for day in daily_data.keys():
    closing_price = round(float(daily_data[day]['4b. close (USD)']), 2)
    closing_prices.append(closing_price)

"We have {} data points".format(len(closing_prices))

Split dataset into batches of 5 sequential price points

In [None]:
time_steps = 5

In [None]:
chunks = [closing_prices[x:x+time_steps] for x in range(0, len(closing_prices), time_steps)]
train_test_split_index = int(0.8 * len(chunks))

Prepare training and test data

In [None]:
training_data = chunks[0:train_test_split_index]
test_data = chunks[train_test_split_index:]
len(training_data), training_data[0], len(test_data), test_data[0]


Turn training and test data into pytorch tensors

In [None]:
import torch
import torch.nn as nn

In [None]:
train = torch.Tensor(training_data) / 10000
test = torch.Tensor(test_data) / 10000
train.shape, test.shape

In [None]:
input_to_output = nn.Linear(1 + time_steps, 1)
input_to_hidden = nn.Linear(1 + time_steps, time_steps)

Feed forward operation

In [None]:
def forward(input, hidden):
    combined_input = torch.cat((input, hidden), 1)
    output = input_to_output(combined_input)
    new_hidden = input_to_hidden(combined_input)
    return output, new_hidden

In [None]:
hidden = torch.zeros(1, time_steps)
for i in range(0, time_steps):
    input = train[0][i:i+1].unsqueeze(1)
    output, hidden = forward(input, hidden)
print(output)

Train the model in randomized batches in one epoch

In [None]:
batch_size = 10
learning_rate = 0.0005
mse = nn.MSELoss()

input_to_output = nn.Linear(1 + time_steps, 1)
input_to_hidden = nn.Linear(1 + time_steps, time_steps)

def run_epoch():
    randomized_training_indices = torch.randperm(train.shape[0])
    losses = []
    # for each training batch
    for new_batch_start in range(0, len(randomized_training_indices), batch_size):
        batch_indices = [randomized_training_indices[x].item() for x in range(new_batch_start, new_batch_start+batch_size)]
        
        predictions = torch.zeros(len(batch_indices))
        outputs = torch.zeros(len(batch_indices))
        index = 0

        input_to_hidden.zero_grad()
        input_to_output.zero_grad()
    
        # for training data in batch
        for batch_index in batch_indices:

            hidden = torch.zeros(1, time_steps)
            single_train_data = train[batch_index]
            prediction = 0
            # for each time step
            for step in range(0, len(single_train_data - 1)):
                input = single_train_data[step:step+1].unsqueeze(1)
                prediction, hidden = forward(input, hidden)          
            
            actual = single_train_data[-1]
            predictions[index] = prediction
            outputs[index] = actual
            index += 1
        
        # calculate error
        loss = mse(predictions, outputs)
        losses.append(loss.item())

        # back propagation to adjust weights
        loss.backward()
        for p in input_to_hidden.parameters():
            p.data.add_(p.grad.data, alpha=-learning_rate)
        input_to_hidden.grad = None

        for p in input_to_output.parameters():
            p.data.add_(p.grad.data, alpha=-learning_rate)
        input_to_output.grad = None
    
    print("avg epoch loss: {}".format(sum(losses)/ len(losses)))

In [None]:
for i in range(0, 30):
    run_epoch()

Check prediction on test data

In [None]:
for i in range(0, test.shape[0]):
    single_test_data = test[i]
    print(single_test_data)
    hidden = torch.zeros(1, time_steps)
    prediction = 0
    # for each time step
    for step in range(0, len(single_test_data) - 1):
        input = single_test_data[step:step+1].unsqueeze(1)
        prediction, hidden = forward(input, hidden)          
    
    actual = single_test_data[-1]
    percentage_error = round(((prediction.item() - actual.item()) * 100) / actual.item(), 3)
    print("prediction: {}, actual: {}, percentage error={}".format(prediction  * 10000, actual * 10000, percentage_error))