In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import pickle
import pandas as pd
import numpy as np
from sklearn.datasets import load_diabetes
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='dia',
    default_args=default_args,
    schedule='@daily',
    catchup=False
)

DATA_PATH = '/tmp/airflow_diabetes'
os.makedirs(DATA_PATH, exist_ok=True)

def feature_engineering(**kwargs):
    data = load_diabetes()
    X = pd.DataFrame(data.data, columns=data.feature_names)
    y = data.target

    rf = RandomForestRegressor()
    rf.fit(X, y)
    importances = rf.feature_importances_
    top_indices = np.argsort(importances)[-4:]

    X_selected = X.iloc[:, top_indices]
    df_selected = pd.concat([X_selected, pd.Series(y, name='target')], axis=1)
    df_selected.to_csv(os.path.join(DATA_PATH, 'selected_features.csv'), index=False)

def train_models(**kwargs):
    df = pd.read_csv(os.path.join(DATA_PATH, 'selected_features.csv'))
    X = df.drop('target', axis=1)
    y = df['target']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    rf = RandomForestRegressor()
    gb = GradientBoostingRegressor()

    rf.fit(X_train, y_train)
    gb.fit(X_train, y_train)

    pickle.dump((rf, X_test, y_test), open(os.path.join(DATA_PATH, 'rf_model.pkl'), 'wb'))
    pickle.dump((gb, X_test, y_test), open(os.path.join(DATA_PATH, 'gb_model.pkl'), 'wb'))

def select_best_model(**kwargs):
    rf, X_test, y_test = pickle.load(open(os.path.join(DATA_PATH, 'rf_model.pkl'), 'rb'))
    gb, _, _ = pickle.load(open(os.path.join(DATA_PATH, 'gb_model.pkl'), 'rb'))

    rf_rmse = mean_squared_error(y_test, rf.predict(X_test), squared=False)
    gb_rmse = mean_squared_error(y_test, gb.predict(X_test), squared=False)

    best_model = rf if rf_rmse < gb_rmse else gb
    with open(os.path.join(DATA_PATH, 'best_model.pkl'), 'wb') as f:
        pickle.dump(best_model, f)

def run_inference(**kwargs):
    df = pd.read_csv(os.path.join(DATA_PATH, 'selected_features.csv')).drop('target', axis=1)
    model = pickle.load(open(os.path.join(DATA_PATH, 'best_model.pkl'), 'rb'))
    preds = model.predict(df)
    pd.DataFrame(preds, columns=['prediction']).to_csv(os.path.join(DATA_PATH, 'inference.csv'), index=False)

feature_task = PythonOperator(
    task_id='feature_engineering',
    python_callable=feature_engineering,
    dag=dag
)

train_task = PythonOperator(
    task_id='train_models',
    python_callable=train_models,
    dag=dag
)

select_task = PythonOperator(
    task_id='select_best_model',
    python_callable=select_best_model,
    dag=dag
)

inference_task = PythonOperator(
    task_id='run_inference',
    python_callable=run_inference,
    dag=dag
)

feature_task >> train_task >> select_task >> inference_task
