# Analyze ingest data flow within Elasticsearch

The notebook provides insights to ingest pipelines flow for Elasticsearch cluster.

It might be useful to deobfuscate data flow for further refactoring of ingest pipelines.

## Prepare environment

### Install required Python packages

In [None]:
pip install pandas~=2.2 elasticsearch~=8.15

### Restart Jupiter kernel

In [None]:
get_ipython().kernel.do_shutdown(True)

### Import packages

In [None]:
import getpass
import json
import logging
from pathlib import Path
from elasticsearch import Elasticsearch, NotFoundError
from IPython.display import display, FileLink
import pandas as pd

### Input Elasticsearch connection settings

To connect Elasticsearh instance it's hostname and valid API key are required.

API key can be created via Kibana - [https://www.elastic.co/guide/en/kibana/current/api-keys.html](https://www.elastic.co/guide/en/kibana/current/api-keys.html)

In [None]:
elasticsearch_host = input("Enter Elasticsearch hostname: ").strip()

In [None]:
elasticsearch_api_key = getpass.getpass("Enter Elasticsearch API key:").strip()

### Create Elasticsearch client and connect the cluster

In [None]:
client = Elasticsearch(
    hosts=elasticsearch_host,
    api_key=elasticsearch_api_key,
    verify_certs=False,             # Elasticsearch certificate is signed by the non-public authority, so ignore any warning
    ssl_show_warn=False             # Unverified SSL/TLS connections cause a lot of warnings, so them should be supressed
)

In [None]:
print(client.cat.health())

## Analyze incoming data flow

When data arrives at Elasticsearch, its processing flow is governed by the following steps:

1. **Index Template Matching**
- If the target index does not exist, Elasticsearch matches the target index name against the index patterns defined in the index templates.
- The index template with the highest priority and a matching index pattern is selected to create the new index where the data will be indexed.

2. **Ingest Pipeline Reference**
- The incoming data can be directed through an ingest pipeline, which may be specified in a `/bulk` request (a write request to Elasticsearch) or directly within the index itself.

3. **Initial Data Processing**
- The data is first processed by the default ingest pipeline's processors before it is indexed.

4. **Pipeline Rerouting and Reindexing**
- An ingest pipeline can contain references to another ingest pipeline or include a reroute processor, which can redirect the data to a different index than initially defined.
- This rerouting allows the data to be processed according to the initial ingest pipeline and then potentially redirected and processed through the ingest pipeline of the destination index.
- Such behavior can complicate the data flow and lead to challenges in tracking and accountability of the incoming data.

5. **Final Ingest Pipeline Processing**
- If a final ingest pipeline is defined, the data passes through its processors before being indexed.

### Analyze flow for indices

#### Process description
1. **Retrieve Index Settings**:
   - Obtain the index settings from Elasticsearch to identify the `default_pipeline` and `final_pipeline` settings. These settings determine the pipelines that will process the data during indexing.

2. **Simulate Index API Usage**:
   - If the destination index does not exist, utilize the Simulate Index API to test and validate the pipeline configurations. More details can be found in the Elasticsearch documentation: [Simulate Index API](https://www.elastic.co/guide/en/elasticsearch/reference/current/simulate-index-api.html).

3. **Concatenate pipeline steps**:
   - Steps in pipelines are concatenated in the single list to represent them as CSV file
   - If step includes a **pipeline** processor the referenced pipeline is also traversed
   - **WARNING: reroute** processor is not supported yet   

In [None]:
# Configure logging
logging.basicConfig(level=logging.WARNING)

def get_index_ingest_pipelines(client, index_pattern):
    """
    Retrieve ingest pipelines associated with an index pattern.
    """
    try:
        response = client.indices.get_settings(index=index_pattern)
    except NotFoundError:
        logging.warning("Matching index does not exist. Using the Simulate index API to simulate pipeline index settings.")
        simulate_response = client.indices.simulate_index_template(name=index_pattern)
        response = {index_pattern: simulate_response.get("template")}

    if len(response) > 1:
        logging.warning(f"Return values contain {len(res_body)} indices; this might be a data stream or an index pattern.")

    pipelines = []
    for index, content in response.items():
        settings = content.get("settings", {}).get("index", {})
        pipelines.append({
            "index": index,
            "default_pipeline": settings.get("default_pipeline"),
            "final_pipeline": settings.get("final_pipeline"),
        })

    return pipelines

def traverse_pipeline(client, pipeline_id):
    """
    Recursively traverse and collect details of an ingest pipeline.
    """
    if not pipeline_id:
        return []

    try:
        pipeline = client.ingest.get_pipeline(id=pipeline_id).get(pipeline_id, {})
    except NotFoundError:
        return [{"pipeline_id": pipeline_id, "error": "Pipeline does not exist"}]

    journey = []
    for proc in pipeline.get("processors", []):
        proc_type = next(iter(proc), "undefined")
        journey.append({"pipeline_id": pipeline_id, "processor_type": proc_type, "processor_config": proc[proc_type]})

        if "pipeline" in proc:
            journey.extend(traverse_pipeline(client, proc["pipeline"].get("name")))

    return journey

# Read index pattern from input
index_pattern = input("Input index name to analyze the data flow: ").strip()

pipelines = get_index_ingest_pipelines(client, index_pattern)

# Traverse and display
for pipeline_info in pipelines:
    index = pipeline_info["index"]
    flow = traverse_pipeline(client, pipeline_info["default_pipeline"])
    flow.extend(traverse_pipeline(client, pipeline_info["final_pipeline"]))

    df = pd.DataFrame(flow)
    
    # Save DataFrame to CSV
    output_dir = Path('temp')
    output_dir.mkdir(parents=True, exist_ok=True)
    csv_path = output_dir / f"data_flow_{index}.csv"
    df.to_csv(csv_path, index=False)
    
    # Display a link to download the CSV
    display(FileLink(csv_path, result_html_prefix="Open CSV file: "))