# Connect to Elastic search

In [1]:
from pprint import pprint 
from elasticsearch import Elasticsearch

es = Elasticsearch(
    "https://localhost:9200", 
    basic_auth=("elastic", "6AqhOxi*CPXYvCZl7Iln"), 
    verify_certs=False)
client_info = es.info() 
print("Connected to Elasticsearch!")
pprint(client_info.body)

Connected to Elasticsearch!
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'mIJwhjTmStW54eKFEwQnMA',
 'name': 'f12c85f397e4',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2026-01-29T10:05:46.708397977Z',
             'build_flavor': 'default',
             'build_hash': '17b451d8979a29e31935fe1eb901310350b30e62',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '10.3.2',
             'minimum_index_compatibility_version': '8.0.0',
             'minimum_wire_compatibility_version': '8.19.0',
             'number': '9.3.0'}}


  _transport = transport_class(


# Create the pipeline

In [2]:
from pprint import pprint 

response = es.ingest.put_pipeline(
    id="lowercase_pipeline", 
    description='This pipeline transforms the text to lowercase', 
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
print(response.body)



{'acknowledged': True}


# Get the pipeline

In [3]:
response = es.ingest.get_pipeline(id="lowercase_pipeline")
pprint(response.body)

{'lowercase_pipeline': {'created_date_millis': 1771399062786,
                        'description': 'This pipeline transforms the text to '
                                       'lowercase',
                        'modified_date_millis': 1771399062786,
                        'processors': [{'lowercase': {'field': 'text'}}]}}




# Delete a pipeline

In [4]:
response = es.ingest.delete_pipeline(id='lowercase_pipeline')
pprint(response.body)

{'acknowledged': True}




# Simulate a pipeline

In [5]:
response = es.ingest.put_pipeline(
    id="lowercase_pipeline", 
    description="This pipeline transforms the text to lowercase", 
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)

pprint(response.body)



{'acknowledged': True}


In [6]:
response = es.ingest.simulate(
    id="lowercase_pipeline", 
    docs=[
        {
            "_index": "my_index", 
            "_id": "1", 
            "_source": {
                "text": "HELLO WORLD"
            }
        }
    ]
)

pprint(response.body)



{'docs': [{'doc': {'_id': '1',
                   '_index': 'my_index',
                   '_ingest': {'timestamp': '2026-02-18T07:33:55.472818892Z'},
                   '_source': {'text': 'hello world'},
                   '_version': '-3'}}]}


# Use the pipeline 
let's read the data and make the text uppercased to see if the lowercase_pipeline will be executed before indexing the documents. 

In [7]:
import json 

dummy_data = json.load(open("./data/dummy_data2.json"))
for i, document in enumerate(dummy_data): 
    uppercased_text = document['text'].upper()
    document['text'] = uppercased_text
    dummy_data[i] = document

dummy_data

[{'title': 'Sample Title 1',
  'text': 'THIS IS THE FIRST SAMPLE DOCUMENT TEXT.',
  'created_on': '2024-09-22'},
 {'title': 'Sample Title 2',
  'text': 'HERE IS ANOTHER EXAMPLE OF A DOCUMENT.',
  'created_on': '2024-09-24'},
 {'title': 'Sample Title 3',
  'text': 'THE CONTENT OF THE THIRD DOCUMENT GOES HERE.',
  'created_on': '2024-09-24'}]

In [8]:
es.indices.delete(index='my_index', ignore_unavailable=True)
es.indices.create(index='my_index')



ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

Now, we pass the lowercase_pipeline to the bulk method. It will perform the transactions before indexing the documents. 

In [9]:
operations = []
for document in dummy_data: 
    operations.append({'index': {'_index': 'my_index'}})
    operations.append(document)

response = es.bulk(operations=operations, pipeline='lowercase_pipeline')
pprint(response.body)



{'errors': False,
 'ingest_took': 0,
 'items': [{'index': {'_id': 'xEPUb5wBtWFRoivzek9a',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 0,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'xUPUb5wBtWFRoivzek9a',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 1,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'xkPUb5wBtWFRoivzek9a',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 2,
                      '_shards': {'failed': 0,

In [10]:
response = es.search(index='my_index')
hits = response.body['hits']['hits']

for hit in hits: 
    print(hit['_source'])

{'created_on': '2024-09-22', 'text': 'this is the first sample document text.', 'title': 'Sample Title 1'}
{'created_on': '2024-09-24', 'text': 'here is another example of a document.', 'title': 'Sample Title 2'}
{'created_on': '2024-09-24', 'text': 'the content of the third document goes here.', 'title': 'Sample Title 3'}




# Pipeline failure

# To handling the failure
To handle the failures, we use ignore_failure or define an on_failure block. With ignore_failure, the pipeline will skip over the failed step and continue executing subsequent process. 

In [11]:
response = es.ingest.put_pipeline(
    id='pipeline_2', 
    description='Pipeline with multiple transformations, handling and ignoring failures', 
    processors=[
        {
            "lowercase": {
                "field": "text",
                "on_failure": [
                    {
                        "set": {
                            "field": "text",
                            "value": "FAILED TO LOWERCASE",
                            "ignore_failure": True,
                        }
                    }
                ]
            }
        },
        {
            "set": {
                "field": "new_field",
                "value": "ADDED BY PIPELINE",
                "ignore_failure": True,
            }
        },
    ]
)

pprint(response.body)

{'acknowledged': True}




In [13]:
document = { 
    'title': 'Sample Title 4', 
    'created_on': '2024-09-25', 
}

response = es.index( 
    index='my_index', 
    pipeline='pipeline_2', 
    body=document
)

pprint(response.body)

{'_id': 'yEPab5wBtWFRoivz5k8s',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 4,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_version': 1,
 'result': 'created'}




In [14]:
response = es.search(index="my_index")
hits = response.body['hits']['hits']
for hit in hits: 
    print(hit['_source'])

{'created_on': '2024-09-22', 'text': 'this is the first sample document text.', 'title': 'Sample Title 1'}
{'created_on': '2024-09-24', 'text': 'here is another example of a document.', 'title': 'Sample Title 2'}
{'created_on': '2024-09-24', 'text': 'the content of the third document goes here.', 'title': 'Sample Title 3'}
{'created_on': '2024-09-25', 'new_field': 'ADDED BY PIPELINE', 'text': 'FAILED TO LOWERCASE', 'title': 'Sample Title 4'}
{'created_on': '2024-09-25', 'new_field': 'ADDED BY PIPELINE', 'text': 'FAILED TO LOWERCASE', 'title': 'Sample Title 4'}


