Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Switch to docker's logs API endpoint, from attach, so we can
Browse files Browse the repository at this point in the history
not lose log messages when Heka is down.
  • Loading branch information
rafrombrc committed Mar 24, 2016
1 parent be1420d commit bd5c4e8
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 47 deletions.
13 changes: 10 additions & 3 deletions CHANGES.txt
Expand Up @@ -4,8 +4,15 @@
Backwards Incompatibilities
---------------------------

* StatAccumInput Input: percent_threshold param type convert to slice
* HttpInput Input: user param changed to username to match other Http plugins
* StatAccumInput `percent_threshold` param type convert to slice.

* HttpInput `user` param changed to `username` to match other HTTP plugins.

* DockerLogInput changed to use `logs` API endpoint instead of `attach`. This
helps prevent data loss by allowing use of the `since` parameter to fetch
records that were generated while Heka was down, but it means this input will
now only work with containers using the `json-file` or `journald` logging
drivers.

Bug Handling
------------
Expand All @@ -16,7 +23,7 @@ Bug Handling
* Fixed ESJsonEncoder generating invalid JSON when `DynamicFields` is first of
multiple specified fields but the message contains no dynamic fields.

* Fixed bug where DockerLogInput would not reconnect when a Docker daemon
* Fixed bug where DockerLogInput would not reconnect when a Docker daemon
was down for some time (#1843).

* More verbose logging from the DockerLogInput plugin (#1843).
Expand Down
18 changes: 18 additions & 0 deletions docs/source/config/inputs/docker_log.rst
Expand Up @@ -52,6 +52,24 @@ Config:
- fields_from_env (array[string], optional):
A list of environment variables to extract from the container and add as fields.

.. versionadded:: 0.11

- since_path (string, optional):
Path to file where input will write a record of the "since" time for each
container to be able to not miss log records while Heka is down (see
Dockers `Get container logs
<https://docs.docker.com/engine/reference/api/docker_remote_api_v1.20/#get-container-logs>`_
API). Relative paths will be relative to Heka's configured
``base_dir``. Defaults to `${BASE_DIR}/docker/logs_since.txt`
- since_interval (string, optional):
Time interval (as supported by Go's `time.ParseDuration API
<https://golang.org/pkg/time/#ParseDuration>`_) that specifies how often
the DockerLogInput will write out the "since" file containing the most
recently retrieved log times for each container. Defaults to "5s". If set
to zero (e.g. "0s") then the file will only be written out when Heka
cleanly shuts down, meaning that if Heka crashes all container logs written
since Heka has started will be re-fetched.

Example:

.. code-block:: ini
Expand Down
153 changes: 118 additions & 35 deletions plugins/docker/attacher.go
Expand Up @@ -26,11 +26,14 @@ package docker
// SOFTWARE.

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/fsouza/go-dockerclient"
Expand All @@ -43,6 +46,11 @@ const (
SLEEP_BETWEEN_RECONNECT = 500 * time.Millisecond
)

type sinces struct {
Since int64
Containers map[string]int64
}

type AttachManager struct {
hostname string
client DockerClient
Expand All @@ -53,9 +61,13 @@ type AttachManager struct {
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sincePath string
sinces *sinces
sinceLock sync.Mutex
sinceInterval time.Duration
}

// Return a properly configured Docker client
// Return a properly configured Docker client.
func newDockerClient(certPath string, endpoint string) (DockerClient, error) {
var client DockerClient
var err error
Expand All @@ -74,7 +86,8 @@ func newDockerClient(certPath string, endpoint string) (DockerClient, error) {

// Construct an AttachManager and set up the Docker Client
func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
fieldsFromEnv []string, fieldsFromLabels []string) (*AttachManager, error) {
fieldsFromEnv []string, fieldsFromLabels []string,
sincePath string, sinceInterval time.Duration) (*AttachManager, error) {

client, err := newDockerClient(certPath, endpoint)
if err != nil {
Expand All @@ -87,12 +100,27 @@ func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sincePath: sincePath,
sinces: &sinces{},
sinceInterval: sinceInterval,
}

// Initialize the sinces from the JSON since file.
sinceFile, err := os.Open(sincePath)
if err != nil {
return nil, fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}
jsonDecoder := json.NewDecoder(sinceFile)
m.sinceLock.Lock()
err = jsonDecoder.Decode(m.sinces)
m.sinceLock.Unlock()
if err != nil {
return nil, fmt.Errorf("Can't decode \"since\" file '%s': %s", sincePath, err.Error())
}
return m, nil
}

// Handler to wrap functions with retry logic
// Handler to wrap functions with retry logic.
func withRetries(doWork func() error) error {
var err error

Expand All @@ -111,7 +139,7 @@ func withRetries(doWork func() error) error {
return nil
}

// Sleep between retries, break if we're done
// Sleep between retries, break if we're done.
if e := retrier.Wait(); e != nil {
break
}
Expand Down Expand Up @@ -165,11 +193,56 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
"Failed to add Docker event listener after retrying. Plugin giving up.")
}

m.handleDockerEvents(stopChan)
if m.sinceInterval > 0 {
go m.sinceWriteLoop(stopChan)
}
m.handleDockerEvents(stopChan) // Blocks until stopChan is closed.
// Write to since file on the way out.
m.writeSinceFile(time.Now())
return nil
}

// Try to reboot all of our connections
func (m *AttachManager) writeSinceFile(t time.Time) {
sinceFile, err := os.Create(m.sincePath)
if err != nil {
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sincePath,
err.Error()))
return
}
jsonEncoder := json.NewEncoder(sinceFile)
m.sinceLock.Lock()
m.sinces.Since = t.Unix()
if err = jsonEncoder.Encode(m.sinces); err != nil {
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sincePath,
err.Error()))
}
m.sinceLock.Unlock()
if err = sinceFile.Close(); err != nil {
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sincePath,
err.Error()))
}
}

// Periodically writes out a new since file, until stopped.
func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
ticker := time.Tick(m.sinceInterval)
ok := true
var now time.Time
for ok {
select {
case now, ok = <-ticker:
if !ok {
break
}
m.writeSinceFile(now)
case <-stopChan:
ok = false
break
}
}
}

// Try to reboot all of our connections.
func (m *AttachManager) restart() error {
var err error

Expand Down Expand Up @@ -250,7 +323,7 @@ func (m *AttachManager) extractFields(id string, client DockerClient) (map[strin
return fields, nil
}

// Attach to the output of a single running container
// Attach to the log output of a single running container.
func (m *AttachManager) attach(id string, client DockerClient) error {
m.ir.LogMessage(fmt.Sprintf("Attaching container: %s", id))

Expand All @@ -259,50 +332,58 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
return err
}

success := make(chan struct{})
failure := make(chan error)
outrd, outwr := io.Pipe()
errrd, errwr := io.Pipe()

// Spin up one of these for each container we're watching
// Spin up one of these for each container we're watching.
go func() {
// This will block until the container exits
err := client.AttachToContainer(docker.AttachToContainerOptions{
m.sinceLock.Lock()
since, ok := m.sinces.Containers[id]
if ok {
// We've seen this container before, need to use a since value.
if since == 0 {
// Fall back to the top level since time.
since = m.sinces.Since
} else {
// Clear out the container specific since time.
m.sinces.Containers[id] = 0
}
} else {
// We haven't seen it, add it to our sinces.
m.sinces.Containers[id] = 0
}
m.sinceLock.Unlock()

// This will block until the container exits.
err := client.Logs(docker.LogsOptions{
Container: id,
OutputStream: outwr,
ErrorStream: errwr,
Stdin: false,
Follow: true,
Stdout: true,
Stderr: true,
Stream: true,
Success: success,
Since: since,
Timestamps: false,
Tail: "all",
RawTerminal: false,
})

// Once it has exited, close our pipes
// Once it has exited, close our pipes, remove from the sinces, and (if
// necessary) log the error.
outwr.Close()
errwr.Close()
m.sinceLock.Lock()
m.sinces.Containers[id] = time.Now().Unix()
m.sinceLock.Unlock()
if err != nil {
close(success)
failure <- err
err = fmt.Errorf("streaming container %s logs: %s", id, err.Error())
m.ir.LogError(err)
}
}()

// Wait for success from the attachment
select {
case _, ok := <-success:
if ok {
go m.handleOneStream("stdout", outrd, fields, id)
go m.handleOneStream("stderr", errrd, fields, id)

// Signal back to the client to continue with attachment
success <- struct{}{}
return nil
}
case err := <-failure:
close(failure)
return err
}

go m.handleOneStream("stdout", outrd, fields, id)
go m.handleOneStream("stderr", errrd, fields, id)
return nil
}

Expand All @@ -312,7 +393,7 @@ func (m *AttachManager) makePackDecorator(logger string, fields map[string]strin
pack.Message.SetType("DockerLog")
pack.Message.SetLogger(logger) // stderr or stdout
pack.Message.SetHostname(m.hostname) // Use the host's hosntame
pack.Message.SetTimestamp(time.Now().UnixNano())
pack.Message.SetTimestamp(time.Now().Unix())
pack.Message.SetUuid(uuid.NewRandom())

for name, value := range fields {
Expand All @@ -330,7 +411,9 @@ func (m *AttachManager) makePackDecorator(logger string, fields map[string]strin
}

// Sets up the Heka pipeline for a single IO stream (either stdout or stderr)
func (m *AttachManager) handleOneStream(name string, in io.Reader, fields map[string]string, containerId string) {
func (m *AttachManager) handleOneStream(name string, in io.Reader, fields map[string]string,
containerId string) {

id := fmt.Sprintf("%s-%s", fields["ContainerName"], name)

sRunner := m.ir.NewSplitterRunner(id)
Expand Down
5 changes: 5 additions & 0 deletions plugins/docker/client.go
Expand Up @@ -27,6 +27,11 @@ type DockerClient interface {
// See http://goo.gl/RRAhws for more details.
AttachToContainer(opts docker.AttachToContainerOptions) error

// Logs gets stdout and stderr log from the specified container.
//
// See http://goo.gl/yl8PGm for more details.
Logs(opts docker.LogsOptions) error

// Ping pings the docker server
//
// See https://goo.gl/kQCfJj for more details.
Expand Down

0 comments on commit bd5c4e8

Please sign in to comment.