In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split

from tensorflow.keras.optimizers.legacy import Adam

from functools import partial 

from sklearn.ensemble import GradientBoostingRegressor
from tensorflow.keras.layers import Bidirectional, LSTM, Dropout, Dense
from tensorflow.keras.models import Sequential

In [2]:
file_dir = '/Users/yashwanthkaruparthi/Documents/Acads/sem7/design project/execution/data/solar_weather copy 2.csv'

time_step = 24

num_feats = 5

In [3]:


def create_dataset(dataset, time_step):
    print(f'dataset shape {dataset.shape}')
    dataX, dataY = [], []
    for i in range(len(dataset) - time_step):
        a = dataset[i:(i + time_step), :]  # Features: GHI and Energy delta
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])  # Target: Energy delta
    return np.array(dataX), np.array(dataY)

data = pd.read_csv(file_dir, header=0, infer_datetime_format=True, parse_dates=['Time'], index_col=['Time'])

data = data[(data.index.month.isin([5, 6, 7])) & (data.index.year == 2021)]

dataset = data[['Energy delta[Wh]', 'GHI', 'temp', 'pressure', 'humidity']]

scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(dataset)

X_scaled, y_scaled = create_dataset(scaled_data, time_step)
print(f'X, y shape {X_scaled.shape} {y_scaled.shape}')

  data = pd.read_csv(file_dir, header=0, infer_datetime_format=True, parse_dates=['Time'], index_col=['Time'])


dataset shape (8832, 5)
X, y shape (8808, 24, 5) (8808,)


In [4]:
data = pd.read_csv(file_dir, header=0, infer_datetime_format=True, parse_dates=['Time'], index_col=['Time'])

# data = data[(data.index.month.isin([6, 7, 8])) & (data.index.year == 2021)]

data = data[(data.index.year == 2021)]

dataset = data[['Energy delta[Wh]', 'GHI', 'temp', 'pressure', 'humidity']]
# dataset = data[['Energy delta[Wh]', 'GHI']]

X = dataset.iloc[:, 1:].values  # Features
y = dataset.iloc[:, 0].values   # Target

  data = pd.read_csv(file_dir, header=0, infer_datetime_format=True, parse_dates=['Time'], index_col=['Time'])


In [5]:
import tensorflow.keras.backend as K

def r2_metric(y_true, y_pred):
    """
    R-squared metric for regression tasks.
    """
    ss_res = K.sum(K.square(y_true - y_pred))  # Residual sum of squares
    ss_tot = K.sum(K.square(y_true - K.mean(y_true)))  # Total sum of squares
    return 1 - (ss_res / (ss_tot + K.epsilon()))  # R² formula

In [6]:


# Define the objective function
def evaluate_bilstm(hyperparameters):
    # Unpack hyperparameters
    units, learning_rate, dropout_rate, batch_size = hyperparameters
    print(units, learning_rate, dropout_rate, batch_size)


    # Split dataset into training and validation (dummy example)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    gbdt = GradientBoostingRegressor(n_estimators=100, learning_rate=learning_rate, max_depth=5)

    gbdt.fit(X_train, y_train.ravel())

    # Step 2: Get predictions for all models on both training and test sets
    gbdt_output_train = gbdt.predict(X_train)
    gbdt_output_test = gbdt.predict(X_test)

    scaler = MinMaxScaler()
    gbdt_output_train_scaled = scaler.fit_transform(gbdt_output_train.reshape(-1, 1))
    gbdt_output_test_scaled = scaler.transform(gbdt_output_test.reshape(-1, 1))

    X_train_bilstm_gbdt, y_train_bilstm_gbdt = create_dataset(gbdt_output_train_scaled, time_step)
    X_test_bilstm_gbdt, y_test_bilstm_gbdt = create_dataset(gbdt_output_test_scaled, time_step)

    X_train_bilstm_gbdt = X_train_bilstm_gbdt.reshape(X_train_bilstm_gbdt.shape[0], time_step, 1)
    X_test_bilstm_gbdt = X_test_bilstm_gbdt.reshape(X_test_bilstm_gbdt.shape[0], time_step, 1)

    # Build the BiLSTM model
    # model = Sequential([
    #     Bidirectional(LSTM(int(units), activation='relu',return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2]))),
    #     Dropout(dropout_rate),
    #     Bidirectional(LSTM(int(units), activation='relu',return_sequences=False)),
    #     Dropout(dropout_rate),
    #     Dense(1)
    # ])

    # Build the BiLSTM model
    model = Sequential([
        Bidirectional(LSTM(int(units), activation='relu',return_sequences=True, input_shape=(time_step, num_feats))),
        Dropout(dropout_rate),
        Bidirectional(LSTM(int(units), activation='relu',return_sequences=False)),
        Dropout(dropout_rate),
        Dense(1)
    ])

    # model = Sequential([
    #     Bidirectional(LSTM(int(units), activation='relu',return_sequences=True, input_shape=(time_step, num_feats))),
    #     Dropout(dropout_rate),
    #     Bidirectional(LSTM(int(units), activation='relu',return_sequences=False)),
    #     Dropout(dropout_rate),
    #     Dense(1)
    # ])
    # optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=[r2_metric])


    # Train the model
    history = model.fit(X_train_bilstm_gbdt, y_train_bilstm_gbdt, validation_data=(X_test_bilstm_gbdt, y_test_bilstm_gbdt), batch_size=int(batch_size), epochs=5, verbose=1)

    # Return validation loss as fitness
    return history.history['val_loss'][-1]


In [7]:
lb = [100, 1e-3, 0.1, 10]  # Lower bounds: [units, learning rate, dropout rate, batch size]
ub = [300, 1e-2, 0.4, 300]  # Upper bounds: [units, learning rate, dropout rate, batch size]


In [8]:
from functools import partial
import numpy as np



class WOA:
    def __init__(self, obj_func, n_whale, spiral_constant, n_iter,
                lb, ub):
        self.obj_func = obj_func
        self.n_whale = n_whale
        self.spiral_constant = spiral_constant
        self.n_iter = n_iter
        self.lb = lb
        self.ub = ub
        self.whale = {}
        self.prey = {}

    def init_whale(self):
        tmp = [np.random.uniform(self.lb, self.ub, size=(len(self.lb),))
               for i in range(self.n_whale)]
        self.whale['position'] = np.array(tmp)
        self.whale['fitness'] = np.array([self.obj_func(pos) for pos in self.whale['position']])

    def init_prey(self):
        tmp = [np.random.uniform(self.lb, self.ub, size=(len(self.lb),))]
        self.prey['position'] = np.array(tmp)
        self.prey['fitness'] = self.obj_func(self.prey['position'])

    def update_prey(self):
        if self.whale['fitness'].min() < self.prey['fitness'][0]:
            self.prey['position'][0] = self.whale['position'][self.whale['fitness'].argmin()]
            self.prey['fitness'][0] = self.whale['fitness'].min()

    def search(self, idx, A, C):
        random_whale = self.whale['position'][np.random.randint(low=0, high=self.n_whale,
                                                                size=len(idx[0]))]
        d = np.abs(C[..., np.newaxis] * random_whale - self.whale['position'][idx])
        self.whale['position'][idx] = np.clip(random_whale - A[..., np.newaxis] * d, self.lb, self.ub)

    def encircle(self, idx, A, C):
        d = np.abs(C[..., np.newaxis] * self.prey['position'] - self.whale['position'][idx])
        self.whale['position'][idx] = np.clip(self.prey['position'][0] - A[..., np.newaxis] * d, self.lb, self.ub)

    def bubble_net(self, idx):
        d_prime = np.abs(self.prey['position'] - self.whale['position'][idx])
        l = np.random.uniform(-1, 1, size=len(idx[0]))
        self.whale["position"][idx] = np.clip(
            d_prime * np.exp(self.spiral_constant * l)[..., np.newaxis] * np.cos(2 * np.pi * l)[..., np.newaxis]
            + self.prey["position"],
            self.lb,
            self.ub,
        )

    def optimize(self, a):

        p = np.random.random(self.n_whale)
        r1 = np.random.random(self.n_whale)
        r2 = np.random.random(self.n_whale)
        A = 2 * a * r1 - a
        C = 2 * r2
        search_idx = np.where((p < 0.5) & (abs(A) > 1))
        encircle_idx = np.where((p < 0.5) & (abs(A) <= 1))
        bubbleNet_idx = np.where(p >= 0.5)
        self.search(search_idx, A[search_idx], C[search_idx])
        self.encircle(encircle_idx, A[encircle_idx], C[encircle_idx])
        self.bubble_net(bubbleNet_idx)
        self.whale['fitness'] = self.obj_func(self.whale['position'])

    def run(self):
        self.init_whale()
        self.init_prey()
        f_values = [self.prey['fitness'][0]]
        for n in range(self.n_iter):
            #print("Iteration = ", n, " f(x) = ", self.prey['fitness'][0])
            a = 2 - n * (2 / self.n_iter)
            self.optimize(a)
            self.update_prey()
            f_values.append(self.prey['fitness'][0])
        optimal_x = self.prey['position'].squeeze()
        return f_values, optimal_x

In [9]:
# Partial function to adapt objective function for WOA
# obj_func = partial(lambda params: [evaluate_bilstm(param) for param in params])

# obj_func = lambda params: evaluate_bilstm(params)

obj_func = partial(lambda params: evaluate_bilstm(params))

# Instantiate WOA
woa = WOA(obj_func=obj_func, n_whale=10, spiral_constant=1, n_iter=20, lb=lb, ub=ub)

# Run WOA optimization
f_values, optimal_hyperparameters = woa.run()

print("Optimal Hyperparameters:", optimal_hyperparameters)


142.13913911899502 0.006943386621196543 0.19647969020565875 251.002587581608
dataset shape (27494, 1)
dataset shape (6874, 1)
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
282.2398496732009 0.0016938844097083498 0.17879543798802866 298.6598503750656
dataset shape (27494, 1)
dataset shape (6874, 1)
Epoch 1/5
Epoch 2/5
Epoch 3/5

KeyboardInterrupt: 