In [1]:
# Data_Ingestion
import pandas as pd
from sqlalchemy import create_engine
import logging

# Paths for the CSV file and database
FILE_NAME = "data/electricity_prices.csv"
DATABASE_NAME = "data/electricity.db"

logging.basicConfig(level=logging.INFO)

def load_data_to_db(csv_file, db_file):
    df = pd.read_csv(csv_file)
    engine = create_engine(f'sqlite:///{db_file}')
    df.to_sql('electricity', con=engine, if_exists='replace', index=False)
    logging.info(f"Data loaded into database: {db_file}")

if __name__ == "__main__":
    load_data_to_db(FILE_NAME, DATABASE_NAME)

INFO:root:Data loaded into database: data/electricity.db


In [2]:
import sqlite3
import pandas as pd

def load_data_from_db(db_file):
    conn = sqlite3.connect(db_file)
    sql_query = "SELECT * FROM electricity"
    df = pd.read_sql_query(sql_query, conn)
    conn.close()  # Ensure the connection is closed after reading the data
    return df

In [3]:
# Eda
import os
import pandas as pd
import sqlite3
from ydata_profiling import ProfileReport
import logging
DATABASE_NAME = "data/electricity.db"

logging.basicConfig(level=logging.INFO)

def perform_eda(df):
    os.makedirs('reports', exist_ok=True)
    profile = ProfileReport(df, title="Electricity Price Data Profiling Report", correlations={"auto": {"calculate": False}})
    profile.to_file("reports/electricity_price_profile.html")
    logging.info("EDA report generated")

if __name__ == "__main__":
    df = load_data_from_db(DATABASE_NAME)
    perform_eda(df)

INFO:visions.backends:Pandas backend loaded 2.2.2
INFO:visions.backends:Numpy backend loaded 1.26.4
INFO:visions.backends:Pyspark backend NOT loaded
INFO:visions.backends:Python backend loaded


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

INFO:root:EDA report generated


In [4]:
# Model_Training
import os
import joblib
import pandas as pd
import sqlite3
import logging
from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LinearRegression, ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import mlflow
import mlflow.sklearn
import numpy as np
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

DATABASE_NAME = "data/electricity.db"

def create_preprocessor(numerical_features, categorical_features):
    numerical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_pipeline, numerical_features),
            ('cat', categorical_pipeline, categorical_features)
        ])
    
    return preprocessor

def evaluate_model(name, model, X_train, y_train, preprocessor):
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('model', model)
    ])
    scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='neg_mean_squared_error', n_jobs=-1)
    rmse_scores = np.sqrt(-scores)
    return name, rmse_scores.mean(), model

def evaluate_models(X_train, y_train, preprocessor):
    models = {
        "Linear Regression": LinearRegression(),
        "Random Forest": RandomForestRegressor(random_state=42),
        "Elastic Net": ElasticNet(random_state=42),
        "Gradient Boosting": GradientBoostingRegressor(random_state=42)
    }
    
    results = Parallel(n_jobs=-1)(
        delayed(evaluate_model)(name, model, X_train, y_train, preprocessor) 
        for name, model in models.items()
    )
    
    return results

def log_model_to_mlflow(model, model_name, X_train, X_test, y_train, y_test, preprocessor):
    mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
    mlflow.set_experiment("Electricity Price Prediction")

    # Accessing MLflow credentials
    username = os.environ['MLFLOW_TRACKING_USERNAME']
    password = os.environ['MLFLOW_TRACKING_PASSWORD']
    session = requests.Session()
    session.auth = (username, password)

    retry = Retry(
        total=5,
        backoff_factor=0.1,
        status_forcelist=[500, 502, 503, 504],
        allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT"]
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    with mlflow.start_run() as run:
        pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('model', model)
        ])
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)

        mlflow.log_param("model_type", model_name)
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("rmse", rmse)
        mlflow.sklearn.log_model(pipeline, "model")
        logging.info(f"{model_name} model logged to MLflow")

        return run.info.run_id, rmse

if __name__ == "__main__":
    df = load_data_from_db(DATABASE_NAME)
    
    numerical_features = ['DayOfWeek', 'WeekOfYear', 'Day', 'Month', 'Year', 'PeriodOfDay',
                          'ForecastWindProduction', 'SystemLoadEA', 'SMPEA', 'ORKTemperature',
                          'ORKWindspeed', 'CO2Intensity', 'ActualWindProduction', 'SystemLoadEP2']
    categorical_features = ['Holiday']
    
    X = df.drop('SMPEP2', axis=1)
    y = df['SMPEP2']

    # Drop rows where the target variable is NaN
    X = X[~y.isna()]
    y = y.dropna()

    preprocessor = create_preprocessor(numerical_features, categorical_features)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

    models_results = evaluate_models(X_train, y_train, preprocessor)

    best_model_name = None
    best_rmse = float('inf')
    best_run_id = None
    best_model = None

    for model_name, rmse, model in models_results:
        run_id, model_rmse = log_model_to_mlflow(model, model_name, X_train, X_test, y_train, y_test, preprocessor)
        if model_rmse < best_rmse:
            best_rmse = model_rmse
            best_model_name = model_name
            best_run_id = run_id
            best_model = model

    logging.info(f"Best model: {best_model_name} with RMSE: {best_rmse}")

    # Register the best model
    mlflow.register_model(
        model_uri=f"runs:/{best_run_id}/model",
        name="ElectricityPriceModel"
    )
    logging.info(f"{best_model_name} model registered in MLflow Model Registry")

    # Save the best model and preprocessor locally
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('model', best_model)
    ])
    os.makedirs('model', exist_ok=True)
    joblib.dump(pipeline, 'model/best_model.pkl')
    logging.info(f"{best_model_name} model and preprocessor saved locally")


2024/08/01 11:42:33 INFO mlflow.tracking.fluent: Experiment with name 'Electricity Price Prediction' does not exist. Creating a new experiment.
INFO:root:Linear Regression model logged to MLflow
INFO:root:Random Forest model logged to MLflow
INFO:root:Elastic Net model logged to MLflow
INFO:root:Gradient Boosting model logged to MLflow
INFO:root:Best model: Random Forest with RMSE: 21.655789236118906
Successfully registered model 'ElectricityPriceModel'.
2024/08/01 11:46:46 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ElectricityPriceModel, version 1
Created version '1' of model 'ElectricityPriceModel'.
INFO:root:Random Forest model registered in MLflow Model Registry
INFO:root:Random Forest model and preprocessor saved locally


In [6]:
# Model_Optimization
import os
import joblib
import pandas as pd
import sqlite3
import logging
import time
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import mlflow
import mlflow.sklearn
import numpy as np
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

DATABASE_NAME = "data/electricity.db"

def create_preprocessor(numerical_features, categorical_features):
    numerical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_pipeline, numerical_features),
            ('cat', categorical_pipeline, categorical_features)
        ])
    
    return preprocessor

def optimize_model(model, param_grid, X_train, y_train, preprocessor):
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('model', model)
    ])

    grid_search = GridSearchCV(pipeline, param_grid, cv=3, scoring='neg_mean_squared_error', n_jobs=-1)
    grid_search.fit(X_train, y_train)
    
    best_pipeline = grid_search.best_estimator_
    best_params = grid_search.best_params_
    best_score = np.sqrt(-grid_search.best_score_)

    return best_pipeline, best_params, best_score

def log_and_register_model_to_mlflow(pipeline, model_name, X_train, X_test, y_train, y_test):
    mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
    mlflow.set_experiment("Electricity Price Prediction")

    # Accessing MLflow credentials
    username = os.environ['MLFLOW_TRACKING_USERNAME']
    password = os.environ['MLFLOW_TRACKING_PASSWORD']
    session = requests.Session()
    session.auth = (username, password)

    retry = Retry(
        total=5,
        backoff_factor=0.1,
        status_forcelist=[500, 502, 503, 504],
        allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT"]
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    with mlflow.start_run() as run:
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)

        mlflow.log_param("model_type", model_name)
        mlflow.log_params(pipeline.named_steps['model'].get_params())
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("rmse", rmse)
        mlflow.sklearn.log_model(pipeline, "model")
        logging.info(f"{model_name} model logged to MLflow")

        return run.info.run_id, rmse

if __name__ == "__main__":
    df = load_data_from_db(DATABASE_NAME)
    
    numerical_features = ['DayOfWeek', 'WeekOfYear', 'Day', 'Month', 'Year', 'PeriodOfDay',
                          'ForecastWindProduction', 'SystemLoadEA', 'SMPEA', 'ORKTemperature',
                          'ORKWindspeed', 'CO2Intensity', 'ActualWindProduction', 'SystemLoadEP2']
    categorical_features = ['Holiday']
    
    X = df.drop('SMPEP2', axis=1)
    y = df['SMPEP2']

    # Drop rows where the target variable is NaN
    X = X[~y.isna()]
    y = y.dropna()

    preprocessor = create_preprocessor(numerical_features, categorical_features)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

    # Load the best model name from the training script
    best_model_name = "Random Forest"  # Example: You should load this dynamically based on your previous script

    if best_model_name == "Linear Regression":
        best_model = LinearRegression()
        param_grid = {
            'model__fit_intercept': [True, False],
            'model__normalize': [True, False]
        }
    elif best_model_name == "Random Forest":
        best_model = RandomForestRegressor(random_state=42)
        param_grid = {
            'model__n_estimators': [100, 200],
            'model__max_depth': [10, 20],
            'model__min_samples_split': [2, 5]
        }
    elif best_model_name == "Elastic Net":
        best_model = ElasticNet(random_state=42)
        param_grid = {
            'model__alpha': [0.1, 1.0],
            'model__l1_ratio': [0.1, 0.5]
        }
    elif best_model_name == "Gradient Boosting":
        best_model = GradientBoostingRegressor(random_state=42)
        param_grid = {
            'model__n_estimators': [100, 200],
            'model__learning_rate': [0.01, 0.1],
            'model__max_depth': [3, 5]
        }

    optimized_pipeline, best_params, best_score = optimize_model(best_model, param_grid, X_train, y_train, preprocessor)
    logging.info(f"Optimized model parameters: {best_params}")
    logging.info(f"Best RMSE score: {best_score}")

    run_id, rmse = log_and_register_model_to_mlflow(optimized_pipeline, best_model_name, X_train, X_test, y_train, y_test)

    logging.info(f"Best optimized model: {best_model_name} with RMSE: {best_score}")

    # Register the best optimized model
    mlflow.register_model(
        model_uri=f"runs:/{run_id}/model",
        name="OptimizedElectricityPriceModel"
    )
    logging.info(f"Optimized {best_model_name} model registered in MLflow Model Registry")

    # Save the optimized model and preprocessor locally
    os.makedirs('model', exist_ok=True)
    joblib.dump(optimized_pipeline, 'model/optimized_model.pkl')
    logging.info(f"Optimized {best_model_name} model and preprocessor saved locally")


INFO:root:Optimized model parameters: {'model__max_depth': 20, 'model__min_samples_split': 2, 'model__n_estimators': 200}
INFO:root:Best RMSE score: 22.770299435556765
INFO:root:Random Forest model logged to MLflow
INFO:root:Best optimized model: Random Forest with RMSE: 22.770299435556765
Successfully registered model 'OptimizedElectricityPriceModel'.
2024/08/01 12:04:14 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: OptimizedElectricityPriceModel, version 1
Created version '1' of model 'OptimizedElectricityPriceModel'.
INFO:root:Optimized Random Forest model registered in MLflow Model Registry
INFO:root:Optimized Random Forest model and preprocessor saved locally
