Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

Improve ES Sink: #1260

Merged
merged 5 commits into from
Sep 2, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 20 additions & 6 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ test-integration: clean deps build
container: build
cp heapster deploy/docker/heapster
cp eventer deploy/docker/eventer
docker build -t $(PREFIX)/heapster:$(TAG) deploy/docker/
docker build -t heapster deploy/docker/
docker tag heapster:latest 032833530291.dkr.ecr.eu-west-1.amazonaws.com/heapster:latest

grafana:
docker build -t $(PREFIX)/heapster_grafana:$(TAG) grafana/
Expand Down
51 changes: 35 additions & 16 deletions common/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"github.com/pborman/uuid"

"gopkg.in/olivere/elastic.v3"
"os"
)

const (
Expand Down Expand Up @@ -60,7 +62,7 @@ func SaveDataIntoES(esClient *elastic.Client, indexName string, typeName string,
_, err = esClient.Index().
Index(indexName).
Type(typeName).
Id(string(indexID)).
Id(indexID.String()).
BodyJson(sinkData).
Do()
if err != nil {
Expand All @@ -76,7 +78,7 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
var esConfig ElasticSearchConfig
opts, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, fmt.Errorf("failed to parser url's query string: %s", err)
return nil, fmt.Errorf("fFailed to parser url's query string: %s", err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo error
fFailed ==> Failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// set the index for es,the default value is "heapster"
Expand All @@ -87,12 +89,15 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {

// Set the URL endpoints of the ES's nodes. Notice that when sniffing is
// enabled, these URLs are used to initially sniff the cluster on startup.
if len(opts["nodes"]) < 1 {
var startupFns []elastic.ClientOptionFunc
if len(opts["nodes"]) > 0 {
startupFns = append(startupFns, elastic.SetURL(opts["nodes"]...))
} else if uri.Opaque != "" {
startupFns = append(startupFns, elastic.SetURL(uri.Opaque))
} else {
return nil, fmt.Errorf("There is no node assigned for connecting ES cluster")
}

startupFns := []elastic.ClientOptionFunc{elastic.SetURL(opts["nodes"]...)}

// If the ES cluster needs authentication, the username and secret
// should be set in sink config.Else, set the Authenticate flag to false
if len(opts["esUserName"]) > 0 && len(opts["esUserSecret"]) > 0 {
Expand All @@ -115,14 +120,6 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheck(healthCheck))
}

if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}

if len(opts["startupHealthcheckTimeout"]) > 0 {
timeout, err := time.ParseDuration(opts["startupHealthcheckTimeout"][0] + "s")
if err != nil {
Expand All @@ -131,12 +128,34 @@ func CreateElasticSearchConfig(uri *url.URL) (*ElasticSearchConfig, error) {
startupFns = append(startupFns, elastic.SetHealthcheckTimeoutStartup(timeout))
}

if os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "" ||
os.Getenv("AWS_SECRET_ACCESS_KEY") != "" || os.Getenv("AWS_SECRET_KEY") != "" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the blank line here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

glog.Info("Configuring with AWS credentials..")

awsClient, err := createAWSClient()
if err != nil {
return nil, err
}

startupFns = append(startupFns, elastic.SetHttpClient(awsClient))
startupFns = append(startupFns, elastic.SetSniff(false))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change

startupFns = append(startupFns, elastic.SetHttpClient(awsClient))
startupFns = append(startupFns, elastic.SetSniff(false))

to

startupFns = append(startupFns, elastic.SetHttpClient(awsClient),elastic.SetSniff(false))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else {
if len(opts["sniff"]) > 0 {
sniff, err := strconv.ParseBool(opts["sniff"][0])
if err != nil {
return nil, fmt.Errorf("Failed to parse URL's sniff value into a bool")
}
startupFns = append(startupFns, elastic.SetSniff(sniff))
}
}

esConfig.EsClient, err = elastic.NewClient(startupFns...)
if err != nil {
return nil, fmt.Errorf("failed to create ElasticSearch client: %v", err)
return nil, fmt.Errorf("Failed to create ElasticSearch client: %v", err)
}

glog.V(2).Infof("elasticsearch sink configure successfully")
glog.V(2).Infof("ElasticSearch sink configure successfully")

return &esConfig, nil
}
2 changes: 1 addition & 1 deletion common/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
)

func TestCreateElasticSearchConfig(t *testing.T) {
Expand Down
35 changes: 32 additions & 3 deletions docs/sink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,18 @@ The following options are available:
### Elasticsearch
This sink supports monitoring metrics and events. To use the ElasticSearch
sink add the following flag:

```
--sink=elasticsearch:<ES_SERVER_URL>[?<OPTIONS>]

```
Normally an ElasticSearch cluster has multiple nodes or a proxy, so these need
to be configured for the ElasticSearch sink. To do this, you can set
`ES_SERVER_URL` to a dummy value, and use the `?nodes=` query value for each
additional node in the cluster. For example:

```
--sink=elasticsearch:?nodes=foo.com:9200&nodes=bar.com:9200
```
(*) Notice that using the `?nodes` notation will override the `ES_SERVER_URL`


Besides this, the following options can be set in query string:

Expand All @@ -189,6 +192,32 @@ Like this:

--sink="elasticsearch:?nodes=0.0.0.0:9200&Index=testEvent"

#### AWS Integration
In order to use AWS Managed Elastic we need to use one of the following methods:

1. Making sure the public IPs of the Heapster are allowed on the ElasticSearch's Access Policy

-OR-

2. Configuring an Access Policy with IAM
1. Configure the ElasticSearch cluster policy with IAM User
2. Create a secret that stores the IAM credentials
3. Expose the credentials to the environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`

```
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-heapster
key: aws.secret
```

## Using multiple sinks

Heapster can be configured to send k8s metrics and events to multiple sinks by specifying the`--sink=...` flag multiple times.
Expand Down
13 changes: 8 additions & 5 deletions events/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
event_core "k8s.io/heapster/events/core"
"k8s.io/heapster/metrics/core"
Expand Down Expand Up @@ -84,9 +84,12 @@ func (sink *elasticSearchSink) ExportEvents(eventBatch *event_core.EventBatch) {
for _, event := range eventBatch.Events {
point, err := eventToPoint(event)
if err != nil {
glog.Warningf("Failed to convert event to point: %v", err)
glog.Info("Failed to convert event to point: %v", err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change glog.Info to glog.V(3).Info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to warning.. It make more sense to me. What do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i agree, warning is more precise. thanks

}
err = sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.V(3).Info("Failed to export data to ElasticSearch sink: ", err)
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
}
}

Expand All @@ -102,12 +105,12 @@ func NewElasticSearchSink(uri *url.URL) (event_core.EventSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.V(2).Infof("Failed to config ElasticSearch")
return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Infof("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion events/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/events/core"
kube_api "k8s.io/kubernetes/pkg/api"
Expand Down
16 changes: 11 additions & 5 deletions metrics/sinks/elasticsearch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/olivere/elastic"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down Expand Up @@ -58,7 +58,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.V(3).Info("Failed to export data to ElasticSearch sink: ", err)
}
}
for _, metric := range metricSet.LabeledMetrics {
labels := make(map[string]string)
Expand All @@ -76,7 +79,10 @@ func (sink *elasticSearchSink) ExportData(dataBatch *core.DataBatch) {
},
MetricsTimestamp: dataBatch.Timestamp.UTC(),
}
sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
err := sink.saveDataFunc(sink.esConfig.EsClient, sink.esConfig.Index, typeName, point)
if err != nil {
glog.V(3).Info("Failed to export data to ElasticSearch sink: ", err)
}
}
}
}
Expand All @@ -93,12 +99,12 @@ func NewElasticSearchSink(uri *url.URL) (core.DataSink, error) {
var esSink elasticSearchSink
elasticsearchConfig, err := esCommon.CreateElasticSearchConfig(uri)
if err != nil {
glog.V(2).Infof("failed to config elasticsearch")
glog.V(2).Infof("Failed to config ElasticSearch")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unify the level of glog.Info to V2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to warning.. It make more sense to me. What do you think?

return nil, err
}

esSink.esConfig = *elasticsearchConfig
esSink.saveDataFunc = esCommon.SaveDataIntoES
glog.V(2).Infof("elasticsearch sink setup successfully")
glog.V(2).Infof("ElasticSearch sink setup successfully")
return &esSink, nil
}
2 changes: 1 addition & 1 deletion metrics/sinks/elasticsearch/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"testing"
"time"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"gopkg.in/olivere/elastic.v3"
esCommon "k8s.io/heapster/common/elasticsearch"
"k8s.io/heapster/metrics/core"
)
Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/gedex/inflector/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions vendor/github.com/gedex/inflector/CakePHP_LICENSE.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.