In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
import sys
import os

# ✅ Airflow에서 preprocess_news.py를 실행할 수 있도록 경로 추가
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from preprocess_news import preprocess_news  # ✅ 수정된 파일명 반영!

# ✅ DAG 기본 설정
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2024, 2, 16),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# ✅ DAG 정의
with DAG("preprocess_news", default_args=default_args, schedule_interval="*/5 * * * *", catchup=False) as preprocess_dag:

    # ✅ 크롤링 DAG 완료 후 실행되도록 설정
    wait_for_crawler = ExternalTaskSensor(
        task_id="wait_for_crawler",
        external_dag_id="crypto_news_crawler",
        external_task_id="run_crypto_crawler",
        mode="poke",
        timeout=600,  # 10분간 감지
        poke_interval=15  # 15초마다 감지
    )

    # ✅ 전처리 실행
    preprocess_task = PythonOperator(
        task_id="preprocess_task",
        python_callable=preprocess_news
    )

    # ✅ 실행 순서: 크롤링 완료 대기 → 전처리 실행
    wait_for_crawler >> preprocess_task
