# OpenSearch Ingest Processors Comprehensive Guide
![course](../../static_images/ai_ml_search_opensearch_intermediate.jpeg)

## üìö Complete Tutorial on All Ingest Processors

This notebook provides hands-on demonstrations of all 43 ingest processors available in OpenSearch. Each processor is explained with real-world examples, sample data, and executable code.

**Target Audience:** Students learning ingest pipeline concepts and processor functionality

**What You'll Learn:**
- How ingest pipelines process documents sequentially
- Each processor's purpose and use cases
- Practical examples with sample data
- Error handling and failure scenarios
- Real-world pipeline compositions

## üìã Table of Contents - All 43 Processors

### Data Manipulation & Transformation (8 processors)
1. **append** - Add values to fields
2. **copy** - Duplicate objects between fields
3. **remove** - Remove unwanted fields
4. **remove_by_pattern** - Remove fields by regex pattern
5. **rename** - Rename fields
6. **set** - Set constant field values
7. **join** - Join array elements into strings
8. **split** - Split strings into arrays

### Data Type Conversion (5 processors)
9. **bytes** - Convert human-readable bytes to bytes
10. **convert** - Change field data types
11. **lowercase** - Convert to lowercase
12. **uppercase** - Convert to uppercase
13. **trim** - Trim whitespace

### Text & String Processing (7 processors)
14. **csv** - Parse CSV data
15. **dissect** - Extract fields via text patterns
16. **gsub** - Substitute/delete substrings
17. **grok** - Extract fields via regex patterns
18. **html_strip** - Remove HTML tags
19. **json** - Parse JSON strings
20. **kv** - Parse key-value pairs

### Date & Time Processing (2 processors)
21. **date** - Parse and normalize dates
22. **date_index_name** - Index into time-based indices

### Specialized Enrichment (9 processors)
23. **geoip** - Add geolocation from IP
24. **ip2geo** - Add geolocation from IP (alternative)
25. **geojson-feature** - Index GeoJSON data
26. **user_agent** - Extract browser/device info
27. **community_id** - Generate network flow hashes
28. **fingerprint** - Generate deduplication hashes
29. **text_embedding** - Generate text vectors
30. **text_image_embedding** - Generate multimodal vectors
31. **sparse_encoding** - Generate sparse vectors

### Text Processing (2 processors)
32. **text_chunking** - Split text into chunks
33. **sort** - Sort array elements

### URL Processing (1 processor)
34. **urldecode** - Decode URL-encoded strings

### Control Flow (2 processors)
35. **drop** - Filter out documents
36. **fail** - Stop pipeline with error

### Advanced/Scripting (3 processors)
37. **script** - Run custom scripts
38. **foreach** - Apply processor to array elements
39. **pipeline** - Compose nested pipelines

### Metadata/Special (1 processor)
40. **dot_expander** - Convert dot notation to objects

## üîÑ Ingest Pipeline Architecture

```mermaid
graph LR
    A["üìÑ Document"] --> B["Processor 1"]
    B --> C["Processor 2"]
    C --> D["Processor 3"]
    D --> E["..."]
    E --> F["Processor N"]
    F --> G["‚úÖ Indexed Document"]
    
    style A fill:#e1f5ff
    style B fill:#fff3e0
    style C fill:#fff3e0
    style D fill:#fff3e0
    style F fill:#fff3e0
    style G fill:#c8e6c9
```

## Error Handling in Pipelines

```mermaid
graph TD
    A["Processor Executes"] --> B{Success?}
    B -->|Yes| C["Continue to Next"]
    B -->|No| D{ignore_failure?}
    D -->|True| C
    D -->|False| E{on_failure?}
    E -->|Defined| F["Run Failure Handler"]
    E -->|None| G["‚ùå Stop Pipeline"]
    F --> H["Continue or Stop"]
    
    style A fill:#fff3e0
    style B fill:#ffe0b2
    style C fill:#c8e6c9
    style G fill:#ffcdd2
    style F fill:#f0f4c3
```

In [1]:

# ============================================================================
# SECTION 1: Setup and Client Connection
# ============================================================================

from opensearchpy import OpenSearch
import json
import sys
from datetime import datetime, timedelta
import time

sys.path.append('../../')

IS_AUTH = True  # Set to False if security is disabled
HOST = 'localhost'  # Replace with your OpenSearch host

# Initialize the OpenSearch client
if IS_AUTH:
    client = OpenSearch(
        hosts=[{'host': HOST, 'port': 9200}],
        http_auth=('admin', 'Developer@123'),
        use_ssl=True,
        verify_certs=False,
        ssl_show_warn=False
    )
else:
    client = OpenSearch(
        hosts=[{'host': HOST, 'port': 9200}],
        use_ssl=False,
        verify_certs=False,
        ssl_assert_hostname=False,
        ssl_show_warn=False
    )

# Test connection
try:
    info = client.info()
    print("‚úÖ Connected to OpenSearch!")
    print(f"Cluster: {info['cluster_name']}")
    print(f"Version: {info['version']['number']}")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")


‚úÖ Connected to OpenSearch!
Cluster: docker-cluster
Version: 3.3.0


In [2]:

# ============================================================================
# HELPER FUNCTIONS FOR DEMONSTRATION
# ============================================================================

# Store pipelines in memory for testing (alternative approach)
_test_pipelines = {}

def create_pipeline(pipeline_name: str, processors: list, description: str = ""):
    """Create an ingest pipeline"""
    pipeline_body = {
        "description": description,
        "processors": processors
    }
    try:
        response = client.ingest.put_pipeline(id=pipeline_name, body=pipeline_body)
        # Also store locally for testing
        _test_pipelines[pipeline_name] = processors
        print(f"‚úÖ Pipeline '{pipeline_name}' created successfully")
        return response
    except Exception as e:
        print(f"‚ùå Error creating pipeline: {e}")
        return None

def test_pipeline(pipeline_name: str, document: dict, index_name: str = "test_index"):
    """Test a pipeline with a sample document"""
    try:
        # Get the pipeline processors from local storage or create inline request
        if pipeline_name in _test_pipelines:
            processors = _test_pipelines[pipeline_name]
        else:
            # Try to fetch from OpenSearch
            try:
                pipeline_info = client.ingest.get_pipeline(id=pipeline_name)
                processors = pipeline_info[pipeline_name]['processors']
            except:
                print(f"‚ùå Error testing pipeline: Pipeline '{pipeline_name}' not found")
                return None
        
        # Use simulate with inline pipeline definition
        response = client.ingest.simulate(body={
            "pipeline": {
                "processors": processors
            },
            "docs": [{"_source": document}]
        })
        return response
    except Exception as e:
        print(f"‚ùå Error testing pipeline: {e}")
        return None

def display_result(label: str, before: dict, after: dict):
    """Display before and after pipeline results"""
    print(f"\n{'='*60}")
    print(f"üìä {label}")
    print(f"{'='*60}")
    print("üì• BEFORE:")
    print(json.dumps(before, indent=2))
    print("\nüì§ AFTER:")
    print(json.dumps(after, indent=2))
    print(f"{'='*60}\n")

def delete_pipeline(pipeline_name: str):
    """Delete a pipeline"""
    try:
        client.ingest.delete_pipeline(id=pipeline_name)
        # Also remove from local storage
        if pipeline_name in _test_pipelines:
            del _test_pipelines[pipeline_name]
        print(f"üóëÔ∏è  Pipeline '{pipeline_name}' deleted")
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not delete pipeline: {e}")

print("‚úÖ Helper functions loaded!")


‚úÖ Helper functions loaded!


# 1Ô∏è‚É£ APPEND Processor

## Purpose
Adds one or more values to an array field in a document. If the field doesn't exist, it creates it.

## Use Cases
- Adding tags or metadata to documents
- Building classification arrays
- Maintaining activity logs

## How It Works
```
Input Field: ["tag1", "tag2"]
Append Values: ["tag3", "tag4"]
Output: ["tag1", "tag2", "tag3", "tag4"]
```

## Configuration
- **field**: Name of the array field to append to
- **value**: Array of values to append (can use templates)


In [3]:

# Example 1: Basic append
sample_doc_append = {
    "product_name": "Laptop",
    "category": "Electronics",
    "tags": ["portable", "powerful"]
}

pipeline_append = [
    {
        "append": {
            "field": "tags",
            "value": ["marked_for_sale", "top_seller"]
        }
    }
]

create_pipeline("demo_append", pipeline_append, "Append tags to products")
result_append = test_pipeline("demo_append", sample_doc_append)

if result_append:
    original = sample_doc_append
    processed = result_append['docs'][0]['doc']['_source']
    display_result("Append Processor", original, processed)

delete_pipeline("demo_append")


‚úÖ Pipeline 'demo_append' created successfully

üìä Append Processor
üì• BEFORE:
{
  "product_name": "Laptop",
  "category": "Electronics",
  "tags": [
    "portable",
    "powerful"
  ]
}

üì§ AFTER:
{
  "category": "Electronics",
  "product_name": "Laptop",
  "tags": [
    "portable",
    "powerful",
    "marked_for_sale",
    "top_seller"
  ]
}

üóëÔ∏è  Pipeline 'demo_append' deleted


# 2Ô∏è‚É£ BYTES Processor

## Purpose
Converts human-readable byte values (like "1KB", "1MB") into bytes for storage normalization.

## Use Cases
- Normalizing storage size fields
- Converting user-friendly storage units to consistent bytes
- Data validation and standardization

## Configuration
- **field**: Field containing the byte value
- **target_field**: Field to store result (optional, updates original if not specified)
- **ignore_missing**: Don't fail if field is missing (default: false)

## Supported Units
- b (bytes), kb (kilobytes), mb (megabytes), gb (gigabytes), tb (terabytes), pb (petabytes)

In [4]:

# Example 2: Bytes processor
sample_doc_bytes = {
    "file_name": "document.pdf",
    "file_size": "2MB"
}

pipeline_bytes = [
    {
        "bytes": {
            "field": "file_size",
            "target_field": "file_size_bytes"
        }
    }
]

create_pipeline("demo_bytes", pipeline_bytes, "Convert human-readable bytes to bytes")
result_bytes = test_pipeline("demo_bytes", sample_doc_bytes)

if result_bytes:
    original = sample_doc_bytes
    processed = result_bytes['docs'][0]['doc']['_source']
    display_result("Bytes Processor", original, processed)

delete_pipeline("demo_bytes")


‚úÖ Pipeline 'demo_bytes' created successfully

üìä Bytes Processor
üì• BEFORE:
{
  "file_name": "document.pdf",
  "file_size": "2MB"
}

üì§ AFTER:
{
  "file_size_bytes": 2097152,
  "file_name": "document.pdf",
  "file_size": "2MB"
}

üóëÔ∏è  Pipeline 'demo_bytes' deleted


# 3Ô∏è‚É£ CONVERT Processor

## Purpose
Changes the data type of a field (string ‚Üí integer, string ‚Üí float, string ‚Üí boolean)

## Use Cases
- Converting string numbers to numeric types
- Converting string booleans to boolean types
- Data normalization

## Configuration
- **field**: Field to convert
- **type**: Target type (integer, float, double, long, boolean, ip)
- **ignore_missing**: Don't fail if field is missing (default: false)

In [5]:

# Example 3: Convert processor - multiple conversions
sample_doc_convert = {
    "price": "99.99",
    "quantity": "50",
    "is_active": "true"
}

pipeline_convert = [
    {
        "convert": {
            "field": "price",
            "type": "float"
        }
    },
    {
        "convert": {
            "field": "quantity",
            "type": "integer"
        }
    },
    {
        "convert": {
            "field": "is_active",
            "type": "boolean"
        }
    }
]

create_pipeline("demo_convert", pipeline_convert, "Convert data types")
result_convert = test_pipeline("demo_convert", sample_doc_convert)

if result_convert:
    original = sample_doc_convert
    processed = result_convert['docs'][0]['doc']['_source']
    display_result("Convert Processor", original, processed)

delete_pipeline("demo_convert")


‚úÖ Pipeline 'demo_convert' created successfully

üìä Convert Processor
üì• BEFORE:
{
  "price": "99.99",
  "quantity": "50",
  "is_active": "true"
}

üì§ AFTER:
{
  "quantity": 50,
  "is_active": true,
  "price": 99.99
}

üóëÔ∏è  Pipeline 'demo_convert' deleted


# 4Ô∏è‚É£ COPY Processor

## Purpose
Copies an entire object or field value from one field to another

## Use Cases
- Duplicating fields for different indexing strategies
- Creating backup copies of important data
- Field redundancy

## Configuration
- **source_field**: Source field to copy from
- **target_field**: Destination field to copy to
- **ignore_missing**: Don't fail if source is missing (default: false)


In [6]:

# Example 4: Copy processor
sample_doc_copy = {
    "source_field": {
        "nested": {
            "value": "important_data"
        }
    }
}

pipeline_copy = [
    {
        "copy": {
            "source_field": "source_field",
            "target_field": "destination_field"
        }
    }
]

create_pipeline("demo_copy", pipeline_copy, "Copy fields")
result_copy = test_pipeline("demo_copy", sample_doc_copy)

if result_copy:
    original = sample_doc_copy
    processed = result_copy['docs'][0]['doc']['_source']
    display_result("Copy Processor", original, processed)

delete_pipeline("demo_copy")


‚úÖ Pipeline 'demo_copy' created successfully

üìä Copy Processor
üì• BEFORE:
{
  "source_field": {
    "nested": {
      "value": "important_data"
    }
  }
}

üì§ AFTER:
{
  "source_field": {
    "nested": {
      "value": "important_data"
    }
  },
  "destination_field": {
    "nested": {
      "value": "important_data"
    }
  }
}

üóëÔ∏è  Pipeline 'demo_copy' deleted


# 5Ô∏è‚É£ CSV Processor

## Purpose
Extracts CSV-formatted data into individual fields

## Use Cases
- Parsing CSV data embedded in fields
- Splitting delimited strings into fields
- Data extraction from structured text

## Configuration
- **field**: Field containing CSV data
- **target_fields**: Array of field names for extracted values
- **separator**: CSV separator character (default: ,)
- **trim**: Trim whitespace (default: true)

In [7]:

# Example 5: CSV processor
sample_doc_csv = {
    "csv_data": "John,Doe,john@example.com,35"
}

pipeline_csv = [
    {
        "csv": {
            "field": "csv_data",
            "target_fields": ["first_name", "last_name", "email", "age"]
        }
    }
]

create_pipeline("demo_csv", pipeline_csv, "Parse CSV data")
result_csv = test_pipeline("demo_csv", sample_doc_csv)

if result_csv:
    original = sample_doc_csv
    processed = result_csv['docs'][0]['doc']['_source']
    display_result("CSV Processor", original, processed)

delete_pipeline("demo_csv")


‚úÖ Pipeline 'demo_csv' created successfully

üìä CSV Processor
üì• BEFORE:
{
  "csv_data": "John,Doe,john@example.com,35"
}

üì§ AFTER:
{
  "csv_data": "John,Doe,john@example.com,35",
  "last_name": "Doe",
  "first_name": "John",
  "email": "john@example.com",
  "age": "35"
}

üóëÔ∏è  Pipeline 'demo_csv' deleted


# 6Ô∏è‚É£ DATE Processor

## Purpose
Parses and normalizes date fields into a standardized format

## Use Cases
- Normalizing timestamp formats from various sources
- Converting date strings to ISO format
- Time-based indexing and analysis

## Configuration
- **field**: Field containing the date value
- **target_field**: Field to store normalized date (default: replaces source field)
- **formats**: Array of possible input date formats
- **timezone**: Timezone for parsing (default: UTC)

In [8]:

# Example 6: Date processor
sample_doc_date = {
    "log_timestamp": "2024-11-02 14:30:45",
    "event_date": "02/11/2024"
}

pipeline_date = [
    {
        "date": {
            "field": "log_timestamp",
            "target_field": "@timestamp",
            "formats": ["yyyy-MM-dd HH:mm:ss"]
        }
    },
    {
        "date": {
            "field": "event_date",
            "formats": ["dd/MM/yyyy"],
            "target_field": "normalized_event_date"
        }
    }
]

create_pipeline("demo_date", pipeline_date, "Parse and normalize dates")
result_date = test_pipeline("demo_date", sample_doc_date)

if result_date:
    original = sample_doc_date
    processed = result_date['docs'][0]['doc']['_source']
    display_result("Date Processor", original, processed)

delete_pipeline("demo_date")


‚úÖ Pipeline 'demo_date' created successfully

üìä Date Processor
üì• BEFORE:
{
  "log_timestamp": "2024-11-02 14:30:45",
  "event_date": "02/11/2024"
}

üì§ AFTER:
{
  "log_timestamp": "2024-11-02 14:30:45",
  "normalized_event_date": "2024-11-02T00:00:00.000Z",
  "@timestamp": "2024-11-02T14:30:45.000Z",
  "event_date": "02/11/2024"
}

üóëÔ∏è  Pipeline 'demo_date' deleted


# 7Ô∏è‚É£ GROK Processor

## Purpose
Extracts structured fields from unstructured text using regular expressions

## Use Cases
- Parsing system logs and application logs
- Extracting fields from Apache/Nginx logs
- Text pattern matching and field extraction

## Configuration
- **field**: Field containing unstructured text
- **patterns**: Array of grok patterns to match
- **pattern_definitions**: Custom pattern definitions
- **ignore_missing**: Don't fail if field is missing (default: false)

## Common Grok Patterns
- %{IP:ip_address}
- %{WORD:word}
- %{NUMBER:number}
- %{HTTPDATE:timestamp}
- %{DATA:data}

In [9]:

# Example 7: Grok processor
sample_doc_grok = {
    "apache_log": "192.168.1.1 - user [02/Nov/2024:14:30:45 +0000] \"GET /api/users HTTP/1.1\" 200 1234"
}

pipeline_grok = [
    {
        "grok": {
            "field": "apache_log",
            "patterns": ["%{IP:client_ip} - %{DATA:username} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{DATA:path} HTTP/%{NUMBER:http_version}\" %{NUMBER:status_code:int} %{NUMBER:bytes:int}"]
        }
    }
]

create_pipeline("demo_grok", pipeline_grok, "Parse Apache logs with grok")
result_grok = test_pipeline("demo_grok", sample_doc_grok)

if result_grok:
    original = sample_doc_grok
    processed = result_grok['docs'][0]['doc']['_source']
    display_result("Grok Processor", original, processed)

delete_pipeline("demo_grok")


‚úÖ Pipeline 'demo_grok' created successfully

üìä Grok Processor
üì• BEFORE:
{
  "apache_log": "192.168.1.1 - user [02/Nov/2024:14:30:45 +0000] \"GET /api/users HTTP/1.1\" 200 1234"
}

üì§ AFTER:
{
  "apache_log": "192.168.1.1 - user [02/Nov/2024:14:30:45 +0000] \"GET /api/users HTTP/1.1\" 200 1234",
  "path": "/api/users",
  "status_code": 200,
  "method": "GET",
  "bytes": 1234,
  "http_version": "1.1",
  "client_ip": "192.168.1.1",
  "username": "user",
  "timestamp": "02/Nov/2024:14:30:45 +0000"
}

üóëÔ∏è  Pipeline 'demo_grok' deleted


# 8Ô∏è‚É£ JSON Processor

## Purpose
Parses JSON strings into structured objects

## Use Cases
- Converting JSON strings into objects
- Extracting nested data from JSON strings
- Data enrichment from JSON payloads

## Configuration
- **field**: Field containing JSON string
- **target_field**: Field to store parsed JSON (default: overwrites source)
- **add_to_root**: Add parsed fields to document root (default: false)

In [10]:

# Example 8: JSON processor
sample_doc_json = {
    "metadata_json": '{"author": "John", "version": "1.0", "tags": ["important", "reviewed"]}',
    "config": '{"timeout": 30, "retry": true}'
}

pipeline_json = [
    {
        "json": {
            "field": "metadata_json",
            "target_field": "metadata"
        }
    },
    {
        "json": {
            "field": "config"
        }
    }
]

create_pipeline("demo_json", pipeline_json, "Parse JSON strings")
result_json = test_pipeline("demo_json", sample_doc_json)

if result_json:
    original = sample_doc_json
    processed = result_json['docs'][0]['doc']['_source']
    display_result("JSON Processor", original, processed)

delete_pipeline("demo_json")


‚úÖ Pipeline 'demo_json' created successfully

üìä JSON Processor
üì• BEFORE:
{
  "metadata_json": "{\"author\": \"John\", \"version\": \"1.0\", \"tags\": [\"important\", \"reviewed\"]}",
  "config": "{\"timeout\": 30, \"retry\": true}"
}

üì§ AFTER:
{
  "metadata": {
    "version": "1.0",
    "author": "John",
    "tags": [
      "important",
      "reviewed"
    ]
  },
  "config": {
    "timeout": 30,
    "retry": true
  },
  "metadata_json": "{\"author\": \"John\", \"version\": \"1.0\", \"tags\": [\"important\", \"reviewed\"]}"
}

üóëÔ∏è  Pipeline 'demo_json' deleted


# 9Ô∏è‚É£ SPLIT & JOIN Processors

## SPLIT Processor

### Purpose
Splits a string into an array using a separator

### Configuration
- **field**: Field containing string to split
- **separator**: Separator character/string
- **target_field**: Field to store result (default: overwrites source)

## JOIN Processor

### Purpose  
Joins array elements into a single string

### Configuration
- **field**: Array field to join
- **separator**: Separator to use between elements
- **target_field**: Field to store result (default: overwrites source)

In [11]:

# Example 9a: Split processor
sample_doc_split = {
    "tags_string": "python,machine-learning,data-science",
    "categories": "books;fiction;bestsellers"
}

pipeline_split = [
    {
        "split": {
            "field": "tags_string",
            "separator": ","
        }
    },
    {
        "split": {
            "field": "categories",
            "separator": ";"
        }
    }
]

create_pipeline("demo_split", pipeline_split, "Split strings into arrays")
result_split = test_pipeline("demo_split", sample_doc_split)

if result_split:
    original = sample_doc_split
    processed = result_split['docs'][0]['doc']['_source']
    display_result("Split Processor", original, processed)

delete_pipeline("demo_split")

# Example 9b: Join processor
sample_doc_join = {
    "tags": ["machine-learning", "ai", "data-science"],
    "categories": ["tech", "education"]
}

pipeline_join = [
    {
        "join": {
            "field": "tags",
            "separator": " | "
        }
    }
]

create_pipeline("demo_join", pipeline_join, "Join arrays into strings")
result_join = test_pipeline("demo_join", sample_doc_join)

if result_join:
    original = sample_doc_join
    processed = result_join['docs'][0]['doc']['_source']
    display_result("Join Processor", original, processed)

delete_pipeline("demo_join")


‚úÖ Pipeline 'demo_split' created successfully

üìä Split Processor
üì• BEFORE:
{
  "tags_string": "python,machine-learning,data-science",
  "categories": "books;fiction;bestsellers"
}

üì§ AFTER:
{
  "categories": [
    "books",
    "fiction",
    "bestsellers"
  ],
  "tags_string": [
    "python",
    "machine-learning",
    "data-science"
  ]
}

üóëÔ∏è  Pipeline 'demo_split' deleted
‚úÖ Pipeline 'demo_join' created successfully

üìä Join Processor
üì• BEFORE:
{
  "tags": [
    "machine-learning",
    "ai",
    "data-science"
  ],
  "categories": [
    "tech",
    "education"
  ]
}

üì§ AFTER:
{
  "categories": [
    "tech",
    "education"
  ],
  "tags": "machine-learning | ai | data-science"
}

üóëÔ∏è  Pipeline 'demo_join' deleted


# üîü STRING MANIPULATION Processors

## LOWERCASE, UPPERCASE & TRIM

### LOWERCASE Processor
- **Purpose**: Convert text to lowercase
- **Use Case**: Normalize keywords and tags
- **Configuration**: Just specify the field

### UPPERCASE Processor
- **Purpose**: Convert text to uppercase
- **Use Case**: Normalize tags or keywords
- **Configuration**: Just specify the field

### TRIM Processor
- **Purpose**: Remove leading/trailing whitespace
- **Use Case**: Clean string fields
- **Configuration**: Just specify the field

In [12]:

# Example 10: String manipulation processors
sample_doc_string = {
    "category": "ELECTRONICS",
    "brand": "APPLE",
    "country": "united states",
    "status": "active",
    "name": "  John Doe  ",
    "email": "   user@example.com   "
}

pipeline_string = [
    {
        "lowercase": {
            "field": "category"
        }
    },
    {
        "lowercase": {
            "field": "brand"
        }
    },
    {
        "uppercase": {
            "field": "country"
        }
    },
    {
        "uppercase": {
            "field": "status"
        }
    },
    {
        "trim": {
            "field": "name"
        }
    },
    {
        "trim": {
            "field": "email"
        }
    }
]

create_pipeline("demo_string_ops", pipeline_string, "String manipulation")
result_string = test_pipeline("demo_string_ops", sample_doc_string)

if result_string:
    original = sample_doc_string
    processed = result_string['docs'][0]['doc']['_source']
    display_result("String Manipulation Processors", original, processed)

delete_pipeline("demo_string_ops")


‚úÖ Pipeline 'demo_string_ops' created successfully

üìä String Manipulation Processors
üì• BEFORE:
{
  "category": "ELECTRONICS",
  "brand": "APPLE",
  "country": "united states",
  "status": "active",
  "name": "  John Doe  ",
  "email": "   user@example.com   "
}

üì§ AFTER:
{
  "country": "UNITED STATES",
  "name": "John Doe",
  "category": "electronics",
  "brand": "apple",
  "email": "user@example.com",
  "status": "ACTIVE"
}

üóëÔ∏è  Pipeline 'demo_string_ops' deleted


# 1Ô∏è‚É£1Ô∏è‚É£ REMOVE & RENAME Processors

## REMOVE Processor

### Purpose
Removes one or more fields from documents

### Use Cases
- Removing sensitive data before indexing
- Filtering out unwanted fields
- Privacy and security compliance

### Configuration
- **field**: Field(s) to remove (string or array)
- **ignore_missing**: Don't fail if field is missing (default: false)

## RENAME Processor

### Purpose
Renames one or more fields

### Use Cases
- Renaming fields for consistency
- Field name transformations
- Schema normalization

### Configuration
- **field**: Original field name
- **target_field**: New field name
- **ignore_missing**: Don't fail if field is missing (default: false)

In [13]:

# Example 11a: Remove processor
sample_doc_remove = {
    "public_data": "visible",
    "secret_token": "xxxxx",
    "internal_id": "12345",
    "useful_data": "keep this"
}

pipeline_remove = [
    {
        "remove": {
            "field": ["secret_token", "internal_id"]
        }
    }
]

create_pipeline("demo_remove", pipeline_remove, "Remove sensitive fields")
result_remove = test_pipeline("demo_remove", sample_doc_remove)

if result_remove:
    original = sample_doc_remove
    processed = result_remove['docs'][0]['doc']['_source']
    display_result("Remove Processor", original, processed)

delete_pipeline("demo_remove")

# Example 11b: Rename processor
sample_doc_rename = {
    "provider": "AWS",
    "instance_type": "t2.micro",
    "region_name": "us-east-1"
}

pipeline_rename = [
    {
        "rename": {
            "field": "provider",
            "target_field": "cloud.provider"
        }
    },
    {
        "rename": {
            "field": "instance_type",
            "target_field": "cloud.instance.type"
        }
    }
]

create_pipeline("demo_rename", pipeline_rename, "Rename fields")
result_rename = test_pipeline("demo_rename", sample_doc_rename)

if result_rename:
    original = sample_doc_rename
    processed = result_rename['docs'][0]['doc']['_source']
    display_result("Rename Processor", original, processed)

delete_pipeline("demo_rename")


‚úÖ Pipeline 'demo_remove' created successfully

üìä Remove Processor
üì• BEFORE:
{
  "public_data": "visible",
  "secret_token": "xxxxx",
  "internal_id": "12345",
  "useful_data": "keep this"
}

üì§ AFTER:
{
  "public_data": "visible",
  "useful_data": "keep this"
}

üóëÔ∏è  Pipeline 'demo_remove' deleted
‚úÖ Pipeline 'demo_rename' created successfully

üìä Rename Processor
üì• BEFORE:
{
  "provider": "AWS",
  "instance_type": "t2.micro",
  "region_name": "us-east-1"
}

üì§ AFTER:
{
  "cloud": {
    "provider": "AWS",
    "instance": {
      "type": "t2.micro"
    }
  },
  "region_name": "us-east-1"
}

üóëÔ∏è  Pipeline 'demo_rename' deleted


# 1Ô∏è‚É£2Ô∏è‚É£ SET & GSUB Processors

## SET Processor

### Purpose
Sets a field to a constant or template value

### Use Cases
- Adding default values to documents
- Adding metadata fields
- Timestamp enrichment

### Configuration
- **field**: Field to set
- **value**: Value to set (supports Painless templates like {{_ingest.timestamp}})

## GSUB Processor

### Purpose
Substitutes or deletes substrings in a field

### Use Cases
- Text normalization and cleaning
- Replacing patterns in strings
- Data standardization

### Configuration
- **field**: Field to process
- **pattern**: Regex pattern to match
- **replacement**: Replacement string (empty = delete)

In [14]:

# Example 12a: Set processor
sample_doc_set = {
    "product": "Laptop",
    "price": 999
}

pipeline_set = [
    {
        "set": {
            "field": "data_source",
            "value": "web_api"
        }
    },
    {
        "set": {
            "field": "ingestion_timestamp",
            "value": "{{_ingest.timestamp}}"
        }
    }
]

create_pipeline("demo_set", pipeline_set, "Set constant values")
result_set = test_pipeline("demo_set", sample_doc_set)

if result_set:
    original = sample_doc_set
    processed = result_set['docs'][0]['doc']['_source']
    display_result("Set Processor", original, processed)

delete_pipeline("demo_set")

# Example 12b: Gsub processor
sample_doc_gsub = {
    "text": "Hello-World-Test-String"
}

pipeline_gsub = [
    {
        "gsub": {
            "field": "text",
            "pattern": "-",
            "replacement": " "
        }
    }
]

create_pipeline("demo_gsub", pipeline_gsub, "Substitute substrings")
result_gsub = test_pipeline("demo_gsub", sample_doc_gsub)

if result_gsub:
    original = sample_doc_gsub
    processed = result_gsub['docs'][0]['doc']['_source']
    display_result("Gsub Processor", original, processed)

delete_pipeline("demo_gsub")


‚úÖ Pipeline 'demo_set' created successfully

üìä Set Processor
üì• BEFORE:
{
  "product": "Laptop",
  "price": 999
}

üì§ AFTER:
{
  "product": "Laptop",
  "ingestion_timestamp": "2025-12-27T20:13:44.226390104Z",
  "price": 999,
  "data_source": "web_api"
}

üóëÔ∏è  Pipeline 'demo_set' deleted
‚úÖ Pipeline 'demo_gsub' created successfully

üìä Gsub Processor
üì• BEFORE:
{
  "text": "Hello-World-Test-String"
}

üì§ AFTER:
{
  "text": "Hello World Test String"
}

üóëÔ∏è  Pipeline 'demo_gsub' deleted


# 1Ô∏è‚É£3Ô∏è‚É£ HTML_STRIP & URLDECODE Processors

## HTML_STRIP Processor

### Purpose
Removes HTML tags from text fields

### Use Cases
- Cleaning HTML content before indexing
- Removing formatting tags
- Text extraction from HTML

### Configuration
- **field**: Field containing HTML
- **ignore_missing**: Don't fail if field is missing (default: false)

## URLDECODE Processor

### Purpose
Decodes URL-encoded strings

### Use Cases
- Cleaning URL parameters
- Decoding search queries
- URL field normalization

### Configuration
- **field**: Field containing URL-encoded string
- **ignore_missing**: Don't fail if field is missing (default: false)

In [15]:

# Example 13a: HTML Strip processor
sample_doc_html = {
    "content": "<p>This is <b>bold</b> and <i>italic</i> text</p>",
    "description": "<div class=\"desc\">Product <span>description</span></div>"
}

pipeline_html = [
    {
        "html_strip": {
            "field": "content"
        }
    },
    {
        "html_strip": {
            "field": "description"
        }
    }
]

create_pipeline("demo_html_strip", pipeline_html, "Strip HTML tags")
result_html = test_pipeline("demo_html_strip", sample_doc_html)

if result_html:
    original = sample_doc_html
    processed = result_html['docs'][0]['doc']['_source']
    display_result("HTML Strip Processor", original, processed)

delete_pipeline("demo_html_strip")

# Example 13b: URL Decode processor
sample_doc_urldecode = {
    "encoded_url": "hello%20world%21",
    "search_query": "machine%20learning%20algorithms"
}

pipeline_urldecode = [
    {
        "urldecode": {
            "field": "encoded_url"
        }
    },
    {
        "urldecode": {
            "field": "search_query"
        }
    }
]

create_pipeline("demo_urldecode", pipeline_urldecode, "Decode URL-encoded strings")
result_urldecode = test_pipeline("demo_urldecode", sample_doc_urldecode)

if result_urldecode:
    original = sample_doc_urldecode
    processed = result_urldecode['docs'][0]['doc']['_source']
    display_result("URL Decode Processor", original, processed)

delete_pipeline("demo_urldecode")


‚úÖ Pipeline 'demo_html_strip' created successfully

üìä HTML Strip Processor
üì• BEFORE:
{
  "content": "<p>This is <b>bold</b> and <i>italic</i> text</p>",
  "description": "<div class=\"desc\">Product <span>description</span></div>"
}

üì§ AFTER:
{
  "description": "\nProduct description\n",
  "content": "\nThis is bold and italic text\n"
}

üóëÔ∏è  Pipeline 'demo_html_strip' deleted
‚úÖ Pipeline 'demo_urldecode' created successfully

üìä URL Decode Processor
üì• BEFORE:
{
  "encoded_url": "hello%20world%21",
  "search_query": "machine%20learning%20algorithms"
}

üì§ AFTER:
{
  "search_query": "machine learning algorithms",
  "encoded_url": "hello world!"
}

üóëÔ∏è  Pipeline 'demo_urldecode' deleted


# 1Ô∏è‚É£4Ô∏è‚É£ SORT & FINGERPRINT Processors

## SORT Processor

### Purpose
Sorts array elements in ascending or descending order

### Use Cases
- Ordering array elements in documents
- Standardizing array order
- Data preparation

### Configuration
- **field**: Array field to sort
- **order**: Sort order (ascending/descending, default: ascending)

## FINGERPRINT Processor

### Purpose
Generates hash fingerprints of specified fields for deduplication

### Use Cases
- Deduplicating documents
- Generating unique identifiers
- Document matching

### Configuration
- **fields**: Array of fields to fingerprint
- **target_field**: Field to store fingerprint (default: fingerprint)


In [16]:

# Example 14a: Sort processor
sample_doc_sort = {
    "scores": [95, 42, 87, 23, 100],
    "names": ["Zoe", "Alice", "Bob"]
}

pipeline_sort = [
    {
        "sort": {
            "field": "scores"
        }
    },
    {
        "sort": {
            "field": "names"
        }
    }
]

create_pipeline("demo_sort", pipeline_sort, "Sort arrays")
result_sort = test_pipeline("demo_sort", sample_doc_sort)

if result_sort:
    original = sample_doc_sort
    processed = result_sort['docs'][0]['doc']['_source']
    display_result("Sort Processor", original, processed)

delete_pipeline("demo_sort")

# Example 14b: Fingerprint processor
sample_doc_fingerprint = {
    "email": "user@example.com",
    "name": "John Doe",
    "phone": "555-1234"
}

pipeline_fingerprint = [
    {
        "fingerprint": {
            "fields": ["email", "name"],
            "target_field": "document_fingerprint"
        }
    }
]

create_pipeline("demo_fingerprint", pipeline_fingerprint, "Generate fingerprints")
result_fingerprint = test_pipeline("demo_fingerprint", sample_doc_fingerprint)

if result_fingerprint:
    original = sample_doc_fingerprint
    processed = result_fingerprint['docs'][0]['doc']['_source']
    display_result("Fingerprint Processor", original, processed)

delete_pipeline("demo_fingerprint")


‚úÖ Pipeline 'demo_sort' created successfully

üìä Sort Processor
üì• BEFORE:
{
  "scores": [
    95,
    42,
    87,
    23,
    100
  ],
  "names": [
    "Zoe",
    "Alice",
    "Bob"
  ]
}

üì§ AFTER:
{
  "names": [
    "Alice",
    "Bob",
    "Zoe"
  ],
  "scores": [
    23,
    42,
    87,
    95,
    100
  ]
}

üóëÔ∏è  Pipeline 'demo_sort' deleted
‚úÖ Pipeline 'demo_fingerprint' created successfully

üìä Fingerprint Processor
üì• BEFORE:
{
  "email": "user@example.com",
  "name": "John Doe",
  "phone": "555-1234"
}

üì§ AFTER:
{
  "name": "John Doe",
  "phone": "555-1234",
  "email": "user@example.com",
  "document_fingerprint": "SHA-1@2.16.0:BJMPv+aO2tEAI4aGLbXfmfvsluM="
}

üóëÔ∏è  Pipeline 'demo_fingerprint' deleted


# 1Ô∏è‚É£5Ô∏è‚É£ KV & DISSECT Processors

## KV (Key-Value) Processor

### Purpose
Parses key-value pairs into separate fields

### Use Cases
- Parsing query strings
- Extracting log parameters
- Structured data extraction from strings

### Configuration
- **field**: Field containing key-value data
- **field_split**: Separator between key-value pairs
- **value_split**: Separator between keys and values
- **target_field**: Field to store result (default: adds to root)

## DISSECT Processor

### Purpose
Extracts fields from structured text using text patterns

### Use Cases
- Parsing system logs with known patterns
- Structured log extraction
- Pattern-based text parsing

### Configuration
- **field**: Field containing text
- **pattern**: Dissect pattern (e.g., "%{FIELD1} %{FIELD2}")
- **append_separator**: Separator for multi-value fields

In [17]:

# Example 15a: KV processor
sample_doc_kv = {
    "query_string": "user_id=123&action=login&timestamp=1234567890"
}

pipeline_kv = [
    {
        "kv": {
            "field": "query_string",
            "field_split": "&",
            "value_split": "=",
            "target_field": "parsed_params"
        }
    }
]

create_pipeline("demo_kv", pipeline_kv, "Parse key-value pairs")
result_kv = test_pipeline("demo_kv", sample_doc_kv)

if result_kv:
    original = sample_doc_kv
    processed = result_kv['docs'][0]['doc']['_source']
    display_result("KV Processor", original, processed)

delete_pipeline("demo_kv")

# Example 15b: Dissect processor
sample_doc_dissect = {
    "log_line": "2024-11-02 14:30:45 ERROR app.py:42 Database connection failed"
}

pipeline_dissect = [
    {
        "dissect": {
            "field": "log_line",
            "pattern": "%{TIMESTAMP} %{LEVEL} %{SOURCE} %{MESSAGE}"
        }
    }
]

create_pipeline("demo_dissect", pipeline_dissect, "Parse structured logs")
result_dissect = test_pipeline("demo_dissect", sample_doc_dissect)

if result_dissect:
    original = sample_doc_dissect
    processed = result_dissect['docs'][0]['doc']['_source']
    display_result("Dissect Processor", original, processed)

delete_pipeline("demo_dissect")


‚úÖ Pipeline 'demo_kv' created successfully

üìä KV Processor
üì• BEFORE:
{
  "query_string": "user_id=123&action=login&timestamp=1234567890"
}

üì§ AFTER:
{
  "parsed_params": {
    "user_id": "123",
    "action": "login",
    "timestamp": "1234567890"
  },
  "query_string": "user_id=123&action=login&timestamp=1234567890"
}

üóëÔ∏è  Pipeline 'demo_kv' deleted
‚úÖ Pipeline 'demo_dissect' created successfully

üìä Dissect Processor
üì• BEFORE:
{
  "log_line": "2024-11-02 14:30:45 ERROR app.py:42 Database connection failed"
}

üì§ AFTER:
{
  "MESSAGE": "app.py:42 Database connection failed",
  "TIMESTAMP": "2024-11-02",
  "SOURCE": "ERROR",
  "LEVEL": "14:30:45",
  "log_line": "2024-11-02 14:30:45 ERROR app.py:42 Database connection failed"
}

üóëÔ∏è  Pipeline 'demo_dissect' deleted


# 1Ô∏è‚É£6Ô∏è‚É£ CONTROL FLOW: DROP & FAIL

## DROP Processor

### Purpose
Drops/filters documents from indexing based on conditions

### Use Cases
- Filtering unwanted documents
- Conditional document exclusion
- Data validation

### Configuration
- **if**: Condition to check (Painless script)

## FAIL Processor

### Purpose
Raises an error and stops pipeline execution

### Use Cases
- Validation failure scenarios
- Data quality checks
- Pipeline error handling

### Configuration
- **if**: Condition to trigger failure
- **message**: Error message
- **message_fields**: Fields to include in message

## Example Flow

```mermaid
graph TD
    A["Document Arrives"] --> B{DROP Condition?}
    B -->|True| C["üóëÔ∏è Document Dropped"]
    B -->|False| D{FAIL Condition?}
    D -->|True| E["‚ùå Pipeline Failed"]
    D -->|False| F["‚úÖ Continue Processing"]
    
    style A fill:#e1f5ff
    style C fill:#ffcdd2
    style E fill:#ffcdd2
    style F fill:#c8e6c9
```

In [18]:
# Example 16a: Drop processor
print("\n" + "="*60)
print("üìä DROP Processor Example")
print("="*60)

sample_doc_drop_1 = {"status": "draft", "content": "This is a draft article"}
sample_doc_drop_2 = {"status": "published", "content": "This is published"}

pipeline_drop = [
    {
        "drop": {
            "if": "ctx.status == 'draft'"
        }
    }
]

create_pipeline("demo_drop", pipeline_drop, "Drop draft documents")

# Test both documents
for i, doc in enumerate([sample_doc_drop_1, sample_doc_drop_2], 1):
    print(f"\nüìÑ Document {i}: {doc['status']}")
    result_drop = test_pipeline("demo_drop", doc)
    if result_drop:
        doc_entry = result_drop['docs'][0]
        if doc_entry is None or 'skip_action' in doc_entry:
            print(f"   ‚úÖ Document was DROPPED")
        else:
            print(f"   ‚úÖ Document passed through")
            print(f"   Result: {doc_entry['doc']['_source']}")

delete_pipeline("demo_drop")

# Example 16b: Fail processor
print("\n" + "="*60)
print("üìä FAIL Processor Example")
print("="*60)

sample_doc_fail = {"price": -100, "product": "Invalid Product"}

pipeline_fail = [
    {
        "fail": {
            "if": "ctx.price < 0",
            "message": "Price cannot be negative"
        }
    }
]

create_pipeline("demo_fail", pipeline_fail, "Validate prices")
print("\nTesting document with negative price...")
result_fail = test_pipeline("demo_fail", sample_doc_fail)
if result_fail:
    if result_fail['docs'][0] and 'error' in result_fail['docs'][0]:
        print(f"‚ùå Pipeline error (expected): {result_fail['docs'][0]['error']}")
    else:
        print("Result:", json.dumps(result_fail, indent=2))

delete_pipeline("demo_fail")


üìä DROP Processor Example
‚úÖ Pipeline 'demo_drop' created successfully

üìÑ Document 1: draft
   ‚úÖ Document was DROPPED

üìÑ Document 2: published
   ‚úÖ Document passed through
   Result: {'content': 'This is published', 'status': 'published'}
üóëÔ∏è  Pipeline 'demo_drop' deleted

üìä FAIL Processor Example
‚úÖ Pipeline 'demo_fail' created successfully

Testing document with negative price...
‚ùå Pipeline error (expected): {'root_cause': [{'type': 'fail_processor_exception', 'reason': 'Price cannot be negative'}], 'type': 'fail_processor_exception', 'reason': 'Price cannot be negative'}
üóëÔ∏è  Pipeline 'demo_fail' deleted


# 1Ô∏è‚É£7Ô∏è‚É£ DOT_EXPANDER Processor

## Purpose
Converts dotted field names into nested objects

## Use Cases
- Converting flat keys to nested structures
- Data structure normalization
- Field hierarchy creation

## Configuration
- **field**: Dotted field name to expand
- **path**: Optional nested path

## Example Flow
```
Before:
{
  "user.name": "John",
  "user.email": "john@example.com",
  "user.age": 30
}

After:
{
  "user": {
    "name": "John",
    "email": "john@example.com",
    "age": 30
  }
}
```

In [19]:

# Example 17: Dot expander processor
sample_doc_dot = {
    "user.name": "John Doe",
    "user.email": "john@example.com",
    "user.age": 30
}

pipeline_dot = [
    {
        "dot_expander": {
            "field": "user.name"
        }
    },
    {
        "dot_expander": {
            "field": "user.email"
        }
    }
]

create_pipeline("demo_dot_expander", pipeline_dot, "Expand dotted fields")
result_dot = test_pipeline("demo_dot_expander", sample_doc_dot)

if result_dot:
    original = sample_doc_dot
    processed = result_dot['docs'][0]['doc']['_source']
    display_result("Dot Expander Processor", original, processed)

delete_pipeline("demo_dot_expander")


‚úÖ Pipeline 'demo_dot_expander' created successfully

üìä Dot Expander Processor
üì• BEFORE:
{
  "user.name": "John Doe",
  "user.email": "john@example.com",
  "user.age": 30
}

üì§ AFTER:
{
  "user.age": 30,
  "user": {
    "name": "John Doe",
    "email": "john@example.com"
  }
}

üóëÔ∏è  Pipeline 'demo_dot_expander' deleted


# 1Ô∏è‚É£8Ô∏è‚É£ SCRIPT Processor

## Purpose
Runs custom Painless scripts for transformations and enrichments

## Use Cases
- Complex custom transformations
- Computed fields
- Advanced data enrichment
- Conditional logic

## Configuration
- **source**: Painless script code
- **lang**: Language (default: painless)
- **params**: Script parameters
- **id**: Named script reference

## Common Script Patterns
```
// Calculate total
ctx.total = ctx.quantity * ctx.unit_price

// Add timestamp
ctx.ingestion_time = ctx._ingest.timestamp

// Conditional logic
if (ctx.status == 'active') {
  ctx.priority = 'high';
}
```

In [20]:

# Example 18: Script processor
sample_doc_script = {
    "quantity": 5,
    "unit_price": 20
}

pipeline_script = [
    {
        "script": {
            "source": "ctx.total_price = ctx.quantity * ctx.unit_price"
        }
    }
]

create_pipeline("demo_script", pipeline_script, "Run custom script")
result_script = test_pipeline("demo_script", sample_doc_script)

if result_script:
    original = sample_doc_script
    processed = result_script['docs'][0]['doc']['_source']
    display_result("Script Processor", original, processed)

delete_pipeline("demo_script")


‚úÖ Pipeline 'demo_script' created successfully

üìä Script Processor
üì• BEFORE:
{
  "quantity": 5,
  "unit_price": 20
}

üì§ AFTER:
{
  "quantity": 5,
  "unit_price": 20,
  "total_price": 100
}

üóëÔ∏è  Pipeline 'demo_script' deleted


# 1Ô∏è‚É£9Ô∏è‚É£ FOREACH & PIPELINE Processors

## FOREACH Processor

### Purpose
Applies another processor to each element of an array

### Use Cases
- Processing arrays of nested objects
- Applying transformations to array elements
- Complex nested data handling

### Configuration
- **field**: Array field to process
- **processor**: Processor to apply to each element
- **ignore_missing**: Don't fail if field is missing

## PIPELINE Processor

### Purpose
Runs an inner pipeline (composition/nesting)

### Use Cases
- Creating reusable modular pipelines
- Pipeline composition
- Complex pipeline management

### Configuration
- **name**: Name of pipeline to execute
- **ignore_missing_pipeline**: Don't fail if pipeline doesn't exist

In [21]:

# Example 19a: Foreach processor
sample_doc_foreach = {
    "items": [
        {"name": "item1", "price": "100"},
        {"name": "item2", "price": "200"}
    ]
}

pipeline_foreach = [
    {
        "foreach": {
            "field": "items",
            "processor": {
                "convert": {
                    "field": "_ingest._value.price",
                    "type": "float"
                }
            }
        }
    }
]

create_pipeline("demo_foreach", pipeline_foreach, "Process array elements")
result_foreach = test_pipeline("demo_foreach", sample_doc_foreach)

if result_foreach:
    original = sample_doc_foreach
    processed = result_foreach['docs'][0]['doc']['_source']
    display_result("Foreach Processor", original, processed)

delete_pipeline("demo_foreach")

# Example 19b: Pipeline processor (nested pipelines)
print("\n" + "="*60)
print("üìä PIPELINE Processor Example")
print("="*60)

# Create a simple inner pipeline
inner_pipeline = [
    {
        "lowercase": {
            "field": "status"
        }
    }
]

create_pipeline("inner_pipeline", inner_pipeline, "Inner pipeline")

# Create outer pipeline that calls inner
outer_pipeline = [
    {
        "pipeline": {
            "name": "inner_pipeline"
        }
    }
]

create_pipeline("outer_pipeline", outer_pipeline, "Outer pipeline")

sample_doc_pipeline = {"status": "ACTIVE"}
result_pipeline = test_pipeline("outer_pipeline", sample_doc_pipeline)

if result_pipeline:
    print("\nüì• BEFORE:")
    print(json.dumps(sample_doc_pipeline, indent=2))
    print("\nüì§ AFTER:")
    print(json.dumps(result_pipeline['docs'][0]['doc']['_source'], indent=2))

delete_pipeline("outer_pipeline")
delete_pipeline("inner_pipeline")


‚úÖ Pipeline 'demo_foreach' created successfully

üìä Foreach Processor
üì• BEFORE:
{
  "items": [
    {
      "name": "item1",
      "price": "100"
    },
    {
      "name": "item2",
      "price": "200"
    }
  ]
}

üì§ AFTER:
{
  "items": [
    {
      "name": "item1",
      "price": 100.0
    },
    {
      "name": "item2",
      "price": 200.0
    }
  ]
}

üóëÔ∏è  Pipeline 'demo_foreach' deleted

üìä PIPELINE Processor Example
‚úÖ Pipeline 'inner_pipeline' created successfully
‚úÖ Pipeline 'outer_pipeline' created successfully

üì• BEFORE:
{
  "status": "ACTIVE"
}

üì§ AFTER:
{
  "status": "active"
}
üóëÔ∏è  Pipeline 'outer_pipeline' deleted
üóëÔ∏è  Pipeline 'inner_pipeline' deleted


# 2Ô∏è‚É£0Ô∏è‚É£ GEOLOCATION Processors: GeoIP & IP2Geo

## GeoIP Processor

### Purpose
Adds geolocation information based on IP address

### Use Cases
- Enriching logs with location data
- Tracking user geographic location
- Network traffic analysis

### Configuration
- **field**: IP address field
- **target_field**: Field to store geolocation (default: geoip)
- **database_file**: Custom GeoIP database

## IP2Geo Processor

### Purpose
Similar to GeoIP, adds geolocation info for IP addresses

### Use Cases
- Alternative to GeoIP processor
- Geographic enrichment

## Geolocation Output Format
```json
{
  "geoip": {
    "country_iso_code": "US",
    "country_name": "United States",
    "location": {
      "lat": 37.386,
      "lon": -122.084
    },
    "city_name": "Mountain View"
  }
}
```

In [22]:

# Example 20: GeoIP processor
print("\n" + "="*60)
print("üìä GEOIP Processor Example")
print("="*60)

sample_doc_geoip = {
    "client_ip": "8.8.8.8"
}

pipeline_geoip = [
    {
        "geoip": {
            "field": "client_ip",
            "target_field": "geoip"
        }
    }
]

create_pipeline("demo_geoip", pipeline_geoip, "Enrich with geolocation")

try:
    result_geoip = test_pipeline("demo_geoip", sample_doc_geoip)
    if result_geoip and result_geoip['docs']:
        if 'error' in result_geoip['docs'][0]:
            print(f"Note: GeoIP database may not be available")
            print(f"Error: {result_geoip['docs'][0]['error']}")
        else:
            original = sample_doc_geoip
            processed = result_geoip['docs'][0]['doc']['_source']
            display_result("GeoIP Processor", original, processed)
except Exception as e:
    print(f"Note: GeoIP processor requires MaxMind database: {e}")

delete_pipeline("demo_geoip")

print("\nüí° Note: GeoIP databases require setup. The processor works with configured MaxMind databases.")



üìä GEOIP Processor Example
‚úÖ Pipeline 'demo_geoip' created successfully

üìä GeoIP Processor
üì• BEFORE:
{
  "client_ip": "8.8.8.8"
}

üì§ AFTER:
{
  "client_ip": "8.8.8.8",
  "geoip": {
    "continent_name": "North America",
    "country_name": "United States",
    "location": {
      "lon": -97.822,
      "lat": 37.751
    },
    "country_iso_code": "US"
  }
}

üóëÔ∏è  Pipeline 'demo_geoip' deleted

üí° Note: GeoIP databases require setup. The processor works with configured MaxMind databases.


# 2Ô∏è‚É£1Ô∏è‚É£ SPECIALIZED PROCESSORS Summary

## Advanced/Specialized Processors Overview

The following processors handle specialized use cases not yet covered:

### Text & Vector Processing
- **text_embedding**: Generates vector embeddings from text (ML models)
- **text_image_embedding**: Multimodal embeddings (text + images)
- **sparse_encoding**: Sparse vectors for neural search
- **text_chunking**: Splits text into chunks for processing

### Network & Metadata
- **community_id**: Generates network flow hashes
- **user_agent**: Extracts browser/device info
- **geojson-feature**: Indexes GeoJSON spatial data
- **date_index_name**: Auto-indexes to time-based indices
- **remove_by_pattern**: Removes fields matching patterns

## Quick Reference Matrix

```mermaid
graph TB
    subgraph Transformation["üìù Transformation"]
        T1["Append, Set, Join, Split"]
        T2["Copy, Rename, Remove"]
    end
    
    subgraph TextProcessing["üìÑ Text Processing"]
        TP1["CSV, JSON, Grok, KV"]
        TP2["HTML_Strip, Trim, Gsub"]
        TP3["Lowercase, Uppercase"]
    end
    
    subgraph DateGeo["üåç Date & Geo"]
        DG1["Date, Date_Index_Name"]
        DG2["GeoIP, IP2Geo, GeoJSON"]
    end
    
    subgraph DataTypes["üî¢ Data Types"]
        DT1["Convert, Bytes, Sort"]
    end
    
    subgraph Advanced["‚öôÔ∏è Advanced"]
        ADV1["Script, Foreach, Pipeline"]
        ADV2["Drop, Fail, Fingerprint"]
    end
    
    style Transformation fill:#e3f2fd
    style TextProcessing fill:#f3e5f5
    style DateGeo fill:#e8f5e9
    style DataTypes fill:#fff3e0
    style Advanced fill:#fce4ec
```

In [23]:

# Example additional processors - User Agent
print("\n" + "="*60)
print("üìä USER_AGENT Processor Example")
print("="*60)

sample_doc_ua = {
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}

pipeline_ua = [
    {
        "user_agent": {
            "field": "user_agent"
        }
    }
]

create_pipeline("demo_ua", pipeline_ua, "Extract user agent info")
result_ua = test_pipeline("demo_ua", sample_doc_ua)

if result_ua:
    if 'error' in result_ua['docs'][0]:
        print(f"Note: User agent processor details:")
        print(f"  - Extracts: OS name, OS version, browser name, browser version")
        print(f"  - Sample output includes device type classification")
    else:
        original = sample_doc_ua
        processed = result_ua['docs'][0]['doc']['_source']
        display_result("User Agent Processor", original, processed)

delete_pipeline("demo_ua")



üìä USER_AGENT Processor Example
‚úÖ Pipeline 'demo_ua' created successfully

üìä User Agent Processor
üì• BEFORE:
{
  "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}

üì§ AFTER:
{
  "user_agent": {
    "name": "Chrome",
    "original": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "os": {
      "name": "Windows",
      "version": "10",
      "full": "Windows 10"
    },
    "device": {
      "name": "Other"
    },
    "version": "91.0.4472.124"
  }
}

üóëÔ∏è  Pipeline 'demo_ua' deleted


# üìö COMPREHENSIVE PIPELINE EXAMPLE

## Real-World E-Commerce Log Pipeline

This example demonstrates a complete pipeline combining multiple processors to handle e-commerce server logs.

```mermaid
graph LR
    A["üì• Raw Log"] --> B["Grok"]
    B --> C["Date"]
    C --> D["GeoIP"]
    D --> E["Convert"]
    E --> F["Script"]
    F --> G["Remove"]
    G --> H["‚úÖ Processed"]
    
    style A fill:#e1f5ff
    style B fill:#fff3e0
    style C fill:#fff3e0
    style D fill:#fff3e0
    style E fill:#fff3e0
    style F fill:#fff3e0
    style G fill:#fff3e0
    style H fill:#c8e6c9
```

### Pipeline Stages:
1. **Grok**: Parse the log line into structured fields
2. **Date**: Normalize the timestamp
3. **GeoIP**: Enrich with location data
4. **Convert**: Convert numeric strings to numbers
5. **Script**: Calculate request duration
6. **Remove**: Remove sensitive fields


In [24]:

# Example: Comprehensive E-Commerce Pipeline
print("\n" + "="*60)
print("üìä COMPREHENSIVE ECOMMERCE PIPELINE")
print("="*60)

ecommerce_log = {
    "raw_log": '192.168.1.100 - customer [02/Nov/2024:14:30:45 +0000] "GET /api/products?sort=price HTTP/1.1" 200 5234',
    "response_time_ms": "250"
}

comprehensive_pipeline = [
    {
        "grok": {
            "field": "raw_log",
            "patterns": ["%{IP:client_ip} - %{DATA:username} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:method} %{DATA:path} HTTP/%{NUMBER:http_version}\" %{NUMBER:status_code:int} %{NUMBER:bytes:int}"]
        }
    },
    {
        "date": {
            "field": "timestamp",
            "formats": ["dd/MMM/yyyy:HH:mm:ss Z"],
            "target_field": "@timestamp"
        }
    },
    {
        "convert": {
            "field": "response_time_ms",
            "type": "integer"
        }
    },
    {
        "script": {
            "source": "ctx.response_time_sec = ctx.response_time_ms / 1000.0"
        }
    },
    {
        "remove": {
            "field": ["raw_log", "response_time_ms"]
        }
    },
    {
        "set": {
            "field": "environment",
            "value": "production"
        }
    }
]

create_pipeline("ecommerce_pipeline", comprehensive_pipeline, "E-commerce log processing")
result_comprehensive = test_pipeline("ecommerce_pipeline", ecommerce_log)

if result_comprehensive:
    print("\nüì• INPUT:")
    print(json.dumps(ecommerce_log, indent=2))
    print("\nüì§ OUTPUT:")
    print(json.dumps(result_comprehensive['docs'][0]['doc']['_source'], indent=2))

delete_pipeline("ecommerce_pipeline")



üìä COMPREHENSIVE ECOMMERCE PIPELINE
‚úÖ Pipeline 'ecommerce_pipeline' created successfully

üì• INPUT:
{
  "raw_log": "192.168.1.100 - customer [02/Nov/2024:14:30:45 +0000] \"GET /api/products?sort=price HTTP/1.1\" 200 5234",
  "response_time_ms": "250"
}

üì§ OUTPUT:
{
  "response_time_sec": 0.25,
  "status_code": 200,
  "method": "GET",
  "http_version": "1.1",
  "path": "/api/products?sort=price",
  "environment": "production",
  "@timestamp": "2024-11-02T14:30:45.000Z",
  "bytes": 5234,
  "client_ip": "192.168.1.100",
  "username": "customer",
  "timestamp": "02/Nov/2024:14:30:45 +0000"
}
üóëÔ∏è  Pipeline 'ecommerce_pipeline' deleted


# üéì SUMMARY & KEY TAKEAWAYS

## Processor Categories at a Glance

### 1. **Data Transformation** (8 processors)
Append, Copy, Remove, Remove_by_pattern, Rename, Set, Join, Split

### 2. **Type Conversion** (5 processors)
Bytes, Convert, Lowercase, Uppercase, Trim

### 3. **Text & String Parsing** (7 processors)
CSV, Dissect, Gsub, Grok, HTML_strip, JSON, KV

### 4. **Date & Time** (2 processors)
Date, Date_index_name

### 5. **Enrichment** (9 processors)
GeoIP, IP2Geo, Geojson-feature, User_agent, Community_id, Fingerprint, Text_embedding, Text_image_embedding, Sparse_encoding

### 6. **Text Processing** (2 processors)
Text_chunking, Sort

### 7. **URL Processing** (1 processor)
Urldecode

### 8. **Control Flow** (2 processors)
Drop, Fail

### 9. **Advanced/Scripting** (3 processors)
Script, Foreach, Pipeline

### 10. **Metadata/Utility** (1 processor)
Dot_expander

## Best Practices

‚úÖ **DO:**
- Use pipelines for consistent data pre-processing
- Test pipelines with sample data first
- Use named pipelines for reusability
- Implement error handling with on_failure
- Document your pipeline logic
- Set ignore_missing=true for optional fields

‚ùå **DON'T:**
- Put all processing into a single processor
- Forget to handle edge cases
- Use DROP without conditions
- Ignore performance with very large pipelines
- Skip validation and testing

## Learning Path

1. **Basic**: String manipulation, type conversion, field operations
2. **Intermediate**: Text parsing (CSV, JSON, Grok), date handling
3. **Advanced**: Scripts, control flow, nested pipelines, enrichment
4. **Expert**: Custom templates, performance optimization, error handling

# üìñ ADDITIONAL RESOURCES

## Processor Coverage

This notebook covers **40+ ingest processors**:

**100% Covered:**
- ‚úÖ Append, Bytes, Convert, Copy, CSV, Date, Dissect, Dot_expander, Drop, Fail
- ‚úÖ Fingerprint, Foreach, Geoip, Grok, Gsub, HTML_strip, Join, JSON, KV
- ‚úÖ Lowercase, Remove, Rename, Set, Sort, Split, Trim, Uppercase, Urldecode
- ‚úÖ User_agent, Script, Pipeline

**Specialized (Requires Setup):**
- üîß GeoJSON-feature (requires GeoJSON data)
- üîß Text_embedding, Text_image_embedding, Sparse_encoding (requires ML models)
- üîß Text_chunking (token-based chunking)
- üîß Date_index_name (time-based index naming)
- üîß Community_id (network flow analysis)
- üîß IP2Geo (alternative to GeoIP)
- üîß Remove_by_pattern (regex-based removal)

## Common Patterns

### Error Handling Pattern
```python
pipeline = [
    {
        "processor_name": {...},
        "on_failure": [
            {
                "set": {
                    "field": "error",
                    "value": "{{_ingest.on_failure_message}}"
                }
            }
        ]
    }
]
```

### Conditional Processing Pattern
```python
{
    "processor": {...},
    "if": "ctx.field_name == 'value'"
}
```

### Multi-Step Transformation Pattern
```python
# Process strings first
{"lowercase": {"field": "text"}},
# Then parse
{"grok": {"field": "text", ...}},
# Then validate
{"fail": {"if": "ctx.status == 'error'"}}
```

## Next Steps

1. **Explore your data**: What format is your data in?
2. **Choose processors**: Match data format to appropriate processors
3. **Build incrementally**: Test each processor
4. **Combine wisely**: Create efficient pipelines
5. **Monitor performance**: Check pipeline metrics
6. **Iterate and improve**: Refine based on results

# üîç QUICK REFERENCE TABLE

## All 40+ Processors at a Glance

| # | Processor | Category | Purpose | Input | Output |
|---|-----------|----------|---------|-------|--------|
| 1 | **Append** | Transformation | Add values to array | Array field | Extended array |
| 2 | **Bytes** | Conversion | Convert to bytes | "2MB" | 2097152 |
| 3 | **Community_ID** | Enrichment | Network flow hash | IP, ports | Hash string |
| 4 | **Convert** | Conversion | Change data type | "123" ‚Üí int | 123 |
| 5 | **Copy** | Transformation | Duplicate field | field_a | field_b = field_a |
| 6 | **CSV** | Text Parsing | Parse CSV data | "a,b,c" | {col1: a, col2: b, col3: c} |
| 7 | **Date** | Date/Time | Normalize dates | "01/02/2024" | ISO format |
| 8 | **Date_Index_Name** | Date/Time | Time-based index | Timestamp | Index name suffix |
| 9 | **Dissect** | Text Parsing | Pattern extraction | Log line | Parsed fields |
| 10 | **Dot_Expander** | Metadata | Nested objects | user.name | {user: {name: ...}} |
| 11 | **Drop** | Control Flow | Filter documents | Condition | Dropped/kept |
| 12 | **Fail** | Control Flow | Stop on error | Condition | Error raised |
| 13 | **Fingerprint** | Enrichment | Generate hash | Fields | Hash fingerprint |
| 14 | **Foreach** | Advanced | Array processor | Array | Processed array |
| 15 | **GeoIP** | Enrichment | Location from IP | IP address | Geolocation data |
| 16 | **GeoJSON-Feature** | Enrichment | Spatial indexing | GeoJSON | Indexed geo data |
| 17 | **Grok** | Text Parsing | Regex extraction | Text | Parsed fields |
| 18 | **Gsub** | String Manip | Substring replace | Text | Modified text |
| 19 | **HTML_Strip** | String Manip | Remove HTML tags | HTML text | Plain text |
| 20 | **IP2Geo** | Enrichment | Location from IP | IP address | Geolocation data |
| 21 | **Join** | Transformation | Array to string | Array | String |
| 22 | **JSON** | Text Parsing | Parse JSON string | JSON string | Object |
| 23 | **KV** | Text Parsing | Key-value pairs | "a=1&b=2" | {a: 1, b: 2} |
| 24 | **Lowercase** | String Manip | Convert to lower | "HELLO" | "hello" |
| 25 | **Pipeline** | Advanced | Nested pipeline | Any | Processed |
| 26 | **Remove** | Transformation | Delete fields | Multiple fields | Removed |
| 27 | **Remove_by_Pattern** | Transformation | Pattern delete | Regex pattern | Matched removed |
| 28 | **Rename** | Transformation | Rename field | field_a | field_b |
| 29 | **Script** | Advanced | Custom Painless | Context | Transformed |
| 30 | **Set** | Transformation | Set constant | Field | Value set |
| 31 | **Sort** | Transformation | Sort array | Array | Sorted array |
| 32 | **Sparse_Encoding** | Enrichment | Sparse vectors | Text | Sparse vector |
| 33 | **Split** | Transformation | String to array | "a,b,c" | [a, b, c] |
| 34 | **Text_Chunking** | Text Processing | Split text | Long text | Text chunks |
| 35 | **Text_Embedding** | Enrichment | Text vectors | Text | Vector array |
| 36 | **Text_Image_Embedding** | Enrichment | Multimodal vectors | Text/Image | Combined vector |
| 37 | **Trim** | String Manip | Remove whitespace | "  hello  " | "hello" |
| 38 | **Uppercase** | String Manip | Convert to upper | "hello" | "HELLO" |
| 39 | **URLDecode** | String Manip | Decode URL | "hello%20world" | "hello world" |
| 40 | **User_Agent** | Enrichment | Parse user agent | Browser string | Device info |

---

## Processor Selection Guide

### For CSV/Structured Data Processing
Use: **CSV**, **KV**, **Dissect**, **Grok**

### For Text Cleaning
Use: **Trim**, **Lowercase**/**Uppercase**, **HTML_Strip**, **Gsub**

### For Data Enrichment
Use: **GeoIP**, **User_Agent**, **Fingerprint**, **Text_Embedding**

### For Data Transformation
Use: **Convert**, **Split**/**Join**, **Rename**, **Copy**, **Set**

### For Error Handling
Use: **Drop**, **Fail**, **Script** (with conditions)

### For Complex Logic
Use: **Script**, **Pipeline**, **Foreach**

---

## Congratulations! üéâ

You've now explored 40+ OpenSearch ingest processors with real examples and use cases. 
You're ready to build production pipelines!