# Investigating conversion of Elastic ingest pipelines into Cribl worker mappers

## Install prerequisite packages

In [65]:
!pip install elasticsearch google-generativeai ipywidgets

[0m

## Create a connection to Elasticsearch

In [66]:
from elasticsearch import Elasticsearch
import os

# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = os.getenv('ELASTIC_PASSWORD')

# Create the client instance
client = Elasticsearch(
    "https://es01:9200",
    basic_auth=("elastic", ELASTIC_PASSWORD),
    ca_certs="/certs/ca/ca.crt"
)

# Successful response!
body = client.info().body
# {'name': 'instance-0000000000', 'cluster_name': ...}

body

{'name': 'es01',
 'cluster_name': 'docker-cluster',
 'cluster_uuid': 'ngA1MWFSRQWti_GPCHBW3g',
 'version': {'number': '8.15.0',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179',
  'build_date': '2024-08-05T10:05:34.233336849Z',
  'build_snapshot': False,
  'lucene_version': '9.11.1',
  'minimum_wire_compatibility_version': '7.17.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'You Know, for Search'}

## Retrieve and manipulate the ingest pipeline

In [67]:
PIPELINE_NAME = 'logs-crowdstrike.fdr-1.39.2'

pipeline = client.ingest.get_pipeline(id=PIPELINE_NAME).body

In [68]:
import json

processors = pipeline[PIPELINE_NAME]['processors']

processors_json = json.dumps(processors)

## Connect to Gemini API

In [69]:
system_instruction="""
You are an expert in Elasticsearch ingest pipelines. 

You will receive a JSON representation of an ingest pipeline fragment containing processors. 
Your task is to explain this pipeline step-by-step, describing each processor in the order they appear. 

For each processor:
- Identify the processor type.
- Explain its purpose. Describe how it modifies or interacts with the document being processed. If it is a script processor, explain what the code block is doing in details.
- Specify the link to it's documentation page on elastic.co documentation site. Provide link as text only.
- Mention any conditions applied to the processor.
- Specify the event tags, if the event has tags.

Provide a new paragraph when describing each processor without numbering the paragraph. 
Preserve the order of processor appearance in the pipeline fragment. 
Identify processors one at a time and don't group processors.

If you encounter any unfamiliar processors or need additional information, consult the official Elasticsearch documentation to ensure accuracy. 
Provide a clear, concise explanation that a developer familiar with Elasticsearch could understand.
"""

prompt = """
 Describe the processors in the following ingest pipeline fragment:
"""

In [70]:
import google.generativeai as genai
import os

genai.configure(api_key=os.environ["GEMINI_API_KEY"])

In [71]:
# Response JSON schema
import typing_extensions as typing


class Processor(typing.TypedDict):
    processor_type: str
    purpose: str
    documentation_link: str
    conditions: list[str]
    tags: str
    

In [72]:
import json

ARRAY_LENGTH=40

def split_array(arr, X):
    return [arr[i:i + X] for i in range(0, len(arr), X)]

model_parameters = {
    "system_instruction": system_instruction
}

model = genai.GenerativeModel('gemini-1.5-flash', **model_parameters)

generation_config=genai.types.GenerationConfig(
    response_mime_type="application/json",
    response_schema=list[Processor],
    temperature=0,
    
)

combined_response = []

for processor_chunk in split_array(processors, ARRAY_LENGTH):
    response = model.generate_content(prompt + json.dumps(processor_chunk), generation_config=generation_config)
    combined_response.extend(json.loads(response.text))


In [73]:
from IPython.display import display, JSON

display(JSON(combined_response))

<IPython.core.display.JSON object>

In [74]:
print(json.dumps(combined_response, indent=4))

[
    {
        "conditions": "ctx.event?.original == null",
        "documentation_link": "https://www.elastic.co/guide/en/elasticsearch/ingest/current/rename-processor.html",
        "processor_type": "rename",
        "purpose": "Renames the \"message\" field to \"event.original\" if the \"event.original\" field is null. This ensures that the original message is preserved for later processing.",
        "tags": "message-to-original"
    },
    {
        "conditions": null,
        "documentation_link": "https://www.elastic.co/guide/en/elasticsearch/ingest/current/json-processor.html",
        "processor_type": "json",
        "purpose": "Decodes the JSON content in the \"event.original\" field and stores the resulting object in the \"crowdstrike\" field.",
        "tags": "json-decoding"
    },
    {
        "conditions": null,
        "documentation_link": "https://www.elastic.co/guide/en/elasticsearch/ingest/current/remove-processor.html",
        "processor_type": "remove",
     