<a href="https://colab.research.google.com/github/parimal1998/Predictive-Maintenance-of-Renewable-Plant-Using-Digital-Twins/blob/main/RNN_Model1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:


import os, math, numpy as np, pandas as pd, matplotlib.pyplot as plt
import tensorflow as tf
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, Bidirectional, LSTM, Dense, Dropout, LayerNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.optimizers import Adam

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

CANDIDATE_PATHS = [
    '/content/Final Dataset.csv',
]
csv_path = None
for p in CANDIDATE_PATHS:
    if os.path.exists(p):
        csv_path = p
        break
assert csv_path is not None, "Could not find 'Final Dataset.csv' in known locations."

df = pd.read_csv(csv_path)
df.columns = [c.strip() for c in df.columns]
assert 'time' in df.columns and 'PV_Output' in df.columns, f"Columns: {df.columns.tolist()}"

df['time'] = pd.to_datetime(df['time'], dayfirst=True, errors='coerce')
df = df.dropna(subset=['time']).set_index('time').sort_index()

df = df.resample('h').mean()

In [None]:
df['hour'] = df.index.hour
df['dayofyear'] = df.index.dayofyear


df['hour_sin'] = np.sin(2*np.pi*df['hour']/24)
df['hour_cos'] = np.cos(2*np.pi*df['hour']/24)
df['doy_sin']  = np.sin(2*np.pi*df['dayofyear']/365.25)
df['doy_cos']  = np.cos(2*np.pi*df['dayofyear']/365.25)

base_feature_cols = [
    'Irradiance','Temperature_C','Humidity','Cloud_Cover','Snowfall',
    'hour_sin','hour_cos','doy_sin','doy_cos'
]
target_col = 'PV_Output'


df[target_col] = df[target_col].interpolate(method='time')
df[target_col] = df[target_col].clip(lower=0)

for c in base_feature_cols:
    if c in df.columns:
        df[c] = df[c].interpolate(method='time')
        df[c] = df[c].ffill().bfill()


df['PV_lag1'] = df[target_col].shift(1)
df = df.dropna(subset=['PV_lag1'])

feature_cols = base_feature_cols + ['PV_lag1']

assert np.isfinite(df[feature_cols + [target_col]].values).all(), "Non-finite values remain after cleaning."

print("Data range:", df.index.min(), "→", df.index.max())
print("Rows after cleaning:", len(df))


In [None]:
SEQ_LEN = 24

timestamps = df.index
mask_train_all = timestamps.year <= 2023
mask_test      = timestamps.year == 2024

df_train_all = df.loc[mask_train_all].copy()
df_test      = df.loc[mask_test].copy()

n_train = len(df_train_all)
val_tail = int(math.ceil(0.10 * n_train))
df_train = df_train_all.iloc[:-val_tail].copy()
df_val   = df_train_all.iloc[-val_tail:].copy()

print(f"Train hours: {len(df_train)}, Val hours: {len(df_val)}, Test hours: {len(df_test)}")

In [None]:
zero_var_cols = df_train[feature_cols].std().replace({0.0: np.nan}).dropna().index
zero_var_cols = [c for c in feature_cols if df_train[c].std() == 0.0]
if zero_var_cols:
    print("Dropping zero-variance cols:", zero_var_cols)
    feature_cols = [c for c in feature_cols if c not in zero_var_cols]

X_scaler = StandardScaler()
y_scaler = MinMaxScaler()

X_train_f = X_scaler.fit_transform(df_train[feature_cols])
X_val_f   = X_scaler.transform(df_val[feature_cols])
X_test_f  = X_scaler.transform(df_test[feature_cols])

y_train_f = y_scaler.fit_transform(df_train[[target_col]])
y_val_f   = y_scaler.transform(df_val[[target_col]])
y_test_f  = y_scaler.transform(df_test[[target_col]])

for name, arr in [('X_train_f',X_train_f),('X_val_f',X_val_f),('X_test_f',X_test_f),
                  ('y_train_f',y_train_f),('y_val_f',y_val_f),('y_test_f',y_test_f)]:
    assert np.isfinite(arr).all(), f"Non-finite values in {name}"


In [None]:
def make_sequences(X, y, seq_len):
    Xs, ys = [], []
    for i in range(len(X) - seq_len):
        Xs.append(X[i:i+seq_len, :])
        ys.append(y[i+seq_len, 0])
    return np.array(Xs, dtype=np.float32), np.array(ys, dtype=np.float32)

X_train, y_train = make_sequences(X_train_f, y_train_f, SEQ_LEN)
X_val,   y_val   = make_sequences(X_val_f,   y_val_f,   SEQ_LEN)
X_test,  y_test  = make_sequences(X_test_f,  y_test_f,  SEQ_LEN)

print("Shapes | X_train:", X_train.shape, "X_val:", X_val.shape, "X_test:", X_test.shape)

for name, arr in [('X_train',X_train),('X_val',X_val),('X_test',X_test),
                  ('y_train',y_train),('y_val',y_val),('y_test',y_test)]:
    assert np.isfinite(arr).all(), f"Non-finite values in {name}"

In [None]:
baseline_shifted = df_test[target_col].shift(1).values
test_index = df_test.index[SEQ_LEN:]
baseline_aligned = baseline_shifted[SEQ_LEN:]
baseline_aligned = baseline_aligned[:len(y_test)]
baseline_actual  = df_test[target_col].values[SEQ_LEN:SEQ_LEN+len(y_test)]

In [None]:
model = Sequential([
    Input(shape=(SEQ_LEN, X_train.shape[-1])),
    Conv1D(filters=64, kernel_size=3, padding='causal', activation='relu'),
    LayerNormalization(),
    Bidirectional(LSTM(64, return_sequences=True)),
    Dropout(0.2),
    Bidirectional(LSTM(32)),
    Dense(32, activation='relu'),
    Dropout(0.1),
    Dense(1, activation='linear')
])

optimizer = Adam(learning_rate=0.01, clipnorm=1.0)  # (or clipvalue=0.5)
model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])
model.summary()

es  = EarlyStopping(monitor='val_loss', patience=12, restore_best_weights=True)
rlr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=6, min_lr=1e-5, verbose=1)

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=200,
    batch_size=128,
    callbacks=[es, rlr,],
    verbose=1
)

In [None]:
y_pred_scaled = model.predict(X_test, verbose=0).reshape(-1, 1)
y_pred = y_scaler.inverse_transform(y_pred_scaled).flatten()
y_true = y_scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()

def rmse(a, b):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    return np.sqrt(mean_squared_error(a, b))

def mape(a, b, eps=1e-3):
    a = np.asarray(a, dtype=float)
    b = np.asarray(b, dtype=float)
    mask = np.abs(a) > eps
    if not np.any(mask):
        return np.nan
    return np.mean(np.abs((a[mask] - b[mask]) / a[mask])) * 100

print("\n=== Model vs Baseline (2024) ===")
print(f"Model  RMSE: {rmse(y_true, y_pred):.4f} | "
      f"MAE: {mean_absolute_error(y_true, y_pred):.4f} | "
      f"MAPE: {mape(y_true, y_pred):.2f}%")

m_len = min(len(y_true), len(baseline_aligned))
y_true_b = y_true[:m_len]
base_b = baseline_aligned[:m_len]
mask = np.isfinite(y_true_b) & np.isfinite(base_b)



In [None]:
results_idx = test_index[:len(y_pred)]
df_results = pd.DataFrame({
    'datetime': results_idx,
    'actual': y_true[:len(results_idx)],
    'predicted': y_pred[:len(results_idx)]
}).set_index('datetime')

monthly = df_results.groupby([df_results.index.year, df_results.index.month]).apply(
    lambda g: pd.Series({
        'RMSE': rmse(g['actual'], g['predicted']),
        'MAE': mean_absolute_error(g['actual'], g['predicted']),
        'MAPE': mape(g['actual'], g['predicted'])
    })
)
print("\nMonthly metrics (year, month):\n", monthly)

# January plot
jan = df_results[df_results.index.month == 1]
if not jan.empty:
    plt.figure(figsize=(14,5))
    plt.plot(jan.index, jan['actual'], label='Actual', linewidth=1)
    plt.plot(jan.index, jan['predicted'], label='Predicted', alpha=0.9, linewidth=1)
    plt.title('Forecast vs Actual — January 2024')
    plt.xlabel('Date'); plt.ylabel('PV Output')
    plt.legend(); plt.grid(True); plt.tight_layout(); plt.show()

In [None]:
df_daily = df_results.resample('D').sum(numeric_only=True)

In [None]:
df_daily[['actual','predicted']].plot(figsize=(12,8))
df_daily

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
def calculate_error_metrics(y_true, y_pred):

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    mse = mean_squared_error(y_true, y_pred)

    mae = mean_absolute_error(y_true, y_pred)


    naive_forecast = np.roll(y_true, 1)[1:]
    naive_error = np.mean(np.abs(y_true[1:] - naive_forecast))
    mase = mae / naive_error if naive_error != 0 else np.inf

    mask = y_true != 0
    mape = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

    return {
        'MSE': mse,
        'MAE': mae,
        'MAPE': mape,
        'MASE': mase
    }

In [None]:
metrics = calculate_error_metrics(df_results['actual'], df_results['predicted'])

print("Error Metrics for model RNN:")
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")