## Documentation

To read more about the ingest pipeline, checkout the docs [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html).

![ingest_pipelines_docs](../images/ingest_pipelines_docs.png)

## Connect to ElasticSearch


In [1]:
import { Client } from "npm:@elastic/elasticsearch";
import { load } from "https://deno.land/std/dotenv/mod.ts";

const env = await load({ envPath: "../.env" });

const client = new Client({
  node: env.ELASTICSEARCH_NODE,
  auth: {
    apiKey: env.ELASTICSEARCH_API_KEY,
  },
});

const info = await client.info();
console.log(info);


{
  name: "fead23d3120d",
  cluster_name: "docker-cluster",
  cluster_uuid: "v3fUyW9OReext6IjPiOCqg",
  version: {
    number: "8.17.4",
    build_flavor: "default",
    build_type: "docker",
    build_hash: "c63c7f5f8ce7d2e4805b7b3d842e7e792d84dda1",
    build_date: "2025-03-20T15:39:59.811110136Z",
    build_snapshot: false,
    lucene_version: "9.12.0",
    minimum_wire_compatibility_version: "7.17.0",
    minimum_index_compatibility_version: "7.0.0"
  },
  tagline: "You Know, for Search"
}


## Create pipeline


In [7]:
await client.ingest.putPipeline({
  id: "lowercase_pipeline",
  description: "Transform text to lowercase",
  processors: [
    {
      lowercase: {
        field: "text",
      },
    },
  ],
});


{ acknowledged: [33mtrue[39m }

## Get the pipeline

In [8]:
await client.ingest.getPipeline({
  id: "lowercase_pipeline",
});


{
  lowercase_pipeline: {
    description: [32m"Transform text to lowercase"[39m,
    processors: [ { lowercase: { field: [32m"text"[39m } } ]
  }
}

## Delete a pipeline

In [4]:
await client.ingest.deletePipeline({
  id: "lowercase_pipeline",
});


{ acknowledged: [33mtrue[39m }

## Simulate a pipeline

The simulate method allows you to give the pipeline fake data just to test if it is working or not. This is usually done before applying the pipeline to your real index and data.

Inside the docs list, we are providing some test data. After executing the cell, nothing will be indexed. You will just get back how the documents will look like after the transformation.

In [9]:
await client.ingest.simulate({
  id: "lowercase_pipeline",
  docs: [
    {
      _index: "my_index",
      _id: "1",
      _source: {
        text: "Hello World",
      },
    },
  ],
});


{
  docs: [
    {
      doc: {
        _index: [32m"my_index"[39m,
        _version: [32m"-3"[39m,
        _id: [32m"1"[39m,
        _source: { text: [32m"hello world"[39m },
        _ingest: { timestamp: [32m"2025-04-09T11:12:43.632259486Z"[39m }
      }
    }
  ]
}

## Use the pipeline

Let's read the data and make the text uppercased to see if the `lowercase_pipeline` will be executed before indexing the documents.

In [None]:
import data from "../data/dummy_data.json" with { type: "json" };

for (const document of data) {
  document.text = document.text.toUpperCase();
}

console.log(data);


[
  {
    title: "Title 1",
    text: "THIS IS THE FIRST SAMPLE DOCUMENT TEXT.",
    createdAt: "2025-03-01"
  },
  {
    title: "Title 2",
    text: "HERE IS ANOTHER EXAMPLE OF A DOCUMENT.",
    createdAt: "2025-03-02"
  },
  {
    title: "Title 3",
    text: "THE CONTENT OF THE THIRD DOCUMENT GOES HERE.",
    createdAt: "2025-03-03"
  }
]


Create index

In [47]:
await client.indices.delete({ index: "my_index", ignore_unavailable: true });
await client.indices.create({ index: "my_index" });


{ acknowledged: [33mtrue[39m, shards_acknowledged: [33mtrue[39m, index: [32m"my_index"[39m }

Now, we pass the `lowercase_pipeline` to the bulk method. It will perform the transformations before indexing the documents.

In [48]:
const operations = [];
for (const document of data) {
  operations.push({
    index: {
      _index: "my_index",
    },
  });
  operations.push(document);
}

await client.bulk({
  operations,
  pipeline: "lowercase_pipeline",
});


{
  errors: [33mfalse[39m,
  took: [33m400[39m,
  ingest_took: [33m0[39m,
  items: [
    {
      index: {
        _index: [32m"my_index"[39m,
        _id: [32m"bIlTGpYBkrY7cs0FuRKv"[39m,
        _version: [33m1[39m,
        result: [32m"created"[39m,
        _shards: { total: [33m2[39m, successful: [33m1[39m, failed: [33m0[39m },
        _seq_no: [33m0[39m,
        _primary_term: [33m1[39m,
        status: [33m201[39m
      }
    },
    {
      index: {
        _index: [32m"my_index"[39m,
        _id: [32m"bYlTGpYBkrY7cs0FuRKv"[39m,
        _version: [33m1[39m,
        result: [32m"created"[39m,
        _shards: { total: [33m2[39m, successful: [33m1[39m, failed: [33m0[39m },
        _seq_no: [33m1[39m,
        _primary_term: [33m1[39m,
        status: [33m201[39m
      }
    },
    {
      index: {
        _index: [32m"my_index"[39m,
        _id: [32m"bolTGpYBkrY7cs0FuRKv"[39m,
        _version: [33m1[39m,
        result: [32m"create

After indexing the documents, we can see that the `text` field for all documents has been lowercased. This indicates the pipeline did run with no issues.

In [49]:
const response = await client.search({
  index: "my_index",
});

const hits = response.hits.hits;
for (const hit of hits) {
  console.log(hit._source);
}


{
  title: "Title 1",
  createdAt: "2025-03-01",
  text: "this is the first sample document text."
}
{
  title: "Title 2",
  createdAt: "2025-03-02",
  text: "here is another example of a document."
}
{
  title: "Title 3",
  createdAt: "2025-03-03",
  text: "the content of the third document goes here."
}


## Pipeline failure

### 1. Not handling the failure


In this scenario, we don’t handle failures with `ignore_failure` or `on_failure`. Instead, the pipeline will raise an exception, halting execution of any further processes, and the document will not be indexed.

In [50]:
await client.ingest.putPipeline({
  id: "my_pipeline",
  description: "Pipeline with multiple transformations",
  processors: [
    {
      lowercase: {
        field: "text",
      },
    },
    {
      set: {
        field: "text",
        value: "CHANGED BY PIPELINE",
      },
    },
  ],
});


{ acknowledged: [33mtrue[39m }

In [51]:
const document = {
  title: "Hello World",
  createdAt: "2025-04-01",
};

await client.index({
  index: "my_index",
  pipeline: "my_pipeline",
  document,
});


ResponseError: illegal_argument_exception
	Root causes:
		illegal_argument_exception: field [text] not present as part of path [text]

### 2. Handling the failure

To handle the failures, we use `ignore_failure` or define an `on_failure` block. With `ignore_failure`, the pipeline will skip over the failed step and continue executing subsequent processes without interrupting the flow, allowing other documents to be indexed.

Alternatively, with `on_failure`, we can specify custom error-handling steps, such as logging the error, retrying, or sending notifications, ensuring the pipeline proceeds even if one step encounters an issue.

In [53]:
await client.ingest.putPipeline({
  id: "my_pipeline_2",
  description: "Pipeline with multiple transformations",
  processors: [
    {
      lowercase: {
        field: "text",
        on_failure: [
          {
            set: {
              field: "text",
              value: "FAILED TO LOWERCASE",
              ignore_failure: true,
            },
          },
        ],
      },
    },
    {
      set: {
        field: "text",
        value: "ADDED BY PIPELINE",
        ignore_failure: true,
      },
    },
  ],
});


{ acknowledged: [33mtrue[39m }

In [54]:
const document = {
  title: "Hello World",
  createdAt: "2025-04-01",
};

await client.index({
  index: "my_index",
  pipeline: "my_pipeline_2",
  document,
});


{
  _index: [32m"my_index"[39m,
  _id: [32m"b4lUGpYBkrY7cs0FGxLI"[39m,
  _version: [33m1[39m,
  result: [32m"created"[39m,
  _shards: { total: [33m2[39m, successful: [33m1[39m, failed: [33m0[39m },
  _seq_no: [33m3[39m,
  _primary_term: [33m1[39m
}

In [55]:
const response = await client.search({
  index: "my_index",
});

const hits = response.hits.hits;
for (const hit of hits) {
  console.log(hit._source);
}


{
  title: "Title 1",
  createdAt: "2025-03-01",
  text: "this is the first sample document text."
}
{
  title: "Title 2",
  createdAt: "2025-03-02",
  text: "here is another example of a document."
}
{
  title: "Title 3",
  createdAt: "2025-03-03",
  text: "the content of the third document goes here."
}
{
  title: "Hello World",
  createdAt: "2025-04-01",
  text: "ADDED BY PIPELINE"
}
