# Apache Airflow Tutorial for Google Colab

This notebook will help you learn Apache Airflow concepts and create your first DAG (Directed Acyclic Graph).

**Note**: While Airflow webserver won't work in Colab, you can learn DAG structure and test task logic.

## Step 1: Install Apache Airflow

First, let's install Airflow in this Colab environment:

In [None]:
# Install Apache Airflow
!pip install 'apache-airflow==3.0.3' \
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.3/constraints-3.12.txt"

# Install additional packages we'll need
!pip install pandas matplotlib

## Step 2: Set Up Environment

In [None]:
import os
from datetime import datetime, timedelta
import pandas as pd

# Set Airflow home directory
os.environ['AIRFLOW_HOME'] = '/content/airflow'

# Create directory structure
!mkdir -p /content/airflow/dags
!mkdir -p /content/airflow/logs
!mkdir -p /content/airflow/plugins

print("Airflow environment set up successfully!")

## Step 3: Initialize Airflow Database

In [None]:
# Initialize the Airflow database
!airflow db init

print("Airflow database initialized!")

## Step 4: Your First DAG

Let's create a simple DAG that demonstrates common Airflow concepts:

In [None]:
# Create a simple DAG file
dag_code = '''
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Default arguments for the DAG
default_args = {
    'owner': 'student',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'tutorial_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    catchup=False,
    tags=['tutorial'],
)

# Task 1: Start task
start_task = DummyOperator(
    task_id='start',
    dag=dag,
)

# Task 2: Python function
def hello_world():
    print("Hello from Airflow!")
    return "Task completed successfully"

python_task = PythonOperator(
    task_id='hello_world_python',
    python_callable=hello_world,
    dag=dag,
)

# Task 3: Bash command
bash_task = BashOperator(
    task_id='hello_world_bash',
    bash_command='echo "Hello from Bash!"',
    dag=dag,
)

# Task 4: Data processing example
def process_data():
    import pandas as pd
    
    # Create sample data
    data = {
        'name': ['Alice', 'Bob', 'Charlie'],
        'score': [95, 87, 92]
    }
    df = pd.DataFrame(data)
    
    # Process data
    avg_score = df['score'].mean()
    print(f"Average score: {avg_score}")
    
    return avg_score

data_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag,
)

# Task 5: End task
end_task = DummyOperator(
    task_id='end',
    dag=dag,
)

# Define task dependencies
start_task >> [python_task, bash_task] >> data_task >> end_task
'''

# Write the DAG to a file
with open('/content/airflow/dags/tutorial_dag.py', 'w') as f:
    f.write(dag_code)

print("DAG file created successfully!")
print("File location: /content/airflow/dags/tutorial_dag.py")

## Step 5: Test Your DAG

Let's test if our DAG is valid and list it:

In [None]:
# List all DAGs
!airflow dags list

print("\n" + "="*50)
print("DAG Details:")
print("="*50)

# Show DAG details
!airflow dags show tutorial_dag

## Step 6: Test Individual Tasks

You can test individual tasks without running the full DAG:

In [None]:
# Test the Python task
!airflow tasks test tutorial_dag hello_world_python 2025-01-01

print("\n" + "="*50)

# Test the data processing task
!airflow tasks test tutorial_dag process_data 2025-01-01

## Step 7: Understanding DAG Structure

Let's visualize and understand the DAG structure:

In [None]:
# List all tasks in the DAG
!airflow tasks list tutorial_dag

print("\n" + "="*50)
print("Task Dependencies:")
print("="*50)

# Show task dependencies
!airflow tasks show tutorial_dag

print("\nDAG Flow:")
print("start → [hello_world_python, hello_world_bash] → process_data → end")

## Step 8: Advanced DAG Example

Let's create a more advanced DAG with conditional logic:

In [None]:
advanced_dag_code = '''
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator

default_args = {
    'owner': 'student',
    'start_date': datetime(2025, 1, 1),
    'retries': 1,
}

dag = DAG(
    'advanced_tutorial_dag',
    default_args=default_args,
    description='Advanced DAG with branching',
    schedule_interval='@daily',
    catchup=False,
)

def check_data_quality():
    import random
    quality_score = random.randint(1, 100)
    print(f"Data quality score: {quality_score}")
    
    if quality_score > 80:
        return 'process_high_quality_data'
    else:
        return 'clean_data'

def process_high_quality_data():
    print("Processing high quality data...")
    return "High quality processing completed"

def clean_data():
    print("Cleaning data before processing...")
    return "Data cleaned successfully"

def final_processing():
    print("Final data processing...")
    return "All processing completed"

# Tasks
start = DummyOperator(task_id='start', dag=dag)

quality_check = BranchPythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    dag=dag,
)

process_high_quality = PythonOperator(
    task_id='process_high_quality_data',
    python_callable=process_high_quality_data,
    dag=dag,
)

clean_data_task = PythonOperator(
    task_id='clean_data',
    python_callable=clean_data,
    dag=dag,
)

final_task = PythonOperator(
    task_id='final_processing',
    python_callable=final_processing,
    dag=dag,
    trigger_rule='none_failed_or_skipped',
)

# Dependencies
start >> quality_check >> [process_high_quality, clean_data_task] >> final_task
'''

# Write the advanced DAG
with open('/content/airflow/dags/advanced_tutorial_dag.py', 'w') as f:
    f.write(advanced_dag_code)

print("Advanced DAG created successfully!")

## Step 9: Key Airflow Concepts Summary

**DAG (Directed Acyclic Graph)**: A collection of tasks with dependencies

**Operators**: Define what actually gets done
- `PythonOperator`: Runs Python functions
- `BashOperator`: Runs bash commands
- `DummyOperator`: Does nothing (useful for structure)
- `BranchPythonOperator`: Conditional task execution

**Task Dependencies**: Define execution order using `>>` or `<<`

**Schedule Interval**: When the DAG runs
- `@daily`, `@hourly`, `@weekly`
- Cron expressions: `'0 2 * * *'` (daily at 2 AM)
- Timedelta: `timedelta(hours=1)`

## Step 10: Next Steps

To continue learning Airflow:

1. **Try GitHub Codespaces** for a full Airflow experience with webserver
2. **Explore more operators**: SqlOperator, EmailOperator, etc.
3. **Learn about XComs** for task communication
4. **Study Airflow Variables and Connections**
5. **Practice with real data workflows**

**For a complete Airflow environment, use GitHub Codespaces or a Linux environment!**