In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os 
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from keras.models import load_model, Model
from keras.layers import Dense, LSTM, Conv1D, Input, Attention
from keras.callbacks import EarlyStopping

In [None]:
tf.__version__

In [None]:
# dirs
DATA_DIR = "./load.csv"
TEST_PLOT_DIR = "./test_plots/CNN_LSTM/"
MODEL_FILE_DIR = "./model/CNN_LSTM.keras"
TRAINING_HISTORY_DIR = "./training_history/CNN_LSTM.png"

In [None]:
# constants
PREDICT_STEP = 16
INPUT_STEP = 192

N_FEATURE = 1
N_OUPUT = 1

In [None]:
if not os.path.exists(TEST_PLOT_DIR):
    os.makedirs(TEST_PLOT_DIR)
if not os.path.exists("./model"):
    os.makedirs("./model")
if not os.path.exists("./training_history"):
    os.makedirs("./training_history")


In [None]:
# Function to split data into train, validation, and test sets
def split_data(df, train_frac=0.75, test_frac=0.15):
    # Sort data by year_month
    grouped = df.groupby('year_month')

    train_data = pd.DataFrame()
    test_data = pd.DataFrame()
    val_data = pd.DataFrame()

    for name, group in grouped:
        n = len(group)
        train_end = int(train_frac * n)
        test_end = train_end + int(test_frac * n)   
        train_data = pd.concat([train_data, group.iloc[:train_end]], ignore_index=True)
        val_data = pd.concat([val_data, group.iloc[train_end:test_end]], ignore_index=True)
        test_data = pd.concat([test_data, group.iloc[test_end:]], ignore_index=True)
    # adding time_idx
    train_data['time_idx'] = np.arange(len(train_data))
    test_data['time_idx'] = np.arange(len(test_data))
    val_data['time_idx'] = np.arange(len(val_data))
    return train_data, test_data, val_data

In [None]:
def create_dataset(dataset, look_back, look_forward):
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-look_forward+1):
        X = dataset[i:(i+look_back), :]
        y = dataset[(i+look_back):(i+look_back+look_forward), 0]
        dataX.append(X)
        dataY.append(y)
    return np.array(dataX), np.array(dataY).reshape(np.array(dataY).shape[0], np.array(dataY).shape[1], 1)

In [None]:
data = pd.read_csv(DATA_DIR)
# Convert the 'date' column to datetime format
data['Timestamp'] = pd.to_datetime(data['Timestamp'], format='%Y/%m/%d %H:%M')

# Sort the data by date
data.sort_values('Timestamp', inplace=True)

# Extract month and year from the date for splitting
data['year_month'] = data['Timestamp'].dt.to_period('M')

# Splitting the data
train_df, test_df, val_df = split_data(data)

# create scaler
scaler = MinMaxScaler()
scaler.fit(np.array(data["Load"]).reshape(-1, 1))

In [None]:
display(train_df)

In [None]:
X_train, y_train = create_dataset(scaler.transform(np.array(train_df['Load']).reshape(-1, 1)), INPUT_STEP, PREDICT_STEP)
X_val, y_val = create_dataset(scaler.transform(np.array(val_df['Load']).reshape(-1, 1)), INPUT_STEP, PREDICT_STEP)
X_test, y_test = create_dataset(scaler.transform(np.array(test_df['Load']).reshape(-1, 1)), INPUT_STEP, PREDICT_STEP)

In [None]:
print("-" * 86)
print(f"X_train: {X_train.shape}")
print(f"y_train: {y_train.shape}")
print(f"X_val: {X_val.shape}")
print(f"y_val: {y_val.shape}")
print(f"X_test: {X_test.shape}")
print(f"y_test: {y_test.shape}")
print("-" * 86)

In [None]:
tf.config.list_physical_devices()

In [None]:
with tf.device('/CPU:0'):
    inputs = Input(shape=(INPUT_STEP, N_FEATURE))
    conv = Conv1D(filters=64, kernel_size=3, activation='relu')(inputs)
    encoder = LSTM(64, return_sequences=True)(conv)
    # decoder_inputs = Attention()([encoder, encoder])
    decoder = LSTM(64, return_sequences=False)(encoder)
    dense1 = Dense(50, activation='relu')(decoder)
    dense2 = Dense(PREDICT_STEP)(dense1)

    model = Model(inputs=inputs, outputs=dense2)
    model.summary()
    model.compile(optimizer='adam', loss='mse')
    
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=10,
        min_delta=0.0001,
        restore_best_weights=True
    )

    history = model.fit(
        X_train,
        y_train,
        verbose=1,
        epochs=100,
        batch_size=250,
        validation_data=(X_val, y_val),
        callbacks=[early_stopping]
    )

# plot loss & validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
plt.savefig(TRAINING_HISTORY_DIR)
plt.close()

model.save(MODEL_FILE_DIR)


In [None]:
# testing
model = load_model(MODEL_FILE_DIR)
loss = model.evaluate(X_test, y_test)

print("-" * 86)
print(f'Test Loss: {loss:.4f}')
print("-" * 86)

pred = model.predict(X_test)
X_test_reshaped = X_test[:, :, 0]
y_test_reshaped = np.reshape(y_test, (y_test.shape[0], y_test.shape[1]))

pred_data = np.concatenate(
    [scaler.inverse_transform(X_test_reshaped),
        scaler.inverse_transform(pred)],
    axis=-1
)
actual_data = np.concatenate(
    [scaler.inverse_transform(X_test_reshaped),
        scaler.inverse_transform(y_test_reshaped)],
    axis=-1
)

for i in range(actual_data.shape[0]):
    plt.figure(figsize=(16, 6))
    X = np.arange(1, actual_data.shape[1]+1, 1)
    y_pred = pred_data[i]
    y_actual = actual_data[i]
    plt.title(f"Time Series {i+1} prediction result")
    plt.plot(X, y_pred, label='Predict')
    plt.plot(X, y_actual, label='Actual')
    plt.ylim(0, 30)
    plt.xlabel('Time step')
    plt.ylabel('Usage (kWh)')
    plt.legend()
    plt.savefig(TEST_PLOT_DIR+f"Time_Series_{i+1}.png")
    plt.close()