In [2]:
import os
import gc
import pickle
import statsmodels.api as sm
import pandas as pd
import numpy as np
import math
import seaborn as sns
from seaborn_analyzer import regplot
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import japanize_matplotlib
import random
import logging
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib')
from openpyxl import load_workbook
from datetime import datetime, timedelta
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')
from platypus import NSGAIII, Problem, Real, Constraint

### 作成した予測モデルを目的関数にもつNSGA-IIIアルゴリズムによる非線形最適化問題で計画自動生成プログラム

#### 便利関数群

In [3]:
def pickle_dump(obj, path):
    with open(path, mode='wb') as f:
        pickle.dump(obj,f)

def pickle_load(path):
    with open(path, mode='rb') as f:
        data = pickle.load(f)
        return data

def validate_input(target_to_features, data):
    """
    入力変数とデータ構造を検証します。
    Args:
        target_to_features (dict): 目的変数と対応する説明変数のディクショナリ。
        data (pd.DataFrame): 入力データ。
    Raises:
        ValueError: 入力が無効な場合。
    """
    if not isinstance(target_to_features, dict):
        raise ValueError("target_to_features must be a dictionary")
    for target, features in target_to_features.items():
        if not isinstance(target, str) or not isinstance(features, list):
            raise ValueError("target_to_features keys must be strings and values must be lists")
        if target not in data.columns:
            raise ValueError(f"{target} is not a column in the data")
        for feature in features:
            if feature not in data.columns:
                raise ValueError(f"{feature} is not a column in the data")

def create_directory_if_not_exists(directory_path):
    """
    指定されたパスにフォルダが存在しない場合、フォルダを作成します。
    Args:
        directory_path (str): フォルダを作成するパス。
    """
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)

def check_file_path(file_path):
    """
    指定されたパスにフォルダが存在しない場合、エラーを返します。
    Args:
        directory_path (str): フォルダを作成するパス。
    """
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

def add_hour_column(df):
    """
    'time'列から時刻を抽出し、新しい列としてDataFrameに追加します。
    Args:
        df (pd.DataFrame): 入力データ。
    Returns:
        pd.DataFrame: 新しい 'hour' 列が追加されたDataFrame。
    """
    df['hour'] = pd.to_datetime(df['time'], format='%H:%M:%S').dt.hour
    return df

def one_hot_encode_hours(df):
    """
    'hour' 列をワンホットエンコードし、変更されたDataFrameを返します。
    Args:
        df (pd.DataFrame): 入力データ。
    Returns:
        pd.DataFrame: ワンホットエンコードされた 'hour' 列を持つDataFrame。
    """
    all_hours = range(1, 24)
    hour_columns = [f'hour_{h}' for h in all_hours]
    df = pd.get_dummies(df, columns=['hour'], prefix='hour', drop_first=True)
    # 全てのhour列が含まれるように、存在しない列を追加する
    for col in hour_columns:
        if col not in df.columns:
            df[col] = 0

    return df

def plot_actual_vs_predicted_and_residuals(train_df, test_df, target, features, train_y_pred, test_y_pred, residuals, save_directory, eval_metric):
    """
    実際の値と予測値のグラフおよび残差グラフをプロットし、保存する関数。
    Args:
        train_df (pd.DataFrame): 学習用データセット。
        test_df (pd.DataFrame): 検証用データセット。
        target (str): 目的変数。
        features (list): 説明変数のリスト。
        train_y_pred (np.array): 学習用データの予測値。
        test_y_pred (np.array): 検証用データの予測値。
        residuals (np.array): 学習用データの残差。
        save_directory (str): 結果を保存するディレクトリへのパス。
        eval_metric (str): 評価指標。
    """
    # 観測値、予測値 vs. time
    fig, axs = plt.subplots(2, 1, figsize=(50, 16))
    axs[0].plot(train_df['time'], train_df[target], label='Actual')
    axs[0].plot(train_df['time'], train_y_pred, label='Predicted')
    axs[0].set_title('Train Data')
    axs[0].set_xlabel('Time')
    axs[0].set_ylabel(target)
    axs[0].legend()
    
    axs[1].plot(test_df['time'], test_df[target], label='Actual')
    axs[1].plot(test_df['time'], test_y_pred, label='Predicted')
    axs[1].set_title('Test Data')
    axs[1].set_xlabel('Time')
    axs[1].set_ylabel(target)
    axs[1].legend()
    
    fig.suptitle(f'{target} - Actual vs Predicted')
    plt.savefig(os.path.join(save_directory, f'{target}_yy_vs_time_{eval_metric}.png'))
    plt.close(fig)

    # 残差 vs. 説明変数
    fig, ax = plt.subplots(1, len(features), figsize=(10*len(features), 20))
    for i in range(len(features)):
        if len(features) > 1:
            ax[i].scatter(train_df[features[i]], residuals)
            ax[i].set_xlabel(features[i])
            ax[i].set_ylabel('Residuals')
        else:
            ax.scatter(train_df[features[i]], residuals)
            ax.set_xlabel(features[i])
            ax.set_ylabel('Residuals')
    plt.savefig(os.path.join(save_directory,f'{target}_residuals_vs_xvars_{eval_metric}.png'))
    plt.close(fig)

def predict_and_evaluate(models:dict, df:pd.DataFrame, target_to_features:dict, save_path:str):
    """
    与えられたモデルを使って、指定されたDataFrameに対して予測を行い、各種評価指標とグラフを表示します。
    Args:
        models (dict): ターゲット変数ごとに訓練されたモデルの辞書
        df (pd.DataFrame): 予測を行う対象のデータフレーム
        target_to_features (dict): ターゲット変数ごとに使用する説明変数のリストを格納した辞書, 説明変数にはOne Hot Encodingされて生成された時間列含む。
        save_path (str): グラフを保存するディレクトリのパス
    """
    # 時間（Hour）列を作成
    df = add_hour_column(df)
    
    # 時間列に対してOne Hot Encodingを実行
    df = one_hot_encode_hours(df)

    # 結果を格納するDataFrameを作成
    result_df = pd.DataFrame(columns=['y_var', 'mse', 'mae', 'r2'])

    for target, features in target_to_features.items():
        # 予測値を計算
        y_pred = models[target].predict(df[features])

        # 評価指標を計算
        mse = mean_squared_error(df[target], y_pred)
        mae = mean_absolute_error(df[target], y_pred)
        r2 = r2_score(df[target], y_pred)

        # 結果をDataFrameに追加
        result_df.loc[len(result_df)] = [target, mse, mae, r2]

        # グラフの作成
        fig, ax = plt.subplots(figsize=(15, 6))
        ax.plot(df['time'], df[target], label='Actual')
        ax.plot(df['time'], y_pred, label='Predicted')
        ax.set_title(f'{target} - Actual vs Predicted')
        ax.set_xlabel('Time')
        ax.set_ylabel(target)
        ax.legend()

        # グラフの保存
        plt.savefig(os.path.join(save_path, f'{target}_prediction.png'))
        plt.close(fig)

    # 結果の表示
    print(result_df)

    # 結果の保存
    result_file_path = os.path.join(save_path, 'prediction_result.csv')
    result_df.to_csv(result_file_path, index=False)

def output(df:pd.DataFrame, models:dict):
    """
    df, modelsを受け取り、予測モデルに基づいて目的変数の列を入力dfに新たに作成し、予測値を格納する。
    Args:
        df (pd.DataFrame): 入力データ。時間列と各列の計画値が入ったDataFrame
        models (dict): 予測モデル、各列の値から各出力を予測するモデルが格納されたdict
    Returns:
        pd.DataFrame: 入力データに対する予測データを格納したDataFrame
    """
    var_dict = pickle_load('/workspaces/data/control/optimization_mapping/var_dict.pkl')
    df = one_hot_encode_hours(df)
    result = {}

    for target, model in models.items():
        features = var_dict[target]
        valid_columns = [col for col in features if col in df.columns]
        result[target] = model.predict(df[valid_columns])
    return pd.DataFrame(result)

def check_file_path(file_path: str):
    import os
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"No file found at {file_path}")


#### 必要ファイルのロード

In [4]:
var_dict = pickle_load('/workspaces/data/control/optimization_mapping/var_dict.pkl')
df = pd.read_pickle('/workspaces/data/flow/preprocessed_dataset.pkl')
models = pickle_load('/workspaces/data/control/optimization_models/best_models_mae.pkl')
columns = pickle_load('/workspaces/data/control/optimization_mapping/columns.pkl')
columns_mapping = pickle_load('/workspaces/data/control/optimization_mapping/columns_mapping.pkl')

### テスト用サンプルデータ作成用関数　基本的にはテストのみに使用

In [None]:
# テスト用のサンプルデータを生成する関数
def create_sample_data(total_hours, total_areas, num_constants):
    # 変数のリストを生成
    variables = np.random.randint(1, 10, total_hours * total_areas - num_constants)
    
    # 定数スケジュールを生成
    constant_schedule = np.zeros((total_hours, total_areas))
    constant_indices = np.random.choice(total_hours * total_areas, num_constants, replace=False)
    constant_schedule.flat[constant_indices] = np.random.randint(1, 10, num_constants)
    constant_schedule[0, 0] = np.nan  # テスト用に1つのNaNを含むように変更
    constant_schedule[1, 1] = np.nan  # テスト用に1つのNaNを含むように変更
    
    return variables, constant_schedule

vars, constant_schedule = create_sample_data(23, 6, 92)

In [7]:
# Global constants
total_hours = 23
total_areas = 6
total_products = 3
total_vars = total_hours * total_areas + 4
total_objects = 3
total_contraints = 34
worker_min = 0
worker_max = 48
csv_path = '/workspaces/data/control/optimization_mapping/constant_schedule.csv'

def read_constant_schedule(csv_filepath: str) -> np.ndarray:
    """
    CSVファイルから定数スケジュールを読み込み、NumPy配列として返します。
    Args:
        csv_filepath (str): CSVファイルへのパス。
    Returns:
        np.array: 定数スケジュールのNumPy配列。
    """
    check_file_path(csv_filepath)
    df = pd.read_csv(csv_path, encoding='shift-jis')
    df = df.rename(columns=columns_mapping)
    df = df.filter(columns, axis=1)
    constant_schedule = df.values
    return constant_schedule

def apply_constant_schedule(variables: list, constant_schedule: np.ndarray) -> np.ndarray:
    """
    変数と定数スケジュールを組み合わせて、作業者のスケジュールを作成します。
    Args:
        variables (list): 最適化問題の変数のリスト。
        constant_schedule (np.array): 定数スケジュールのNumPy配列。
    Returns:
        np.array: 作業者スケジュールのNumPy配列。
    """
    constant_mask = np.logical_not(np.isnan(constant_schedule))
    worker_schedule = np.zeros(constant_schedule.shape)
    index = 0
    for i in range(constant_schedule.shape[0]):
        for j in range(constant_schedule.shape[1]):
            if constant_mask[i, j]:
                worker_schedule[i, j] = constant_schedule[i, j]
            else:
                worker_schedule[i, j] = variables[index]
                index += 1
    return worker_schedule

def belegundu(vars: list) -> tuple:
    """
    最適化問題の目的関数と制約条件を設定します。
    Args:
        vars (list): 最適化された変数のリスト。
    Returns:
        tuple: (目的関数のリスト, 制約条件のリスト)
    """
    w1, w2, w3, w4 = np.array(vars[:4])
    constant_schedule = read_constant_schedule('/workspaces/data/control/optimization_mapping/constant_schedule.csv')
    worker_schedule = apply_constant_schedule(variables=vars, constant_schedule=constant_schedule)
    df = pd.DataFrame(worker_schedule, columns=columns)
    df['hour'] = [hour % 24 for hour in range(8, 31)]
    
    # Objective function
    result = output(df, models)
    production_a = result['1F']
    production_c = result['3F']
    total_worker = np.array(vars[:4]).sum()

    objective_function = [-production_a.sum(), -production_c.sum(), total_worker]

    # Constraint function
    constraint1 = -production_a.sum() + 30000
    constraints3to9 = [worker_schedule[i].sum() - get_schedule_limit(w1, w2, w3, w4, i) for i in range(total_hours)]
    constraint10 = - 48 + (w1 + w2)
    constraint11 = - 48 + (w2 + w3)
    constraint12_1 = [-wi for wi in [w1, w2, w3, w4]]
    constraint12_2 = [-48 + wi for wi in [w1, w2, w3, w4]]

    constraint_function = [constraint1] + constraints3to9 + [constraint10, constraint11] + constraint12_1 + constraint12_2

    return objective_function, constraint_function

def get_schedule_limit(w1, w2, w3, w4, hour):
    """
    指定された時間帯に対するスケジュール制限を計算します。
    Args:
        hour (int): 時間帯 (0-22)。
        w1 (float): : 制約条件用の変数。
    Returns:
        float: 指定された時間帯に対するスケジュール制限。
    """
    if hour == 0:
        return 0.8 * w1
    elif hour == 1:
        return 0.8 * w2 + w1
    elif 2 <= hour <= 8:
        return w1 + w2
    elif hour == 9:
        return w2 + 0.5 * 0.8 * w3
    elif 10 <= hour <= 13:
        return w3
    elif hour == 14:
        return 0.8 * w4
    else:
        return w4

constant_schedule = read_constant_schedule(csv_path)
problem = Problem(4 + (total_hours * total_areas - np.count_nonzero(~np.isnan(constant_schedule))), total_objects, total_contraints)  # 変数の数, 目的関数の数, 制約条件の数
problem.types[:] = [Real(worker_min, worker_max) for _ in range(4 + (total_hours * total_areas - np.count_nonzero(~np.isnan(constant_schedule))))]
problem.constraints[:] = "<=0"
problem.function = belegundu

algorithm = NSGAIII(problem, divisions_outer=5)
algorithm.run(30000)  # 試行

# 解集合を取得
solutions = algorithm.result

In [None]:
def constraint_check(vars):
    """
    与えられた変数に対して制約条件をチェックし、違反している制約の数を返す関数。
    Args:
        vars (list): 制約条件をチェックする変数のリスト。
    Returns:
        int: 違反している制約の数。
    この関数は、次の手順で制約条件をチェックします。
    1. constraint_function によって制約条件を計算する。
    2. 計算された制約条件が正の場合、違反していると判断し、フラグを立てる。
    3. 違反している制約の数をカウントし、結果を返す。
    """
    constraint_function = constraint_function(vars)
    constraint_num = [f'constraint_{i}' for i in range(34)]
    constrain_flag = [1 if constraint > 0 else 0 for constraint in constraint_function]

    result = pd.DataFrame({
        'number': constraint_num,
        'flag':constrain_flag
    })

    return sum([1 if i > 0 else 0 for i in constraint_function])


In [None]:
def multiply_solution(vars):
    """
    与えられた変数に対して制約条件をチェックし、違反している制約の数を返す関数。
    Args:
        vars (list): 制約条件をチェックする変数のリスト。
    Returns:
        int: 違反している制約の数。
    この関数は、次の手順で制約条件をチェックします。
    1. constraint_function によって制約条件を計算する。
    2. 計算された制約条件が正の場合、違反していると判断し、フラグを立てる。
    3. 違反している制約の数をカウントし、結果を返す。
    """
    constraint_function = constraint_function(vars)
    constraint_num = [f'constraint_{i}' for i in range(34)]
    constrain_flag = [1 if constraint > 0 else 0 for constraint in constraint_function]

    result = pd.DataFrame({
        'number': constraint_num,
        'flag':constrain_flag
    })

    return sum([1 if i > 0 else 0 for i in constraint_function])

In [None]:
w1, w2, w3, w4 = np.array(solutions[10].variables[:4])
worker_schedule = np.array(solutions[10].variables[4:]).reshape(23, 6)

In [None]:
solutions_value = []
for i, solution in enumerate(solutions):
    objective_values = solution.objectives
    for j, obj_value in enumerate(objective_values):
        if obj_value < 0:
            obj_value = -1 * obj_value
        objective_values[j] = obj_value
    solutions_value.append(objective_values)
solutions_df = pd.DataFrame(np.array(solutions_value), columns=['1F', '3F', 'eficiency', 'total_mh'])

In [None]:
solutions_df[solutions_df['eficiency'] == solutions_df['eficiency'].max()]