In [6]:
import yfinance as yf
import pandas as pd
import numpy as np
import tensorflow as tf
import ta
from sklearn.preprocessing import MinMaxScaler

# Define stock and time range
ticker = "AAPL"
start_date = "2015-01-01"
end_date = "2023-12-31"
df = yf.download(ticker, start=start_date, end=end_date, auto_adjust=False)
    
    # Reset index for proper datetime handling
df.reset_index(inplace=True)
    
    # Ensure 'Adj Close' is a 1D Series
adj_close = df['Adj Close'].squeeze()
    
    # Compute percentage return
df['Return'] = adj_close.pct_change()

    # Compute Technical Indicators
df['RSI'] = ta.momentum.RSIIndicator(adj_close).rsi()
df['EMA'] = ta.trend.EMAIndicator(adj_close).ema_indicator()
df['ATR'] = ta.volatility.AverageTrueRange(
        high=df['High'].squeeze(), 
        low=df['Low'].squeeze(), 
        close=adj_close
    ).average_true_range()
    
df['VWAP'] = ta.volume.VolumeWeightedAveragePrice(
        high=df['High'].squeeze(), 
        low=df['Low'].squeeze(), 
        close=adj_close, 
        volume=df['Volume'].squeeze()
    ).volume_weighted_average_price()

    # Drop NaN values that appear due to indicator calculations
df.dropna(inplace=True)


# Normalize data
# scaler = MinMaxScaler()
# df_scaled = pd.DataFrame(scaler.fit_transform(df), columns=df.columns, index=df.index)
features = ['Adj Close', 'Volume', 'RSI', 'EMA']
# lookback =14  # Total number of days window choosen

# Scale data
scaler = MinMaxScaler()
df_scaled = scaler.fit_transform(df[features])

# Define lookback period (past days for input)
lookback = 60  # Example: use past 60 days to predict next 5 days
n_steps_out = 5

# Convert data into sequences for model training
def create_sequences(data, lookback, n_steps_out):
    X, y = [], []
    for i in range(len(data) - lookback - n_steps_out):
        X.append(data[i : i + lookback])
        y.append(data[i + lookback : i + lookback + n_steps_out,3])
    return np.array(X), np.array(y)

# Create training sequences
X, y = create_sequences(df_scaled, lookback, n_steps_out)
feature_size = X.shape[2]

print(f"Dataset Shape - X: {X.shape}, y: {y.shape}")  # Should be (samples, lookback, features), (samples, 5)

[*********************100%***********************]  1 of 1 completed

Dataset Shape - X: (2186, 60, 4), y: (2186, 5)





In [7]:
print(df.head())

Price        Date  Adj Close      Close       High        Low       Open  \
Ticker                  AAPL       AAPL       AAPL       AAPL       AAPL   
13     2015-01-22  25.003347  28.100000  28.117500  27.430000  27.565001   
14     2015-01-23  25.132374  28.245001  28.437500  27.882500  28.075001   
15     2015-01-26  25.159063  28.275000  28.590000  28.200001  28.434999   
16     2015-01-27  24.278170  27.285000  28.120001  27.257500  28.105000   
17     2015-01-28  25.650681  28.827499  29.530001  28.827499  29.407499   

Price      Volume    Return        RSI        EMA       ATR       VWAP  
Ticker       AAPL                                                       
13      215185600  0.026016  60.986494  24.313966  3.246368  26.227638  
14      185859200  0.005160  62.490274  24.423087  3.259781  26.274639  
15      222460000  0.001062  62.809609  24.521218  3.273913  26.395273  
16      382274800 -0.035013  48.218610  24.488811  3.251557  26.482635  
17      585908400  0.056533  

In [9]:
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, LeakyReLU, Dropout, Conv1D, Flatten, Bidirectional, Reshape
from tensorflow.keras.models import Model, Sequential

def Generator(n_steps_in, n_steps_out, feature_size, weight_initializer) -> tf.keras.models.Model:
    model = Sequential()
    model.add(Conv1D(32, kernel_size=2, strides=1, 
                     padding='same', kernel_initializer=weight_initializer, 
                     input_shape=(n_steps_in, feature_size)))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Bidirectional(LSTM(64, activation='relu', kernel_initializer=weight_initializer, 
                                 return_sequences=False, dropout=0.3)))
    
    model.add(Dense(64, activation='linear'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Dropout(0.2))
    model.add(Dense(32, activation='linear'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Dropout(0.2))

    model.add(Dense(n_steps_out, activation='linear'))  # Predicting 5 days ahead
    model.add(Reshape((n_steps_out, 1)))  # Ensure output shape is (5, 1)
    
    return model

def Discriminator(weight_initializer, n_steps_in, n_steps_out, feature_size) -> tf.keras.models.Model:
    model = Sequential()
    model.add(Conv1D(32, kernel_size=2, strides=1, padding='same', 
                     kernel_initializer=weight_initializer, 
                     input_shape=(n_steps_in + n_steps_out, feature_size)))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Conv1D(64, kernel_size=2, strides=1, kernel_initializer=weight_initializer, padding='same'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Flatten())

    model.add(Dense(64, activation='linear'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Dropout(0.2))
    model.add(Dense(32, activation='linear'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(Dropout(0.2))

    model.add(Dense(1, activation='sigmoid'))  # Sigmoid for binary classification
    return model


# Create Generator & Discriminator
weight_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

generator = Generator(lookback, n_steps_out, feature_size, weight_initializer)
discriminator = Discriminator(weight_initializer, lookback, n_steps_out, feature_size)

# WGAN Model Class
class WGAN(tf.keras.Model):
    def __init__(self, generator, discriminator, n_steps_in, n_steps_out):
        super(WGAN, self).__init__()
        self.d_optimizer = tf.keras.optimizers.Adam(0.0004, beta_1=0.5, beta_2=0.9)
        self.g_optimizer = tf.keras.optimizers.Adam(0.0001, beta_1=0.5, beta_2=0.9)
        self.generator = generator
        self.discriminator = discriminator
        self.n_steps_in = n_steps_in
        self.n_steps_out = n_steps_out
        self.batch_size = 32

    def gradient_penalty(self, batch_size, real_output, generated_output):
        """ Calculates the gradient penalty."""
        alpha = tf.random.normal([batch_size, self.n_steps_in + self.n_steps_out, feature_size], 0.0, 1.0)
        diff = generated_output - tf.cast(real_output, tf.float32)
        interpolated = tf.cast(real_output, tf.float32) + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.discriminator(interpolated, training=True)

        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [15]:
for epoch in range(epochs):
    for real_x, real_y in dataset:
        # Convert to float32
        real_x = tf.cast(real_x, tf.float32)  # Shape: (batch, 60, 4)
        real_y = tf.cast(real_y, tf.float32)  # Shape: (batch, 5)

        with tf.GradientTape() as disc_tape:
            fake_y = generator(real_x, training=True)
            fake_y = tf.cast(fake_y, tf.float32)  # Shape: (batch, 5)

            # **Fix the shape issue**
            # Expand real_y and fake_y to match time steps (repeat across 60 time steps)
            real_y_expanded = tf.tile(real_y[:, tf.newaxis, :], [1, real_x.shape[1], 1])  # (batch, 60, 5)
            fake_y_expanded = tf.tile(fake_y[:, tf.newaxis, :], [1, real_x.shape[1], 1])  # (batch, 60, 5)

            # Concatenate across the feature axis (last axis)
            real_input = tf.concat([real_x, real_y_expanded], axis=-1)  # Shape: (batch, 60, 9)
            fake_input = tf.concat([real_x, fake_y_expanded], axis=-1)  # Shape: (batch, 60, 9)

            real_output = discriminator(real_input, training=True)
            fake_output = discriminator(fake_input, training=True)

            d_loss = tf.reduce_mean(fake_output) - tf.reduce_mean(real_output)
            gp = wgan.gradient_penalty(batch_size, real_output, fake_output)
            d_loss += 10 * gp  # Apply gradient penalty

        grads = disc_tape.gradient(d_loss, discriminator.trainable_variables)
        wgan.d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

        # Train generator
        with tf.GradientTape() as gen_tape:
            fake_y = generator(real_x, training=True)
            fake_y = tf.cast(fake_y, tf.float32)  # Shape: (batch, 5)

            # Expand fake_y to match time steps
            fake_y_expanded = tf.tile(fake_y[:, tf.newaxis, :], [1, real_x.shape[1], 1])  # (batch, 60, 5)

            fake_input = tf.concat([real_x, fake_y_expanded], axis=-1)  # Shape: (batch, 60, 9)
            fake_output = discriminator(fake_input, training=True)
            g_loss = -tf.reduce_mean(fake_output)

        grads = gen_tape.gradient(g_loss, generator.trainable_variables)
        wgan.g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

    print(f"Epoch {epoch} - D Loss: {d_loss.numpy()}, G Loss: {g_loss.numpy()}")


InvalidArgumentError: {{function_node __wrapped__Tile_device_/job:localhost/replica:0/task:0/device:CPU:0}} Expected multiples argument to be a vector of length 4 but got length 3 [Op:Tile]

In [None]:
# Select the last known 60 days of data as input
latest_data = df_scaled.iloc[-lookback:].values.reshape(1, lookback, feature_size)

# Predict next 5 days
predicted_future = generator.predict(latest_data)

# Convert predictions back to actual values
predicted_future_prices = scaler.inverse_transform(
    np.hstack((np.zeros((5, feature_size - 1)), predicted_future.reshape(-1, 1)))
)[:, -1]

print("Predicted Prices for Next 5 Days:", predicted_future_prices)
