Skip to content

Releases: richm/richm.github.io

How to include vars and tasks in Ansible

15 Jun 23:02
Compare
Choose a tag to compare
how-to-include-vars-and-tasks-in-ansible

how-to-include-vars-and-tasks-in-ansible.md

How to find dynamic field definitions in Elasticsearch

21 Mar 20:53
Compare
Choose a tag to compare

When writing data to Elasticsearch, the first time it sees a field that isn't already in the field mapping, it will try to detect the data type of the field based on the JSON type and some rules as specified here: dynamic field mapping rules, and it will add that field to the field mapping for the index. For example, if Elasticsearch gets "field1":9999, it will create a field mapping which maps "field1" to a long type:

...
  "field2" : {
    "type" : "long"
  },
...

Also, in Elasticsearch, this data is immutable in the index - it can be created but not changed without reindexing. When using origin-aggregated-logging with MERGE_JSON_LOG=true, this can cause problems when different applications create fields with the same name but with incompatible data types. For example, suppose another application writes a log with "field2" having a different data type "field2":{"field21":"string","field22":1000} ("field2" is a JSON hash). You will get an error like this from Elasticsearch:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "mapper_parsing_exception",
        "reason" : "failed to parse [field2]"
      }
    ],
    "type" : "mapper_parsing_exception",
    "reason" : "failed to parse [field2]",
    "caused_by" : {
      "type" : "json_parse_exception",
      "reason" : "Current token (START_OBJECT) not numeric, can not use numeric value accessors\n at [Source: org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper@6340a27e; line: 1, column: 12]"
    }
  },
  "status" : 400
}

The error message is a bit verbose, but it means the field value {"field21":"string","field22":1000} is not a numeric value (because it is a Hash).

How to view the dynamic mappings

If you want to see what are the mappings that have been added dynamically, use the field mapping API. For example, with origin-aggregated-logging:

oc exec -c elasticsearch $espod -- es_util --query=_all/_mapping/*/field/field2?pretty
{
  ".kibana" : {
    "mappings" : { }
  },
  ".operations.2019.03.21" : {
    "mappings" : {
      "com.redhat.viaq.common" : {
        "field2" : {
          "full_name" : "field2",
          "mapping" : {
            "field2" : {
              "type" : "long"
            }
          }
        }
      }
    }
  },
  ".searchguard" : {
    "mappings" : { }
  }
}

Where $espod is the name of one of your Elasticsearch pods. You may have multiple definitions for a field, one for each index in which the field is defined. In origin-aggregated-logging, we create a new index for each day and for each namespace, so you may have many such definitions, and you may have different definitions, if some other index has "field2" with a different type. With the _mapping command above, the fields are listed by index. You can use a tool like jq to parse apart the JSON returned.

Can I force Elasticsearch to store everything as a string?

Not exactly. Using the dynamic templates API you might consider adding a dynamic mapping that forces every value to be a string:

{
    "order": 20,
    "mappings": {
      "_default_": {
        "dynamic_templates": [
        {
          "force_all_to_string": {
            "match_mapping_type": "*",
            "mapping": {
              "type": "text",
              "fields": {
                "raw": {
                  "type":  "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        }
        ]
      }
    },
    "template": ".operations.*"
}
cat force_string_template.json | oc exec -i -c elasticsearch $espod -- es_util --query=_template/force_all_to_string -X PUT -d@-

And that seems to work for some types:

oc exec -c elasticsearch $espod -- es_util --query=.operations.2019.03.23/com.redhat.viaq.common?pretty -XPOST -d '{"field1":"stringval"}'
{
  "_index" : ".operations.2019.03.23",
  "_type" : "com.redhat.viaq.common",
  "_id" : "AWmiLVg1uPjkpmElglqW",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "created" : true
}

Now I can add a numeric field1 value which will be converted to a string:

oc exec -c elasticsearch $espod -- es_util --query=.operations.2019.03.23/com.redhat.viaq.common?pretty -XPOST -d '{"field1":1000}'
{
  "_index" : ".operations.2019.03.23",
  "_type" : "com.redhat.viaq.common",
  "_id" : "AWmiLdOnuPjkpmElglqY",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "created" : true
}

However, if field1 is a Hash or Array:

oc exec -c elasticsearch $espod -- es_util --query=.operations.2019.03.23/com.redhat.viaq.common?pretty -XPOST -d '{"field1":{"field11":"value"}}'
{
  "error" : {
    "root_cause" : [
      {
        "type" : "mapper_parsing_exception",
        "reason" : "failed to parse [field1]"
      }
    ],
    "type" : "mapper_parsing_exception",
    "reason" : "failed to parse [field1]",
    "caused_by" : {
      "type" : "illegal_state_exception",
      "reason" : "Can't get text on a START_OBJECT at 1:11"
    }
  },
  "status" : 400
}

Elasticsearch cannot convert the value to a string.

How to parse JSON strings with embedded JSON strings

26 Feb 00:29
Compare
Choose a tag to compare

How to parse JSON strings with embedded JSON strings

Many times I have to deal with JSON documents in which the values are still
more embedded JSON documents. For example, from Kubernetes operator framework
registry
:

{
  "csvName": "etcdoperator.v0.9.2",
  "csvJson": "{\"apiVersion\":\"operators.coreos.com/v1alpha1\",...}"
  "object": [
    "{\"apiVersion\":\"apiextensions.k8s.io/v1beta1\",...}",
    "{\"apiVersion\":\"apiextensions.k8s.io/v1beta1\",...}",
    "{\"apiVersion\":\"operators.coreos.com/v1alpha1\",...}",
    "{\"apiVersion\":\"apiextensions.k8s.io/v1beta1\",...}"
  ]
}

Where the values for "csvJson" and the "object" array are large JSON
encoded blobs. I would rather view this as "exploded" JSON:

{
  "csvJson": {
    "kind": "ClusterServiceVersion",
    "spec": {
      "replaces": "etcdoperator.v0.9.0",
      "displayName": "etcd",
      ...
    },
  },
  "object": [
    {
      "kind": "CustomResourceDefinition",
      "spec": {
        "scope": "Namespaced",
        ...
      },
    },
    {
      "kind": "CustomResourceDefinition",
      "spec": {
        "scope": "Namespaced",
        "version": "v1beta2",
        ...
      }
    },
    ...
  ]
}

Here is a short python program to print this format:

import sys,json
def obj_hook(dd):
  for k,v in dd.iteritems():
    if isinstance(v,basestring) and (v.startswith("{") or v.startswith("[")):
      v = json.loads(v, object_hook=obj_hook)
      dd[k] = v
    elif isinstance(v,list):
      for ii, v1 in enumerate(v):
        if isinstance(v1,basestring) and (v1.startswith("{") or v1.startswith("[")):
          v1 = json.loads(v1, object_hook=obj_hook)
          v[ii] = v1
  return dd
print json.dumps(json.load(sys.stdin, object_hook=obj_hook), indent=2)

This assumes you are feeding the JSON document from stdin.

How to send Application logs to external destination and Operations to internal

05 Feb 21:24
Compare
Choose a tag to compare

How to send Application logs to external destination and Operations to internal

I want to configure my OpenShift EFK stack Fluentd to send all Application logs
to an external forward, but keep all Operations logs in the internal
Elasticsearch/Kibana.

The product provides a way to send all logs off the cluster using
Fluentd secure_forward
"Configuring Fluentd to Send Logs to an External Log Aggregator" but this
method is not very flexible in terms of selecting different output
destinations. It sends all logs, both Operations and logs from all
Applications, to the secure_forward destination. This document describes a way
to make those log destinations more flexible.

How it Works

The ES_HOST and ES_PORT options used to be used when we supported the ES
COPY
feature, now unsupported, to have a different Elasticsearch for
Application logs and Operations logs. However, you can use these to hack the
code into thinking that there are different output destinations for Application
logs and Operations logs. The fact that ES_HOST != OPS_HOST will "trick"
Fluentd into setting up two different output streams for Applications logs and
Operations logs, but will not actually use ES_HOST because we will be setting
up the Applications logs destination to not use the Elasticsearch plugin.

First Steps

These are the steps you will need to do first steps, before getting into the
actual detailed routing steps.

  • Make a working directory for holding your copies of various configmaps used
    by Fluentd. You will use this directory for subsequent steps below. The
    steps below assume the current directory is this directory:

      mkdir working_dir
      cd working_dir
    
  • Make sure you are in the logging project. This will be openshift-logging
    for later versions (3.10 or later) or logging for earlier versions:

      oc project openshift-logging
      OR
      oc project logging
    
  • Make your own copy of output-applications.conf and output-operations.conf
    from a running Fluentd pod:

Assuming $fluentd_pod is the name of a running Fluentd pod:

oc exec $fluentd_pod -- \
  cat /etc/fluent/configs.d/openshift/output-applications.conf > \
  output-applications.conf
oc exec $fluentd_pod -- \
  cat /etc/fluent/configs.d/openshift/output-operations.conf > \
  output-operations.conf
  • Make a copy of the current logging-fluentd configmap:

      oc extract configmap/logging-fluentd --to=.
    
  • Make a copy of the original fluent.conf:

      if [ ! -f fluent.conf.orig ] ; then
        cp fluent.conf fluent.conf.orig
      fi
    
  • Edit fluent.conf to use your customized output-applications.conf and
    output-operations.conf

Where you see the lines for

@include configs.d/openshift/output-operations.conf
@include configs.d/openshift/output-applications.conf

Change these to

@include configs.d/user/output-operations.conf
@include configs.d/user/output-applications.conf

Application logs to external and Operations logs to internal EFK

This assumes you want to forward all Application logs to your external Fluentd
secure_forward listener, and keep all Operations logs in the internal
Elasticsearch.

  • Edit output-applications.conf

Remove the following lines from the file:

<match retry_es>
   @type copy
   @include output-es-retry.conf
</match>

   @include output-es-config.conf
   @include ../dynamic/output-remote-syslog.conf
   @include ../user/output-extra-*.conf

That is, the file should look like this:

<match **>https://github.com/richm/docs/releases/tag/20190205142308
   @type copy
   @include ../user/secure-forward.conf
</match>

That is, there is only the output for your secure-forward config.

  • Edit output-operations.conf

Remove the following lines from the file:

   @include ../user/secure-forward.conf

That is, the file should look like this:

<match retry_es>
   @type copy
   @include output-es-retry.conf
</match>
<match **>
   @type copy
   @include output-es-config.conf
   @include ../dynamic/output-remote-syslog.conf
   @include ../user/output-extra-*.conf
</match>

That is, it should skip your secure-forward config.

Application logs to external and discard/delete Operations logs

This assumes you want to forward all Application logs to your external Fluentd
secure_forward listener, and discard and delete Operations logs (i.e. you are
not going to use Elasticsearch/Kibana at all).

  • Edit output-applications.conf

Remove the following lines from the file:

<match retry_es>
   @type copy
   @include output-es-retry.conf
</match>

   @include output-es-config.conf
   @include ../dynamic/output-remote-syslog.conf
   @include ../user/output-extra-*.conf

That is, the file should look like this:

<match **>
   @type copy
   @include ../user/secure-forward.conf
</match>

That is, there is only the output for your secure-forward config.

  • Edit output-operations.conf

It should contain only the following:

<match output_ops_tag journal.** system.var.log** mux.ops audit.log** %OCP_FLUENTD_TAGS%>
  @type null
</match>

The null plugin will throw away all of these logs.

Application logs from specific namespaces/pods/containers

NOTE: This only works when using the docker --log-driver=json-file or
when using CRI-O as the log driver. Unfortunetly, the --log-driver=journald
handling code in Fluentd strips all of the useful tagging information.

This assumes you want to forward all Application logs to your external Fluentd
secure_forward listener, and keep all Operations logs in the internal
Elasticsearch. In addition, you want to forward only logs from specific
projects and/or pods and/or containers. First, perform the above steps for
Application logs to external and Operations logs to internal EFK. Then
perform these additional steps.

  • Create a file called filter-post-retag-apps.conf

This file will contain some filters and matches based on the criteria you want
to use for routing.

  • Create filters/retaggers

The tag routing is based on field names, and those field names must be top
level fields, not nested fields, so we'll have to create some top level fields
to use.

<filter kubernetes.**>
  @type record_transformer
  enable_ruby
  <record>
    kubernetes_namespace_name ${record["kubernetes"] && record["kubernetes"]["namespace_name"] ? record["kubernetes"]["namespace_name"] : "undefined_namespace_name"}
    kubernetes_pod_name ${record["kubernetes"] && record["kubernetes"]["pod_name"] ? record["kubernetes"]["pod_name"] : "undefined_pod_name"}
    kubernetes_container_name ${record["kubernetes"] && record["kubernetes"]["container_name"] ? record["kubernetes"]["container_name"] : "undefined_container_name"}
  </record>
</filter>

This will add 3 new top level fields to the record for the namespace, pod, and
container names, which will be used to retag the records for routing. Then use
a custom rewrite_tag_filter.

<match kubernetes.**> # match all kubernetes container logs
  @type rewrite_tag_filter
  @label @OUTPUT
  rewriterule1 kubernetes_namespace_name ^this-is-my-project$ output_tag
  rewriterule2 kubernetes_pod_name ^my-app output_tag
  rewriterule3 kubernetes_container_name mongodb$ output_tag
  rewriterule4 message .+ output_ops_tag
</match>

The rules are applied in order. Once a rule is matched, no further processing
is done. The first rule says "If the namespace name exactly matches the string
'this-is-my-project', route to the output destination for Application logs".
The second rule says "If the pod name begins with the string 'my-app', route to
the output destination for Application logs". The third rule says "If the
container name ends with 'mongodb', route to the output destination for
Application logs". The fourth rule is the "default" rule which describes what
to do with all logs that didn't match any of the other rules. It says "If the
record has a 'message' field (all records should have a 'message' field), route
to the destination for Operations logs".

  • Edit fluent.conf to include your filter-post-retag-apps.conf

Find these two lines in fluent.conf

  @include configs.d/openshift/filter-viaq-data-model.conf
  @include configs.d/openshift/filter-post-*.conf

Add the following between these two lines:

  @include configs.d/user/filter-post-retag-apps.conf

so that the result looks like this:

  @include configs.d/openshift/filter-viaq-data-model.conf
  @include configs.d/user/filter-post-retag-apps.conf
  @include configs.d/openshift/filter-post-*.conf

Last Steps

These are the last steps to perform after performing all of the First Steps and
Routing Specific steps.

  • Recreate the logging-fluentd configmap with your new files:

      oc delete configmap logging-fluentd
      oc create configmap logging-fluentd --from-file=.
    
  • Set ES_HOST and restart all Fluentd pods:

      oc set env daemonset/logging-fluentd ES_HOST=value-is-not-used
    

Setting ES_HOST to a value other than its current value will also trigger all
Fluentd pods to be restarted. If you have already changed the...

Read more

How to compare semantic versions strings in bash using python

10 Oct 20:29
Compare
Choose a tag to compare

This isn't pure bash, but the pure bash implementations I've found are pretty
long and ugly. This solution gives me the balance I'm looking for between pure
bash and readability.

I use the following shell function. My version strings are of the form
release-X.Y or master. master should always be the latest version.

compare_versions() {
    local aver="$1"
    local op="$2"
    local bver="$3"
    if [ "$aver" = master ] ; then aver=release-9999 ; fi
    if [ "$bver" = master ] ; then bver=release-9999 ; fi
    python -c 'import sys
from pkg_resources import parse_version
sys.exit(not parse_version(sys.argv[1])'"${op}"'parse_version(sys.argv[2]))' "$aver" "$bver"
}

For example, if the version is release-3.11 and I want to take some action if
the release is release-3.10 or later:

if compare_versions $version '>=' release-3.10 ; then
    echo do something for newer versions
else
    echo do something else for older versions
fi

If $version is release-3.10, release-3.11, or master it will hit the
echo do something for newer versions branch, otherwise it will hit the other
branch.

Elasticsearch - add fields to index template

04 Sep 17:51
Compare
Choose a tag to compare

Elasticsearch - add fields to index template

Intro

Problem: How to add fields to the default index template so that they are
parsed using a different syntax than string?

These instructions are primarily for OpenShift logging but should apply to any
Elasticsearch installation by removing the OpenShift specific bits. They also
apply to Elasticsearch 2.x for OpenShift 3.4 -> 3.10, so may require
some tweaking to work with ES 5.x. The instructions assume your logging
namespace is logging - use openshift-logging with OpenShift 3.10 and later.

OpenShift logging Elasticsearch 2.x uses the following default index templates for
.operations.* and project.* indices:
https://github.com/ViaQ/elasticsearch-templates/releases/download/0.0.16/com.redhat.viaq-openshift-operations.2.4.4.template.json
and
https://github.com/ViaQ/elasticsearch-templates/releases/download/0.0.16/com.redhat.viaq-openshift-project.2.4.4.template.json
Any undefined field not specified here will be treated as a string:

"mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "string_fields": {
            "mapping": {
              "index": "analyzed",
              "norms": {
                "enabled": true
              },
              "type": "string"
            },
            "match": "*",
            "match_mapping_type": "string"

If you have time valued fields that you want to use for sorting and collation, you
will need to define these separately in separate index templates.

Steps

Identify the index pattern for the indices you want to add the fields to. For OpenShift
logging this will be .operations.* or project.*. If there is a specific project that
will only have these fields, you can specify a specific index pattern for the indices for
this project e.g. project.this-project-has-time-fields.*.

Create a JSON file for each index pattern, like this:

{
    "order": 20,
    "mappings": {
      "_default_": {
        "properties": {
          "mytimefield1": {
            "doc_values": true,
            "format": "yyyy-MM-dd HH:mm:ss,SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZ||yyyy-MM-dd'T'HH:mm:ssZ||dateOptionalTime",
            "index": "not_analyzed",
            "type": "date"
          },
          "mytimefield2": {
            "doc_values": true,
            "format": "yyyy-MM-dd HH:mm:ss,SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZ||yyyy-MM-dd'T'HH:mm:ssZ||dateOptionalTime",
            "index": "not_analyzed",
            "type": "date"
          }
        }
      }
    }
    "template": ".operations.*"
}

Call this one add-time-fields-to-operations-indices.json.

{
    "order": 20,
    "mappings": {
      "_default_": {
        "properties": {
          "mytimefield1": {
            "doc_values": true,
            "format": "yyyy-MM-dd HH:mm:ss,SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZ||yyyy-MM-dd'T'HH:mm:ssZ||dateOptionalTime",
            "index": "not_analyzed",
            "type": "date"
          },
          "mytimefield2": {
            "doc_values": true,
            "format": "yyyy-MM-dd HH:mm:ss,SSSZ||yyyy-MM-dd'T'HH:mm:ss.SSSSSSZ||yyyy-MM-dd'T'HH:mm:ssZ||dateOptionalTime",
            "index": "not_analyzed",
            "type": "date"
          }
        }
      }
    }
    "template": "project.*"
}

Call this one add-time-fields-to-project-indices.json.

Load these into Elasticsearch. You'll need the name of one of the Elasticsearch
pods:

oc get -n logging pods -l component=es

Pick one and call it $espod. If you have a separate OPS cluster, you'll need
to identify one of the es-ops Elasticsearch pods too, for the .operations.*
indices:

oc get -n logging pods -l component=es-ops

Pick one and call it $esopspod.

Load the file add-time-fields-to-project-indices.json into $espod:

file=add-time-fields-to-project-indices.json
cat $file | \
oc exec -n logging -i -c elasticsearch $espod -- \
    curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_template/$file -XPUT -d@- | \
python -mjson.tool

Load the file add-time-fields-to-operations-indices.json into $esopspod, or
$espod if you do not have a separate OPS cluster:

file=add-time-fields-to-operations-indices.json
cat $file | \
oc exec -n logging -i -c elasticsearch $esopspod -- \
    curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_template/$file -XPUT -d@- | \
python -mjson.tool

NOTE The settings will not apply to existing indices. You will need to
perform a reindexing for that to work. However, it is usually not a problem,
as the settings will apply to new indices, and curator will eventually delete
the old ones.

Results

To see if this is working, wait until new indices are created, and use the
oc exec command as above to query:

oc exec -n logging -i -c elasticsearch $esopspod -- \
    curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/project.*/_search?sort=mytimefield2:desc | \
python -mjson.tool

You should see your records sorted in descending order by the mytimefield2 field
with the latest time listed first.

Elasticsearch - change number of shards for index template

26 Jul 23:21
Compare
Choose a tag to compare

Elasticsearch - change number of shards for index template

Intro

These instructions are primarily for OpenShift logging but should apply to any
Elasticsearch installation by removing the OpenShift specific bits. They also
apply to Elasticsearch 2.x for OpenShift 3.4 -> 3.10, so may require
some tweaking to work with ES 5.x. The instructions assume your logging
namespace is logging - use openshift-logging with OpenShift 3.10 and later.

The default number of shards per index for OpenShift logging is 1, which is by
design not to break very large deployments with a large number of indices,
where the problem is having too many shards. However, for deployments with a
small number of very large indices, this can be problematic. Elasticsearch
recommends keeping shard size under 50GB, so increasing the number of shards
per index can help with that.

Steps

Identify the index pattern you want to increase sharding for. For
OpenShift logging this will be .operations.* or project.*. If there are
specific projects that typically generate much more data than others, and you
need to keep the number of shards down, you can shard very specific patterns
e.g. project.this-project-generates-too-many-logs.*. If you don't anticipate
having many namespaces/project/indices, you can just use project.*.

Create a JSON file for each index pattern, like this:

{
    "order": 20,
    "settings": {
        "index": {
            "number_of_shards": 3
        }
    },
    "template": ".operations.*"
}

Call this one more-shards-for-operations-indices.json.

{
    "order": 20,
    "settings": {
        "index": {
            "number_of_shards": 3
        }
    },
    "template": "project.*"
}

Call this one more-shards-for-project-indices.json.

Load these into Elasticsearch. You'll need the name of one of the Elasticsearch
pods:

oc get -n logging pods -l component=es

Pick one and call it $espod. If you have a separate OPS cluster, you'll need
to identify one of the es-ops Elasticsearch pods too, for the .operations.*
indices:

oc get -n logging pods -l component=es-ops

Pick one and call it $esopspod.

Load the file more-shards-for-project-indices.json into $espod:

file=more-shards-for-project-indices.json
cat $file | \
oc exec -n logging -i -c elasticsearch $espod -- \
    curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_template/$file -XPUT -d@- | \
python -mjson.tool

Load the file more-shards-for-operations-indices.json into $esopspod, or
$espod if you do not have a separate OPS cluster:

file=more-shards-for-operations-indices.json
cat $file | \
oc exec -n logging -i -c elasticsearch $esopspod -- \
    curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_template/$file -XPUT -d@- | \
python -mjson.tool

NOTE The settings will not apply to existing indices. You will need to
perform a reindexing for that to work. However, it is usually not a problem,
as the settings will apply to new indices, and curator will eventually delete
the old ones.

Results

To see if this is working, wait until new indices are created, and use the
_cat endpoints to view the new indices/shards:

oc exec -c elasticsearch $espod -- \
curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_cat/indices?v
health status index                                                                        pri rep docs.count docs.deleted store.size pri.store.size 
green  open   project.kube-service-catalog.d5dbe052-903c-11e8-8c22-fa163e6e12b8.2018.07.26   3   0       1395            0      2.2mb          2.2mb

The pri value is now 3 instead of the default 1. This means there are 3
shards for this index. You can also check the shards endpoint:

oc exec -c elasticsearch $espod -- \
curl -s -k --cert /etc/elasticsearch/secret/admin-cert \
    --key /etc/elasticsearch/secret/admin-key \
    https://localhost:9200/_cat/shards?v
index                                                                        shard prirep state     docs   store ip         node                            

project.kube-service-catalog.d5dbe052-903c-11e8-8c22-fa163e6e12b8.2018.07.26 1     p      STARTED    596 683.3kb 10.131.0.8 logging-es-data-master-vksc2fwe 
project.kube-service-catalog.d5dbe052-903c-11e8-8c22-fa163e6e12b8.2018.07.26 2     p      STARTED    590 652.6kb 10.131.0.8 logging-es-data-master-vksc2fwe 
project.kube-service-catalog.d5dbe052-903c-11e8-8c22-fa163e6e12b8.2018.07.26 0     p      STARTED    602 628.1kb 10.131.0.8 logging-es-data-master-vksc2fwe

This lists the 3 shards for the index. If you have multiple Elasticsearch
nodes, you should see more than one node listed in the node column of the
_cat/shards output.

Retry Handling with fluent-plugin-elasticsearch

11 May 16:13
Compare
Choose a tag to compare

Retry Handling with fluent-plugin-elasticsearch

Thanks to @jcantrill et. al. the retry logic in fluent-plugin-elasticsearch has
been greatly improved. These changes are in version 1.16.1 and later:
https://github.com/uken/fluent-plugin-elasticsearch/releases/tag/v1.16.1

  • When a bulk index request is rejected, records that can't be processed at all
    (i.e. would clog the pipe and prevent other records from being processed) are
    redirected to @error label processing:
    https://docs.fluentd.org/v0.12/articles/config-file#@error-label

  • When a record can be retried, it is re-submitted back to fluentd for
    processing. The primary use case is when a record is rejected due to a
    429 - bulk index queue is full, or other resource problem in Elasticsearch.
    In this case, clients are supposed to back-off and resubmit the operation at
    a later time, as described here
    https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_monitoring_individual_nodes.html#_threadpool_section
    in the Bulk Rejections box.

  • When a record is rejected with a 409 - Conflict - the duplicate record
    is not resubmitted. This relies on using a unique ID per record, and setting
    write_operation create in the fluent-plugin-elasticsearch configuration
    (see below).

Unique ID per record

All records must be assigned a unique ID. OpenShift logging does it with the
new elasticsearch_genid plugin type provided by fluent-plugin-elasticsearch.
https://github.com/openshift/origin-aggregated-logging/blob/master/fluentd/configs.d/openshift/filter-post-genid.conf#L3

<filter **>
  @type elasticsearch_genid
  hash_id_key viaq_msg_id
</filter>

You will have to add a section like this to your fluentd config in order to
assign a unique ID to each record. The @type elasticsearch plugin config
should be changed like this:

@type elasticsearch
...
id_key viaq_msg_id
write_operation 'create'

This tells the plugin to use the generated ID for the _id
https://www.elastic.co/guide/en/elasticsearch/guide/2.x/_document_metadata.html#_id
metadata field associated with the record. This also tells the plugin to use the
create
https://www.elastic.co/guide/en/elasticsearch/guide/2.x/create-doc.html
operation to add the new record instead of the default index, which will
update the record if it exists. In this case, we want Elasticsearch to return
409 so that we will not attempt to resubmit this record.

Retry handling

When fluent-plugin-elasticsearch resubmits a failed record that is a candidate
for a retry (e.g. the request returned a 429 for the record), the record is
resubmitted back into the fluentd record queue for processing. By default, it
is submitted back to the very beginning of processing, and will go back through
all of your processing pipeline again. You will usually want to avoid this.
There are two ways to route records so that you can control the reprocessing -
tagging and/or labeling. You can use either separately or both in conjunction.

Retry tagging

There is a new parameter in fluent-plugin-elasticsearch - retry_tag -
https://github.com/uken/fluent-plugin-elasticsearch#retry_tag

@type elasticsearch
...
retry_tag retry_es

It is best to use a tag that isn't used by anything else and will not match
other filters, to avoid having records reprocessed. When records are
resubmitted, they will use this tag instead of whatever tag they were using
before. Then you can have something like this in your fluentd config:

# handler for initial attempt and retries
<match retry_es **my_initial_tag**>
  @type elasticsearch
  ...
  retry_tag retry_es
  ... other config ...
</match>

This is good for a simple case where you don't have much processing, and you
only have one or two matches that don't use @type copy. You can't do this
for @type copy because your retried records would be sent again to all of
your copy destinations, causing duplicate records at those destinations. In
that case, you will need to define a separate @type elasticsearch config to
handle only the Elasticsearch retries:

# handler for retries
<match retry_es>
  @type elasticsearch
  ...
  retry_tag retry_es
  ... other config ...
</match>
# handler for initial attempt and copy destinations
<match **my_initial_tag**>
  @type copy
  <store>
    @type elasticsearch
    ...
    retry_tag retry_es
    ... other config ...
  </store>
  <store>
    @type secure_forward
    ...
  </store>
</match>

This is the method used by OpenShift logging as there may be multiple copy
destinations specified by the user. For example:
https://github.com/openshift/origin-aggregated-logging/blob/master/fluentd/configs.d/openshift/output-operations.conf
The configuration of the retry fluent-plugin-elasticsearch should be identical
to the initial fluent-plugin-elasticsearch. This is config duplication, so
you'll need to use some other method to keep it DRY e.g. move all of the common
config to a separate file and @include it.

Retry labeling

You can also specify a label
https://docs.fluentd.org/v0.12/articles/life-of-a-fluentd-event#labels and
https://docs.fluentd.org/v0.12/articles/routing-examples#input--%3E-filter--%3E-output-with-label
to route records to. To do this, add a @label @LABEL_NAME to your
fluent-plugin-elasticsearch config:

@type elasticsearch
...
@label @RETRY_ES

Then add a <label @RETRY_ES> section to your config:

<label @RETRY_ES>
  <match **>
    @type elasticsearch
    ...
    @label @RETRY_ES
  </match>
</label>

With labeling, you can ensure that the retried record is sent directly to the
label section, avoiding the rest of your processing pipeline.

You can use labeling and tagging both:

@type elasticsearch
...
@label @RETRY_ES
retry_tag retry_es

Then add a <label @RETRY_ES> section to your config:

<label @RETRY_ES>
  <match retry_es>
    @type elasticsearch
    ...
    @label @RETRY_ES
    retry_tag retry_es
  </match>
</label>

The configuration of the retry fluent-plugin-elasticsearch should be identical
to the initial fluent-plugin-elasticsearch. This is config duplication, so
you'll need to use some other method to keep it DRY e.g. move all of the common
config to a separate file and @include it.

Other parameters related to retries

The parameters max_retry_wait, disable_retry_limit, retry_limit, and
retry_wait won't be as important when using the configuration specified
above, because the retries will happen internally to the
fluent-plugin-elasticsearch. In a sense, that code path will be avoided. They
will still be used in the case where the error is due to connection or network
issues e.g. cannot connect to Elasticsearch.

Error record handling

Records that have "hard" errors (schema violations, corrupted data, etc.) that
cannot be retried will be sent to the @ERROR handler. Think of this as a
"dead letter queue", for messages that cannot be delivered. If you add a
<label @ERROR> section to your fluentd config, you can handle these records
how you want. For example:

<label @ERROR>
  <match **>
    @type file
    path /var/log/fluent/dlq
    time_slice_format %Y%m%d
    time_slice_wait 10m
    time_format %Y%m%dT%H%M%S%z
    compress gzip
  </match>
</label>

This will write error records to the dlq file. See
https://docs.fluentd.org/v0.12/articles/out_file for more information about the
file output.