Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/jolokia proxy mode #1031

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#g
- Possible breaking change for the librato and graphite outputs. Telegraf will
no longer insert field names when the field is simply named `value`. This is
because the `value` field is redundant in the graphite/librato context.
- Breaking change in jolokia plugin. See https://github.com/influxdata/telegraf/blob/master/plugins/inputs/jolokia/README.md

### Features
- [#1031](https://github.com/influxdata/telegraf/pull/1031): Jolokia plugin proxy mode. Thanks @saiello!
- [#1009](https://github.com/influxdata/telegraf/pull/1009): Cassandra input plugin. Thanks @subhachandrachandra!
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
- [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener.
Expand All @@ -21,6 +23,7 @@ because the `value` field is redundant in the graphite/librato context.
- [#1008](https://github.com/influxdata/telegraf/pull/1008): Adding memstats metrics to the influxdb plugin.

### Bugfixes
- [#1050](https://github.com/influxdata/telegraf/issues/1050): jolokia plugin - do not overwrite host tag. Thanks @saiello!
- [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)
- [#969](https://github.com/influxdata/telegraf/pull/969): ipmi_sensors: allow : in password. Thanks @awaw!
- [#972](https://github.com/influxdata/telegraf/pull/972): dovecot: remove extra newline in dovecot command. Thanks @mrannanj!
Expand Down
36 changes: 24 additions & 12 deletions plugins/inputs/jolokia/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
```toml
[[inputs.jolokia]]
## This is the context root used to compose the jolokia url
context = "/jolokia/read"

## List of servers exposing jolokia read service
context = "/jolokia"

# This specifies the mode used
# mode = "proxy"
#
# When in proxy mode this section is used to specify further proxy address configurations.
# Remember to change servers addresses
# [inputs.jolokia.proxy]
# host = "127.0.0.1"
# port = "8080"

# List of servers exposing jolokia read service
[[inputs.jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"
name = "as-server-01"
host = "127.0.0.1"
port = "8080"
# username = "myuser"
# password = "mypassword"

Expand All @@ -21,17 +30,20 @@
## This collect all heap memory usage metrics.
[[inputs.jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"

mbean = "java.lang:type=Memory"
attribute = "HeapMemoryUsage"

## This collect thread counts metrics.
[[inputs.jolokia.metrics]]
name = "thread_count"
jmx = "/java.lang:type=Threading/TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"

mbean = "java.lang:type=Threading"
attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"

## This collect number of class loaded/unloaded counts metrics.
[[inputs.jolokia.metrics]]
name = "class_count"
jmx = "/java.lang:type=ClassLoading/LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
mbean = "java.lang:type=ClassLoading"
attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
```

#### Description
Expand All @@ -42,4 +54,4 @@ are collected for each server configured.
See: https://jolokia.org/

# Measurements:
Jolokia plugin produces one measure for each metric configured, adding Server's `name`, `host` and `port` as tags.
Jolokia plugin produces one measure for each metric configured, adding Server's `server_name`, `server_host` and `server_port` as tags.
180 changes: 137 additions & 43 deletions plugins/inputs/jolokia/jolokia.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jolokia

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -22,8 +23,10 @@ type Server struct {
}

type Metric struct {
Name string
Jmx string
Name string
Mbean string
Attribute string
Path string
}

type JolokiaClient interface {
Expand All @@ -41,20 +44,32 @@ func (c JolokiaClientImpl) MakeRequest(req *http.Request) (*http.Response, error
type Jolokia struct {
jClient JolokiaClient
Context string
Mode string
Servers []Server
Metrics []Metric
Proxy Server
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you changing the default context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'/jolokia' is actually the context. 'read' is the operation. I started to use POST request to support proxy mode and I moved the operation type into the request body.


func (j *Jolokia) SampleConfig() string {
return `
## This is the context root used to compose the jolokia url
context = "/jolokia/read"
# This is the context root used to compose the jolokia url
context = "/jolokia"

## List of servers exposing jolokia read service
# This specifies the mode used
# mode = "proxy"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the options besides "proxy"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently any value different to "proxy", make the input plugin executing in agent mode.

#
# When in proxy mode this section is used to specify further proxy address configurations.
# Remember to change servers addresses
# [inputs.jolokia.proxy]
# host = "127.0.0.1"
# port = "8080"


# List of servers exposing jolokia read service
[[inputs.jolokia.servers]]
name = "stable"
host = "192.168.103.2"
port = "8180"
name = "as-server-01"
host = "127.0.0.1"
port = "8080"
# username = "myuser"
# password = "mypassword"

Expand All @@ -64,30 +79,28 @@ func (j *Jolokia) SampleConfig() string {
## This collect all heap memory usage metrics.
[[inputs.jolokia.metrics]]
name = "heap_memory_usage"
jmx = "/java.lang:type=Memory/HeapMemoryUsage"

mbean = "java.lang:type=Memory"
attribute = "HeapMemoryUsage"

## This collect thread counts metrics.
[[inputs.jolokia.metrics]]
name = "thread_count"
jmx = "/java.lang:type=Threading/TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"

mbean = "java.lang:type=Threading"
attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"

## This collect number of class loaded/unloaded counts metrics.
[[inputs.jolokia.metrics]]
name = "class_count"
jmx = "/java.lang:type=ClassLoading/LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
mbean = "java.lang:type=ClassLoading"
attribute = "LoadedClassCount,UnloadedClassCount,TotalLoadedClassCount"
`
}

func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia"
}

func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Create + send request
req, err := http.NewRequest("GET", requestUrl.String(), nil)
if err != nil {
return nil, err
}
func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) {

resp, err := j.jClient.MakeRequest(req)
if err != nil {
Expand All @@ -98,7 +111,7 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
// Process response
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("Response from url \"%s\" has status code %d (%s), expected %d (%s)",
requestUrl,
req.RequestURI,
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
Expand All @@ -118,51 +131,132 @@ func (j *Jolokia) getAttr(requestUrl *url.URL) (map[string]interface{}, error) {
return nil, errors.New("Error decoding JSON response")
}

if status, ok := jsonOut["status"]; ok {
if status != float64(200) {
return nil, fmt.Errorf("Not expected status value in response body: %3.f", status)
}
} else {
return nil, fmt.Errorf("Missing status in response body")
}

return jsonOut, nil
}

func (j *Jolokia) prepareRequest(server Server, metric Metric) (*http.Request, error) {
var jolokiaUrl *url.URL
context := j.Context // Usually "/jolokia"

// Create bodyContent
bodyContent := map[string]interface{}{
"type": "read",
"mbean": metric.Mbean,
}

if metric.Attribute != "" {
bodyContent["attribute"] = metric.Attribute
if metric.Path != "" {
bodyContent["path"] = metric.Path
}
}

// Add target, only in proxy mode
if j.Mode == "proxy" {

serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", server.Host, server.Port)

target := map[string]string{
"url": serviceUrl,
}

if server.Username != "" {
target["user"] = server.Username
}

if server.Password != "" {
target["password"] = server.Password
}

bodyContent["target"] = target

proxy := j.Proxy

// Prepare ProxyURL
proxyUrl, err := url.Parse("http://" + proxy.Host + ":" + proxy.Port + context)
if err != nil {
return nil, err
}
if proxy.Username != "" || proxy.Password != "" {
proxyUrl.User = url.UserPassword(proxy.Username, proxy.Password)
}

jolokiaUrl = proxyUrl
} else {

serverUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context)
if err != nil {
return nil, err
}
if server.Username != "" || server.Password != "" {
serverUrl.User = url.UserPassword(server.Username, server.Password)
}

jolokiaUrl = serverUrl
}

requestBody, err := json.Marshal(bodyContent)

req, err := http.NewRequest("POST", jolokiaUrl.String(), bytes.NewBuffer(requestBody))

if err != nil {
return nil, err
}

req.Header.Add("Content-type", "application/json")

return req, nil
}

func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
context := j.Context //"/jolokia/read"
servers := j.Servers
metrics := j.Metrics
tags := make(map[string]string)

for _, server := range servers {
tags["server"] = server.Name
tags["port"] = server.Port
tags["host"] = server.Host
tags["server_name"] = server.Name
tags["server_port"] = server.Port
tags["server_host"] = server.Host
fields := make(map[string]interface{})
for _, metric := range metrics {

for _, metric := range metrics {
measurement := metric.Name
jmxPath := metric.Jmx

// Prepare URL
requestUrl, err := url.Parse("http://" + server.Host + ":" +
server.Port + context + jmxPath)
req, err := j.prepareRequest(server, metric)
if err != nil {
return err
}
if server.Username != "" || server.Password != "" {
requestUrl.User = url.UserPassword(server.Username, server.Password)
}

out, _ := j.getAttr(requestUrl)
out, err := j.doRequest(req)

if values, ok := out["value"]; ok {
switch t := values.(type) {
case map[string]interface{}:
for k, v := range t {
fields[measurement+"_"+k] = v
if err != nil {
fmt.Printf("Error handling response: %s\n", err)
} else {

if values, ok := out["value"]; ok {
switch t := values.(type) {
case map[string]interface{}:
for k, v := range t {
fields[measurement+"_"+k] = v
}
case interface{}:
fields[measurement] = t
}
case interface{}:
fields[measurement] = t
} else {
fmt.Printf("Missing key 'value' in output response\n")
}
} else {
fmt.Printf("Missing key 'value' in '%s' output response\n",
requestUrl.String())

}
}

acc.AddFields("jolokia", fields, tags)
}

Expand Down
26 changes: 21 additions & 5 deletions plugins/inputs/jolokia/jolokia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ const invalidJSON = "I don't think this is JSON"
const empty = ""

var Servers = []Server{Server{Name: "as1", Host: "127.0.0.1", Port: "8080"}}
var HeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage", Jmx: "/java.lang:type=Memory/HeapMemoryUsage"}
var HeapMetric = Metric{Name: "heap_memory_usage",
Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"}
var UsedHeapMetric = Metric{Name: "heap_memory_usage",
Mbean: "java.lang:type=Memory", Attribute: "HeapMemoryUsage"}

type jolokiaClientStub struct {
responseBody string
Expand Down Expand Up @@ -94,9 +96,9 @@ func TestHttpJsonMultiValue(t *testing.T) {
"heap_memory_usage_used": 203288528.0,
}
tags := map[string]string{
"host": "127.0.0.1",
"port": "8080",
"server": "as1",
"server_host": "127.0.0.1",
"server_port": "8080",
"server_name": "as1",
}
acc.AssertContainsTaggedFields(t, "jolokia", fields, tags)
}
Expand All @@ -114,3 +116,17 @@ func TestHttpJsonOn404(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Metrics))
}

// Test that the proper values are ignored or collected
func TestHttpInvalidJson(t *testing.T) {

jolokia := genJolokiaClientStub(invalidJSON, 200, Servers,
[]Metric{UsedHeapMetric})

var acc testutil.Accumulator
acc.SetDebug(true)
err := jolokia.Gather(&acc)

assert.Nil(t, err)
assert.Equal(t, 0, len(acc.Metrics))
}