<a href="https://www.kaggle.com/code/tanish34/stock-market-prediction?scriptVersionId=227511061" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Bidirectional, Dropout, Dense, Input
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, LSTM, GRU
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l1_l2

In [None]:
df = pd.read_csv('/kaggle/input/asian-paints1/Asian_Paints.csv')

# Convert Date to datetime
df['Date'] = pd.to_datetime(df['Date'], format='%d-%m-%Y')

# Sort by date (oldest first)
df = df.sort_values('Date').reset_index(drop=True)

In [None]:
# Clean numeric columns - remove commas and convert to float
for col in ['Price', 'Open', 'High', 'Low']:
    df[col] = df[col].astype(str).str.replace(',', '').astype(float)

# Convert Volume to numeric
df['Volume'] = df['Vol.'].str.replace('K', '*1e3').str.replace('M', '*1e6').map(pd.eval)

# Create technical indicators
def add_technical_indicators(df):
    # Moving Averages
    df['MA5'] = df['Price'].rolling(window=5).mean()
    df['MA20'] = df['Price'].rolling(window=20).mean()
    df['MA50'] = df['Price'].rolling(window=50).mean()
    
    # Exponential Moving Averages
    df['EMA12'] = df['Price'].ewm(span=12, adjust=False).mean()
    df['EMA26'] = df['Price'].ewm(span=26, adjust=False).mean()
    
    # MACD
    df['MACD'] = df['EMA12'] - df['EMA26']
    df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
    
    # Bollinger Bands
    df['20MA'] = df['Price'].rolling(window=20).mean()
    df['20SD'] = df['Price'].rolling(window=20).std()
    df['upper_band'] = df['20MA'] + (df['20SD'] * 2)
    df['lower_band'] = df['20MA'] - (df['20SD'] * 2)
    
    # RSI
    delta = df['Price'].diff()
    gain = delta.clip(lower=0).rolling(window=14).mean()
    loss = -delta.clip(upper=0).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    # Price Rate of Change
    df['ROC'] = df['Price'].pct_change(periods=5) * 100
    
    # Average True Range
    df['TR'] = np.maximum(
        np.maximum(
            df['High'] - df['Low'],
            abs(df['High'] - df['Price'].shift(1))
        ),
        abs(df['Low'] - df['Price'].shift(1))
    )
    df['ATR'] = df['TR'].rolling(window=14).mean()
    
    # Volatility
    df['volatility'] = df['Price'].rolling(window=20).std()
    
    # Daily Returns
    df['daily_return'] = df['Price'].pct_change() * 100
    
    return df

# Add technical indicators
df = add_technical_indicators(df)

df = df.dropna()

In [None]:
df.tail()

In [None]:
features = ['Price', 'Open', 'High', 'Low', 'Volume', 
            'MA5', 'MA20', 'EMA12', 'MACD', 'RSI', 'ROC', 
            'upper_band', 'lower_band', 'ATR', 'volatility']

# Prepare feature set
feature_df = df[features]

In [None]:
# Use StandardScaler for financial time series
scaler = StandardScaler()
scaled_data = scaler.fit_transform(feature_df)

# Create a separate price scaler for inverse transformation
price_scaler = StandardScaler()
price_scaler.fit_transform(df[['Price']])

In [None]:
# Split data into training and testing sets (70/30)
training_size = int(len(scaled_data) * 0.7)
train_data = scaled_data[:training_size]
test_data = scaled_data[training_size:]

In [None]:
# Function to create sequences
def create_dataset(dataset, time_step=1):
    X, y = [], []
    for i in range(len(dataset) - time_step - 1):
        # Use all features for X
        a = dataset[i:(i + time_step)]
        X.append(a)
        # Use only the Price (index 0) for y
        y.append(dataset[i + time_step, 0])
    return np.array(X), np.array(y)

# Create sequences with 60 days time step
time_step = 60
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)

In [None]:
print(f"Training data shape: {X_train.shape}")
print(f"Testing data shape: {X_test.shape}")

In [None]:
def build_model(input_shape):
    # Input layer
    inputs = Input(shape=input_shape)
    
    # CNN Block 1
    x = Conv1D(filters=64, kernel_size=3, padding='same', activation='relu', 
               kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(inputs)
    x = LayerNormalization(epsilon=1e-6)(x)
    x = MaxPooling1D(pool_size=2)(x)
    
    # CNN Block 2
    x = Conv1D(filters=128, kernel_size=3, padding='same', activation='relu',
               kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    x = MaxPooling1D(pool_size=2)(x)
    
    # Bidirectional GRU
    x = Bidirectional(GRU(64, return_sequences=True, 
                          recurrent_dropout=0.1,
                          kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)))(x)
    x = Dropout(0.3)(x)
    
    # Bidirectional LSTM
    x = Bidirectional(LSTM(32, return_sequences=True,
                          recurrent_dropout=0.1,
                          kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)))(x)
    x = Dropout(0.3)(x)
    
    # Multi-head Attention
    attention_output = MultiHeadAttention(
        num_heads=4, key_dim=16
    )(x, x, x)
    x = LayerNormalization(epsilon=1e-6)(attention_output + x)
    
    # Final sequence processing
    x = LSTM(32, return_sequences=False)(x)
    x = Dropout(0.2)(x)
    
    # Dense layers
    x = Dense(16, activation='relu', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4))(x)
    outputs = Dense(1)(x)
    
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model


In [None]:
# Create the model
model = build_model((X_train.shape[1], X_train.shape[2]))

In [None]:
model.summary()

In [None]:
# Compile with Adam optimizer and Huber loss
optimizer = Adam(learning_rate=0.001)
model.compile(
    optimizer=optimizer,
    loss='huber',
    metrics=['mae', 'mse']
)

# Callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=30,
    mode='min',
    min_delta=0.0001,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=10,
    min_lr=1e-6,
    verbose=1
)

checkpoint_path = 'best_asian_paints_model.keras'
model_checkpoint = ModelCheckpoint(
    checkpoint_path,
    monitor='val_loss',
    save_best_only=True,
    mode='min',
    verbose=1
)

In [None]:
# Train the model
history = model.fit(
    X_train, 
    y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=32,
    callbacks=[early_stopping, reduce_lr, model_checkpoint],
    verbose=1
)


In [None]:
# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

In [None]:
# Prepare for inverse scaling
train_pred_full = np.zeros((len(train_predict), len(features)))
test_pred_full = np.zeros((len(test_predict), len(features)))

# Set the first column (Price) to our predictions
train_pred_full[:, 0] = train_predict.flatten()
test_pred_full[:, 0] = test_predict.flatten()

# Inverse transform to get actual stock prices
train_predictions_actual = scaler.inverse_transform(train_pred_full)[:, 0]
test_predictions_actual = scaler.inverse_transform(test_pred_full)[:, 0]

# Get actual values
y_train_full = np.zeros((len(y_train), len(features)))
y_test_full = np.zeros((len(y_test), len(features)))
y_train_full[:, 0] = y_train
y_test_full[:, 0] = y_test
y_train_actual = scaler.inverse_transform(y_train_full)[:, 0]
y_test_actual = scaler.inverse_transform(y_test_full)[:, 0]

# Calculate performance metrics
train_mae = mean_absolute_error(y_train_actual, train_predictions_actual)
test_mae = mean_absolute_error(y_test_actual, test_predictions_actual)
train_rmse = np.sqrt(mean_squared_error(y_train_actual, train_predictions_actual))
test_rmse = np.sqrt(mean_squared_error(y_test_actual, test_predictions_actual))
train_r2 = r2_score(y_train_actual, train_predictions_actual)
test_r2 = r2_score(y_test_actual, test_predictions_actual)

print(f"Training MAE: {train_mae:.2f}")
print(f"Testing MAE: {test_mae:.2f}")
print(f"Training RMSE: {train_rmse:.2f}")
print(f"Testing RMSE: {test_rmse:.2f}")
print(f"Training R²: {train_r2:.4f}")
print(f"Testing R²: {test_r2:.4f}")

In [None]:
# Plot predictions vs actual
plt.figure(figsize=(15, 8))

# Training predictions
plt.subplot(2, 1, 1)
plt.plot(y_train_actual, label='Actual Prices')
plt.plot(train_predictions_actual, label='Predicted Prices')
plt.title('Training Data: Actual vs Predicted Stock Prices')
plt.legend()
plt.grid(True)

# Testing predictions
plt.subplot(2, 1, 2)
plt.plot(y_test_actual, label='Actual Prices')
plt.plot(test_predictions_actual, label='Predicted Prices')
plt.title('Testing Data: Actual vs Predicted Stock Prices')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Future predictions
def predict_future(model, last_sequence, n_steps=30):
    future_predictions = []
    current_sequence = last_sequence.reshape(1, last_sequence.shape[0], last_sequence.shape[1])
    
    for _ in range(n_steps):
        # Get prediction for next day
        next_pred = model.predict(current_sequence)[0]
        future_predictions.append(next_pred)
        
        # Update sequence for next prediction
        next_seq = current_sequence[0, 1:, :]
        next_features = np.zeros(current_sequence.shape[2])
        next_features[0] = next_pred  # Set price prediction
        
        # Use last values for other features (simple approach)
        next_features[1:] = next_seq[-1, 1:]
        
        # Add new timestep to sequence
        current_sequence = np.vstack([next_seq, next_features.reshape(1, -1)]).reshape(1, last_sequence.shape[0], last_sequence.shape[1])
    
    return np.array(future_predictions)

# Get last sequence from test data
last_sequence = X_test[-1]

# Predict next 30 days
future_preds = predict_future(model, last_sequence, 30)

# Prepare for inverse scaling
future_pred_full = np.zeros((len(future_preds), len(features)))
future_pred_full[:, 0] = future_preds.flatten()

# Inverse transform
future_prices = scaler.inverse_transform(future_pred_full)[:, 0]

# Get dates for future predictions
last_date = df['Date'].iloc[-1]
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=30)

# Display future predictions
print("\nFuture Price Predictions for Asian Paints:")
for i, (date, price) in enumerate(zip(future_dates, future_prices)):
    print(f"Date: {date.strftime('%d-%m-%Y')}, Predicted Price: {price:.2f}")

# Plot future predictions
plt.figure(figsize=(15, 6))
# Plot historical data
historical_prices = df['Price'][-100:].values
historical_dates = df['Date'][-100:].values

# Plot
plt.plot(historical_dates, historical_prices, label='Historical Prices')
plt.plot(future_dates, future_prices, 'r--', label='Future Predictions')
plt.title('Asian Paints Stock Price Forecast')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()