In [2]:
import matplotlib.pyplot as plt
import biosteam as bst
import numpy as np
from scipy.optimize import differential_evolution # population based optimization code
from scipy.stats import qmc # latin hypercube sampling 
from skopt import gp_minimize
from skopt.space import Integer, Real
from skopt.utils import use_named_args
import warnings

# PyTorch 관련 모듈 (BNN에 필요)
import torch
import torch.nn as nn
import torch.optim as optim

In [3]:


# 아래 import는 사용자 환경에 맞추어 조정합니다.
# from BayesianNN import BayesianNN
# 여기서는 예시로 내부에 직접 구현한 간단한 BNN 클래스를 정의한다고 가정하겠습니다.
class BayesianNN(nn.Module):
    def __init__(self, input_dim, hidden_dim=32):
        super(BayesianNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        # Dropout (MC Dropout)
        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x




In [4]:
###############################################################################
# 1. AceticAcidProcess: 공정(프로세스) 시뮬레이션 및 재무 지표 계산을 담당하는 클래스
###############################################################################
class AceticAcidProcess:
    def __init__(self, verbose=False):
        """
        BioSTEAM을 이용한 아세트산 분리 공정 모델을 설정하고,
        CAPEX, OPEX, MSP 등의 재무 지표 계산 로직을 포함.
        """
        bst.nbtutorial()  # (그래프 라이트 모드용)
        self.verbose = verbose
        self.nEval = 0  # 시뮬레이션 실행 횟수를 추적
        self.sys = None
        
        # 기본 모델 셋업
        self.set_base_model()
        
        if self.verbose:
            print(" ##### 'AceticAcidProcess' 인스턴스가 초기화되었습니다!")

    def set_base_model(self):
        """
        최초 기본 모델 세팅을 위한 함수 (원하는 기본값으로 설정 가능)
        """
        # 예시로 간단히 변수 세팅
        X_base = [12,   # 추출기 단계 수
                  0.95, 0.95,  # Lr1, Hr1
                  0.999, 0.999,  # Lr2, Hr2
                  310,          # T_hex
                  0.99, 0.99,   # Lr3, Hr3
                 ]
        self._set(X_base)

    def _bounds(self):
        """
        설계변수의 상하한 범위
        """
        bounds = [
            (0, 50),      # n_stages
            (0, 0.9999), (0, 0.9999),  # Lr1, Hr1
            (0, 0.9999), (0, 0.9999),  # Lr2, Hr2
            (273, 350),   # T_hex
            (0, 0.9999), (0, 0.9999),  # Lr3, Hr3
        ]
        return bounds

    def _integrality(self):
        """
        어떤 변수가 정수인지(True) 여부를 반환
        """
        ints = [
            True,     # n_stages (정수)
            False, False,
            False, False,
            False,
            False, False
        ]
        return ints

    def _set(self, X):
        """
        실제 BioSTEAM 프로세스에 변수를 반영하는 메서드
        """
        # Thermo 설정
        bst.settings.set_thermo(['Water', 'AceticAcid', 'EthylAcetate'])

        # unpack
        _n1 = int(X[0])  # 추출기 단계 수 (정수화)
        _Lr1, _Hr1 = X[1], X[2]
        _Lr2, _Hr2 = X[3], X[4]
        _T_hex = X[5]
        _Lr3, _Hr3 = X[6], X[7]

        # 공정 각종 파라미터
        _k1, _k2, _k3 = (1.5, 1.5, 1.5)
        solvent_feed_ratio = 1.5

        # Stream 정의
        acetic_acid_broth = bst.Stream('acetic_acid_broth', AceticAcid=1000, Water=9000, units='kg/hr')
        ethyl_acetate = bst.Stream('ethyl_acetate', EthylAcetate=1)
        glacial_acetic_acid = bst.Stream('glacial_acetic_acid')
        wastewater = bst.Stream('wastewater')

        solvent_recycle = bst.Stream('solvent_rich')
        water_rich = bst.Stream('water_rich')
        distillate = bst.Stream('raffinate_distillate')

        # System 및 단위조작
        with bst.System('AAsep') as sys:
            extractor = bst.MultiStageMixerSettlers(
                'extractor',
                ins=(acetic_acid_broth, ethyl_acetate, solvent_recycle),
                outs=('extract', 'raffinate'),
                top_chemical='EthylAcetate',
                feed_stages=(0, -1, -1),
                N_stages=_n1,
                use_cache=True,
            )

            @extractor.add_specification(run=True)
            def adjust_fresh_solvent_flow_rate():
                broth = acetic_acid_broth.F_mass
                EtAc_recycle = solvent_recycle.imass['EthylAcetate']
                EtAc_required = broth * solvent_feed_ratio
                if EtAc_required < EtAc_recycle:
                    solvent_recycle.F_mass *= EtAc_required / EtAc_recycle
                    EtAc_recycle = solvent_recycle.imass['EthylAcetate']
                EtAc_fresh = EtAc_required - EtAc_recycle
                ethyl_acetate.imass['EthylAcetate'] = max(0, EtAc_fresh)

            HX = bst.HXutility(
                'extract_heater',
                ins=extractor.extract,
                outs=('hot_extract'),
                rigorous=True,
                V=0,
            )

            ED = bst.ShortcutColumn(
                'extract_distiller',
                ins=HX-0,
                outs=['', 'acetic_acid'],
                LHK=('Water', 'AceticAcid'),
                Lr=_Lr1,
                Hr=_Hr1,
                k=_k1,
                partial_condenser=False,
            )

            ED2 = bst.ShortcutColumn(
                'acetic_acid_purification',
                ins=ED-1,
                outs=('', glacial_acetic_acid),
                LHK=('EthylAcetate', 'AceticAcid'),
                Lr=_Lr2,
                Hr=_Hr2,
                k=_k2,
                partial_condenser=False
            )
            ED.check_LHK = ED2.check_LHK = False

            mixer = bst.Mixer(ins=(ED-0, ED2-0, distillate))
            HX2 = bst.HXutility(ins=mixer-0, T=_T_hex)

            settler = bst.MixerSettler(
                'settler',
                ins=HX2-0,
                outs=(solvent_recycle, water_rich),
                top_chemical='EthylAcetate',
            )

            mixer2 = bst.Mixer(ins=[extractor.raffinate, water_rich])
            RD = bst.ShortcutColumn(
                'raffinate_distiller',
                LHK=('EthylAcetate', 'Water'),
                ins=mixer2-0,
                outs=[distillate, wastewater],
                partial_condenser=False,
                Lr=_Lr3,
                Hr=_Hr3,
                k=_k3,
            )
        sys.operating_hours = 330 * 24  # 연간 운전 시간 (hr/yr)

        self.sys = sys  # 생성한 공정을 클래스 변수로 저장

    def simulate(self):
        """
        공정 시뮬레이션 실행 (nEval 카운팅 포함)
        """
        self.nEval += 1
        self.sys.simulate()

    def capex(self):
        """
        장치 CAPEX (연간화) [MMUSD/yr]
        """
        capex = round(self.sys.installed_equipment_cost / 1e6, 4)
        try:
            _ = int(capex)
            return capex
        except:
            print(capex)
            print("capex error")
            return 10  # 오류 시 기본값 반환

    def opex(self):
        """
        재료비 + 유틸리티비 (연간화) [MMUSD/yr]
        """
        # material_cost와 utility_cost는 기본적으로 [$/hr] (BioSTEAM)
        opex = round((self.sys.material_cost + self.sys.utility_cost) / 1e6, 4)
        try:
            _ = int(opex)
            return opex
        except:
            print("opex error")
            return 10

    def cost(self):
        """
        총비용 (CAPEX + OPEX) [MMUSD/yr]
        """
        return self.capex() + self.opex()

    def revenue(self):
        """
        생산된 아세트산을 판매했을 때의 연간 매출 [MMUSD/yr]
        (순도 충족하지 못하면 0)
        """
        if self.acetic_acid_constraint() == 0:
            # 순도 충족 시에만 매출 발생
            stream = [s for s in self.sys.streams if s.ID == 'glacial_acetic_acid'][0]
            P_AceticAcid = 0.4  # $/kg
            F_AceticAcid = stream.F_mass  # kg/hr
            annual_mass = F_AceticAcid * self.sys.operating_hours
            return round(P_AceticAcid * annual_mass / 1e6, 4)
        else:
            print("순도 미충족으로 인한 매출 0")
            return 0

    def profit(self):
        """
        연간 수익 [MMUSD/yr]
        """
        return self.revenue() - self.cost()

    def MSP(self):
        """
        Minimum Selling Price: 총비용을 제품량(kg/yr)으로 나눈 $/kg
        (순도 미충족 시에는 매우 큰 값(np.inf)을 반환하여 페널티)
        """
        stream = [s for s in self.sys.streams if s.ID == 'glacial_acetic_acid'][0]
        # total_mass_annual = stream.F_mass * self.sys.operating_hours
        F_AceticAcid = stream.F_mass * self.sys.operating_hours / 1e6 # kg/yr
        if self.acetic_acid_constraint() != 0:
            # 순도 충족 실패 -> 무한대로 처리
            return np.inf
        else:
            msp = self.cost() / F_AceticAcid # UNits: $/kg
            return round(msp, 4)
        
            # # 순도 만족 -> 정상적인 MSP 계산
            # # cost()가 이미 [MMUSD/yr] (백만 달러/년)이라면,
            # # F_AceticAcid가 [kg/yr]이므로 MSP 단위는 [$/kg]가 되려면...
            # # cost가 백만 달러 -> 1e6 달러
            # # mass가 kg
            # # => MSP = ( cost * 1e6 ) / F_AceticAcid 
            # # 이 형태여야 합니다. (또는 cost()를 달러로 환산하지 않고, F_AceticAcid를 1e6으로 나눠 MSP를 계산해도 됨)
            
            # # 만약 cost()가 "이미 million USD"라면:
            # cost_musd = self.cost()  # [MMUSD/yr]
            # cost_usd = cost_musd * 1e6  # [$/yr]
            # msp = cost_usd / F_AceticAcid  # [$/kg]

            # return round(msp, 4)

    def wt_acetic_acid(self):
        """
        최종 아세트산 스트림의 (AceticAcid) 질량분율
        """
        stream = [s for s in self.sys.streams if s.ID == 'glacial_acetic_acid'][0]
        return stream.get_mass_fraction(IDs='AceticAcid')

    def acetic_acid_constraint(self):
        """
        최종 아세트산 순도(0.98 wt%) 미달 시 제약 위반을 반환 (0보다 큰 값)
        """
        x_desired = 0.98
        x_achieved = self.wt_acetic_acid()
        diff = x_desired - x_achieved
        return max(0, diff)

    def func(self, X=None):
        """
        최적화 알고리즘에서 호출되는 대표 목적함수.
        X가 주어지면 공정 파라미터를 설정하고 시뮬레이션을 돌린 후 MSP를 계산해 반환.
        """
        try:
            if X is None:
                # 값이 없으면 기본 모델 사용
                self.set_base_model()
            else:
                # 설정값 업데이트
                self._set(X)
            # 시뮬레이션
            self.simulate()
            # 최적화의 목표: MSP 최소화
            objective_function = self.MSP()

        except:
            # 시뮬레이션 실패 등 오류 시 페널티
            return np.inf

        if self.verbose:
            print(f"[eval {self.nEval}] X={X}, MSP={objective_function}")

        return objective_function

    def natural_units(self, X_scaled):
        """
        0~1 스케일로 들어온 X_scaled를 실제 물리범위로 변환
        """
        bounds = self._bounds()
        b = np.array(bounds)
        d_b = b[:, 1] - b[:, 0]
        X_natural = b[:, 0] + (X_scaled * d_b)
        return X_natural


In [5]:

###############################################################################
# 2. OptimizationManager: 다양한 최적화 기법(Differential Evolution, Bayesian Opt, BNN 등)을 관리
###############################################################################
class OptimizationManager:
    def __init__(self, process: AceticAcidProcess, verbose=False):
        """
        :param process: AceticAcidProcess 인스턴스
        """
        self.process = process  # 실제 시뮬레이션이 담긴 프로세스
        self.verbose = verbose
        self.history = []  # (iteration, MSP) 저장

    def optimize(self, method='DE', maxiter=50):
        """
        주어진 method에 따라 최적화를 수행.
        """
        if method == 'DE':
            return self._optimize_DE(maxiter=maxiter)
        elif method == 'BO':
            return self._optimize_BO(maxiter=maxiter)
        elif method == 'BNN':
            return self._optimize_BNN(maxiter=maxiter)
        else:
            raise ValueError(f"Unsupported optimization method: {method}")

    def _optimize_DE(self, maxiter=50):
        """
        Scipy Differential Evolution을 사용하여 최소화(DE)
        """
        bounds = self.process._bounds()
        # 라틴 하이퍼큐브 샘플링 초기 population
        sampler = qmc.LatinHypercube(d=len(bounds))
        sample = sampler.random(n=50)  # population size
        population = self.process.natural_units(sample)

        # DE용 콜백
        def callback(xk, convergence):
            # xk: 현재 best x
            # MSP 계산
            msp_value = self.process.func(xk)
            iteration = len(self.history) + 1
            self.history.append((iteration, msp_value))

            if self.verbose:
                print(f"[DE] Iteration {iteration}: x={xk}, MSP={msp_value}, convergence={convergence}")

        result = differential_evolution(
            func=self.process.func,
            bounds=bounds,
            integrality=self.process._integrality(),
            init=sample,
            maxiter=maxiter,
            callback=callback
        )
        return result

    def _optimize_BO(self, maxiter=50):
        """
        Bayesian Optimization (Gaussian Process) by scikit-opt
        """
        print("Running Bayesian Optimization (BO)")

        space = [
            Integer(0, 50, name="n_stages"),
            Real(0, 0.9999, name="Lr1"),
            Real(0, 0.9999, name="Hr1"),
            Real(0, 0.9999, name="Lr2"),
            Real(0, 0.9999, name="Hr2"),
            Real(273, 350, name="T_hex"),
            Real(0, 0.9999, name="Lr3"),
            Real(0, 0.9999, name="Hr3"),
        ]

        @use_named_args(space)
        def objective(**params):
            """
            skopt 방식의 objective 함수
            """
            x = [
                params["n_stages"],
                params["Lr1"], params["Hr1"],
                params["Lr2"], params["Hr2"],
                params["T_hex"],
                params["Lr3"], params["Hr3"],
            ]
            msp_value = self.process.func(x)
            iteration = len(self.history) + 1
            self.history.append((iteration, msp_value))
            if self.verbose:
                print(f"[BO] Iteration {iteration}: x={x}, MSP={msp_value}")
            return msp_value

        result = gp_minimize(
            func=objective,
            dimensions=space,
            n_calls=maxiter,
            random_state=42,
            n_initial_points=10,
            acq_func="EI",
        )
        return result

    def _optimize_BNN(self, maxiter=20):
        """
        Bayesian Neural Network 기반 Active Learning 최적화
        """
        print("Running Bayesian Neural Network Active Learning")

        bounds = np.array(self.process._bounds())
        input_dim = len(bounds)

        # 초기 샘플 (Latin Hypercube)
        initial_samples = 10
        sampler = qmc.LatinHypercube(d=input_dim)
        sample = sampler.random(n=initial_samples)

        X_train = self.process.natural_units(sample)
        y_train = np.array([self.process.func(x) for x in X_train]).reshape(-1, 1)

        # PyTorch Tensor
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

        # BNN 모델 생성
        model = BayesianNN(input_dim)
        optimizer = optim.Adam(model.parameters(), lr=0.01)
        criterion = nn.MSELoss()

        # 초기 학습
        for epoch in range(100):
            optimizer.zero_grad()
            outputs = model(X_train_tensor)
            loss = criterion(outputs, y_train_tensor)
            loss.backward()
            optimizer.step()

        # Active Learning 루프
        n_new_samples_per_iter = 5
        for i in range(maxiter):
            # 4.1 새로운 후보 샘플 생성
            new_samples = sampler.random(n=n_new_samples_per_iter)
            X_candidates = self.process.natural_units(new_samples)

            # 4.2 MC Dropout로 불확실성 추정
            model.train()  # Dropout 활성화
            X_candidates_tensor = torch.tensor(X_candidates, dtype=torch.float32)
            predictions = torch.stack([model(X_candidates_tensor) for _ in range(50)])  # 50번 샘플링
            uncertainties = predictions.std(dim=0).detach().numpy().flatten()

            # NaN 체크
            if np.isnan(uncertainties).any():
                print("Uncertainty contains NaN values. Skipping iteration.")
                continue

            # 4.3 최대 불확실성 점 하나를 선택
            idx_max_uncertainty = np.argmax(uncertainties)
            X_new = X_candidates[idx_max_uncertainty]

            # 4.4 새 샘플 함수값 평가
            y_new = self.process.func(X_new)

            # 학습데이터 추가
            X_train = np.vstack([X_train, X_new])
            y_train = np.vstack([y_train, [y_new]])

            # PyTorch Tensor 갱신
            X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
            y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

            # 4.5 모델 재학습
            for epoch in range(50):
                optimizer.zero_grad()
                outputs = model(X_train_tensor)
                loss = criterion(outputs, y_train_tensor)
                loss.backward()
                optimizer.step()

            # 이력 기록
            iteration = len(self.history) + 1
            self.history.append((iteration, y_new))

            if self.verbose:
                print(f"[BNN] Iteration {i+1}: X={X_new}, MSP={y_new:.4f}, Unc={uncertainties[idx_max_uncertainty]:.4f}")

        return X_train, y_train


In [6]:
###############################################################################
# 3. ResultsPlotter: 최적화 과정의 결과(history)를 시각화
###############################################################################
class ResultsPlotter:
    def __init__(self):
        pass

    def plot_results(self, history):
        """
        최적화 실행 횟수별 MSP 값 그래프 출력
        :param history: [(iteration, msp), ...] 형태의 리스트
        """
        if not history:
            print("No optimization history found. Run optimize() first.")
            return

        history_array = np.array(history)
        iterations = history_array[:, 0]
        msp_values = history_array[:, 1]

        plt.figure(figsize=(8, 5))
        plt.plot(iterations, msp_values, marker='o', linestyle='-', color='b', label="MSP ($/kg)")
        plt.xlabel("Iteration (nEval)")
        plt.ylabel("MSP ($/kg)")
        plt.title("Optimization Progress (MSP over Iterations)")
        plt.grid(True)
        plt.legend()
        plt.show()

In [None]:

if __name__ == "__main__":
    # 1) 공정 모델 생성
    process = AceticAcidProcess(verbose=False)

    # 2) 최적화 매니저 생성
    manager = OptimizationManager(process=process, verbose=True)

    # 3) 최적화 수행 (예: Differential Evolution)
    result_de = manager.optimize(method='DE', maxiter=10)
    print("DE optimization result:", result_de)

    # 4) 결과 시각화
    plotter = ResultsPlotter()
    plotter.plot_results(manager.history)
