In [43]:
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Union, Tuple, Any
from dataclasses import dataclass
from Functions.WaterVapourPressure import WaterVapourPressure


In [49]:
class CombiTimeTable:
    """
    Modelica.Blocks.Sources.CombiTimeTable의 Python 구현
    시간에 따른 데이터 테이블을 관리하고 보간을 수행하는 클래스
    """
    def __init__(self, tableOnFile: bool = True, tableName: str = "tab", 
                 columns: Optional[List[int]] = None, fileName: Optional[str] = None):
        """
        Args:
            tableOnFile (bool): 파일에서 데이터를 로드할지 여부
            tableName (str): 테이블 이름
            columns (List[int], optional): 사용할 열 인덱스 목록
            fileName (str, optional): 데이터 파일 경로
        """
        self.tableOnFile = tableOnFile
        self.tableName = tableName
        self.columns = columns or list(range(1, 10))  # 기본값: 1-10 열
        self.fileName = fileName
        self.data = None
        self.load_data()
        
    def load_data(self) -> None:
        """데이터 파일에서 테이블 데이터를 로드"""
        if self.tableOnFile and self.fileName:
            try:
                # 데이터 파일 로드 (탭으로 구분된 텍스트 파일)
                self.data = pd.read_csv(self.fileName, 
                                      delimiter="\t", 
                                      skiprows=2,  # 헤더 2줄 건너뛰기
                                      names=['time'] + [f'col_{i}' for i in self.columns])
            except Exception as e:
                raise RuntimeError(f"데이터 파일 로드 실패: {self.fileName}") from e
        else:
            self.data = pd.DataFrame(columns=['time'] + [f'col_{i}' for i in self.columns])
            
    def get_value(self, time: float, interpolate: bool = False) -> Union[float, List[float]]:
        """
        주어진 시간에 대한 데이터 값을 반환
        
        Args:
            time (float): 조회할 시간
            interpolate (bool): 선형 보간 사용 여부
            
        Returns:
            Union[float, List[float]]: 단일 열인 경우 float, 여러 열인 경우 List[float]
        """
        if self.data is None or len(self.data) == 0:
            raise RuntimeError("데이터가 로드되지 않았습니다")
            
        if time < self.data['time'].min() or time > self.data['time'].max():
            raise ValueError(f"시간 {time}이(가) 데이터 범위를 벗어났습니다")
            
        if interpolate:
            # 선형 보간 수행
            result = []
            for col in [f'col_{i}' for i in self.columns]:
                value = np.interp(time, self.data['time'], self.data[col])
                # nan 값 처리
                if np.isnan(value):
                    value = 0.0
                result.append(value)
        else:
            # 가장 가까운 시간의 값을 반환
            idx = (self.data['time'] - time).abs().idxmin()
            result = []
            for i in self.columns:
                value = self.data.loc[idx, f'col_{i}']
                # nan 값 처리
                if np.isnan(value):
                    value = 0.0
                result.append(value)
            
        return result[0] if len(result) == 1 else result
    
    def _set_environmental_conditions(self, weather: List[float]) -> None:
        """
        외부 환경 조건을 설정합니다.
        
        Args:
            weather (List[float]): TMY_and_control 데이터 [0온도, 1습도, 2압력, 3일사량, 4풍속, 5하늘온도, 6온도설정값, 7CO2설정값, 8조명, 9추가값]
        """
        
        # 외부 온도 (Modelica: TMY_and_control.y[2])
        self.Tout = weather[0]  # Celsius (col_1이 온도)
             
        # 하늘 온도 (Modelica: TMY_and_control.y[7])
        self.Tsky = weather[5]  # Celsius (col_6이 하늘온도)
        
        # 풍속 [m/s] (Modelica: TMY_and_control.y[6])
        self.u_wind = weather[4]  # col_5가 풍속
        
        # 일사량 [W/m²] (Modelica: TMY_and_control.y[5])
        self.I_glob = weather[3]  # col_4가 일사량
        
        # 외부 수증기압 [Pa] (Modelica: TMY_and_control.y[2], TMY_and_control.y[3])
        self.VPout = WaterVapourPressure().calculate(weather[0], weather[1])  # 온도, 습도
        
        # 조명 ON/OFF 신호 (Modelica: TMY_and_control.y[10])
        self.OnOff = weather[8]  # col_9가 조명
        
        # 태양광 모델 업데이트
        self.solar_model.I_glob = self.I_glob
        
        # 외부 CO2 농도 [mg/m³]
        self.CO2out_ppm_to_mgm3 = 430 * 1.94  # 430 ppm을 mg/m³로 변환


In [50]:
WEATHER_DATA_PATH = "./10Dec-22Nov.txt"
SETPOINT_DATA_PATH = "./SP_10Dec-22Nov.txt"
SCREEN_USABLE_PATH = "./SC_usable_10Dec-22Nov.txt"
TMY_and_control = CombiTimeTable(
            fileName=WEATHER_DATA_PATH,
            columns=list(range(1, 10))  # 1번째부터 10번째 열까지
        )
SC_usable = CombiTimeTable(
            fileName=SCREEN_USABLE_PATH,
            columns=[1]  # 1번째, 2번째 열
        )
SP_new = CombiTimeTable(
            fileName=SETPOINT_DATA_PATH,
            columns=[1, 2]  # 1번째, 2번째, 3번째 열
        )

In [51]:
print(TMY_and_control.data)

          time  col_1  col_2   col_3  col_4  col_5     col_6  col_7  col_8  \
0            0    4.9     95  100700      0    3.0 -5.031436     16    800   
1         3600    5.0     96  100700      0    3.5 -5.031436     16    800   
2         7200    5.1     97  100600      0    4.0 -5.490152     16    800   
3        10800    5.0     97  100700      0    2.0 -5.031436     16    800   
4        14400    5.0     99  100700      0    1.0 -2.327377     16    800   
...        ...    ...    ...     ...    ...    ...       ...    ...    ...   
8275  29790000    4.5     90  100800      0    4.6 -3.219721     16    800   
8276  29793600    4.5     90  100900      0    3.1 -3.219721     16    800   
8277  29797200    4.4     90  100900      0    2.6 -3.444195     16    800   
8278  29800800    4.3     92  100900      0    2.6 -3.444195     16    800   
8279  29804400    4.3     93  100900      0    2.6 -4.802958     16    800   

      col_9  
0         0  
1         0  
2         0  
3      

In [56]:
time_idx = 1
dt = 3600
weather_data = TMY_and_control.get_value(time_idx * dt, interpolate=True)

In [57]:
weather_data

[np.float64(5.0),
 np.float64(96.0),
 np.float64(100700.0),
 np.float64(0.0),
 np.float64(3.5),
 np.float64(-5.03143573),
 np.float64(16.0),
 np.float64(800.0),
 np.float64(0.0)]