From ca70cfacc455e9cb3a86d2332e70645a26ee6b21 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 29 Nov 2018 21:27:10 +0100 Subject: [PATCH] Use _doc if ES major version is 7 (#9056) Update the Elasticsearch output and template generator to set the type to _doc if Elasticsearch major version is 7. Use common.Version throughout - Use common version instead of strings + parsing over and over again. - Having the parsed version available earlier, we can now configure the default document type based on version ranges. - Use `_doc` if version is >= 7.0. Introduce unit tests for version aware bulk encoding. (cherry picked from commit 41f87a4681d3dec1654728372955447a0757ddff) --- CHANGELOG.asciidoc | 1 + filebeat/beater/filebeat.go | 21 ++-- filebeat/fileset/fileset.go | 16 +-- filebeat/fileset/fileset_test.go | 53 +++++----- filebeat/fileset/modules_integration_test.go | 9 +- filebeat/fileset/pipelines.go | 3 +- filebeat/scripts/tester/main.go | 2 +- libbeat/cmd/export/dashboard.go | 1 + libbeat/cmd/export/template.go | 11 ++- libbeat/common/version.go | 15 +++ libbeat/dashboards/dashboards.go | 60 ++---------- libbeat/dashboards/es_loader.go | 8 +- libbeat/dashboards/es_loader_test.go | 10 +- libbeat/dashboards/export.go | 9 +- libbeat/dashboards/kibana_loader.go | 5 +- libbeat/kibana/client.go | 30 +++--- libbeat/ml-importer/importer.go | 13 +-- libbeat/outputs/codec/json/event.go | 2 +- libbeat/outputs/codec/json/json_test.go | 8 +- libbeat/outputs/console/console_test.go | 4 +- .../elasticsearch/api_integration_test.go | 5 +- libbeat/outputs/elasticsearch/client.go | 65 +++++++++--- .../elasticsearch/client_integration_test.go | 9 +- libbeat/outputs/elasticsearch/client_test.go | 88 +++++++++++++++++ .../outputs/elasticsearch/elasticsearch.go | 67 +++++++------ libbeat/template/load.go | 5 +- libbeat/template/load_integration_test.go | 98 +++++++++++++------ libbeat/template/template.go | 30 +++--- libbeat/template/template_test.go | 9 +- metricbeat/mb/module/example_test.go | 2 +- metricbeat/tests/system/test_template.py | 2 +- 31 files changed, 409 insertions(+), 252 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2531ae59ac2..0d4a3daf391 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -79,6 +79,7 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff] *Affecting all Beats* - Unify dashboard exporter tools. {pull}9097[9097] +- Use _doc as document type of the Elasticsearch major version is 7. {pull}9056[9056] - Dissect will now flag event on parsing error. {pull}8751[8751] - Added the `redirect_stderr` option that allows panics to be logged to log files. {pull}8430[8430] diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index f057cf5c1ca..edda8eb647e 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -234,12 +234,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err return errors.Errorf("Error creating Kibana client: %v", err) } - kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion()) - if err != nil { - return errors.Errorf("Error checking Kibana version: %v", err) - } - - if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil { + if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient); err != nil { errs = append(errs, err) } @@ -265,7 +260,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err continue } - if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil { + if err := setupMLBasedOnVersion(set, esClient, kibanaClient); err != nil { errs = append(errs, err) } @@ -275,18 +270,16 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err return errs.Err() } -func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error { - if isElasticsearchLoads(kibanaVersion) { +func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client) error { + if isElasticsearchLoads(kibanaClient.GetVersion()) { return reg.LoadML(esClient) } return reg.SetupML(esClient, kibanaClient) } -func isElasticsearchLoads(kibanaVersion *common.Version) bool { - if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 { - return true - } - return false +func isElasticsearchLoads(kibanaVersion common.Version) bool { + return kibanaVersion.Major < 6 || + (kibanaVersion.Major == 6 && kibanaVersion.Minor < 1) } // Run allows the beater to be run as a beat. diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index 49c2b2a542e..a57a40f6752 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -25,6 +25,7 @@ package fileset import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "os" @@ -192,15 +193,14 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) { // turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version` // set. -func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) { +func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion common.Version) (map[string]interface{}, error) { retVars := map[string]interface{}{} for key, val := range vars { retVars[key] = val } - haveVersion, err := common.NewVersion(esVersion) - if err != nil { - return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err) + if !esVersion.IsValid() { + return vars, errors.New("Unknown Elasticsearch version") } for _, vals := range fs.manifest.Vars { @@ -217,11 +217,11 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err) } - logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion) + logp.Debug("fileset", "Comparing ES version %s with requirement of %s", esVersion.String(), minVersion) - if haveVersion.LessThan(minVersion) { + if esVersion.LessThan(minVersion) { retVars[name] = minESVersion["value"] - logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion) + logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String()) } } } @@ -358,7 +358,7 @@ func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { } // GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs. -func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) { +func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) { path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) if err != nil { return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 93456b2ee3b..8435532efa7 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -27,7 +27,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -213,7 +215,8 @@ func TestGetPipelineNginx(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") assert.NoError(t, fs.Read("5.2.0")) - pipelineID, content, err := fs.GetPipeline("5.2.0") + version := common.MustNewVersion("5.2.0") + pipelineID, content, err := fs.GetPipeline(*version) assert.NoError(t, err) assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID) assert.Contains(t, content, "description") @@ -234,27 +237,31 @@ func TestGetPipelineConvertTS(t *testing.T) { assert.NoError(t, err) assert.NoError(t, fs.Read("6.1.0")) - // ES 6.0.0 should not have beat.timezone referenced - pipelineID, content, err := fs.GetPipeline("6.0.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err := json.Marshal(content) - assert.NoError(t, err) - assert.NotContains(t, string(marshaled), "beat.timezone") - - // ES 6.1.0 should have beat.timezone referenced - pipelineID, content, err = fs.GetPipeline("6.1.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err = json.Marshal(content) - assert.NoError(t, err) - assert.Contains(t, string(marshaled), "beat.timezone") + cases := map[string]struct { + Beat string + Timezone bool + }{ + "6.0.0": {Timezone: false}, + "6.1.0": {Timezone: true}, + "6.2.0": {Timezone: true}, + } - // ES 6.2.0 should have beat.timezone referenced - pipelineID, content, err = fs.GetPipeline("6.2.0") - assert.NoError(t, err) - assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID) - marshaled, err = json.Marshal(content) - assert.NoError(t, err) - assert.Contains(t, string(marshaled), "beat.timezone") + for esVersion, cfg := range cases { + pipelineName := "filebeat-6.1.0-system-syslog-pipeline" + + t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) { + ver := common.MustNewVersion(esVersion) + pipelineID, content, err := fs.GetPipeline(*ver) + require.NoError(t, err) + assert.Equal(t, pipelineName, pipelineID) + + marshaled, err := json.Marshal(content) + require.NoError(t, err) + if cfg.Timezone { + assert.Contains(t, string(marshaled), "beat.timezone") + } else { + assert.NotContains(t, string(marshaled), "beat.timezone") + } + }) + } } diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index dbf786053a8..45baabdc354 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -22,7 +22,6 @@ package fileset import ( "encoding/json" "path/filepath" - "strconv" "testing" "github.com/stretchr/testify/assert" @@ -142,11 +141,5 @@ func TestAvailableProcessors(t *testing.T) { func hasIngest(client *elasticsearch.Client) bool { v := client.GetVersion() - majorVersion := string(v[0]) - version, err := strconv.Atoi(majorVersion) - if err != nil { - return true - } - - return version >= 5 + return v.Major >= 5 } diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index b67e9fce487..d7f63bdabad 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -33,7 +34,7 @@ type PipelineLoaderFactory func() (PipelineLoader, error) type PipelineLoader interface { LoadJSON(path string, json map[string]interface{}) ([]byte, error) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } // LoadPipelines loads the pipelines for each configured fileset. diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go index 3300b8ef623..d1fe2b9a184 100644 --- a/filebeat/scripts/tester/main.go +++ b/filebeat/scripts/tester/main.go @@ -264,7 +264,7 @@ func runSimulate(url string, pipeline map[string]interface{}, logs []string, ver for _, s := range sources { d := common.MapStr{ "_index": "index", - "_type": "doc", + "_type": "_doc", "_id": "id", "_source": s, } diff --git a/libbeat/cmd/export/dashboard.go b/libbeat/cmd/export/dashboard.go index 697da10061e..4cf0c7db30e 100644 --- a/libbeat/cmd/export/dashboard.go +++ b/libbeat/cmd/export/dashboard.go @@ -73,6 +73,7 @@ func GenDashboardCmd(name, idxPrefix, beatVersion string) *cobra.Command { if decode { r = dashboards.DecodeExported(r) } + err = dashboards.SaveToFile(r, info.Dashboards[i].File, filepath.Dir(yml), client.GetVersion()) if err != nil { fmt.Fprintf(os.Stderr, "Error saving dashboard '%s' to file '%s' : %+v\n", diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index bf3cbbb9b27..d56330a62ff 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -57,7 +57,16 @@ func GenTemplateConfigCmd(settings instance.Settings, name, idxPrefix, beatVersi } } - tmpl, err := template.New(b.Info.Version, index, version, cfg) + if version == "" { + version = b.Info.Version + } + + esVersion, err := common.NewVersion(version) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid Elasticsearch version: %s\n", err) + } + + tmpl, err := template.New(b.Info.Version, index, *esVersion, cfg) if err != nil { fmt.Fprintf(os.Stderr, "Error generating template: %+v", err) os.Exit(1) diff --git a/libbeat/common/version.go b/libbeat/common/version.go index 795eb2b8ddb..54293e9ff79 100644 --- a/libbeat/common/version.go +++ b/libbeat/common/version.go @@ -31,6 +31,16 @@ type Version struct { Meta string } +// MustNewVersion creates a version from the given version string. +// If the version string is invalid, MustNewVersion panics. +func MustNewVersion(version string) *Version { + v, err := NewVersion(version) + if err != nil { + panic(err) + } + return v +} + // NewVersion expects a string in the format: // major.minor.bugfix(-meta) func NewVersion(version string) (*Version, error) { @@ -69,6 +79,11 @@ func NewVersion(version string) (*Version, error) { return &v, nil } +// IsValid returns true if the version object stores a successfully parsed version number. +func (v *Version) IsValid() bool { + return v.version != "" +} + func (v *Version) IsMajor(major int) bool { return major == v.Major } diff --git a/libbeat/dashboards/dashboards.go b/libbeat/dashboards/dashboards.go index 81106f6064f..833c7a1ebe8 100644 --- a/libbeat/dashboards/dashboards.go +++ b/libbeat/dashboards/dashboards.go @@ -22,8 +22,6 @@ import ( "errors" "fmt" "path/filepath" - "strconv" - "strings" errw "github.com/pkg/errors" @@ -106,12 +104,7 @@ func ImportDashboards( esLoader.statusMsg("Elasticsearch URL %v", esLoader.client.Connection.URL) - majorVersion, _, err := getMajorAndMinorVersion(esLoader.version) - if err != nil { - return fmt.Errorf("wrong Elasticsearch version: %v", err) - } - - if majorVersion < 6 { + if esLoader.version.Major < 6 { importVia = importViaES } else { importVia = useKibana @@ -145,17 +138,16 @@ func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kib } func ImportDashboardsViaKibana(kibanaLoader *KibanaLoader) error { - - if !isKibanaAPIavailable(kibanaLoader.version) { - return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version) + version := kibanaLoader.version + if !version.IsValid() { + return errors.New("No valid kibana version available") } - version, err := common.NewVersion(kibanaLoader.version) - if err != nil { - return fmt.Errorf("Invalid Kibana version: %s", kibanaLoader.version) + if !isKibanaAPIavailable(kibanaLoader.version) { + return fmt.Errorf("Kibana API is not available in Kibana version %s", kibanaLoader.version.String()) } - importer, err := NewImporter(*version, kibanaLoader.config, kibanaLoader) + importer, err := NewImporter(version, kibanaLoader.config, kibanaLoader) if err != nil { return fmt.Errorf("fail to create a Kibana importer for loading the dashboards: %v", err) } @@ -187,40 +179,6 @@ func ImportDashboardsViaElasticsearch(esLoader *ElasticsearchLoader) error { return nil } -func getMajorAndMinorVersion(version string) (int, int, error) { - fields := strings.Split(version, ".") - if len(fields) != 3 { - return 0, 0, fmt.Errorf("wrong version %s", version) - } - majorVersion := fields[0] - minorVersion := fields[1] - - majorVersionInt, err := strconv.Atoi(majorVersion) - if err != nil { - return 0, 0, err - } - - minorVersionInt, err := strconv.Atoi(minorVersion) - if err != nil { - return 0, 0, err - } - - return majorVersionInt, minorVersionInt, nil -} - -func isKibanaAPIavailable(version string) bool { - majorVersion, minorVersion, err := getMajorAndMinorVersion(version) - if err != nil { - return false - } - - if majorVersion == 5 && minorVersion >= 6 { - return true - } - - if majorVersion >= 6 { - return true - } - - return false +func isKibanaAPIavailable(version common.Version) bool { + return (version.Major == 5 && version.Minor >= 6) || version.Major >= 6 } diff --git a/libbeat/dashboards/es_loader.go b/libbeat/dashboards/es_loader.go index 270e2f26dd0..4787bb3faf5 100644 --- a/libbeat/dashboards/es_loader.go +++ b/libbeat/dashboards/es_loader.go @@ -19,6 +19,7 @@ package dashboards import ( "encoding/json" + "errors" "fmt" "io/ioutil" "path" @@ -33,7 +34,7 @@ import ( type ElasticsearchLoader struct { client *elasticsearch.Client config *Config - version string + version common.Version msgOutputter MessageOutputter } @@ -48,6 +49,9 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut } version := esClient.GetVersion() + if !version.IsValid() { + return nil, errors.New("No valid Elasticsearch version available") + } loader := ElasticsearchLoader{ client: esClient, @@ -56,7 +60,7 @@ func NewElasticsearchLoader(cfg *common.Config, dashboardsConfig *Config, msgOut msgOutputter: msgOutputter, } - loader.statusMsg("Initialize the Elasticsearch %s loader", version) + loader.statusMsg("Initialize the Elasticsearch %s loader", version.String()) return &loader, nil } diff --git a/libbeat/dashboards/es_loader_test.go b/libbeat/dashboards/es_loader_test.go index 4b9c468a335..29f5684da90 100644 --- a/libbeat/dashboards/es_loader_test.go +++ b/libbeat/dashboards/es_loader_test.go @@ -20,7 +20,6 @@ package dashboards import ( - "strings" "testing" "github.com/stretchr/testify/assert" @@ -40,8 +39,9 @@ func TestImporter(t *testing.T) { } client := estest.GetTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.GetVersion(), "6.") || - strings.HasPrefix(client.Connection.GetVersion(), "7.") { + major := client.GetVersion().Major + + if major == 6 || major == 7 { t.Skip("Skipping tests for Elasticsearch 6.x releases") } @@ -76,8 +76,8 @@ func TestImporterEmptyBeat(t *testing.T) { } client := estest.GetTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.GetVersion(), "6.") || - strings.HasPrefix(client.Connection.GetVersion(), "7.") { + major := client.GetVersion().Major + if major == 6 || major == 7 { t.Skip("Skipping tests for Elasticsearch 6.x releases") } diff --git a/libbeat/dashboards/export.go b/libbeat/dashboards/export.go index d74a1d84b7b..cfb3074ae05 100644 --- a/libbeat/dashboards/export.go +++ b/libbeat/dashboards/export.go @@ -84,14 +84,9 @@ func ExportAll(client *kibana.Client, list ListYML) ([]common.MapStr, error) { } // SaveToFile creates the required directories if needed and saves dashboard. -func SaveToFile(dashboard common.MapStr, filename, root, versionStr string) error { - version, err := common.NewVersion(versionStr) - if err != nil { - return err - } - +func SaveToFile(dashboard common.MapStr, filename, root string, version common.Version) error { dashboardsPath := "_meta/kibana/" + strconv.Itoa(version.Major) + "/dashboard" - err = generator.CreateDirectories(root, dashboardsPath) + err := generator.CreateDirectories(root, dashboardsPath) if err != nil { return err } diff --git a/libbeat/dashboards/kibana_loader.go b/libbeat/dashboards/kibana_loader.go index 8a09e72bfae..6c6e291e4ae 100644 --- a/libbeat/dashboards/kibana_loader.go +++ b/libbeat/dashboards/kibana_loader.go @@ -35,7 +35,7 @@ var importAPI = "/api/kibana/dashboards/import" type KibanaLoader struct { client *kibana.Client config *Config - version string + version common.Version hostname string msgOutputter MessageOutputter } @@ -59,7 +59,8 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig * msgOutputter: msgOutputter, } - loader.statusMsg("Initialize the Kibana %s loader", client.GetVersion()) + version := client.GetVersion() + loader.statusMsg("Initialize the Kibana %s loader", version.String()) return &loader, nil } diff --git a/libbeat/kibana/client.go b/libbeat/kibana/client.go index fe21af40dcb..115a1bffc29 100644 --- a/libbeat/kibana/client.go +++ b/libbeat/kibana/client.go @@ -43,7 +43,7 @@ type Connection struct { Headers map[string]string http *http.Client - version string + version common.Version } type Client struct { @@ -146,7 +146,7 @@ func NewClientWithConfig(config *ClientConfig) (*Client, error) { } if !config.IgnoreVersion { - if err = client.SetVersion(); err != nil { + if err = client.readVersion(); err != nil { return nil, fmt.Errorf("fail to get the Kibana version: %v", err) } } @@ -178,7 +178,7 @@ func (conn *Connection) Request(method, extraPath string, } if method != "GET" { - req.Header.Set("kbn-version", conn.version) + req.Header.Set("kbn-version", conn.version.String()) } resp, err := conn.http.Do(req) @@ -201,7 +201,7 @@ func (conn *Connection) Request(method, extraPath string, return resp.StatusCode, result, retError } -func (client *Client) SetVersion() error { +func (client *Client) readVersion() error { type kibanaVersionResponse struct { Name string `json:"name"` Version struct { @@ -221,11 +221,12 @@ func (client *Client) SetVersion() error { err, truncateString(result)) } - var kibanaVersion kibanaVersionResponse - var kibanaVersion5x kibanaVersionResponse5x + var versionString string + var kibanaVersion kibanaVersionResponse err = json.Unmarshal(result, &kibanaVersion) if err != nil { + var kibanaVersion5x kibanaVersionResponse5x // The response returned by /api/status is different in Kibana 5.x than in Kibana 6.x err5x := json.Unmarshal(result, &kibanaVersion5x) @@ -234,21 +235,28 @@ func (client *Client) SetVersion() error { return fmt.Errorf("fail to unmarshal the response from GET %s/api/status. Response: %s. Kibana 5.x status api returns: %v. Kibana 6.x status api returns: %v", client.Connection.URL, truncateString(result), err5x, err) } - client.version = kibanaVersion5x.Version + versionString = kibanaVersion5x.Version } else { - - client.version = kibanaVersion.Version.Number + versionString = kibanaVersion.Version.Number if kibanaVersion.Version.Snapshot { // needed for the tests - client.version = client.version + "-SNAPSHOT" + versionString += "-SNAPSHOT" } } + version, err := common.NewVersion(versionString) + if err != nil { + return fmt.Errorf("fail to parse kibana version (%v): %+v", versionString, err) + } + + client.version = *version return nil } -func (client *Client) GetVersion() string { return client.version } +// GetVersion returns the version read from kibana. The version is not set if +// IgnoreVersion was set when creating the client. +func (client *Client) GetVersion() common.Version { return client.version } func (client *Client) ImportJSON(url string, params url.Values, jsonBody map[string]interface{}) error { diff --git a/libbeat/ml-importer/importer.go b/libbeat/ml-importer/importer.go index 47b8b93b3ae..892c8dae47a 100644 --- a/libbeat/ml-importer/importer.go +++ b/libbeat/ml-importer/importer.go @@ -56,13 +56,13 @@ type MLConfig struct { type MLLoader interface { Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) LoadJSON(path string, json map[string]interface{}) ([]byte, error) - GetVersion() string + GetVersion() common.Version } // MLSetupper is a subset of the Kibana client API capable of setting up ML objects. type MLSetupper interface { Request(method, path string, params url.Values, headers http.Header, body io.Reader) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } // MLResponse stores the relevant parts of the response from Kibana to check for errors. @@ -125,10 +125,11 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error { datafeedURL := fmt.Sprintf(esDataFeedURL, cfg.ID) if len(cfg.MinVersion) > 0 { - esVersion, err := common.NewVersion(esClient.GetVersion()) - if err != nil { - return errors.Errorf("Error parsing ES version: %s: %v", esClient.GetVersion(), err) + esVersion := esClient.GetVersion() + if !esVersion.IsValid() { + return errors.New("Invalid Elasticsearch version") } + minVersion, err := common.NewVersion(cfg.MinVersion) if err != nil { return errors.Errorf("Error parsing min_version: %s: %v", minVersion, err) @@ -136,7 +137,7 @@ func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error { if esVersion.LessThan(minVersion) { logp.Debug("machine-learning", "Skipping job %s, because ES version (%s) is smaller than min version (%s)", - cfg.ID, esVersion, minVersion) + cfg.ID, esVersion.String(), minVersion) return nil } } diff --git a/libbeat/outputs/codec/json/event.go b/libbeat/outputs/codec/json/event.go index 5ea4b420924..2f0d2c1217b 100644 --- a/libbeat/outputs/codec/json/event.go +++ b/libbeat/outputs/codec/json/event.go @@ -46,7 +46,7 @@ func makeEvent(index, version string, in *beat.Event) event { Meta: meta{ Beat: index, Version: version, - Type: "doc", + Type: "_doc", Fields: in.Meta, }, Fields: in.Fields, diff --git a/libbeat/outputs/codec/json/json_test.go b/libbeat/outputs/codec/json/json_test.go index 3e2abcfd184..4788bf4be10 100644 --- a/libbeat/outputs/codec/json/json_test.go +++ b/libbeat/outputs/codec/json/json_test.go @@ -35,7 +35,7 @@ func TestJsonCodec(t *testing.T) { "default json": testCase{ config: defaultConfig, in: common.MapStr{"msg": "message"}, - expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"message"}`, + expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"message"}`, }, "pretty enabled": testCase{ config: config{Pretty: true}, @@ -44,7 +44,7 @@ func TestJsonCodec(t *testing.T) { "@timestamp": "0001-01-01T00:00:00.000Z", "@metadata": { "beat": "test", - "type": "doc", + "type": "_doc", "version": "1.2.3" }, "msg": "message" @@ -53,12 +53,12 @@ func TestJsonCodec(t *testing.T) { "html escaping enabled": testCase{ config: config{EscapeHTML: true}, in: common.MapStr{"msg": "world"}, - expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"\u003chello\u003eworld\u003c/hello\u003e"}`, + expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"\u003chello\u003eworld\u003c/hello\u003e"}`, }, "html escaping disabled": testCase{ config: config{EscapeHTML: false}, in: common.MapStr{"msg": "world"}, - expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"doc","version":"1.2.3"},"msg":"world"}`, + expected: `{"@timestamp":"0001-01-01T00:00:00.000Z","@metadata":{"beat":"test","type":"_doc","version":"1.2.3"},"msg":"world"}`, }, } diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 3c165f950e7..28f6933757e 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -82,7 +82,7 @@ func TestConsoleOutput(t *testing.T) { []beat.Event{ {Fields: event("field", "value")}, }, - "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n", + "{\"@timestamp\":\"0001-01-01T00:00:00.000Z\",\"@metadata\":{\"beat\":\"test\",\"type\":\"_doc\",\"version\":\"1.2.3\"},\"field\":\"value\"}\n", }, { "single json event (pretty=true)", @@ -90,7 +90,7 @@ func TestConsoleOutput(t *testing.T) { []beat.Event{ {Fields: event("field", "value")}, }, - "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n", + "{\n \"@timestamp\": \"0001-01-01T00:00:00.000Z\",\n \"@metadata\": {\n \"beat\": \"test\",\n \"type\": \"_doc\",\n \"version\": \"1.2.3\"\n },\n \"field\": \"value\"\n}\n", }, // TODO: enable test after update fmtstr support to beat.Event { diff --git a/libbeat/outputs/elasticsearch/api_integration_test.go b/libbeat/outputs/elasticsearch/api_integration_test.go index 33294c1012c..8ac9a0ac233 100644 --- a/libbeat/outputs/elasticsearch/api_integration_test.go +++ b/libbeat/outputs/elasticsearch/api_integration_test.go @@ -24,7 +24,6 @@ import ( "fmt" "net/http" "os" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -105,8 +104,8 @@ func TestIngest(t *testing.T) { } client := getTestingElasticsearch(t) - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } status, _, err := client.DeletePipeline(pipeline, nil) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 2bd1c06ab37..896a8f2b99f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -29,6 +29,7 @@ import ( "time" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outil" @@ -42,10 +43,11 @@ type Client struct { Connection tlsConfig *transport.TLSConfig - index outil.Selector - pipeline *outil.Selector - params map[string]string - timeout time.Duration + index outil.Selector + pipeline *outil.Selector + params map[string]string + timeout time.Duration + eventType string // buffered bulk requests bulkRequ *bulkRequest @@ -89,7 +91,7 @@ type Connection struct { onConnectCallback func() error encoder bodyEncoder - version string + version common.Version } type bulkIndexAction struct { @@ -129,7 +131,9 @@ var ( ) const ( - eventType = "doc" + defaultEventTypeES6 = "doc" + defaultEventTypeES7 = "_doc" + defaultEventType = defaultEventTypeES7 ) // NewClient instantiates a new client. @@ -214,6 +218,7 @@ func NewClient( pipeline: pipeline, params: params, timeout: s.Timeout, + eventType: defaultEventType, bulkRequ: bulkRequ, @@ -301,7 +306,7 @@ func (client *Client) publishEvents( // events slice origCount := len(data) - data = bulkEncodePublishRequest(body, client.index, client.pipeline, data) + data = bulkEncodePublishRequest(body, client.index, client.pipeline, client.eventType, data) newCount := len(data) if st != nil && origCount > newCount { st.Dropped(origCount - newCount) @@ -360,12 +365,13 @@ func bulkEncodePublishRequest( body bulkWriter, index outil.Selector, pipeline *outil.Selector, + eventType string, data []publisher.Event, ) []publisher.Event { okEvents := data[:0] for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(index, pipeline, event) + meta, err := createEventBulkMeta(index, pipeline, eventType, event) if err != nil { logp.Err("Failed to encode event meta data: %s", err) continue @@ -382,6 +388,7 @@ func bulkEncodePublishRequest( func createEventBulkMeta( indexSel outil.Selector, pipelineSel *outil.Selector, + eventType string, event *beat.Event, ) (interface{}, error) { pipeline, err := getPipeline(event, pipelineSel) @@ -627,8 +634,8 @@ func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte return body, nil } -// GetVersion returns the elasticsearch version the client is connected to -func (client *Client) GetVersion() string { +// GetVersion returns the elasticsearch version the client is connected to. +func (client *Client) GetVersion() common.Version { return client.Connection.version } @@ -660,7 +667,7 @@ func (client *Client) Test(d testing.Driver) { err = client.Connect() d.Fatal("talk to server", err) - d.Info("version", client.version) + d.Info("version", client.version.String()) }) } @@ -668,14 +675,40 @@ func (client *Client) String() string { return "elasticsearch(" + client.Connection.URL + ")" } -// Connect connects the client. +// Connect connects the client. It runs a GET request against the root URL of +// the configured host, updates the known Elasticsearch version and calls +// globally configured handlers. +func (client *Client) Connect() error { + err := client.Connection.Connect() + if err != nil { + return err + } + + if client.GetVersion().Major < 7 { + client.eventType = defaultEventTypeES6 + } else { + client.eventType = defaultEventType + } + + return nil +} + +// Connect connects the client. It runs a GET request against the root URL of +// the configured host, updates the known Elasticsearch version and calls +// globally configured handlers. func (conn *Connection) Connect() error { - var err error - conn.version, err = conn.Ping() + versionString, err := conn.Ping() if err != nil { return err } + if version, err := common.NewVersion(versionString); err != nil { + logp.Err("Invalid version from Elasticsearch: %v", versionString) + conn.version = common.Version{} + } else { + conn.version = *version + } + err = conn.onConnectCallback() if err != nil { return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err) @@ -802,7 +835,9 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) return status, obj, err } -func (conn *Connection) GetVersion() string { +// GetVersion returns the elasticsearch version the client is connected to. +// The version is read and updated on 'Connect'. +func (conn *Connection) GetVersion() common.Version { return conn.version } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 9588e5ee162..8a3a7d5f5b9 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -21,7 +21,6 @@ package elasticsearch import ( "math/rand" - "strings" "testing" "time" @@ -92,8 +91,8 @@ func TestClientPublishEventWithPipeline(t *testing.T) { client.Delete(index, "", "", nil) // Check version - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(event beat.Event) { @@ -173,8 +172,8 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { }) client.Delete(index, "", "", nil) - if strings.HasPrefix(client.Connection.version, "2.") { - t.Skip("Skipping tests as pipeline not available in 2.x releases") + if client.Connection.version.Major < 5 { + t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(events ...beat.Event) { diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 0c92d17b100..87f824f89b2 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -36,6 +37,7 @@ import ( "github.com/elastic/beats/libbeat/outputs/outest" "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/version" ) func readStatusItem(in []byte) (int, string, error) { @@ -379,3 +381,89 @@ func TestAddToURL(t *testing.T) { assert.Equal(t, url, test.expected) } } + +type testBulkRecorder struct { + data []interface{} + inAction bool +} + +func TestBulkEncodeEvents(t *testing.T) { + cases := map[string]struct { + docType string + config common.MapStr + events []common.MapStr + }{ + "ES 6.x event": { + docType: "doc", + config: common.MapStr{}, + events: []common.MapStr{{"message": "test"}}, + }, + "ES 7.x event": { + docType: "_doc", + config: common.MapStr{}, + events: []common.MapStr{{"message": "test"}}, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.config) + + index, pipeline, err := buildSelectors(beat.Info{ + IndexPrefix: "test", + Version: version.GetDefaultVersion(), + }, cfg) + require.NoError(t, err) + + events := make([]publisher.Event, len(test.events)) + for i, fields := range test.events { + events[i] = publisher.Event{ + Content: beat.Event{ + Timestamp: time.Now(), + Fields: fields, + }, + } + } + + recorder := &testBulkRecorder{} + + encoded := bulkEncodePublishRequest(recorder, index, pipeline, test.docType, events) + assert.Equal(t, len(events), len(encoded), "all events should have been encoded") + assert.False(t, recorder.inAction, "incomplete bulk") + + // check meta-data for each event + for i := 0; i < len(recorder.data); i += 2 { + var meta bulkEventMeta + switch v := recorder.data[i].(type) { + case bulkCreateAction: + meta = v.Create + case bulkIndexAction: + meta = v.Index + default: + panic("unknown type") + } + + assert.NotEqual(t, "", meta.Index) + assert.Equal(t, test.docType, meta.DocType) + } + + // TODO: customer per test case validation + }) + } +} + +func (r *testBulkRecorder) Add(meta, obj interface{}) error { + if r.inAction { + panic("can not add a new action if other action is active") + } + + r.data = append(r.data, meta, obj) + return nil +} + +func (r *testBulkRecorder) AddRaw(raw interface{}) error { + r.data = append(r.data) + r.inAction = !r.inAction + return nil +} diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index ddc4d925f79..0c9dd08537c 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -108,9 +108,9 @@ func makeES( cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } - if !cfg.HasField("index") { - pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.IndexPrefix, beat.Version) - cfg.SetString("index", -1, pattern) + index, pipeline, err := buildSelectors(beat, cfg) + if err != nil { + return outputs.Fail(err) } config := defaultConfig @@ -123,36 +123,11 @@ func makeES( return outputs.Fail(err) } - index, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ - Key: "index", - MultiKey: "indices", - EnableSingleOnly: true, - FailEmpty: true, - }) - if err != nil { - return outputs.Fail(err) - } - tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) if err != nil { return outputs.Fail(err) } - pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ - Key: "pipeline", - MultiKey: "pipelines", - EnableSingleOnly: true, - FailEmpty: false, - }) - if err != nil { - return outputs.Fail(err) - } - - var pipeline *outil.Selector - if !pipelineSel.IsEmpty() { - pipeline = &pipelineSel - } - proxyURL, err := parseProxyURL(config.ProxyURL) if err != nil { return outputs.Fail(err) @@ -201,6 +176,42 @@ func makeES( return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients) } +func buildSelectors( + beat beat.Info, + cfg *common.Config, +) (index outil.Selector, pipeline *outil.Selector, err error) { + if !cfg.HasField("index") { + pattern := fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", beat.IndexPrefix, beat.Version) + cfg.SetString("index", -1, pattern) + } + + index, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "index", + MultiKey: "indices", + EnableSingleOnly: true, + FailEmpty: true, + }) + if err != nil { + return index, pipeline, err + } + + pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "pipeline", + MultiKey: "pipelines", + EnableSingleOnly: true, + FailEmpty: false, + }) + if err != nil { + return index, pipeline, err + } + + if !pipelineSel.IsEmpty() { + pipeline = &pipelineSel + } + + return index, pipeline, err +} + // NewConnectedClient creates a new Elasticsearch client based on the given config. // It uses the NewElasticsearchClients to create a list of clients then returns // the first from the list that successfully connects. diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 157e2c7e915..34fb5638194 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -34,7 +34,7 @@ import ( type ESClient interface { LoadJSON(path string, json map[string]interface{}) ([]byte, error) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) - GetVersion() string + GetVersion() common.Version } type Loader struct { @@ -79,7 +79,8 @@ func (l *Loader) Load() error { exists := l.CheckTemplate(templateName) if !exists || l.config.Overwrite { - logp.Info("Loading template for Elasticsearch version: %s", l.client.GetVersion()) + version := l.client.GetVersion() + logp.Info("Loading template for Elasticsearch version: %s", version.String()) if l.config.Overwrite { logp.Info("Existing template will be overwritten, as overwrite is enabled.") } diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 12d8568c534..ac4910362d5 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -21,10 +21,13 @@ package template import ( "encoding/json" + "fmt" "path/filepath" + "strconv" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -32,6 +35,12 @@ import ( "github.com/elastic/beats/libbeat/version" ) +type testTemplate struct { + t *testing.T + client ESClient + common.MapStr +} + func TestCheckTemplate(t *testing.T) { client := estest.GetTestingElasticsearch(t) if err := client.Connect(); err != nil { @@ -110,24 +119,6 @@ func TestLoadInvalidTemplate(t *testing.T) { assert.False(t, loader.CheckTemplate(templateName)) } -func getTemplate(t *testing.T, client ESClient, templateName string) common.MapStr { - status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil) - assert.NoError(t, err) - assert.Equal(t, status, 200) - - var response common.MapStr - err = json.Unmarshal(body, &response) - assert.NoError(t, err) - - return common.MapStr(response[templateName].(map[string]interface{})) -} - -func newConfigFrom(t *testing.T, from interface{}) *common.Config { - cfg, err := common.NewConfigFrom(from) - assert.NoError(t, err) - return cfg -} - // Tests loading the templates for each beat func TestLoadBeatsTemplate(t *testing.T) { beats := []string{ @@ -213,13 +204,8 @@ func TestTemplateSettings(t *testing.T) { // Check that it contains the mapping templateJSON := getTemplate(t, client, tmpl.GetName()) - val, err := templateJSON.GetValue("settings.index.number_of_shards") - assert.NoError(t, err) - assert.Equal(t, val.(string), "1") - - val, err = templateJSON.GetValue("mappings.doc._source.enabled") - assert.NoError(t, err) - assert.Equal(t, val.(bool), false) + assert.Equal(t, 1, templateJSON.NumberOfShards()) + assert.Equal(t, false, templateJSON.SourceEnabled()) // Delete template again to clean up client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) @@ -276,8 +262,7 @@ func TestOverwrite(t *testing.T) { // Overwrite was not enabled, so the first version should still be there templateJSON := getTemplate(t, client, templateName) - _, err = templateJSON.GetValue("mappings.doc._source.enabled") - assert.Error(t, err) + assert.Equal(t, true, templateJSON.SourceEnabled()) // Load template again, this time with custom settings AND overwrite: true config = newConfigFrom(t, TemplateConfig{ @@ -297,9 +282,7 @@ func TestOverwrite(t *testing.T) { // Overwrite was enabled, so the custom setting should be there templateJSON = getTemplate(t, client, templateName) - val, err := templateJSON.GetValue("mappings.doc._source.enabled") - assert.NoError(t, err) - assert.Equal(t, val.(bool), false) + assert.Equal(t, false, templateJSON.SourceEnabled()) // Delete template again to clean up client.Request("DELETE", "/_template/"+templateName, "", nil, nil) @@ -388,3 +371,58 @@ func TestTemplateWithData(t *testing.T) { // Make sure it was removed assert.False(t, loader.CheckTemplate(tmpl.GetName())) } + +func newConfigFrom(t *testing.T, from interface{}) *common.Config { + cfg, err := common.NewConfigFrom(from) + assert.NoError(t, err) + return cfg +} + +func getTemplate(t *testing.T, client ESClient, templateName string) testTemplate { + status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil) + assert.NoError(t, err) + assert.Equal(t, status, 200) + + var response common.MapStr + err = json.Unmarshal(body, &response) + assert.NoError(t, err) + + return testTemplate{ + t: t, + client: client, + MapStr: common.MapStr(response[templateName].(map[string]interface{})), + } +} + +func (tt *testTemplate) SourceEnabled() bool { + docType := "_doc" + major := tt.client.GetVersion().Major + if major < 7 { + docType = "doc" + } + + key := fmt.Sprintf("mappings.%v._source.enabled", docType) + + // _source.enabled is true if it's missing (default) + b, _ := tt.HasKey(key) + if !b { + return true + } + + val, err := tt.GetValue(key) + if !assert.NoError(tt.t, err) { + doc, _ := json.MarshalIndent(tt.MapStr, "", " ") + tt.t.Fatal(fmt.Sprintf("failed to read '%v' in %s", key, doc)) + } + + return val.(bool) +} + +func (tt *testTemplate) NumberOfShards() int { + val, err := tt.GetValue("settings.index.number_of_shards") + require.NoError(tt.t, err) + + i, err := strconv.Atoi(val.(string)) + require.NoError(tt.t, err) + return i +} diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 49e274f3727..540077b29d6 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -51,7 +51,7 @@ type Template struct { } // New creates a new template instance -func New(beatVersion string, beatName string, esVersion string, config TemplateConfig) (*Template, error) { +func New(beatVersion string, beatName string, esVersion common.Version, config TemplateConfig) (*Template, error) { bV, err := common.NewVersion(beatVersion) if err != nil { return nil, err @@ -96,20 +96,15 @@ func New(beatVersion string, beatName string, esVersion string, config TemplateC } // In case no esVersion is set, it is assumed the same as beat version - if esVersion == "" { - esVersion = beatVersion - } - - esV, err := common.NewVersion(esVersion) - if err != nil { - return nil, err + if !esVersion.IsValid() { + esVersion = *bV } return &Template{ pattern: pattern, name: name, beatVersion: *bV, - esVersion: *esV, + esVersion: esVersion, config: config, }, nil } @@ -210,20 +205,21 @@ func (t *Template) Generate(properties common.MapStr, dynamicTemplates []common. indexSettings.Put("number_of_routing_shards", defaultNumberOfRoutingShards) } - if t.esVersion.IsMajor(7) { + var mappingName string + major := t.esVersion.Major + switch { + case major < 6: + mappingName = "_default_" + case major == 6: + mappingName = "doc" + case major >= 7: + mappingName = "_doc" defaultFields = append(defaultFields, "fields.*") indexSettings.Put("query.default_field", defaultFields) } indexSettings.DeepUpdate(t.config.Settings.Index) - var mappingName string - if t.esVersion.Major >= 6 { - mappingName = "doc" - } else { - mappingName = "_default_" - } - // Load basic structure basicStructure := common.MapStr{ "mappings": common.MapStr{ diff --git a/libbeat/template/template_test.go b/libbeat/template/template_test.go index 09eb50b93a1..701f692737f 100644 --- a/libbeat/template/template_test.go +++ b/libbeat/template/template_test.go @@ -34,7 +34,8 @@ func TestNumberOfRoutingShards(t *testing.T) { config := TemplateConfig{} // Test it exists in 6.1 - template, err := New(beatVersion, beatName, "6.1.0", config) + ver := common.MustNewVersion("6.1.0") + template, err := New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data := template.Generate(nil, nil) @@ -44,7 +45,8 @@ func TestNumberOfRoutingShards(t *testing.T) { assert.Equal(t, 30, shards.(int)) // Test it does not exist in 6.0 - template, err = New(beatVersion, beatName, "6.0.0", config) + ver = common.MustNewVersion("6.0.0") + template, err = New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data = template.Generate(nil, nil) @@ -64,7 +66,8 @@ func TestNumberOfRoutingShardsOverwrite(t *testing.T) { } // Test it exists in 6.1 - template, err := New(beatVersion, beatName, "6.1.0", config) + ver := common.MustNewVersion("6.1.0") + template, err := New(beatVersion, beatName, *ver, config) assert.NoError(t, err) data := template.Generate(nil, nil) diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index 0f84ba8bdc8..be0e068b656 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -86,7 +86,7 @@ func ExampleWrapper() { // { // "@metadata": { // "beat": "noindex", - // "type": "doc", + // "type": "_doc", // "version": "1.2.3" // }, // "@timestamp": "2016-05-10T23:27:58.485Z", diff --git a/metricbeat/tests/system/test_template.py b/metricbeat/tests/system/test_template.py index 8766ba54ddf..0f6e09b8dcb 100644 --- a/metricbeat/tests/system/test_template.py +++ b/metricbeat/tests/system/test_template.py @@ -42,7 +42,7 @@ def test_export_template(self): t = json.loads(template_content) - properties = t["mappings"]["doc"]["properties"] + properties = t["mappings"]["_doc"]["properties"] # Check libbeat fields assert properties["@timestamp"] == {"type": "date"}