Skip to content

Commit

Permalink
Use _doc if ES major version is 7 (elastic#9056)
Browse files Browse the repository at this point in the history
Update the Elasticsearch output and template generator to set the type
to _doc if Elasticsearch major version is 7.

Use common.Version throughout
- Use common version instead of strings + parsing over and over again.
- Having the parsed version available earlier, we can now configure the
  default document type based on version ranges.
- Use `_doc` if version is >= 7.0.

Introduce unit tests for version aware bulk encoding.

(cherry picked from commit 41f87a4)
  • Loading branch information
Steffen Siering committed Dec 5, 2018
1 parent afaeb92 commit ca70cfa
Show file tree
Hide file tree
Showing 31 changed files with 409 additions and 252 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -79,6 +79,7 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

*Affecting all Beats*
- Unify dashboard exporter tools. {pull}9097[9097]
- Use _doc as document type of the Elasticsearch major version is 7. {pull}9056[9056]

- Dissect will now flag event on parsing error. {pull}8751[8751]
- Added the `redirect_stderr` option that allows panics to be logged to log files. {pull}8430[8430]
Expand Down
21 changes: 7 additions & 14 deletions filebeat/beater/filebeat.go
Expand Up @@ -234,12 +234,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errors.Errorf("Error creating Kibana client: %v", err)
}

kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion())
if err != nil {
return errors.Errorf("Error checking Kibana version: %v", err)
}

if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil {
if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}

Expand All @@ -265,7 +260,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
continue
}

if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil {
if err := setupMLBasedOnVersion(set, esClient, kibanaClient); err != nil {
errs = append(errs, err)
}

Expand All @@ -275,18 +270,16 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
return errs.Err()
}

func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error {
if isElasticsearchLoads(kibanaVersion) {
func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error {
if isElasticsearchLoads(kibanaClient.GetVersion()) {
return reg.LoadML(esClient)
}
return reg.SetupML(esClient, kibanaClient)
}

func isElasticsearchLoads(kibanaVersion *common.Version) bool {
if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 {
return true
}
return false
func isElasticsearchLoads(kibanaVersion common.Version) bool {
return kibanaVersion.Major < 6 ||
(kibanaVersion.Major == 6 && kibanaVersion.Minor < 1)
}

// Run allows the beater to be run as a beat.
Expand Down
16 changes: 8 additions & 8 deletions filebeat/fileset/fileset.go
Expand Up @@ -25,6 +25,7 @@ package fileset
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -192,15 +193,14 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {

// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion common.Version) (map[string]interface{}, error) {
retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}

haveVersion, err := common.NewVersion(esVersion)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
if !esVersion.IsValid() {
return vars, errors.New("Unknown Elasticsearch version")
}

for _, vals := range fs.manifest.Vars {
Expand All @@ -217,11 +217,11 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}

logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion)
logp.Debug("fileset", "Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)

if haveVersion.LessThan(minVersion) {
if esVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion)
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
}
}
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
}

// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs.
func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {
func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
Expand Down
53 changes: 30 additions & 23 deletions filebeat/fileset/fileset_test.go
Expand Up @@ -27,7 +27,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -213,7 +215,8 @@ func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))

pipelineID, content, err := fs.GetPipeline("5.2.0")
version := common.MustNewVersion("5.2.0")
pipelineID, content, err := fs.GetPipeline(*version)
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
assert.Contains(t, content, "description")
Expand All @@ -234,27 +237,31 @@ func TestGetPipelineConvertTS(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, fs.Read("6.1.0"))

// ES 6.0.0 should not have beat.timezone referenced
pipelineID, content, err := fs.GetPipeline("6.0.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err := json.Marshal(content)
assert.NoError(t, err)
assert.NotContains(t, string(marshaled), "beat.timezone")

// ES 6.1.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.1.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
cases := map[string]struct {
Beat string
Timezone bool
}{
"6.0.0": {Timezone: false},
"6.1.0": {Timezone: true},
"6.2.0": {Timezone: true},
}

// ES 6.2.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
for esVersion, cfg := range cases {
pipelineName := "filebeat-6.1.0-system-syslog-pipeline"

t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) {
ver := common.MustNewVersion(esVersion)
pipelineID, content, err := fs.GetPipeline(*ver)
require.NoError(t, err)
assert.Equal(t, pipelineName, pipelineID)

marshaled, err := json.Marshal(content)
require.NoError(t, err)
if cfg.Timezone {
assert.Contains(t, string(marshaled), "beat.timezone")
} else {
assert.NotContains(t, string(marshaled), "beat.timezone")
}
})
}
}
9 changes: 1 addition & 8 deletions filebeat/fileset/modules_integration_test.go
Expand Up @@ -22,7 +22,6 @@ package fileset
import (
"encoding/json"
"path/filepath"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -142,11 +141,5 @@ func TestAvailableProcessors(t *testing.T) {

func hasIngest(client *elasticsearch.Client) bool {
v := client.GetVersion()
majorVersion := string(v[0])
version, err := strconv.Atoi(majorVersion)
if err != nil {
return true
}

return version >= 5
return v.Major >= 5
}
3 changes: 2 additions & 1 deletion filebeat/fileset/pipelines.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

Expand All @@ -33,7 +34,7 @@ type PipelineLoaderFactory func() (PipelineLoader, error)
type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() string
GetVersion() common.Version
}

// LoadPipelines loads the pipelines for each configured fileset.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/scripts/tester/main.go
Expand Up @@ -264,7 +264,7 @@ func runSimulate(url string, pipeline map[string]interface{}, logs []string, ver
for _, s := range sources {
d := common.MapStr{
"_index": "index",
"_type": "doc",
"_type": "_doc",
"_id": "id",
"_source": s,
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/export/dashboard.go
Expand Up @@ -73,6 +73,7 @@ func GenDashboardCmd(name, idxPrefix, beatVersion string) *cobra.Command {
if decode {
r = dashboards.DecodeExported(r)
}

err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion())
if err != nil {
fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n",
Expand Down
11 changes: 10 additions & 1 deletion libbeat/cmd/export/template.go
Expand Up @@ -57,7 +57,16 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi
}
}

tmpl, err := template.New(b.Info.Version, index, version, cfg)
if version == "" {
version = b.Info.Version
}

esVersion, err := common.NewVersion(version)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err)
}

tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Error generating template: %+v", err)
os.Exit(1)
Expand Down
15 changes: 15 additions & 0 deletions libbeat/common/version.go
Expand Up @@ -31,6 +31,16 @@ type Version struct {
Meta string
}

// MustNewVersion creates a version from the given version string.
// If the version string is invalid, MustNewVersion panics.
func MustNewVersion(version string) *Version {
v, err := NewVersion(version)
if err != nil {
panic(err)
}
return v
}

// NewVersion expects a string in the format:
// major.minor.bugfix(-meta)
func NewVersion(version string) (*Version, error) {
Expand Down Expand Up @@ -69,6 +79,11 @@ func NewVersion(version string) (*Version, error) {
return &v, nil
}

// IsValid returns true if the version object stores a successfully parsed version number.
func (v *Version) IsValid() bool {
return v.version != ""
}

func (v *Version) IsMajor(major int) bool {
return major == v.Major
}
Expand Down
60 changes: 9 additions & 51 deletions libbeat/dashboards/dashboards.go
Expand Up @@ -22,8 +22,6 @@ import (
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"

errw "github.com/pkg/errors"

Expand Down Expand Up @@ -106,12 +104,7 @@ func ImportDashboards(

esLoader.statusMsg("Elasticsearch URL %v", esLoader.client.Connection.URL)

majorVersion, _, err := getMajorAndMinorVersion(esLoader.version)
if err != nil {
return fmt.Errorf("wrong Elasticsearch version: %v", err)
}

if majorVersion < 6 {
if esLoader.version.Major < 6 {
importVia = importViaES
} else {
importVia = useKibana
Expand Down Expand Up @@ -145,17 +138,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kib
}

func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader) error {

if !isKibanaAPIavailable(kibanaLoader.version) {
return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version)
version := kibanaLoader.version
if !version.IsValid() {
return errors.New("No valid kibana version available")
}

version, err := common.NewVersion(kibanaLoader.version)
if err != nil {
return fmt.Errorf("Invalid Kibana version: %s", kibanaLoader.version)
if !isKibanaAPIavailable(kibanaLoader.version) {
return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String())
}

importer, err := NewImporter(*version, kibanaLoader.config, kibanaLoader)
importer, err := NewImporter(version, kibanaLoader.config, kibanaLoader)
if err != nil {
return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err)
}
Expand Down Expand Up @@ -187,40 +179,6 @@ func ImportDashboardsViaElasticsearch(esLoader *ElasticsearchLoader) error {
return nil
}

func getMajorAndMinorVersion(version string) (int, int, error) {
fields := strings.Split(version, ".")
if len(fields) != 3 {
return 0, 0, fmt.Errorf("wrong version %s", version)
}
majorVersion := fields[0]
minorVersion := fields[1]

majorVersionInt, err := strconv.Atoi(majorVersion)
if err != nil {
return 0, 0, err
}

minorVersionInt, err := strconv.Atoi(minorVersion)
if err != nil {
return 0, 0, err
}

return majorVersionInt, minorVersionInt, nil
}

func isKibanaAPIavailable(version string) bool {
majorVersion, minorVersion, err := getMajorAndMinorVersion(version)
if err != nil {
return false
}

if majorVersion == 5 && minorVersion >= 6 {
return true
}

if majorVersion >= 6 {
return true
}

return false
func isKibanaAPIavailable(version common.Version) bool {
return (version.Major == 5 && version.Minor >= 6) || version.Major >= 6
}

0 comments on commit ca70cfa

Please sign in to comment.