# Lab 1: Creación de un pipeline de Ingesta

A la hora de trabajar en un caso de uso real es muy común que los datos que quermos adquirir e insertar en Elasticsearch no tengan un formato correcto, bien porque sean fuentes de datos semiestructuradas o directamente no estructuradas, o bien porque queramos enriquecer los datos con ciertas operaciones para calcular inforamción extra, como por ejemplo extraer la localización de una petición a un servidor web a partir de la dirección IP de origen.

Para facilitar todas estas tareas Elasticsearch nos ofrece diferentes mecanismos, el que vamos a ver en esta práctica es el uso de pipelines de ingesta. Otra ventaja de utilizar este mecanismo, es que no necesitamos utilizar otras tecnologías diferentes a Elasticsearch y por tanto ayuda a simplificar nuestra arquitectura de la infraestructura de datos.

Los pipelines de ingesta nos van a permitir ejecutar transformaciones comunes sobre los datos antes de indexarlos, como por ejemplo eliminar campos, extraer valores de textos o enriquecer los datos.

Un pipeline consiste en una serie de tareas o tasks configurables denominados processors. Cada processor se ejecuta secuencialmente ejecutando los cambios especificados sobre los documentos de entrada. Los documentos resultantes serán la fuente de entrada del siguiente proccesor configurado. Una vez ejecutados todos los processors, Elasticsearch añadirá los documentos transformados en el índice que indiquemos en el pipeline.

<img src="../../images/els/ingest-process.svg" alt="ingest process"/>

Los pipelines de ingesta ofrecen una solución ligera para realizar tareas de transformación y manipulación de datos cuando no disponemos (o no nos interesa disponer) de una erramienta de ETL. Puesto que los pipelines se ejecutan en los nodos de Elasticsearch, se puede escalar la infraestructura necesara de forma sencilla como parte del cluster.

En el siguiente enlace podrás encontrar un listado con los processors de los que dispone Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/8.0/processors.html.

En esta práctica vamos a ver como crear, configurar y ejecutar un pipeline de ingesta de datos.

## 1. Creamos un pipeline de ingesta indicando los processors y su orden de ejecución

Más adelante veremos los tipos de processors que existen. Por ahora sólo fíjate en la sintaxis.

En este caso creamos un pipeline que sólo tiene un step definido con el processor set que añade un nuevo campo en el documento a indexar, "environment" y le asigna el valor "production" independientemente del contenido del documento.  

`
PUT _ingest/pipeline/logs-add-tag
{
  "description": "Adds a static tag for the environment the log originates from",
  "processors": [
    {
      "set": {
        "field": "environment",
        "value": "production"
      }
    }
  ]
}
`



## 2. Vamos a probar el pipeline de ingesta ejecutando algunos test a través de él

Para ello utilizamos el método _simulate

`
POST _ingest/pipeline/logs-add-tag/_simulate
{
  "docs": [
    {
      "_source": {
        "host.os": "macOS",
        "source.ip": "10.22.11.89"
      }
    }
  ]
}
`

En la salida veremos el nuevo campo añadido a demás de los anteriores:
"environment": "production"

## 3. Procesar los documentos con el pipeline de ingesta

Para ello tenemos tes pociones:

### 3a. Especificando el pipeline de ingest al indexar un documento 

`
POST log-index/_doc?pipeline=logs-add-tag
{
  "host.os": "windows 10",
  "source.ip": "113.121.143.90"
}
`

Vamos a comprobar que el pipeline de ingesta se ha ejecutado al insertar el documento. Para ello buscamos el documento que acabamos de insertar y comprobamos que el documento contiene el campo "environment":

`
POST log-index/_search
{
    "query": {
        "match" : {
            "host.os.keyword" : "windows 10"
        }
    }
}
`

### 3b. Indicando el pipeline a usar en una operación de tipo bulk

`
POST _bulk
{ "index" : { "_index" : "log-index", "_id" : "1","pipeline": "logs-add-tag" } }
{ "host.os" : "windows 7", "source.ip": "10.0.0.1" }
`

Comprobamos igual que antes que se ha añadido el campo "environment" como resultado de la ejecución del pipeline:

`
POST log-index/_search
{
    "query": {
        "match" : {
            "host.os.keyword" : "windows 7"
        }
    }
}
`

### 3c. Especificando el pipeline de ingesta por defecto como un setting de un índice

Set the default pipeline for an index as follows:
 
`
PUT log-index/_settings
{ "index.default_pipeline": "logs-add-tag" }
`

Note that the setting can also be set using an index template, just like any other index setting.
Index a document to the index without specifying any pipeline query parameters, and then search the index to confirm the document was tagged as expected:

`
POST log-index/_doc/
{
  "host.os": "linux",
  "source.ip": "10.10.10.1"
}
`

Volvemos a comprobar que se ha ejecutado de forma correcta el pipeline:

`
POST log-index/_search
{
    "query": {
        "match" : {
            "host.os.keyword" : "linux"
        }
    }
}
`

### 4. Gestionar los errores de ejecución de un pipeline

Los processors de un pipeline de ingesta se ejecutan secuencialmente. Por defecto si un processor falla en su ejecución, automáticamente se para el proceso de ingesta para el documento que se estaba ingestando.

Para gestionar los errores de ejecución, y modificar el comportamiento por defecto, tenemos tres alternativas que se pueden utilizar de forma simultánea:

#### 4a. Ignorar el error
Para ello utilizaremos el campo "ignore_failure" al definir el processor asignándole el valor "true". De esta manera, el porcessor ingorará el fallo y seguirá ejecutando los siguientes processors definidos.


`
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "ignore_failure": true
      }
    }
  ]
}
`

#### 4b. Especificar una lista de processors a ejecutar inmediatamene después de que un processor falle
Utilizando el campo "on_failure" de un processor pordemos definir la secuencia de processors a ejecutar cuando este falla. De esta forma podemos tratarán el error y por ejemplo añadir el campo "error.message" con el mensaje que indique el error producido:

`
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
`

Podemos anidar tantos processors como queramos utilizando el campo "on_failure"

`
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "description": "Set 'error.message.multi'",
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}
`

#### 4c. Definir una secuencia de processors general para todo el pipeline

Podemos definir una secuencia de processors general a ejecutar en el caso de que alguno de los processors del pipeline falle.

`
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}
`

Podemos encontrar información adicional del fallo en los metadatos del documento que son accesibles en el bloque "on_failure": on_failure_message, on_failure_processor_type, on_failure_processor_tag y on_failure_pipeline

`
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}
`







## Casos de uso más comunes

Vamos a ver algunos ejemplos a través de casos de uso muy comunes que nos podemos encontrar

### 1. Parsear los valores de los campos y extraer valores útiles para nuevos campos.

Processors utilizados:
* dissect: https://www.elastic.co/guide/en/elasticsearch/reference/current/dissect-processor.html#dissect-processor
* lowercase: https://www.elastic.co/guide/en/elasticsearch/reference/current/lowercase-processor.html#lowercase-processor
* remove: https://www.elastic.co/guide/en/elasticsearch/reference/current/remove-processor.html#remove-processor

Vamos a suponer que los documentos a insertar contienen para el campo "message" cadenas de texto con el siguiente formato:

`
"10:12:05 HTTP Monitor production is in GREEN state"
"10:12:05 HTTP Monitor production is in RED state"
`

En este caso hacen referencia a líneas de un fichero de log.

Con el processor dissect podemos parsear estas cadenas y extraer ciertos valores e insertarlos en nuevos campos en el documento de entrada. 

Por ejemplo, vamos a extraer la hora el nombre del monitor y el estado para insertarlo en los respectivos campos:

`
PUT _ingest/pipeline/processors-example-one
{
  "description": "Parse and extract log useful fields",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{time} HTTP Monitor %{monitor.name} is in %{monitor.state} state"
      }
    }
  ]
}
`

Probemos el pipeline utilizando la función sumulate:

`
POST _ingest/pipeline/processors-example-one/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "10:12:05 HTTP Monitor production is in GREEN state"
      }
    }
  ]
}
`

Para completar el pipeline, vamos a convertir en minúsculas el nuevo campo "monitor.state" utilizando el processor lowercase y a eliminar el campo original "message" con el processor remove. Lo haremos modificando el pipeline anteriror:

`
PUT _ingest/pipeline/processors-example-one
{
  "description": "Parse and extract log useful fields",
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{time} HTTP Monitor %{monitor.name} is in %{monitor.state} state"
      }
    },
    {
      "lowercase": {
        "field": "monitor.state"
      }
    },
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}
`

Comprobamos que el pipeline se ha modificado correctamente:

`
POST _ingest/pipeline/processors-example-one/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "10:12:05 HTTP Monitor production is in GREEN state"
      }
    }
  ]
}
`


### 2. Tagear un documento en función del valor de un campo de documento base

En este caso vamos a utilizar sentencias condicionales para crear un nuevo campo y asignarle un valor en función del contenido de un campo del documento base.

Processors utilizados:
* set: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-processor.html#set-processor


Suponemos que tenemos los siguientes documentos base:

`
{
  "environment": "production",
  "subnet": "CTS-01",
  "classification": "secret"
}
{
  "environment": "production",
  "subnet": "ATT-01",
  "classification": "unclassified"
}
`

El processor set utiliza un script para comprobar los valores de los campos "classification" y "subnet", si los campos cumplen la condición del script entonces se tagean con el vaor "protected" añadiendo dicho valor al campo "tag".

El campo "if" utiliza painless para definir la condición.

Nota: "ctx" hace referencia al documento base.

`
PUT _ingest/pipeline/processors-example-two
{
  "description": "Tag a document based on field value",
  "processors": [
    {
      "set": {
        "if": "ctx.classification=='secret' && ctx.subnet=='CTS-01'",
        "field": "tag",
        "value": "protected"
      }
    }
  ]
}
`

Comprobemos que el pipeline funciona correctamente:

`
POST _ingest/pipeline/processors-example-two/_simulate
{
  "docs": [
    {
      "_source": {
        "environment": "production",
        "subnet": "CTS-01",
        "classification": "secret"
      }
    },
    {
      "_source": {
        "environment": "production",
        "subnet": "ATT-01",
        "classification": "unclassified"
      }
    }
  ]
}
`

### 3. Descartar eventos de log no deseados basándose en los valores de los campos de tal forma que no se ingesten en el índice.

En este caso vamos a descartar documentos en función del contenido de uno o varios campos. Para ello vamos primero a crear un uevo campo "tag" con el valor "drop" si cumple la condición descrita por el processor script y después vamos a descartar los documentos cuyo campo "tag" contengan el valor "drop" utilizando el porcessor drop.

Processors utilizados:
* script: https://www.elastic.co/guide/en/elasticsearch/reference/current/drop-processor.html#drop-processor
* drop: https://www.elastic.co/guide/en/elasticsearch/reference/current/drop-processor.html#drop-processor

El formato de los ducumentos base es el siguiente:

`
{
  "environment": "production",
  "subnet": "CTS-01",
  "event_code": "AS-32"
}
{
  "environment": "production",
  "subnet": "ATT-01",
  "event_code": "AS-29"
}
`

Creamos el pipeline que procese los documentos y descarte aquellos que tienen un event_code no permitido:
El processor script permite utilizar varios lenguajes de scripting:
* painless: Por defecto sino se indica el calpo "lang". Leguaje de scripting propio de Elasticsearch. https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-painless.html#modules-scripting-painless
* expression: Lenguaje de scripting de Lucene. https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-expression.html#modules-scripting-expression
* mustache: Pensado para templates. https://mustache.github.io/
* Java: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-painless.html#modules-scripting-painless

`
PUT _ingest/pipeline/processors-example-three
{
  "description": "Dop document based on field value",
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": """
          def disallowedCodes = ["AS-29","BA-23"];
          if (disallowedCodes.contains(ctx.event_code)) {
            ctx.tag = "drop";
          }
          """
        }
    },
    {
      "drop": {
        "if": "ctx.tag == 'drop'"
      }
    }
  ]
}
`

Comprobemos el correcto funcionamiento del pipeline que acabamos de definir:

`
POST _ingest/pipeline/processors-example-three/_simulate
{
  "docs": [
    {
      "_source": {
        "environment": "production",
        "subnet": "CTS-01",
        "event_code": "AS-32"
      }
    },
    {
      "_source": {
        "environment": "production",
        "subnet": "ATT-01",
        "event_code": "AS-29"
      }
    }
  ]
}
`

### 4. Enrutar e indexar documentos en el ídice correcto de Elasticsearch basándose en el valor de los campos del documento.

Para indicar en que índice se insertará un documento, vamos a sobreescribir el campo "_index" que le indica a elasticsearch que índice es el que debe utilizar para indexar un documento.

Processors utilizados:
* set: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-processor.html#set-processor

Para ello utilizaremos el processor set y asignaremos al campo "_index" el resultado de concatenar el valor del campo "application" del documento base con el valor del campo "environment" del documento base. De esta forma conseguiremos tener separados en diferentes índices los documentos de cada aplicación y por entorno.

Los documentos base tienen el siguiente formato:

`
{
  "environment": "production",
  "application": "apache"
}
{
  "environment": "dev",
  "application": "apache"
}
`


Creamos el pipeline:

`
PUT _ingest/pipeline/processors-example-four
{
  "description": "Route document into correct index based on field value",
  "processors": [
    {
      "set": {
        "field": "_index",
        "value": "{{application}}-{{environment}}"
      }
    }
  ]
}
`

Comprobemos el correcto funcionamiento del pipeline:

`
POST _ingest/pipeline/processors-example-four/_simulate
{
  "docs": [
    {
      "_source": {
        "environment": "production",
        "application": "apache"
      }
    },
    {
      "_source": {
        "environment": "dev",
        "application": "apache"
      }
    }
  ]
}
`






### 5. Enmascarar información sensible almacenada en los valores de los campos del documento.

Otro caso de uso muy util es poder enmascarar información sensible como en este caso los números de tarjetas de crédito. 

Processors utilizados:
* gsub: https://www.elastic.co/guide/en/elasticsearch/reference/current/gsub-processor.html#gsub-processor


Vamos a suponer el siguiente formato de documento base:

`
{
  "message": "Customer A1121 paid with 5555555555554444"
}
{
  "message": "Customer A1122 paid with 378282246310005"
}
`

Para ello vamos a definir una expresión regular que sea capaz de extraer el número de una tarjeta de credito y lo utilizaremos con el processor gsub que permite modificar una cadena aplicando expresiones regulares.

`
PUT _ingest/pipeline/processors-example-five
{
  "description": "Strip sensitive information from the fields in documents",
  "processors": [
    {
      "gsub": {
        "field": "message",
        "pattern": "\\b(?:3[47]\\d|(?:4\\d|5[1-5]|65)\\d{2}|6011)\\d{12}\\b",
        "replacement": "xxxx-xxxx-xxxx-xxxx"
      }
    }
  ]
}
`

Probamos el funcionamiento del pipeline:

`
POST _ingest/pipeline/processors-example-five/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "Customer A1121 paid with 5555555555554444"
      }
    },
    {
      "_source": {
        "message": "Customer A1122 paid with 378282246310005"
      }
    }
  ]
}
`




### 6. Enriquecer la el campo que contiene una dirección IP pública con información geográfica.

También podemos utilizar el processor geoip para buscar las coordenadas geográficas de una IP y enriquecer el documento base con esta información.

Processors utilizados:
* geoip: https://www.elastic.co/guide/en/elasticsearch/reference/current/geoip-processor.html#geoip-processor
* rename: https://www.elastic.co/guide/en/elasticsearch/reference/current/rename-processor.html#rename-processor

El formato del documento base que utilizaremos es:
`
{
  "source_ip": "194.121.12.154"
}
`

Definimos el pipeline para ello primero extraemos las coordenadas geográficas a partir del campo "soruce_ip" del documento base y dejamos los valores obtenidos en el campo "source.geo". Después renombramos el campo source_ip por "source.ip" para embeberlo dendro del documento source junto con la información geográfica. 

`
PUT _ingest/pipeline/processors-example-six
{
  "description": "Enrich the public IP address fields with geo-location information",
  "processors": [
    {
      "geoip": {
        "field": "source_ip",
        "target_field": "source.geo"
      }
    },
    {
      "rename": {
        "field": "source_ip",
        "target_field": "source.ip"
      }
    }
  ]
}
`

Comprobamos su funcionamiento. Puedes probar con tu IP publica.

`
POST _ingest/pipeline/processors-example-six/_simulate
{
  "docs": [
    {
      "_source": {
        "source_ip": "194.121.12.154"
      }
    }
  ]
}
`
