In [None]:
import yfinance as yf
from datetime import date, timedelta, datetime 
 
end_date =  date.today().strftime("%Y-%m-%d") #end date for our data retrieval will be current date 
start_date = '1990-01-01' # Beginning date for our historical data retrieval 
 
dataset_df = yf.download('AAPL', start=start_date, end=end_date)# Function used to fetch the data 
dataset_df

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def data_plot(df):
    df_plot = df.copy()
 
    ncols = 2
    nrows = int(round(df_plot.shape[1] / ncols, 0))
 
    fig, ax = plt.subplots(nrows=nrows, ncols=ncols,
                           sharex=True, figsize=(14, 7))
    for i, ax in enumerate(fig.axes):
        sns.lineplot(data=df_plot.iloc[:, i], ax=ax)
        ax.tick_params(axis="x", rotation=30, labelsize=10, length=0)
        ax.xaxis.set_major_locator(mdates.AutoDateLocator())
    fig.tight_layout()
    plt.show()
 
data_plot(dataset_df)

In [None]:
from sklearn.model_selection import train_test_split

ratio = 0.75
train,val = train_test_split(dataset_df,train_size=ratio,shuffle=False)

In [None]:
import pandas as pd
# corr = pd.concat([train_x,train_y], axis=1).corr()
sns.heatmap(train.corr(), cmap="crest")

In [None]:
from sklearn.preprocessing import StandardScaler,MinMaxScaler
import numpy as np
import torch
scaler = MinMaxScaler()

scaled_train = scaler.fit_transform(train)
scaled_val = scaler.fit_transform(val)

# Create sequences and labels for training data
sequence_length = 50  # Number of time steps to look back
X_train, y_train = [], []
for i in range(len(scaled_train) - sequence_length):
    X_train.append(scaled_train[i:i+sequence_length])
    y_train.append(scaled_train[i+1:i+sequence_length+1])
X_train, y_train = np.array(X_train), np.array(y_train)
y_train = y_train[:,:,4].reshape(-1,sequence_length,1)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)

X_train.shape,y_train.shape

In [None]:
# Create sequences and labels for testing data
sequence_length = 30  # Number of time steps to look back
X_val, y_val = [], []
for i in range(len(scaled_val) - sequence_length):
    X_val.append(scaled_val[i:i+sequence_length])
    y_val.append(scaled_val[i+1:i+sequence_length+1])
X_val, y_val = np.array(X_val), np.array(y_val)
y_val = y_val[:,:,4].reshape(-1,sequence_length,1)

# Convert data to PyTorch tensors
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_val.shape, y_val.shape

In [None]:
import torch.nn as nn

class LSTMModel(nn.Module):
      # input_size : number of features in input at each time step
      # hidden_size : Number of LSTM units 
      # num_layers : number of LSTM layers 
    def __init__(self, input_size, hidden_size, num_layers): 
        super(LSTMModel, self).__init__() #initializes the parent class nn.Module
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1)
 
    def forward(self, x): # defines forward pass of the neural network
        out, _ = self.lstm(x)
        out = self.linear(out)
        return out

In [None]:
input_size = 6
num_layers = 3
hidden_size = 64
output_size = 1

device = torch.device("cuda:0")

# Define the model, loss function, and optimizer
model = LSTMModel(input_size, hidden_size, num_layers).to(device)

loss_fn = torch.nn.MSELoss(reduction='mean')

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
print(model)


In [None]:
batch_size = 16
# Create DataLoader for batch training
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
 
# Create DataLoader for batch training
val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
num_epochs = 50
train_hist =[]
val_hist =[]
# Training loop
for epoch in range(num_epochs):
    total_loss = 0.0
    
    # Training
    model.train()
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        predictions = model(batch_X)
        loss = loss_fn(predictions, batch_y)
 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
 
        total_loss += loss.item()

	# Calculate average training loss and accuracy
    average_loss = total_loss / len(train_loader)
    train_hist.append(average_loss)

	# Validation on test data
    model.eval()
    with torch.no_grad():
    	total_val_loss = 0.0

    	for batch_X_val, batch_y_val in val_loader:
    		batch_X_val, batch_y_val = batch_X_val.to(device), batch_y_val.to(device)
    		predictions_val = model(batch_X_val)
    		val_loss = loss_fn(predictions_val, batch_y_val)

    		total_val_loss += val_loss.item()

		# Calculate average test loss and accuracy
    	average_val_loss = total_val_loss / len(val_loader)
    	val_hist.append(average_val_loss)
    if (epoch+1)%5==0:
    	print(f'Epoch [{epoch+1}/{num_epochs}] - Training Loss: {average_loss:.4f}, Val Loss: {average_val_loss:.4f}')


In [None]:
x = np.linspace(1,num_epochs,num_epochs)
plt.plot(x,train_hist,scalex=True, label="Training loss")
plt.plot(x, val_hist, label="Val loss")
plt.legend()
plt.show()

In [None]:


# Define the number of future time steps to forecast
num_forecast_steps = 30
 
# Convert to NumPy and remove singleton dimensions
sequence_to_plot = X_val.squeeze().cpu().numpy()

# Use the last 30 data points as the starting point
historical_data = sequence_to_plot[-1]
print(historical_data.shape)
 
# Initialize a list to store the forecasted values
forecasted_values = []
 
# Use the trained model to forecast future values
with torch.no_grad():
    for _ in range(num_forecast_steps*2):
        # Prepare the historical_data tensor
        historical_data_tensor = torch.as_tensor(historical_data).unsqueeze(0).float().to(device)
        # Use the model to predict the next value
        predicted_value = model(historical_data_tensor).cpu().numpy()[0,0]
        # Append the predicted value to the forecasted_values list
        forecasted_values.append(predicted_value[0])
 
        # Update the historical_data sequence by removing the oldest value and adding the predicted value
        historical_data = np.roll(historical_data, shift=-1)
        historical_data[-1] = predicted_value
 
         
# Generate futute dates
last_date = val.index[-1]
 
# Generate the next 30 dates
future_dates = pd.date_range(start=last_date + pd.DateOffset(1), periods=30)
 
# Concatenate the original index with the future dates
combined_index = val.index.append(future_dates)

In [None]:
#set the size of the plot 
plt.rcParams['figure.figsize'] = [14, 4] 


#Test data
plt.plot(val.index[-100:-30], val["Adj Close"][-100:-30], label = "val", color = "b") 
print(sequence_to_plot[-1].shape)
#reverse the scaling transformation
original_cases = scaler.inverse_transform(sequence_to_plot[-1])
original_cases = original_cases[:,4].flatten()

#the historical data used as input for forecasting
plt.plot(val.index[-30:], original_cases, label='actual values', color='green') 

rev_scaler = MinMaxScaler()
rev_scaler.scale_ = scaler.scale_[4]
rev_scaler.min_ = scaler.min_[4]
#Forecasted Values 
#reverse the scaling transformation
forecasted_cases = rev_scaler.inverse_transform(np.expand_dims(np.array(forecasted_values), axis=1)).flatten() 
# plotting the forecasted values
plt.plot(combined_index[-60:], forecasted_cases, label='forecasted values', color='red') 

plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()
plt.title('Time Series Forecasting')
plt.grid(True)
