In [2]:
!pip install apache-airflow==3.1.7

Collecting apache-airflow==3.1.7
  Downloading apache_airflow-3.1.7-py3-none-any.whl.metadata (36 kB)
Collecting apache-airflow-core==3.1.7 (from apache-airflow==3.1.7)
  Downloading apache_airflow_core-3.1.7-py3-none-any.whl.metadata (6.4 kB)
Collecting apache-airflow-task-sdk==1.1.7 (from apache-airflow==3.1.7)
  Downloading apache_airflow_task_sdk-1.1.7-py3-none-any.whl.metadata (3.9 kB)
Collecting a2wsgi>=1.10.8 (from apache-airflow-core==3.1.7->apache-airflow==3.1.7)
  Downloading a2wsgi-1.10.10-py3-none-any.whl.metadata (4.0 kB)
Collecting aiosqlite<0.22.0,>=0.20.0 (from apache-airflow-core==3.1.7->apache-airflow==3.1.7)
  Downloading aiosqlite-0.21.0-py3-none-any.whl.metadata (4.3 kB)
Collecting apache-airflow-providers-common-compat>=1.7.4 (from apache-airflow-core==3.1.7->apache-airflow==3.1.7)
  Downloading apache_airflow_providers_common_compat-1.13.1-py3-none-any.whl.metadata (5.6 kB)
Collecting apache-airflow-providers-common-io>=1.6.3 (from apache-airflow-core==3.1.7->apa

In [3]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import subprocess

default_args = {
    "owner": "airflow",
    "retries": 1,
}

def preprocess():
    subprocess.run(["python", "scripts/preprocess.py"], check=True)

def train():
    subprocess.run(["python", "scripts/train_model.py"], check=True)

def evaluate():
    subprocess.run(["python", "scripts/evaluate.py"], check=True)

with DAG(
    dag_id="job_classification_pipeline",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args=default_args,
    description="Automated ML retraining pipeline for job classification",
    tags=["ml", "airflow", "training"]
) as dag:

    preprocess_task = PythonOperator(
        task_id="preprocess_data",
        python_callable=preprocess,
    )

    train_task = PythonOperator(
        task_id="train_model",
        python_callable=train,
    )

    evaluate_task = PythonOperator(
        task_id="evaluate_model",
        python_callable=evaluate,
    )

    preprocess_task >> train_task >> evaluate_task