<a href="https://colab.research.google.com/github/johnymeng/pytorch/blob/main/rnn-colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# This mounts your Google Drive to the Colab VM.
from google.colab import drive
drive.mount('/content/drive')

# TODO: Enter the foldername in your Drive where you have saved the unzipped
# assignment folder, e.g. 'cs231n/assignments/assignment1/'
FOLDERNAME = 'PyTorch Test Project/N_TXT'
assert FOLDERNAME is not None, "[!] Enter the foldername."

# Now that we've mounted your Drive, this ensures that
# the Python interpreter of the Colab VM can load
# python files from within it.
import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))

# This downloads the CIFAR-10 dataset to your Drive
# if it doesn't already exist.
%cd /content/drive/My\ Drive/$FOLDERNAME/cs231n/datasets/
!bash get_datasets.sh
%cd /content/drive/My\ Drive/$FOLDERNAME

In [None]:
import torch
import pandas as pd
import numpy as np
from torch import nn
import os
import sklearn
import matplotlib.pyplot as plt
import sys

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
path = "/content/drive/MyDrive/PyTorch Test Project/N_TXT"

os.chdir(path)

fx_data = np.array([], dtype=np.float32)

#gets data from file specified by file_path
def get_contents(file_path):
    global fx_data
    skip_lines = 7

    with open(file_path,'r') as file:

        for skips in range(skip_lines):
            next(file)

        lines = file.readlines()[:-1]

        for line in lines:
            data = line.split()[-1]
            fx_data = np.append(fx_data, np.float32(data))
            #print(data)

In [None]:
#go through all files in directory and extract data from txt files
for file in os.listdir():
    if file.endswith(".txt"):
        file_path = f"{path}/{file}"
        print(f"Currently working on: {file_path}")
        get_contents(file_path)

fx_data.dtype

In [None]:
#create time "sample" for dataframe
time_data = np.array([], dtype=np.float32)

for x in range(len(fx_data)):
    time_data = np.append(time_data, np.float32(x))
    type(x)

time_data, time_data[0].dtype

In [None]:
#create time "sample" for dataframe
dataset = pd.DataFrame({"Time": time_data,
                        "FX Channel": fx_data})
dataset

In [None]:
plt.plot(dataset["FX Channel"])

In [None]:
dataset.iloc[:, 1]

In [None]:
dataset["FX Channel"].shape, type(dataset["FX Channel"])

In [None]:
from sklearn.model_selection import train_test_split

RANDOM_SEED = 42

#split into train, test and validation, convert PD series to numpy first before train_test_split
X_train, X_test, y_train, y_test = train_test_split(dataset["Time"].to_numpy(), dataset["FX Channel"].to_numpy(), test_size = 0.2, random_state=RANDOM_SEED)

X_train.size, X_test.size, X_train.shape, X_test.shape

In [None]:
#reshape X_train and y_train so that we can use scaler from sklearn
X_train = X_train.reshape(-1, 1)
X_test = X_test.reshape(-1, 1)
X_test.shape, X_train.shape, y_train.shape

In [None]:
from sklearn import preprocessing

#scale and normalize data
scaler = preprocessing.StandardScaler()

scaler.fit_transform(X_train)
scaler.fit_transform(X_test)

In [None]:
#set device
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
#convert data from numpy to tensors
X_train = torch.from_numpy(X_train).type(torch.float32)
X_test = torch.from_numpy(X_test).type(torch.float32)
y_train = torch.from_numpy(y_train).type(torch.float32)
y_test = torch.from_numpy(y_test).type(torch.float32)

In [None]:
type(X_train), X_train.dtype

In [None]:
dataset

In [None]:
time_data.shape, fx_data.shape, fx_data.shape[0]

In [None]:
#send data to device
X_train = X_train.to(device).type(torch.float32)
X_test = X_test.to(device).type(torch.float32)
y_test = y_test.to(device).type(torch.float32)
y_train = y_train.to(device).type(torch.float32)

In [None]:
#trying to split data loader now? not too sure if this is the best way
split_idx = int(0.8 * len(dataset))

train_data = dataset[:split_idx]
test_data = dataset[split_idx:]

train_data.size, test_data.size, train_data["FX Channel"], train_data["Time"]

In [None]:
# for x in range(10):
#     print(f"{train_data[x]}\n")

In [None]:
time_data.shape, time_data[7].size, fx_data[0].size

In [None]:
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset
from pathlib import Path

split_idx = int(0.8 * len(fx_data))
print(f"{split_idx}")

#convert data to use for dataloader
class FX_train_dataset(Dataset):
    def __init__(self):
        self.x = torch.from_numpy(time_data[:split_idx])
        self.y = torch.from_numpy(fx_data[:split_idx])
        self.n_samples = split_idx

    def __len__(self):
        return self.n_samples

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

#convert data to use for dataloader
class FX_test_dataset(Dataset):
    def __init__(self):
        self.x = torch.from_numpy(time_data[split_idx:])
        self.y = torch.from_numpy(fx_data[split_idx:])
        self.n_samples = fx_data.shape[0] - split_idx

    def __len__(self):
        return self.n_samples

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [None]:
train_data = FX_train_dataset()
test_data = FX_test_dataset()

len(train_data), train_data[0]

In [None]:
from torch.utils.data import DataLoader

#setup batch size hyperparameter
BATCH_SIZE = 16

#turn datasets into iterables (batches)
train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              shuffle=True)

test_dataloader = DataLoader(dataset=test_data,
                              batch_size=BATCH_SIZE,
                              shuffle=False)

#check out what we've created
print(f"Dataloaders: {train_dataloader, test_dataloader}")
print(f"Length of train_dataloader: {len(train_dataloader)} batches of {BATCH_SIZE}")
print(f"Length of train_dataloader: {len(test_dataloader)} batches of {BATCH_SIZE}")

In [None]:
type(train_dataloader)

In [None]:
# #checkout whats inside the training dataloader
train_features_batch, train_labels_batch = next(iter(train_dataloader))
train_features_batch.shape, train_labels_batch.shape


In [None]:
#set manual seed
torch.manual_seed(42)
torch.cuda.manual_seed(42)

In [None]:
X_train.dtype

In [None]:
# class RNN(nn.Module):
#     def __init__(self, input_size, output_size, hidden_units):
#         """
#             input_size = number of features of input data
#             output_size = number of features for output data
#             hidden_units = number of hidden neurons per layer
#         """
#         super().__init__()

#         #init layer sizes
#         self.input_size = input_size
#         self.output_size = output_size
#         self.hidden_units = hidden_units

#         self.i2h = nn.Linear(input_size, hidden_units, bias=False)
#         self.h2h = nn.Linear(hidden_units, hidden_units)
#         self.h2o = nn.Linear(hidden_units, output_size)

#         def forward(self, x, hidden_state):
#             x = self.i2h(x)
#             hidden_state = torch.tanh(x + hidden_state)
#             out = self.h2o(hidden_state)
#             return out, hidden_state

#         def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
#             return torch.zeroes(batch_size, self.hidden_units, requires_grad = False)


In [None]:
"""
Model definition
"""
class RNN(nn.Module):
    """
    Basic RNN block. This represents a single layer of RNN
    """
    def __init__(self, input_size: int, hidden_size: int, output_size: int) -> None:
        """
        input_size: Number of features of your input vector
        hidden_size: Number of hidden neurons
        output_size: Number of features of your output vector
        """
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = 16

        self.i2h = nn.Linear(input_size, hidden_size, bias=False)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)


    def forward(self, x, hidden_state) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Returns softmax(linear_out) and tanh(i2h + i2o)
        Inputs
        ------
        x: Input vector x  with shape (vocab_size, )
        hidden_state: Hidden state matrix
        Outputs
        -------
        out: Prediction vector
        hidden_state: New hidden state matrix
        """
        x = self.i2h(x)
       # print(f"X rnn shape: {x.shape}")
        hidden_state = self.h2h(hidden_state)
        hidden_state = torch.tanh(x + hidden_state)
       # print(f"hidden_Staete shape: {hidden_state.shape}")
        return self.h2o(hidden_state), hidden_state

    def init_zero_hidden(self, batch_size=1) -> torch.Tensor:
        """
        Returns a hidden state with specified batch size. Defaults to 1
        """
        return torch.zeros(batch_size, self.hidden_size, requires_grad=False)

In [None]:
#THIS MODEL WORKS BUT NOT VERY WELL

# class RNN(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.lstm = nn.LSTM(input_size=1, hidden_size=32, num_layers=1, batch_first=True)
#         self.linear = nn.Linear(32, 1)
#     def forward(self, x):
#         x, _ = self.lstm(x)
#         x = self.linear(x)
#         return x

In [None]:
rnn = RNN(input_size = 1,
          hidden_size = 16,
          output_size = 1,
         ).to(device)

loss_fn = nn.MSELoss()

optimizer = torch.optim.Adam(params=rnn.parameters(),
                             lr = 0.25)

In [None]:
def training_loop(model: torch.nn.Module,
                dataloader: torch.utils.data.DataLoader,
                epochs: int,
                optimizer: torch.optim,
                loss_fn: torch.optim.Optimizer):
    train_loss = {}
    model.to(device)

    #set to train
    model.train()

    for epoch in range(epochs):
        epoch_loss = list()

        loss = 0

        for batch, (X, y) in enumerate(train_dataloader):

            hidden = model.init_zero_hidden(batch_size=model.batch_size)

            #print(f"X shape: {X.shape}")

            X, y, hidden = X.to(device).reshape(1, 16,1), y.to(device), hidden.to(device)
            #make pred
            y_pred, hidden = model(X, hidden)

            #calc loss
            loss += loss_fn(y_pred, y)

            #optim zero grad
            model.zero_grad()

        #back prop
        loss.backward()

        #optim step
        nn.utils.clip_grad_norm_(model.parameters(), 2)
        optimizer.step()

        epoch_loss.append(loss.detach().item() / len(train_dataloader))

        train_loss[epoch] = torch.tensor(epoch_loss).mean()

        print(f"epoch: {epoch} | loss: {train_loss[epoch]}")

In [None]:
X_train.type, X_train.dtype

In [None]:
EPOCHS = 1000

training_loop(model=rnn,
              dataloader=train_dataloader,
              epochs = EPOCHS,
              optimizer=optimizer,
              loss_fn=loss_fn)

In [None]:
  # Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    """Calculates accuracy between truth labels and predictions.

    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.

    Returns:
        [torch.float]: Accuracy value between y_true and y_pred, e.g. 78.45
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [None]:
predictions = np.array([])

# Move values to device
torch.manual_seed(42)
def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn,
               device: torch.device = device):
    """Evaluates a given model on a given dataset.

    Args:
        model (torch.nn.Module): A PyTorch model capable of making predictions on data_loader.
        data_loader (torch.utils.data.DataLoader): The target dataset to predict on.
        loss_fn (torch.nn.Module): The loss function of model.
        accuracy_fn: An accuracy function to compare the models predictions to the truth labels.
        device (str, optional): Target device to compute on. Defaults to device.

    Returns:
        (dict): Results of model making predictions on data_loader.
    """
    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            global predictions
            # Send data to the target device
            X, y = X.reshape(1, len(X), 1).to(device), y.reshape(1, len(X), 1).to(device)
            y_pred = model(X)
            predictions = np.append(predictions, y_pred.cpu())
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1))

        # Scale loss and acc
        loss /= len(data_loader)
        acc /= len(data_loader)
    return {"model_name": model.__class__.__name__, # only works when model was created with a class
            "model_loss": loss.item(),
            "model_acc": acc}

In [None]:
eval_model(model=rnn,
           data_loader= test_dataloader,
           loss_fn = loss_fn,
           accuracy_fn=accuracy_fn,
           device=device)

In [None]:
# Plot linear data or training and test and predictions (optional)
def plot_predictions(
    train_data, train_labels, test_data, test_labels, predictions=None
):
    """
  Plots linear training data and test data and compares predictions.
  """
    plt.figure(figsize=(20, 15))

    # Plot training data in blue
    plt.scatter(train_data, train_labels, c="b", s=4, label="Training data")

    # Plot test data in green
    plt.scatter(test_data, test_labels, c="g", s=4, label="Testing data")

    if predictions is not None:
        # Plot the predictions in red (predictions were made on the test data)
        plt.scatter(test_data, predictions, c="r", s=4, label="Predictions")

    # Show the legend
    plt.legend(prop={"size": 14})

In [None]:
plot_predictions(X_train.cpu().numpy(), y_train.cpu().numpy(), X_test.cpu().numpy(), y_test.cpu().numpy(), predictions=predictions)