In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import Sequence

In [6]:
# -------------------------
# 1. Load and Preprocess Data
# -------------------------
# Read the CSV file (adjust the file path as needed)
# Expected columns: Date, Open, High, Low, Close, Adj Close, Volume
df = pd.read_csv('/content/train.csv', parse_dates=['Timestamp'])

# Ensure the data is sorted by time and set the Date as index
df.sort_values('Timestamp', inplace=True)
df.set_index('Timestamp', inplace=True)


In [7]:
print(df.columns)

Index(['Open', 'High', 'Low', 'Close', 'Volume', 'Hour', 'Minute',
       'DayOfWeek'],
      dtype='object')


In [8]:
# -------------------------
# 2. Extract Time Features for One-Minute Data
# -------------------------
# Extract time components
df['Hour'] = df.index.hour         # Hour (0 to 23)
df['Minute'] = df.index.minute     # Minute (0 to 59)
df['DayOfWeek'] = df.index.dayofweek  # Day of week (0=Monday, …, 6=Sunday)

# Cyclical encoding for hour
df['sin_hour'] = np.sin(2 * np.pi * df['Hour'] / 24)
df['cos_hour'] = np.cos(2 * np.pi * df['Hour'] / 24)

# Cyclical encoding for minute
df['sin_minute'] = np.sin(2 * np.pi * df['Minute'] / 60)
df['cos_minute'] = np.cos(2 * np.pi * df['Minute'] / 60)

# Cyclical encoding for day of week
df['sin_dow'] = np.sin(2 * np.pi * df['DayOfWeek'] / 7)
df['cos_dow'] = np.cos(2 * np.pi * df['DayOfWeek'] / 7)

In [10]:
print(df.columns)


Index(['Open', 'High', 'Low', 'Close', 'Volume', 'Hour', 'Minute', 'DayOfWeek',
       'sin_hour', 'cos_hour', 'sin_minute', 'cos_minute', 'sin_dow',
       'cos_dow'],
      dtype='object')


In [11]:
# -------------------------
# 3. Define Features and Target
# -------------------------
# Select features: price/volume info plus encoded time features.
# You can adjust the list of features as needed.
features = ['Open', 'High', 'Low', 'Close', 'Volume',
            'sin_hour', 'cos_hour', 'sin_minute', 'cos_minute',
            'sin_dow', 'cos_dow']

# Create a binary target. For example:
# 1 if next minute's Close is higher than current minute's Close, else 0.
df['target'] = (df['Close'].shift(-1) > df['Close']).astype(int)

# Remove the last row (which has no next-minute target)
df.dropna(inplace=True)

In [12]:
# -------------------------
# 4. Scale the Features
# -------------------------
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(df[features])
# scaled_features: shape (n_samples, n_features)


In [13]:
# -------------------------
# 5. Create a Data Generator to Produce Sequences On-The-Fly
# -------------------------
class DataGenerator(Sequence):
    def __init__(self, features_array, target_array, seq_length, batch_size=32, shuffle=True):
        """
        Initializes the data generator.
        :param features_array: numpy array with shape (n_samples, n_features)
        :param target_array: numpy array with shape (n_samples,)
        :param seq_length: Number of time steps per sequence.
        :param batch_size: Batch size.
        :param shuffle: Whether to shuffle indices after each epoch.
        """
        self.features_array = features_array
        self.target_array = target_array
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.shuffle = shuffle
        # The maximum start index to generate a full sequence:
        self.indices = np.arange(len(features_array) - seq_length)
        self.on_epoch_end()

    def __len__(self):
        # Number of batches per epoch
        return int(np.ceil(len(self.indices) / self.batch_size))

    def __getitem__(self, index):
        # Generate one batch of data
        batch_indices = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
        X_batch = np.array([self.features_array[i:i+self.seq_length] for i in batch_indices])
        y_batch = np.array([self.target_array[i+self.seq_length] for i in batch_indices])
        return X_batch, y_batch

    def on_epoch_end(self):
        # Shuffle indices after each epoch if required
        if self.shuffle:
            np.random.shuffle(self.indices)

# Define sequence length (e.g., use the past 60 minutes to predict the next minute)
seq_length = 60
batch_size = 32
target_values = df['target'].values

# Create an instance of the DataGenerator
data_gen = DataGenerator(scaled_features, target_values, seq_length, batch_size=batch_size)

# Optionally, check one batch's shapes:
X_batch, y_batch = data_gen[0]
print(f"Batch X shape: {X_batch.shape}")  # Expected: (batch_size, seq_length, n_features)
print(f"Batch y shape: {y_batch.shape}")    # Expected: (batch_size,)

Batch X shape: (32, 60, 11)
Batch y shape: (32,)


In [14]:
# -------------------------
# 6. Build the LSTM Model
# -------------------------
n_features = len(features)
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(seq_length, n_features)))
model.add(Dropout(0.2))
model.add(LSTM(50))
model.add(Dropout(0.2))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()


  super().__init__(**kwargs)


In [None]:
# -------------------------
# 7. Train the Model Using the Generator
# -------------------------
#early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Here, for simplicity, we use the same generator for training and validation.
# In practice, create a separate validation generator.
history = model.fit(
    data_gen,
    epochs=50,
    validation_data=data_gen,
    verbose=1
)


Epoch 1/50


  self._warn_if_super_not_called()


[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m178s[0m 14ms/step - accuracy: 0.7441 - loss: 0.5675 - val_accuracy: 0.7449 - val_loss: 0.5644
Epoch 2/50
[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m227s[0m 17ms/step - accuracy: 0.7440 - loss: 0.5655 - val_accuracy: 0.7449 - val_loss: 0.5635
Epoch 3/50
[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m202s[0m 17ms/step - accuracy: 0.7444 - loss: 0.5647 - val_accuracy: 0.7449 - val_loss: 0.5634
Epoch 4/50
[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m174s[0m 15ms/step - accuracy: 0.7442 - loss: 0.5647 - val_accuracy: 0.7449 - val_loss: 0.5638
Epoch 5/50
[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m174s[0m 15ms/step - accuracy: 0.7450 - loss: 0.5637 - val_accuracy: 0.7449 - val_loss: 0.5632
Epoch 6/50
[1m11941/11941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m202s[0m 15ms/step - accuracy: 0.7443 - loss: 0.5645 - val_accuracy: 0.7449 - val_loss: 0.56

In [None]:
# -------------------------
# 8. Evaluate the Model
# -------------------------
# Evaluate on the generator (or on a separate test generator if available)
loss, accuracy = model.evaluate(data_gen)
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")

In [None]:
import os

# Define the directory where you want to save the model
save_dir = 'saved_models'

# Create the directory if it doesn't exist
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# Define the full path for the model file
save_path = os.path.join(save_dir, 'my_model.keras')

# Save the model in the native Keras format
model.save(save_path)
print(f"Model saved to '{save_path}'.")


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
# -------------------------
# 1. Load the Trained Model and Scaler
# -------------------------
# Load your trained model (update the path if needed)
model = load_model('/content/saved_models/my_model.keras')

# Load your scaler (if you saved it to disk, for example using pickle)
# For demonstration, we assume the scaler is already in memory as 'scaler'
# If you saved it, you might do something like:
# import pickle
# with open('scaler.pkl', 'rb') as f:
#     scaler = pickle.load(f)


In [None]:
# -------------------------
# 2. Load and Preprocess Testing Data
# -------------------------
# Read the test CSV file (adjust the file name/path as needed)
df_test = pd.read_csv('/content/test.csv', parse_dates=['Timestamp'])

# Sort by date and set Date as the index
df_test.sort_values('Timestamp', inplace=True)
df_test.set_index('Timestamp', inplace=True)

# Extract time features from the Date index
df_test['Hour'] = df_test.index.hour         # Hour (0-23)
df_test['Minute'] = df_test.index.minute     # Minute (0-59)
df_test['DayOfWeek'] = df_test.index.dayofweek  # Day of week (0=Monday, ... 6=Sunday)

# Cyclical encoding for hour
df_test['sin_hour'] = np.sin(2 * np.pi * df_test['Hour'] / 24)
df_test['cos_hour'] = np.cos(2 * np.pi * df_test['Hour'] / 24)

# Cyclical encoding for minute
df_test['sin_minute'] = np.sin(2 * np.pi * df_test['Minute'] / 60)
df_test['cos_minute'] = np.cos(2 * np.pi * df_test['Minute'] / 60)

# Cyclical encoding for day of week
df_test['sin_dow'] = np.sin(2 * np.pi * df_test['DayOfWeek'] / 7)
df_test['cos_dow'] = np.cos(2 * np.pi * df_test['DayOfWeek'] / 7)

# Define the feature columns (should match the ones used during training)
features = ['Open', 'High', 'Low', 'Close', 'Volume',
            'sin_hour', 'cos_hour', 'sin_minute', 'cos_minute',
            'sin_dow', 'cos_dow']

# Create the binary target
df_test['target'] = (df_test['Close'].shift(-1) > df_test['Close']).astype(int)
df_test.dropna(inplace=True)  # Remove the last row that doesn't have a target


In [None]:
# -------------------------
# 3. Scale the Test Features
# -------------------------
# Note: Use the same scaler that was fit on the training data
scaled_test = scaler.transform(df_test[features].values)


In [None]:
# -------------------------
# 4. Create Sequences for the LSTM
# -------------------------
def create_sequences(features_array, target_array, seq_length):
    X, y = [], []
    for i in range(len(features_array) - seq_length):
        X.append(features_array[i:i+seq_length])
        y.append(target_array[i+seq_length])
    return np.array(X), np.array(y)

# Set the sequence length (must match what you used during training)
seq_length = 60
target_test = df_test['target'].values

# Create sequences from the test data
X_test, y_test = create_sequences(scaled_test, target_test, seq_length)


In [None]:
# -------------------------
# 5. Evaluate the Model on the Test Data
# -------------------------
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")


In [None]:
# -------------------------
# 6. Generate Predictions and Print Detailed Metrics
# -------------------------
# Generate predicted probabilities
y_pred_probs = model.predict(X_test)

# Convert probabilities to binary predictions (using a threshold of 0.5)
y_pred = (y_pred_probs >= 0.4).astype(int).flatten()

# Print classification report and confusion matrix
print("Classification Report:")
print(classification_report(y_test, y_pred))

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))