In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# --- 0. Configuration ---
# Check if a GPU is available and set the device accordingly.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- 1. Load and Inspect Data ---
try:
    # Load the dataset from the provided CSV file
    df = pd.read_csv('stock_data large.csv')
    print("\nSuccessfully loaded dataset.")
    print("Dataset shape:", df.shape)
    print("First 5 rows:")
    print(df.head())
except FileNotFoundError:
    print("Error: 'stock_data large.csv' not found. Please ensure the file is in the correct directory.")
    exit()

# --- 2. Data Preprocessing ---
# Separate features (X) and the target variable (y)
X = df.drop('Target', axis=1).values
y = df['Target'].values

# NOTE: The data is assumed to be pre-scaled. If not, a scaler like
# MinMaxScaler from sklearn should be used here.
X_scaled = X

# --- 3. Create Time-Series Sequences ---
def create_sequences(data, target, time_steps=60):
    """Creates sequences from the dataset for the LSTM model."""
    Xs, ys = [], []
    for i in range(len(data) - time_steps):
        v = data[i:(i + time_steps)]
        Xs.append(v)
        ys.append(target[i + time_steps])
    return np.array(Xs), np.array(ys)

TIME_STEPS = 60
X_seq, y_seq = create_sequences(X_scaled, y, TIME_STEPS)

print(f"\nData reshaped into sequences with {TIME_STEPS} time steps.")
print("X_seq shape:", X_seq.shape)
print("y_seq shape:", y_seq.shape)

# --- 4. Split Data and Create DataLoaders ---
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X_seq, y_seq, test_size=0.2, random_state=42, shuffle=False
)

# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.from_numpy(X_train).float()
y_train_tensor = torch.from_numpy(y_train).float().view(-1, 1) # Reshape for loss function
X_test_tensor = torch.from_numpy(X_test).float()
y_test_tensor = torch.from_numpy(y_test).float().view(-1, 1)

# Create TensorDatasets and DataLoaders for batching
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print("\nData split and converted to PyTorch DataLoaders.")
print("Train loader size:", len(train_loader.dataset))
print("Test loader size:", len(test_loader.dataset))


# --- 5. Build the LSTM Model in PyTorch ---
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim, dropout_prob=0.2):
        """
        Initializes the LSTM model layers.
        - input_dim: Number of input features.
        - hidden_dim: Number of features in the hidden state.
        - num_layers: Number of stacked LSTM layers.
        - output_dim: Number of output features.
        """
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Define the LSTM layer
        # batch_first=True makes the input/output tensors have shape (batch_size, seq_len, features)
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout_prob)
        
        # Define the fully connected output layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
        # Define the activation function
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """Defines the forward pass of the model."""
        # Initialize hidden state and cell state with zeros
        # Shape: (num_layers, batch_size, hidden_dim)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)

        # We pass the input and hidden states to the LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # We take the output from the last time step
        out = self.fc(out[:, -1, :])
        
        # Apply sigmoid activation for binary classification
        out = self.sigmoid(out)
        
        return out

# --- 6. Instantiate the Model, Loss, and Optimizer ---
INPUT_DIM = X_train.shape[2]  # Number of features
HIDDEN_DIM = 50
NUM_LAYERS = 3 # Corresponds to the 3 stacked LSTM layers in the TF model
OUTPUT_DIM = 1

model = LSTMModel(INPUT_DIM, HIDDEN_DIM, NUM_LAYERS, OUTPUT_DIM).to(device)
criterion = nn.BCELoss() # Binary Cross-Entropy Loss for binary classification
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

print("\nPyTorch Model Architecture:")
print(model)

# --- 7. Train the Model ---
print("\n--- Starting Model Training ---")
EPOCHS = 25
for epoch in range(EPOCHS):
    model.train()  # Set the model to training mode
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
    print(f'Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item():.4f}')
print("--- Model Training Complete ---")


# --- 8. Evaluate the Model ---
print("\n--- Evaluating Model Performance ---")
model.eval()  # Set the model to evaluation mode
all_preds = []
all_labels = []

with torch.no_grad(): # Disable gradient calculation for evaluation
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        outputs = model(inputs)
        predicted = (outputs > 0.5).float()
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate accuracy
correct_predictions = sum(p == l for p, l in zip(all_preds, all_labels))
total_predictions = len(all_preds)
accuracy = (correct_predictions / total_predictions) * 100

print(f"\nFinal Model Accuracy on Test Data: {accuracy[0]:.2f}%")


# --- 9. Make Predictions (Example) ---
print("\n--- Example Prediction ---")
model.eval()
with torch.no_grad():
    # Get the first sample from the test set and add a batch dimension
    first_sample = X_test_tensor[:1].to(device)
    prediction_tensor = model(first_sample)
    
    # Convert prediction to a class
    predicted_class = (prediction_tensor > 0.5).int().item()
    actual_class = int(y_test_tensor[0].item())

    print(f"Prediction for the first test sample: {prediction_tensor.item():.4f}")
    print(f"Predicted Class: {'1 (Up)' if predicted_class == 1 else '0 (Down)'}")
    print(f"Actual Class:    {'1 (Up)' if actual_class == 1 else '0 (Down)'}")
