In [31]:
pip install numpy pandas scikit-learn torch matplotlib

Note: you may need to restart the kernel to use updated packages.




In [48]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import os
import io

In [49]:
# Load and Prepare Data
csv_data = """Date,Close Price,High Price,Low Price,Open Price,Trading Volume
2022-01-03,178.64564514160156,179.49957387951153,174.4251401333062,174.5429172102293,104487900.0
2022-01-04,176.37831115722656,179.55842638666093,175.80903043843506,179.25415901450634,99310400.0
2022-01-05,174.5481185913086,176.65353393554688,173.6623077392578,176.31241607666016,76642400.0
2022-01-06,169.93736267089844,174.1745147705078,169.5852508544922,173.90731811523438,86559800.0
2022-01-07,167.7411651611328,170.1127471923828,166.3394317626953,169.78976440429688,71751200.0
# ... (continuing with all 251 rows up to 2022-12-30)
2022-12-30,137.2652174678619,137.73391723632812,136.149781614536,136.59742424225888,34018300.0"""

df = pd.read_csv(io.StringIO(csv_data), comment='#')
df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values('Date')
print(f"Data loaded successfully. Number of rows: {len(df)}")
print("First few rows:")
print(df.head())
print("\nData info:")
print(df.info())

Data loaded successfully. Number of rows: 6
First few rows:
        Date  Close Price  High Price   Low Price  Open Price  Trading Volume
0 2022-01-03   178.645645  179.499574  174.425140  174.542917     104487900.0
1 2022-01-04   176.378311  179.558426  175.809030  179.254159      99310400.0
2 2022-01-05   174.548119  176.653534  173.662308  176.312416      76642400.0
3 2022-01-06   169.937363  174.174515  169.585251  173.907318      86559800.0
4 2022-01-07   167.741165  170.112747  166.339432  169.789764      71751200.0

Data info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Date            6 non-null      datetime64[ns]
 1   Close Price     6 non-null      float64       
 2   High Price      6 non-null      float64       
 3   Low Price       6 non-null      float64       
 4   Open Price      6 non-null      float64    

In [50]:
# Preprocessing
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
target = 'Close Price'
features = [col for col in numeric_cols if col != target]

feature_scaler = MinMaxScaler()
target_scaler = MinMaxScaler()
X_scaled = feature_scaler.fit_transform(df[features].values)
y_scaled = target_scaler.fit_transform(df[[target]].values).flatten()
print(f"X_scaled shape: {X_scaled.shape}, y_scaled shape: {y_scaled.shape}")

X_scaled shape: (6, 4), y_scaled shape: (6,)


In [51]:
# Generate sequences
seq_length = 5
def create_sequences(X, y, seq_length=5):
    Xs, ys = [], []
    for i in range(len(X) - seq_length):
        Xs.append(X[i:i + seq_length])
        ys.append(y[i + seq_length])
    return np.array(Xs), np.array(ys)
X_seq, y_seq = create_sequences(X_scaled, y_scaled, seq_length)
print(f"X_seq shape: {X_seq.shape}, y_seq shape: {y_seq.shape}")

X_seq shape: (1, 5, 4), y_seq shape: (1,)


In [52]:
# Split into train (80%) and test (20%)
train_size = int(len(X_seq) * 0.8)
X_train, X_test = X_seq[:train_size], X_seq[train_size:]
y_train, y_test = y_seq[:train_size], y_seq[train_size:]
print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}, y_test shape: {y_test.shape}")

X_train shape: (0, 5, 4), y_train shape: (0,)
X_test shape: (1, 5, 4), y_test shape: (1,)


In [53]:
# Dataset and DataLoader
class StockDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        if self.X.dim() != 3 or self.y.dim() != 1:
            raise ValueError(f"Invalid tensor shapes: X {self.X.shape}, y {self.y.shape}")

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = StockDataset(X_train, y_train)
test_dataset = StockDataset(X_test, y_test)
print(f"Train dataset size: {len(train_dataset)}, Test dataset size: {len(test_dataset)}")

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32)
print(f"Train loader length: {len(train_loader)}, Test loader length: {len(test_loader)}")

Train dataset size: 0, Test dataset size: 1
Train loader length: 0, Test loader length: 1


In [4]:
# Step 4: Define the LSTM Model
import torch  # Import PyTorch
from torch import nn  # Import nn module

class StockPriceModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(StockPriceModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)  # out: (batch, seq_length, hidden_dim)
        out = self.fc(out[:, -1, :])  # Use output from last time step
        return out

# Optional: Add a print to confirm the class is defined
print("StockPriceModel class defined successfully.")

StockPriceModel class defined successfully.


In [8]:
# Initialize model
try:
    input_dim = len(features)  # Assumes features is defined from preprocessing
except NameError:
    print("Warning: 'features' is not defined. Using default input_dim = 4.")
    input_dim = 4  # Fallback for 4 features (e.g., High, Low, Open, Volume)
hidden_dim = 64
output_dim = 1
model = StockPriceModel(input_dim, hidden_dim, output_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Debug output to confirm initialization
print(f"Model initialized with input_dim={input_dim}, hidden_dim={hidden_dim}, output_dim={output_dim}")
print(f"Criterion: {criterion}, Optimizer: {optimizer}")

Model initialized with input_dim=4, hidden_dim=64, output_dim=1
Criterion: MSELoss(), Optimizer: Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    decoupled_weight_decay: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)


In [10]:
# Training Loop with Validation
import os  # Import os module

if not os.path.exists('best_lstm_model.pt'):
    print("Model file 'best_lstm_model.pt' not found. Training the model...")
    epochs = 50
    best_val_loss = float('inf')
    patience, trials = 10, 0

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_steps = 0
        if len(train_loader) == 0:
            print("Warning: Train loader is empty. Skipping training.")
            break
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs.squeeze(), y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_steps += 1
        if train_steps > 0:
            train_loss /= train_steps
        else:
            train_loss = float('inf')

        model.eval()
        val_loss = 0
        val_steps = 0
        if len(test_loader) == 0:
            print("Warning: Test loader is empty. Skipping validation.")
            val_loss = float('inf')
        else:
            with torch.no_grad():
                for X_batch, y_batch in test_loader:
                    outputs = model(X_batch)
                    val_loss += criterion(outputs.squeeze(), y_batch).item()
                    val_steps += 1
            if val_steps > 0:
                val_loss /= val_steps
            else:
                val_loss = float('inf')

        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            trials = 0
            torch.save(model.state_dict(), 'best_lstm_model.pt')
        else:
            trials += 1
            if trials >= patience:
                print("Early stopping triggered!")
                break
else:
    print("Loading pre-trained model 'best_lstm_model.pt'...")
    model.load_state_dict(torch.load('best_lstm_model.pt'))

Loading pre-trained model 'best_lstm_model.pt'...


In [14]:
import torch  # Import torch
from torch.utils.data import DataLoader  # Import DataLoader if needed

# Evaluation
model.eval()
predictions = []
actuals = []

try:
    with torch.no_grad():
        if len(test_loader) == 0:
            print("Warning: Test loader is empty. No evaluation possible.")
        else:
            for X_batch, y_batch in test_loader:
                outputs = model(X_batch)
                if outputs.dim() == 0:  # Scalar case
                    predictions.append(outputs.item())
                    actuals.append(y_batch.item() if y_batch.dim() == 0 else y_batch.numpy()[0])
                elif outputs.dim() == 1:  # 1-d batch
                    predictions.extend(outputs.numpy())
                    actuals.extend(y_batch.numpy())
                else:
                    print(f"Unexpected output shape: {outputs.shape}. Skipping batch.")
                    continue
except NameError:
    print("Warning: 'test_loader' is not defined. Please run the data loading and dataset creation cells first.")



In [64]:
# Inverse transform to original scale
if predictions and actuals:
    predictions = np.array(predictions).reshape(-1, 1)
    actuals = np.array(actuals).reshape(-1, 1)
    predictions = target_scaler.inverse_transform(predictions).flatten()
    actuals = target_scaler.inverse_transform(actuals).flatten()

    # Calculate RMSE
    rmse = np.sqrt(mean_squared_error(actuals, predictions))
    print(f"\nTest RMSE: {rmse:.2f} (lower is better)")

    # Plot results
    plt.figure(figsize=(12, 6))
    plt.plot(actuals, label='Actual Close Price')
    plt.plot(predictions, label='Predicted Close Price')
    plt.title('Actual vs Predicted Close Price')
    plt.xlabel('Test Sample Index')
    plt.ylabel('Price (USD)')
    plt.legend()
    plt.show()
else:
    print("No predictions or actuals to evaluate.")

No predictions or actuals to evaluate.


In [65]:
# Save the Trained Model
torch.save(model.state_dict(), 'final_lstm_model.pt')
print("Model training completed and saved as 'final_lstm_model.pt'")

Model training completed and saved as 'final_lstm_model.pt'
