In [4]:
import os
from pprint import pprint

from airflow_client.config import ApiClient, Configuration
from airflow_client.client.api import dag_api, auth_api
from airflow_client.client.model.login_form import LoginForm

ModuleNotFoundError: No module named 'airflow_client'

In [5]:
# --- Configuration ---
AIRFLOW_API_BASE_URL = os.getenv("AIRFLOW_API_BASE_URL", "http://localhost:8080/api/v2")
AIRFLOW_USERNAME = os.getenv("AIRFLOW_USERNAME", "admin")
AIRFLOW_PASSWORD = os.getenv("AIRFLOW_PASSWORD", "gWD2q2pFt9k7fZUF")

In [4]:
# --- Authentication (JWT Token) ---
def get_airflow_jwt_token(host, username, password):
    # This assumes the /auth/token endpoint is at the base URL, not /api/v2
    # Adjust if your setup places it differently.
    config = Configuration(host=host)
    with ApiClient(config) as api_client:
        auth_instance = auth_api.AuthApi(api_client)
        login_form = LoginForm(username=username, password=password)
        try:
            auth_response = auth_instance.login(login_form=login_form)
            return auth_response.access_token
        except Exception as e:
            print(f"Error obtaining JWT token: {e}")
            raise

In [5]:
try:
    # Get the token from the base URL (without /api/v2)
    jwt_token = get_airflow_jwt_token(AIRFLOW_API_BASE_URL.replace("/api/v2", ""), AIRFLOW_USERNAME, AIRFLOW_PASSWORD)
    configuration = Configuration(
        host=AIRFLOW_API_BASE_URL,
        access_token=jwt_token
    )
except Exception as e:
    print(f"Could not get JWT token. Ensure Airflow is running and credentials are correct. Error: {e}")


Could not get JWT token. Ensure Airflow is running and credentials are correct. Error: name 'Configuration' is not defined


In [None]:

# --- Airflow API Interactions ---
with ApiClient(configuration) as api_client:
    # Create an instance of the DAGApi
    dags_api_instance = dag_api.DAGApi(api_client)

    # --- 1. Get all DAGs ---
    print("\n--- Getting all active DAGs ---")
    try:
        # You can use parameters like 'only_active=True' to filter results
        # 'limit' and 'offset' for pagination if you have many DAGs
        list_dags_response = dags_api_instance.get_dags(only_active=True)
        
        if list_dags_response and list_dags_response.dags:
            print(f"Found {len(list_dags_response.dags)} active DAGs:")
            for dag in list_dags_response.dags:
                print(f"  - DAG ID: {dag.dag_id}, Is Paused: {dag.is_paused}, File Location: {dag.fileloc}")
                # For a more complete view of what's available:
                # pprint(dag.to_dict()) # Uncomment to see full dictionary for each DAG
        else:
            print("No active DAGs found.")

    except Exception as e:
        print(f"Error getting all DAGs: {e}")

    # --- 2. Get details for a given DAG ---
    # Replace with a DAG_ID that exists in your Airflow instance
    target_dag_id = "example_bash_operator" # Or one of the IDs from the list above

    print(f"\n--- Getting details for DAG: '{target_dag_id}' ---")
    try:
        dag_details = dags_api_instance.get_dag(dag_id=target_dag_id)
        
        print(f"Details for DAG ID: {dag_details.dag_id}")
        print(f"  Description: {dag_details.description}")
        print(f"  File Location: {dag_details.fileloc}")
        print(f"  Is Paused: {dag_details.is_paused}")
        print(f"  Schedule Interval: {dag_details.schedule_interval.to_dict() if dag_details.schedule_interval else 'None'}")
        
        # Accessing parameters (new in Airflow 3.x - if defined in the DAG)
        if dag_details.params:
            print("  Defined Parameters (params):")
            for param_key, param_value in dag_details.params.items():
                print(f"    - {param_key}:")
                # 'param_value' will be an instance of 'Param' model
                print(f"      Type: {param_value.type}, Default: {param_value.default}, Description: {param_value.description}")
        else:
            print("  No parameters (params) defined for this DAG.")

        print("\nFull DAG details (as dictionary):")
        pprint(dag_details.to_dict())

    except Exception as e:
        print(f"Error getting details for DAG '{target_dag_id}': {e}")
        print("Please ensure the DAG ID is correct and it exists in your Airflow instance.")