Skip to content

Commit

Permalink
Added option to disable decoding json logs (#55) (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
masterada authored and maxekman committed Nov 30, 2017
1 parent e00429e commit 68a4e47
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -122,3 +122,4 @@ This table shows all available configurations:
| DOCKER_LABELS | any | "" |
| RETRY_STARTUP | any | "" |
| RETRY_SEND | any | "" |
| DECODE_JSON_LOGS | bool | true |
37 changes: 32 additions & 5 deletions logstash.go
Expand Up @@ -19,10 +19,11 @@ func init() {

// LogstashAdapter is an adapter that streams UDP JSON to Logstash.
type LogstashAdapter struct {
conn net.Conn
route *router.Route
containerTags map[string][]string
logstashFields map[string]map[string]string
conn net.Conn
route *router.Route
containerTags map[string][]string
logstashFields map[string]map[string]string
decodeJsonLogs map[string]bool
}

// NewLogstashAdapter creates a LogstashAdapter with UDP as the default transport.
Expand All @@ -41,6 +42,7 @@ func NewLogstashAdapter(route *router.Route) (router.LogAdapter, error) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}, nil
}
if os.Getenv("RETRY_STARTUP") == "" {
Expand Down Expand Up @@ -103,6 +105,28 @@ func GetLogstashFields(c *docker.Container, a *LogstashAdapter) map[string]strin
return fields
}

// Get boolean indicating whether json logs should be decoded (or added as message),
// configured with the environment variable DECODE_JSON_LOGS
func IsDecodeJsonLogs(c *docker.Container, a *LogstashAdapter) bool {
if decodeJsonLogs, ok := a.decodeJsonLogs[c.ID]; ok {
return decodeJsonLogs
}

decodeJsonLogsStr := os.Getenv("DECODE_JSON_LOGS")

for _, e := range c.Config.Env {
if strings.HasPrefix(e, "DECODE_JSON_LOGS=") {
decodeJsonLogsStr = strings.TrimPrefix(e, "DECODE_JSON_LOGS=")
}
}

decodeJsonLogs := decodeJsonLogsStr != "false"

a.decodeJsonLogs[c.ID] = decodeJsonLogs

return decodeJsonLogs
}

// Stream implements the router.LogAdapter interface.
func (a *LogstashAdapter) Stream(logstream chan *router.Message) {

Expand Down Expand Up @@ -131,7 +155,10 @@ func (a *LogstashAdapter) Stream(logstream chan *router.Message) {

// Try to parse JSON-encoded m.Data. If it wasn't JSON, create an empty object
// and use the original data as the message.
if err = json.Unmarshal([]byte(m.Data), &data); err != nil || data == nil {
if IsDecodeJsonLogs(m.Container, a) {
err = json.Unmarshal([]byte(m.Data), &data)
}
if err != nil || data == nil {
data = make(map[string]interface{})
data["message"] = m.Data
}
Expand Down
74 changes: 74 additions & 0 deletions logstash_test.go
Expand Up @@ -60,6 +60,7 @@ func TestStreamNullData(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestStreamNotJsonWithoutLogstashTags(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -174,6 +176,7 @@ func TestStreamNotJsonWithLogstashTags(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -231,6 +234,7 @@ func TestStreamJsonWithoutLogstashTags(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -294,6 +298,7 @@ func TestStreamJsonWithLogstashTags(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -357,6 +362,7 @@ func TestStreamNotJsonWithLogstashFields(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -416,6 +422,7 @@ func TestStreamJsonWithLogstashFields(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -483,6 +490,7 @@ func TestStreamNotJsonWithLogstashFieldsWithDefault(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -544,6 +552,7 @@ func TestStreamJsonWithLogstashFieldsWithDefault(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -611,6 +620,7 @@ func TestStreamNotJsonWithLogstashTagsWithDefault(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -670,6 +680,7 @@ func TestStreamJsonWithLogstashTagsWithDefault(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -733,6 +744,7 @@ func TestStreamJsonWithLogstashFieldsAndBlacklist(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -801,6 +813,7 @@ func TestStreamJsonWithLogstashFieldsWithDefaultAndBlacklist(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -870,6 +883,7 @@ func TestStreamJsonLabelsDisabled(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -939,6 +953,7 @@ func TestStreamJsonLabelsEnabled(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -1015,6 +1030,7 @@ func TestStreamJsonLabelsEnabledButEmpty(t *testing.T) {
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)
Expand Down Expand Up @@ -1074,3 +1090,61 @@ func TestStreamJsonLabelsEnabledButEmpty(t *testing.T) {
assert.Equal(true, ok)
assert.Nil(dockerLabels["log"])
}

func TestStreamJsonWithDecodeJsonLogsFalse(t *testing.T) {
assert := assert.New(t)

conn := MockConn{}

adapter := LogstashAdapter{
route: new(router.Route),
conn: conn,
containerTags: make(map[string][]string),
logstashFields: make(map[string]map[string]string),
decodeJsonLogs: make(map[string]bool),
}

assert.NotNil(adapter)

logstream := make(chan *router.Message)

containerConfig := docker.Config{}
containerConfig.Image = "image"
containerConfig.Hostname = "hostname"
containerConfig.Env = []string{"NON_LOGSTASH_TAGS=not,logstash", "DECODE_JSON_LOGS=false"}

container := docker.Container{}
container.Name = "name"
container.ID = "ID"
container.Config = &containerConfig

str := `{ "remote_user": "-", "body_bytes_sent": "25", "request_time": "0.821", "status": "200", "request_method": "POST", "http_referrer": "-", "http_user_agent": "-" }`

message := router.Message{
Container: &container,
Source: "FOOOOO",
Data: str,
Time: time.Now(),
}

go func() {
logstream <- &message
close(logstream)
}()

adapter.Stream(logstream)

var data map[string]interface{}
err := json.Unmarshal([]byte(res), &data)
assert.Nil(err)

assert.Equal(str, data["message"])
assert.Equal([]interface{}{}, data["tags"])

var dockerInfo map[string]interface{}
dockerInfo = data["docker"].(map[string]interface{})
assert.Equal("name", dockerInfo["name"])
assert.Equal("ID", dockerInfo["id"])
assert.Equal("image", dockerInfo["image"])
assert.Equal("hostname", dockerInfo["hostname"])
}

0 comments on commit 68a4e47

Please sign in to comment.