In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# from script import DomesticIndexFetcher, DomesticIndexPreprocessor, FLAMLPredictor, StockTrader

# 파일 및 설정
model_file = "domesticIndex"
data_file = "ds_domesticIndex_flaml.pkl"
file_path = 'domesticIndex.xlsx'
target_idx_code = '0001'

# DAG 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'domestic_index_trading',
    default_args=default_args,
    description='DAG for fetching, preprocessing, training, and trading domestic index data',
    #schedule_interval='@daily',  # Adjust as per requirement
    schedule_interval='0 8 * * *',
    start_date=datetime(2024, 12, 1),
    catchup=False
)

# Python 작업 함수 정의
def fetch_data():
    fetcher = DomesticIndexFetcher(file_path)
    index_codes = ["0001", "0002", "0003", "0004", "0005"]
    start_date = datetime.today().strftime("%Y%m%d")
    num_calls = 10
    fetcher.update_data(index_codes, start_date, num_calls)
    fetcher.save_data()

def preprocess_data():
    preprocessor = DomesticIndexPreprocessor(file_path, target_idx_code)
    preprocessor.process_all_data()
    preprocessor.save_results()

def train_and_predict():
    predictor = FLAMLPredictor(model_file, data_file, target_idx_code='0001')
    predictor.train_model()
    predictor.predict_last()
    predictor.save_model_and_results()

def execute_trade():
    trader = StockTrader()
    trader.execute_trade_based_on_last_value()

# 각 태스크 정의
fetch_data_task = PythonOperator(
    task_id='fetch_data',
    python_callable=fetch_data,
    dag=dag,
)

preprocess_data_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_and_predict_task = PythonOperator(
    task_id='train_and_predict',
    python_callable=train_and_predict,
    dag=dag,
)

execute_trade_task = PythonOperator(
    task_id='execute_trade',
    python_callable=execute_trade,
    dag=dag,
)

# 태스크 순서 정의
fetch_data_task >> preprocess_data_task >> train_and_predict_task >> execute_trade_task