## Documentation

To read more about the ingest pipeline, checkout the docs [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html).

![ingest_pipeline_docs](../images/ingest_pipeline_docs.png)

## Connect to ElasticSearch

In [1]:
from pprint import pprint
from elasticsearch import Elasticsearch

es = Elasticsearch('http://localhost:9200')
client_info = es.info()
print('Connected to Elasticsearch!')
pprint(client_info.body)

Connected to Elasticsearch!
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'DlYG5m9gR3upn7qgaYyAJA',
 'name': '3d37442d2591',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2024-08-05T10:05:34.233336849Z',
             'build_flavor': 'default',
             'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '9.11.1',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.17.0',
             'number': '8.15.0'}}


## Create the pipeline

In [2]:
from pprint import pprint

response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

{'acknowledged': True}


## Get the pipeline

In [3]:
response = es.ingest.get_pipeline(id='lowercase_pipeline')
pprint(response.body)

{'lowercase_pipeline': {'description': 'This pipeline transforms the text to '
                                       'lowercase',
                        'processors': [{'lowercase': {'field': 'text'}}]}}


## Delete a pipeline

In [4]:
response = es.ingest.delete_pipeline(id='lowercase_pipeline')
pprint(response.body)

{'acknowledged': True}


## Simulate a pipeline

The simulate method allows you to give the pipeline fake data just to test if it is working or not. This is usually done before applying the pipeline to your real index and data.

In [5]:
response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

{'acknowledged': True}


Inside the docs list, we are providing some test data. After executing the cell, nothing will be indexed. You will just get back how the documents will look like after the transformation.

In [None]:
response = es.ingest.simulate(
    id='lowercase_pipeline',
    docs=[
        {
            "_index": "my_index",
            "_id": "1",
            "_source": {
                "text": "HELLO WORLD"
            }
        }
    ]
)
pprint(response.body)

{'docs': [{'doc': {'_id': '1',
                   '_index': 'my_index',
                   '_ingest': {'timestamp': '2024-10-30T07:20:35.164171477Z'},
                   '_source': {'text': 'hello world'},
                   '_version': '-3'}}]}


## Use the pipeline

Let's read the data and make the text uppercased to see if the `lowercase_pipeline` will be executed before indexing the documents.

In [9]:
import json

dummy_data = json.load(open("../data/dummy_data.json"))
for i, document in enumerate(dummy_data):
    uppercased_text = document['text'].upper()
    document['text'] = uppercased_text
    dummy_data[i] = document

dummy_data

[{'title': 'Sample Title 1',
  'text': 'THIS IS THE FIRST SAMPLE DOCUMENT TEXT.',
  'created_on': '2024-09-22'},
 {'title': 'Sample Title 2',
  'text': 'HERE IS ANOTHER EXAMPLE OF A DOCUMENT.',
  'created_on': '2024-09-24'},
 {'title': 'Sample Title 3',
  'text': 'THE CONTENT OF THE THIRD DOCUMENT GOES HERE.',
  'created_on': '2024-09-24'}]

In [10]:
es.indices.delete(index='my_index', ignore_unavailable=True)
es.indices.create(index='my_index')

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

Now, we pass the `lowercase_pipeline` to the bulk method. It will perform the transformations before indexing the documents.

In [11]:
operations = []
for document in dummy_data:
    operations.append({'index': {'_index': 'my_index'}})
    operations.append(document)

response = es.bulk(operations=operations, pipeline='lowercase_pipeline')
pprint(response.body)

{'errors': False,
 'ingest_took': 13,
 'items': [{'index': {'_id': 's0tN3JIBEY4rCUA-oBxn',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 0,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'tEtN3JIBEY4rCUA-oBxq',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 1,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'tUtN3JIBEY4rCUA-oBxq',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 2,
                      '_shards': {'failed': 0

After indexing the documents, we can see that the `text` field for all documents has been lowercased. This indicates the pipeline did run with no issues.

In [12]:
response = es.search(index='my_index')
hits = response.body['hits']['hits']

for hit in hits:
    print(hit['_source'])

{'title': 'Sample Title 1', 'created_on': '2024-09-22', 'text': 'this is the first sample document text.'}
{'title': 'Sample Title 2', 'created_on': '2024-09-24', 'text': 'here is another example of a document.'}
{'title': 'Sample Title 3', 'created_on': '2024-09-24', 'text': 'the content of the third document goes here.'}


## Pipeline failure

### 1. Not handling the failure

In this scenario, we don’t handle failures with `ignore_failure` or `on_failure`. Instead, the pipeline will raise an exception, halting execution of any further processes, and the document will not be indexed.

In [13]:
response = es.ingest.put_pipeline(
    id='pipeline_1',
    description='Pipeline with multiple transformations',
    processors=[
        {
            "lowercase": {
                "field": "text",
            }
        },
        {
            "set": {
                "field": "text",
                "value": "CHANGED BY PIPELINE",
            }
        },
    ]
)
pprint(response.body)

{'acknowledged': True}


In [14]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_1',
    body=document
)
pprint(response.body)

BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'field [text] not present as part of path [text]')

### 2. Handling the failure

To handle the failures, we use `ignore_failure` or define an `on_failure` block. With `ignore_failure`, the pipeline will skip over the failed step and continue executing subsequent processes without interrupting the flow, allowing other documents to be indexed.

Alternatively, with `on_failure`, we can specify custom error-handling steps, such as logging the error, retrying, or sending notifications, ensuring the pipeline proceeds even if one step encounters an issue.

In [15]:
response = es.ingest.put_pipeline(
    id='pipeline_2',
    description='Pipeline with multiple transformations, handling and ignoring failures',
    processors=[
        {
            "lowercase": {
                "field": "text",
                "on_failure": [
                    {
                        "set": {
                            "field": "text",
                            "value": "FAILED TO LOWERCASE",
                            "ignore_failure": True,
                        }
                    }
                ]
            }
        },
        {
            "set": {
                "field": "new_field",
                "value": "ADDED BY PIPELINE",
                "ignore_failure": True,
            }
        },
    ]
)
pprint(response.body)

{'acknowledged': True}


In [16]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_2',
    body=document
)
pprint(response.body)

{'_id': 'tktR3JIBEY4rCUA-DRw2',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 3,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_version': 1,
 'result': 'created'}


In [17]:
response = es.search(index='my_index')
hits = response.body['hits']['hits']
for hit in hits:
    print(hit['_source'])

{'title': 'Sample Title 1', 'created_on': '2024-09-22', 'text': 'this is the first sample document text.'}
{'title': 'Sample Title 2', 'created_on': '2024-09-24', 'text': 'here is another example of a document.'}
{'title': 'Sample Title 3', 'created_on': '2024-09-24', 'text': 'the content of the third document goes here.'}
{'text': 'FAILED TO LOWERCASE', 'title': 'Sample Title 4', 'created_on': '2024-09-25', 'new_field': 'ADDED BY PIPELINE'}
