---

## Summary: Kestra Workflow Orchestration

### Quick Reference

| Concept | Purpose | Syntax |
|---------|---------|--------|
| **Flow** | Complete workflow definition | `.yaml` file with `id`, `namespace` |
| **Namespace** | Organization/grouping of flows | `namespace: company.project` |
| **Inputs** | User-provided parameters | `inputs: - id: name, type: STRING` |
| **Variables** | Reusable values in workflow | `{{ vars.variable_name }}` |
| **Kestra Values (KV)** | Secure configuration storage | `{{ kv.config_name }}` |
| **Tasks** | Execution units (ETL jobs) | `tasks: - id: task_id, type: plugin` |
| **Plugins** | Pre-built functionality | `io.kestra.plugin.gcp.bigquery.Query` |
| **Concurrency** | Execution limits | `concurrency: limit: 5, behavior: FAIL` |
| **Triggers** | Schedule automation | `type: io.kestra.plugin.core.trigger.Schedule` |
| **Cron Expression** | Scheduling syntax | `0 0 * * *` (daily at midnight) |

### Workflow Development Process

1. **Design** → Plan your ETL/ELT logic
2. **Create YAML** → Define flow with namespace and ID
3. **Add Inputs** → Accept user parameters if needed
4. **Set Variables** → Define reusable values
5. **Configure Tasks** → Build ETL steps with plugins
6. **Add Triggers** → Schedule execution with cron
7. **Set KV Values** → Store sensitive config in Kestra
8. **Test** → Run manually first, then schedule
9. **Monitor** → Check logs and execution history

### Best Practices

- ✅ Use meaningful IDs and namespaces
- ✅ Store sensitive data in Kestra Values, not YAML
- ✅ Set reasonable concurrency limits
- ✅ Make tasks idempotent (safe to retry)
- ✅ Use variables to reduce duplication
- ✅ Add meaningful descriptions to flows
- ✅ Start simple, then add complexity
- ✅ Monitor first-time scheduled executions
- ✅ Keep namespace hierarchy logical
- ✅ Document custom task configurations

---

**Module 2 Complete!** You now understand how to build, schedule, and orchestrate data pipelines using Kestra with seamless integration to your GCP infrastructure.

---

## 10. Integrating GCP Resources with Kestra

### Architecture Overview

Your Kestra workflows can seamlessly integrate with **Google Cloud Platform (GCP)** infrastructure that you created using Terraform in Module 1. This creates a complete data pipeline:

```
Terraform (Module 1)
        ↓
    Create GCP Resources
    - Compute Engine instances
    - Cloud Storage buckets
    - BigQuery datasets
    - Service accounts & permissions
        ↓
    Kestra (Module 2)
        ↓
    Orchestrate workflows using GCP resources
```

### GCP Resources Created by Terraform

From Module 1, you should have created:

1. **Cloud Storage Bucket**: For storing raw and processed data
2. **BigQuery Dataset**: For data warehousing and analytics
3. **Compute Engine Instance**: For running compute-intensive jobs
4. **Service Account**: For authentication and authorization

### Kestra Values for GCP Configuration

Store GCP configuration as Kestra Values (KV) instead of hardcoding them:

```yaml
# GCP Configuration to be stored in Kestra Values:
kv.gcp_project_id      # Your GCP project ID
kv.gcp_location        # Region (e.g., "us-central1", "europe-west1")
kv.gcp_bucket_name     # Cloud Storage bucket name
kv.gcp_dataset         # BigQuery dataset name
```

### Step 1: Create Kestra Values in Kestra UI

1. Go to **Admin → Kestra Values** (or similar menu option)
2. Create new values:
   - **Name**: `gcp_project_id` | **Value**: `your-project-id`
   - **Name**: `gcp_location` | **Value**: `us-central1`
   - **Name**: `gcp_bucket_name` | **Value**: `my-data-bucket`
   - **Name**: `gcp_dataset` | **Value**: `analytics_warehouse`

### Step 2: Reference in Workflows

```yaml
id: gcp_integrated_pipeline
namespace: data_engineering.gcp

description: "ETL pipeline using Terraform-created GCP resources"

tasks:
  - id: upload_to_storage
    type: io.kestra.plugin.gcp.storage.Upload
    projectId: "{{ kv.gcp_project_id }}"
    bucket: "{{ kv.gcp_bucket_name }}"
    from: "/local/data/taxi_trips.csv"
    to: "raw/taxi_trips/{{ vars.execution_date }}/data.csv"
  
  - id: load_to_bigquery
    type: io.kestra.plugin.gcp.bigquery.LoadFromGCS
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    table: "taxi_trips_raw"
    from: "gs://{{ kv.gcp_bucket_name }}/raw/taxi_trips/*/data.csv"
    format: "CSV"
    csvOptions:
      skipLeadingRows: 1
  
  - id: transform_in_bigquery
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    sql: |
      CREATE OR REPLACE TABLE `{{ kv.gcp_project_id }}.{{ kv.gcp_dataset }}.taxi_trips_clean` AS
      SELECT
        CAST(pickup_time AS TIMESTAMP) as pickup_time,
        CAST(fare_amount AS FLOAT64) as fare_amount,
        CAST(distance AS FLOAT64) as distance,
        passenger_count,
        CURRENT_TIMESTAMP() as processed_at
      FROM `{{ kv.gcp_project_id }}.{{ kv.gcp_dataset }}.taxi_trips_raw`
      WHERE fare_amount > 0 AND distance > 0
```

### Complete GCP Integration Example

```yaml
id: complete_gcp_etl
namespace: data_engineering.gcp

description: "Complete ETL using Terraform-provisioned GCP resources"

variables:
  execution_date: "{{ now | dateFormat('yyyy-MM-dd') }}"
  raw_folder: "raw/taxi"
  processed_folder: "processed/taxi"

triggers:
  - id: daily_run
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 3 * * *"  # 3 AM daily

tasks:
  # Step 1: Extract from source
  - id: extract_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import requests
      import csv
      
      # Download data from API
      response = requests.get("https://api.example.com/taxi?date={{ vars.execution_date }}")
      
      # Save to local file
      with open('/tmp/taxi_data.csv', 'w') as f:
          writer = csv.writer(f)
          writer.writerows(response.json())
      
      print(f"Extracted data for {{{ vars.execution_date }}}")
  
  # Step 2: Upload to Cloud Storage
  - id: upload_raw_data
    type: io.kestra.plugin.gcp.storage.Upload
    projectId: "{{ kv.gcp_project_id }}"
    bucket: "{{ kv.gcp_bucket_name }}"
    from: "/tmp/taxi_data.csv"
    to: "{{ vars.raw_folder }}/{{ vars.execution_date }}/data.csv"
    dependsOn:
      - extract_data
  
  # Step 3: Load into BigQuery
  - id: load_to_warehouse
    type: io.kestra.plugin.gcp.bigquery.LoadFromGCS
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    table: "taxi_trips_staging"
    from: "gs://{{ kv.gcp_bucket_name }}/{{ vars.raw_folder }}/{{ vars.execution_date }}/data.csv"
    format: "CSV"
    csvOptions:
      skipLeadingRows: 1
    dependsOn:
      - upload_raw_data
  
  # Step 4: Transform and clean
  - id: transform_data
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    sql: |
      INSERT INTO `{{ kv.gcp_project_id }}.{{ kv.gcp_dataset }}.taxi_trips_clean`
      SELECT
        pickup_time,
        dropoff_time,
        SAFE.FLOAT64(fare_amount) as fare_amount,
        SAFE.FLOAT64(total_amount) as total_amount,
        passenger_count,
        trip_distance,
        CURRENT_TIMESTAMP() as processed_at,
        '{{ vars.execution_date }}' as batch_date
      FROM `{{ kv.gcp_project_id }}.{{ kv.gcp_dataset }}.taxi_trips_staging`
      WHERE fare_amount > 0
        AND trip_distance > 0
        AND passenger_count > 0
    dependsOn:
      - load_to_warehouse
  
  # Step 5: Archive processed data
  - id: archive_processed
    type: io.kestra.plugin.gcp.storage.Copy
    projectId: "{{ kv.gcp_project_id }}"
    from: "gs://{{ kv.gcp_bucket_name }}/{{ vars.raw_folder }}/{{ vars.execution_date }}/data.csv"
    to: "gs://{{ kv.gcp_bucket_name }}/{{ vars.processed_folder }}/{{ vars.execution_date }}/data.csv"
    dependsOn:
      - transform_data
```

### Key Advantages of This Integration

1. **Infrastructure as Code**: Terraform creates repeatable, version-controlled infrastructure
2. **Secure Configuration**: Kestra Values keep sensitive data out of YAML files
3. **Scalability**: GCP handles storage and compute at scale
4. **Auditability**: Both Terraform and Kestra maintain execution logs
5. **Flexibility**: Easy to switch between environments (dev/prod)

### Environment-Specific Workflows

You can manage multiple environments by creating different Kestra Values sets:

```
Development Environment:
  kv.gcp_project_id = "my-project-dev"
  kv.gcp_bucket_name = "my-data-dev"

Production Environment:
  kv.gcp_project_id = "my-project-prod"
  kv.gcp_bucket_name = "my-data-prod"

# Same workflow YAML, different Kestra Values = different behavior
```

---

## 9. Setting Up Triggers with Cron Expressions

### What are Triggers?

**Triggers** define when a workflow should automatically execute. In Kestra, you can schedule workflows using cron expressions through the **Schedule** plugin.

### The Schedule Plugin

The plugin to use for scheduling is: `io.kestra.plugin.core.trigger.Schedule`

This is Kestra's built-in scheduling mechanism that uses standard cron syntax.

### Cron Expression Syntax

A cron expression has 5 fields (note: Kestra uses 5-field format, not the traditional 6-field):

```
* * * * *
│ │ │ │ │
│ │ │ │ └─ Day of Week (0=Sunday, 6=Saturday)
│ │ │ └─── Month (1-12)
│ │ └───── Day of Month (1-31)
│ └─────── Hour (0-23)
└───────── Minute (0-59)
```

### Special Character: Wildcard (*)

The **asterisk (*)** means **"any value"** for that field. If you want a field to match any value, use `*`.

### Cron Expression Examples

| Expression | Description |
|------------|-------------|
| `0 0 * * *` | Daily at midnight (00:00) |
| `0 9 * * 1` | Every Monday at 9:00 AM |
| `*/15 * * * *` | Every 15 minutes |
| `0 2 1 * *` | First day of every month at 2:00 AM |
| `0 12 * * 1-5` | Weekdays at noon |
| `30 * * * *` | Every hour at the 30-minute mark |
| `0 0,12 * * *` | Twice daily at midnight and noon |

### Trigger Configuration in YAML

```yaml
id: daily_data_refresh
namespace: data_engineering.scheduled

description: "Daily data refresh pipeline"

triggers:
  - id: schedule_daily
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 0 * * *"  # Run at midnight every day
```

### Complete Example: Scheduled Taxi Data Pipeline

```yaml
id: taxi_data_daily_pipeline
namespace: data_engineering.etl.taxi
description: "Extract and transform taxi data daily"

variables:
  table_name: "taxi_trips"
  partition_field: "trip_date"

triggers:
  - id: daily_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 2 * * *"  # Run at 2:00 AM every day

tasks:
  - id: extract_yesterday
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    sql: |
      SELECT 
        * 
      FROM `raw.taxi_data`
      WHERE DATE(pickup_time) = CURRENT_DATE() - 1
    destination: "{{ vars.table_name }}_raw"
  
  - id: transform_data
    type: io.kestra.plugin.scripts.python.Script
    dependsOn:
      - extract_yesterday
    script: |
      from google.cloud import bigquery
      client = bigquery.Client(project="{{ kv.gcp_project_id }}")
      
      query = """
      INSERT INTO `{{ kv.gcp_dataset }}.{{ vars.table_name }}_clean`
      SELECT
        *,
        CAST(fare_amount AS FLOAT64) as fare,
        CURRENT_TIMESTAMP() as processed_at
      FROM `{{ kv.gcp_dataset }}.{{ vars.table_name }}_raw`
      WHERE fare_amount > 0
      """
      
      client.query(query).result()
      print("Data transformation completed")
```

### Cron Expression Tips

1. **Use UTC Time**: Cron times are typically in UTC. Account for timezone differences
2. **Test Before Deploying**: Use online cron validators to verify expressions
3. **Start Simple**: Begin with basic schedules before moving to complex ones
4. **Monitor First Runs**: Check logs to ensure the schedule triggers as expected
5. **Multiple Triggers**: You can add multiple triggers to one flow for different schedules

---

## 8. Understanding Task Types and Plugins

### What are Kestra Plugins?

**Plugins** are pre-built modules in Kestra that provide functionality for specific tools, services, or operations. The `type` of a task specifies which plugin to use.

### Plugin Concept

Think of plugins like Python library imports. Just as you'd write:
```python
from pandas import read_csv
from gcp.bigquery import Client
```

In Kestra, you specify the plugin in the `type` field:
```yaml
type: io.kestra.plugin.scripts.python.Script
type: io.kestra.plugin.gcp.bigquery.Query
```

### How Task Types Work

The `type` field uses a hierarchical naming convention: `io.kestra.plugin.[category].[service].[operation]`

| Type | Purpose | Example |
|------|---------|---------|
| **scripts.python.Script** | Execute Python code | Data processing, transformations |
| **scripts.bash.Commands** | Execute bash/shell commands | File operations, CLI tools |
| **databases.sql.Query** | Execute SQL queries | PostgreSQL, MySQL operations |
| **gcp.bigquery.Query** | BigQuery operations | Analytics, data warehousing |
| **gcp.storage.Upload** | GCP Storage operations | Upload files to Cloud Storage |
| **gcp.gcs.Upload** | Google Cloud Storage | Alternative to Upload |
| **http.Request** | Make HTTP requests | API calls, webhooks |
| **slack.Send** | Send Slack messages | Notifications |

### Python Script Plugin Example

```yaml
tasks:
  - id: process_with_python
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import pandas as pd
      import numpy as np
      
      # Your Python code here
      df = pd.read_csv('data.csv')
      result = df[df['value'] > 100]
      print(f"Filtered {len(result)} rows")
```

### BigQuery Plugin Example

```yaml
tasks:
  - id: query_bigquery
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    sql: |
      SELECT 
        date,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare
      FROM `{{ kv.gcp_project_id }}.{{ kv.gcp_dataset }}.trips`
      WHERE date >= '2024-01-01'
      GROUP BY date
```

### Cloud Storage Plugin Example

```yaml
tasks:
  - id: upload_to_gcp
    type: io.kestra.plugin.gcp.storage.Upload
    projectId: "{{ kv.gcp_project_id }}"
    bucket: "{{ kv.gcp_bucket_name }}"
    from: "/local/path/file.parquet"
    to: "processed/file.parquet"
```

### Accessing Plugin Documentation

Each plugin has its own configuration parameters. In Kestra UI, you can:
- Browse available plugins
- View required vs. optional parameters
- See example configurations for each plugin

---

## 7. Creating and Configuring Tasks

### What are Tasks?

**Tasks** are the core execution units in Kestra where the real work happens. This is where you define your **ETL/ELT jobs** (Extract, Transform, Load), database operations, API calls, and any other computational work.

### Task Structure Requirements

Every task must have:
1. **id**: A unique identifier within the flow
2. **type**: The plugin type that defines what the task does

```yaml
tasks:
  - id: unique_task_id
    type: io.kestra.plugin.type.SubType
    # Additional configuration specific to the type
```

### Basic Task Example

```yaml
id: etl_pipeline
namespace: data_engineering.etl

tasks:
  - id: extract_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import pandas as pd
      data = pd.read_csv('source.csv')
      print(f"Extracted {len(data)} rows")
  
  - id: transform_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import pandas as pd
      data = pd.read_csv('source.csv')
      # Apply transformations
      data['date'] = pd.to_datetime(data['date'])
      data.to_csv('transformed.csv', index=False)
  
  - id: load_data
    type: io.kestra.plugin.databases.sql.Query
    url: "jdbc:postgresql://localhost:5432/mydb"
    username: "user"
    password: "pass"
    sql: "COPY my_table FROM 'transformed.csv';"
```

### Task Dependencies

Tasks can be sequenced so one task waits for another to complete:

```yaml
tasks:
  - id: step1_extract
    type: io.kestra.plugin.scripts.python.Script
    script: |
      print("Extracting data...")
  
  - id: step2_transform
    type: io.kestra.plugin.scripts.python.Script
    dependsOn:  # This task waits for step1_extract to complete
      - step1_extract
    script: |
      print("Transforming data...")
  
  - id: step3_load
    type: io.kestra.plugin.scripts.python.Script
    dependsOn:
      - step2_transform
    script: |
      print("Loading data...")
```

### Task Best Practices

- **Meaningful IDs**: Use names that describe what the task does (e.g., `extract_taxi_data`, not `task1`)
- **Single Responsibility**: Each task should do one thing well
- **Error Handling**: Configure retry policies and failure behaviors
- **Idempotency**: Make tasks safe to run multiple times (important if retries occur)

---

## 6. Configuring Concurrency and Execution Behavior

### What is Concurrency?

**Concurrency** controls how many instances of your workflow can execute simultaneously. It sets limits on the total number of concurrent executions throughout the workflow's lifecycle.

### Concurrency Settings

```yaml
id: concurrent_workflow
namespace: examples

concurrency:
  limit: 5  # Maximum 5 concurrent executions
  behavior: FAIL  # What to do when limit is reached
```

### Behavior Options

When the concurrency limit is reached, Kestra can handle new execution requests in different ways:

| Behavior | Description |
|----------|-------------|
| **FAIL** | New executions fail immediately with an error |
| **QUEUE** | New executions wait in queue until a slot becomes available |
| **FAIL_AFTER_N_RETRIES** | Retry N times before failing |

### Real-World Example

```yaml
id: data_processing_pipeline
namespace: data_engineering.etl

concurrency:
  limit: 3
  behavior: FAIL

description: |
  This workflow processes data files.
  - Maximum 3 files can be processed in parallel
  - If a 4th execution is triggered, it will fail immediately
  - Once one completes, a queued execution can start
```

### Use Cases for Concurrency Limits

- **Resource Management**: Prevent overwhelming your database or storage
- **Rate Limiting**: Respect API rate limits of external services
- **Cost Control**: Limit concurrent cloud resource usage (especially on GCP)
- **Sequential Processing**: Set limit to 1 for workflows that must run one at a time

### Example: Cost-Conscious GCP Workflow

```yaml
id: gcp_bigquery_job
namespace: data_engineering.gcp

concurrency:
  limit: 2  # Only 2 BigQuery jobs at a time to control costs
  behavior: QUEUE

tasks:
  - id: run_bigquery_job
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    sql: "SELECT COUNT(*) FROM `{{ kv.gcp_dataset }}.large_table`"
```

---

## 5. Working with Variables and Kestra Values

### What are Variables?

**Variables** are dynamic values that you can define and reference throughout your workflow. Once created, they can be accessed anywhere in the flow, making your workflows cleaner and more maintainable.

### Variable Syntax

To reference a variable in your flow, use the following syntax:
```
{{ vars.variable_name }}
```

### Defining and Using Variables

```yaml
id: workflow_with_variables
namespace: examples

variables:
  project_name: "taxi_project"
  batch_size: 1000
  output_format: "parquet"

tasks:
  - id: log_variables
    type: io.kestra.plugin.scripts.python.Script
    script: |
      print("Project: {{ vars.project_name }}")
      print("Batch Size: {{ vars.batch_size }}")
      print("Output: {{ vars.output_format }}")
```

### Benefits of Variables

- **Reusability**: Define once, use everywhere
- **Maintainability**: Change in one place affects all references
- **Reduced Duplication**: No need to repeat the same value multiple times
- **Clarity**: Makes flows easier to understand

---

### What are Kestra Values (KV)?

**Kestra Values (KV)** are a special type of variable used to store **sensitive** or **configuration** information that needs to persist across multiple workflow executions. They are stored securely in Kestra's database.

### Creating Kestra Values

Kestra Values are typically created through the Kestra UI or API, not directly in YAML files.

### Common Use Cases for KV

In the context of GCP integration:
- `gcp_project_id`: Your GCP project ID
- `gcp_location`: GCP compute region (e.g., `us-central1`)
- `gcp_bucket_name`: Cloud Storage bucket name
- `gcp_dataset`: BigQuery dataset name
- `gcp_credentials`: Service account credentials (sensitive!)

### Referencing Kestra Values in Flows

```yaml
id: gcp_workflow
namespace: data_engineering.gcp

tasks:
  - id: create_bucket
    type: io.kestra.plugin.gcp.storage.CreateBucket
    projectId: "{{ kv.gcp_project_id }}"
    name: "{{ kv.gcp_bucket_name }}"
    location: "{{ kv.gcp_location }}"
  
  - id: query_dataset
    type: io.kestra.plugin.gcp.bigquery.Query
    projectId: "{{ kv.gcp_project_id }}"
    dataset: "{{ kv.gcp_dataset }}"
    sql: "SELECT * FROM my_table"
```

### Why Use KV for GCP Configuration?

- **Security**: Credentials and sensitive data are not hardcoded in YAML
- **Centralized Management**: Change GCP project IDs in one place
- **Environment Flexibility**: Use different KV values for dev/prod environments
- **Audit Trail**: Track changes to critical configuration

---

## 4. Understanding Namespaces and Unique Identifiers

### What are Namespaces?

A **namespace** is a logical grouping or labeling system for organizing workflows. It helps distinguish between different workflows and serves as a hierarchical organization mechanism.

### Purpose of Namespaces

- **Organization**: Group related workflows together
- **Distinction**: Prevent naming conflicts across different projects
- **Navigation**: Make it easier to find and manage workflows in the UI
- **Access Control**: Can be used for permission management

### Namespace Convention

Use dot notation to create hierarchies:
```
namespace: company.department.project
namespace: data_engineering.etl.taxi
namespace: analytics.reporting
```

### Unique ID Requirement

Every flow **must** have a unique `id` within its namespace. The combination of `namespace + id` creates a globally unique identifier.

### Example: Namespace and ID Usage

```yaml
# Flow 1: Extract raw data
id: extract_taxi_data
namespace: data_engineering.etl.taxi
description: "Extract raw taxi data from source"

# Flow 2: Transform data
id: transform_taxi_data
namespace: data_engineering.etl.taxi
description: "Transform and clean taxi data"

# Flow 3: Load to warehouse
id: load_taxi_data
namespace: data_engineering.etl.taxi
description: "Load transformed data to warehouse"

# Flow 4: Another project with same task name (different namespace)
id: extract_data  # Same ID as Flow 1 would be, but different namespace
namespace: analytics.raw_data
description: "Extract raw analytics data"
```

### Key Rules

1. **No Duplicate IDs**: You cannot save two flows with the same ID in the same namespace
2. **Namespace is Required**: Every flow must have a namespace
3. **Hierarchy Pattern**: Use dot notation to create logical groupings

---

## 3. Defining Inputs and Managing Data Types

### What are Inputs?

**Inputs** allow users to provide values to your workflow when triggering it manually or programmatically. They act as parameters that customize the workflow execution.

### Supported Data Types

Kestra supports multiple input types to store different kinds of data:

| Type | Description | Example |
|------|-------------|---------|
| **String** | Text values | `"2024-02-05"`, `"my-bucket"` |
| **Integer** | Whole numbers | `100`, `365` |
| **Float** | Decimal numbers | `3.14`, `99.99` |
| **Date** | Calendar dates | `2024-02-05` |
| **Boolean** | True/False values | `true`, `false` |
| **Array** | List of values | `[1, 2, 3]` |
| **Object** | Structured data | `{"key": "value"}` |

### Example Input Definition

```yaml
id: user_input_flow
namespace: examples

inputs:
  - id: execution_date
    type: DATE
    description: "Date to process data for"
    defaults: "2024-02-05"
  
  - id: dataset_name
    type: STRING
    description: "Name of the dataset"
    required: true
  
  - id: batch_size
    type: INT
    description: "Number of records to process"
    defaults: 1000
```

### Accessing Inputs in Tasks

Inputs are accessed within tasks using the `inputs` property:
```yaml
tasks:
  - id: process_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      print("Processing date: {{ inputs.execution_date }}")
      print("Dataset: {{ inputs.dataset_name }}")
```

---

## 2. Kestra Flow Structure and YAML Configuration

### Understanding Flows

A **flow** is the core unit in Kestra that defines your entire workflow. Each flow is stored as a **YAML file** with a `.yaml` extension.

### Basic Flow Structure

```yaml
id: unique_flow_identifier
namespace: my.project.namespace
description: "Brief description of what this flow does"

inputs: {}
variables: {}
tasks: []
triggers: []
```

### Key Points

- **File Extension**: Each flow must be saved as a `.yaml` file
- **Unique ID**: Every flow must have a unique `id`. You **cannot** have two different YAML files with the same ID
- **Namespace**: Serves as a logical grouping mechanism (explained in Section 4)
- **Structure**: The flow file contains all configuration in one place

### File Naming Convention

While the `id` property is what matters internally, it's best practice to name your YAML file descriptively:
- `01_Hello_World.yaml` - Simple starter flow
- `04_postgres_taxi.yaml` - PostgreSQL database operations
- `08_gcp_taxi.yaml` - GCP integration

## 1. Overview of Kestra and Workflow Orchestration

### What is Kestra?

**Kestra** is an **open-source workflow orchestration tool** designed to automate, schedule, and manage data pipelines and ETL/ELT jobs. It provides a centralized platform for defining, executing, and monitoring complex workflows.

### Purpose of Workflow Orchestration

Workflow orchestration enables you to:
- **Automate** repetitive data processing tasks
- **Schedule** jobs to run at specific times using cron expressions
- **Manage dependencies** between tasks
- **Monitor** execution history and logs
- **Handle failures** gracefully with retry mechanisms
- **Scale** multiple concurrent executions

### Why Kestra?

- **Open Source**: Free to use and modify
- **Cloud-Native**: Easy integration with cloud services (GCP, AWS, Azure)
- **Simple YAML-Based Configuration**: No complex coding required
- **Rich Plugin Ecosystem**: Support for various tools and databases
- **Flexible Scheduling**: Advanced cron expression support

# Workflow Orchestration with Kestra

## Module 2: Complete Guide

**Overview**: This notebook documents comprehensive notes on Kestra workflow orchestration, covering flow structure, configuration, task management, and integration with cloud infrastructure.