In [None]:
import torch
import torch.nn as nn
import math


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        positions = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(positions * div_term)
        self.encoding[:, 1::2] = torch.cos(positions * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.encoding[:, :seq_len, :].to(x.device)


class TransformerStockPredictor(nn.Module):
    def __init__(self, input_dim, seq_len, d_model, n_heads, n_encoders, ff_dim, dropout=0.1):
        super(TransformerStockPredictor, self).__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len=seq_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=ff_dim,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_encoders)


        self.fc_out = nn.Sequential(
            nn.Linear(d_model * seq_len, d_model),
            nn.ReLU(),
            nn.Linear(d_model, 1)
        )

    def forward(self, x):
        x = self.input_proj(x)
        x = self.pos_encoder(x)

        x = self.transformer_encoder(x)

        x = x.view(x.size(0), -1)
        return self.fc_out(x)

# Instantiate the model
model_params = {
    "input_dim": 4,
    "seq_len": 20,
    "d_model": 96,
    "n_heads": 3,
    "n_encoders": 2,
    "ff_dim": 128,
    "dropout": 0.2
}
model = TransformerStockPredictor(input_dim=4,seq_len=20,d_model=96,n_heads=3,n_encoders=2,ff_dim=128,dropout=0.2)

# Load the model weights
model.load_state_dict(torch.load('./Downloads/Apple_model.pth'))

# Set the model to evaluation mode (if you're doing inference)
model.eval()

  model.load_state_dict(torch.load('./Downloads/Apple_model.pth'))


TransformerStockPredictor(
  (input_proj): Linear(in_features=4, out_features=96, bias=True)
  (pos_encoder): PositionalEncoding()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=96, out_features=96, bias=True)
        )
        (linear1): Linear(in_features=96, out_features=128, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
        (linear2): Linear(in_features=128, out_features=96, bias=True)
        (norm1): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
      )
    )
  )
  (fc_out): Sequential(
    (0): Linear(in_features=1920, out_features=96, bias=True)
    (1): ReLU()
    (2): Linear(in_features=96, out_features=1, bias=Tru

In [45]:
from torch.utils.data import DataLoader
import pandas as pd
class StockDataset(torch.utils.data.Dataset):
    def __init__(self, X, y, seq_len, batch_size, pad_value=0.0):

        if isinstance(X, pd.DataFrame):
            X = X.values
        if isinstance(y, pd.Series):
            y = y.values

        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        self.seq_len = seq_len
        self.batch_size = batch_size
        self.pad_value = pad_value

        # Ensure X and y are 2D
        if self.X.ndimension() == 1:  # For single feature case
            self.X = self.X.unsqueeze(1)  # Add second dimension for features
        if self.y.ndimension() == 1:  # For single feature case
            self.y = self.y.unsqueeze(1)  # Ensure y has shape (num_samples, 1)

        self.X_padded = self._pad_sequences(self.X)
        self.y_padded = self._pad_sequences(self.y)

    def _pad_sequences(self, data):
        total_len = len(data)
        remainder = (total_len % self.seq_len)

        if remainder != 0:
            pad_len = self.seq_len - remainder
            data = torch.cat((data, torch.full((pad_len, data.shape[1]), self.pad_value)), dim=0)

        total_len = len(data)
        if total_len % self.batch_size != 0:
            batch_pad_len = (self.batch_size - (total_len % self.batch_size)) % self.batch_size
            data = torch.cat((data, torch.full((batch_pad_len, data.shape[1]), self.pad_value)), dim=0)

        return data

    def __len__(self):
        return len(self.X_padded) // self.seq_len

    def __getitem__(self, idx):
        start = idx * self.seq_len
        end = start + self.seq_len
        x_seq = self.X_padded[start:end]
        y_seq = self.y_padded[start:end]
        return x_seq, y_seq



In [113]:
import yfinance as yf

st1=yf.Ticker('WFC')
dt=st1.history(start='2024-12-25',end='2025-01-3',interval='1d')
dt

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2024-12-26 00:00:00-05:00,71.43,71.839996,71.110001,71.760002,6964300,0.0,0.0
2024-12-27 00:00:00-05:00,71.18,71.739998,70.629997,71.110001,7219500,0.0,0.0
2024-12-30 00:00:00-05:00,70.410004,70.800003,69.769997,70.410004,8443800,0.0,0.0
2024-12-31 00:00:00-05:00,70.529999,70.919998,70.059998,70.239998,7031500,0.0,0.0
2025-01-02 00:00:00-05:00,70.480003,71.209999,69.760002,70.190002,8303300,0.0,0.0


In [104]:
def calculate_indicators(df, rsi_period=14, macd_fast=12, macd_slow=26, macd_signal=9):
    if 'Close' not in df.columns:
        raise ValueError("DataFrame must contain a 'Close' column")

    df['Price Change'] = df['Close'].diff()
    df['Gain'] = df['Price Change'].where(df['Price Change'] > 0, 0)
    df['Loss'] = -df['Price Change'].where(df['Price Change'] < 0, 0)

    df['Avg Gain'] = df['Gain'].rolling(window=rsi_period).mean()
    df['Avg Loss'] = df['Loss'].rolling(window=rsi_period).mean()

    df['RS'] = df['Avg Gain'] / df['Avg Loss']
    df['RSI'] = 100 - (100 / (1 + df['RS']))

    df['EMA12'] = df['Close'].ewm(span=macd_fast, adjust=False).mean()
    df['EMA26'] = df['Close'].ewm(span=macd_slow, adjust=False).mean()

    df['MACD'] = df['EMA12'] - df['EMA26']
    df['Signal Line'] = df['MACD'].ewm(span=macd_signal, adjust=False).mean()

    return df[['Close', 'RSI', 'MACD', 'Signal Line']]

In [105]:
dt=calculate_indicators(dt)

In [106]:
dt['RSI'] = dt['RSI'].fillna(50)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dt['RSI'] = dt['RSI'].fillna(50)


In [107]:
from sklearn.preprocessing import MinMaxScaler

ftrs=['Close','RSI','MACD','Signal Line']
scaler=MinMaxScaler()
dt[ftrs] = scaler.fit_transform(dt[ftrs])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  dt[ftrs] = scaler.fit_transform(dt[ftrs])


In [108]:
dc=dt['Close']
train_dt= StockDataset(dt,dc,seq_len=20,batch_size=16)
train_ld=DataLoader(train_dt,batch_size=16,shuffle=False)


In [109]:
model.eval()
with torch.no_grad():
    for x_test, y_test in train_ld:
        
        predictions=model(x_test).squeeze(-1)

In [110]:
predictions

tensor([0.2087])

In [111]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# scaler2=MinMaxScaler()

# scaler2.fit_transform()

prediction_np = predictions.detach().cpu().numpy().reshape(-1, 1)  
placeholder = np.zeros((1, 4))  # Shape: (1, 4)

# Step 3: Place the prediction value in the appropriate column (e.g., first feature)
placeholder[:, 0] = prediction_np[:, 0]  # Replace 0 with the appropriate index if needed

stock = scaler.inverse_transform(placeholder)


In [112]:
stock[0][0]

70.55728662983347