Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make user-supplied sinks operate on URIs #606

Merged
merged 6 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ type Config struct {
// EncoderConfig sets options for the chosen encoder. See
// zapcore.EncoderConfig for details.
EncoderConfig zapcore.EncoderConfig `json:"encoderConfig" yaml:"encoderConfig"`
// OutputPaths is a list of paths to write logging output to. See Open for
// details.
// OutputPaths is a list of URLs or file paths to write logging output to.
// See Open for details.
OutputPaths []string `json:"outputPaths" yaml:"outputPaths"`
// ErrorOutputPaths is a list of paths to write internal logger errors to.
// ErrorOutputPaths is a list of URLs to write internal logger errors to.
// The default is standard error.
//
// Note that this setting only affects internal errors; for sample code that
Expand Down
125 changes: 96 additions & 29 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
"sync"

"go.uber.org/zap/zapcore"
)

const schemeFile = "file"

var (
_sinkMutex sync.RWMutex
_sinkFactories map[string]func() (Sink, error)
_sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)

func init() {
Expand All @@ -42,18 +46,10 @@ func init() {
func resetSinkRegistry() {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
_sinkFactories = map[string]func() (Sink, error){
"stdout": func() (Sink, error) { return nopCloserSink{os.Stdout}, nil },
"stderr": func() (Sink, error) { return nopCloserSink{os.Stderr}, nil },
}
}

type errSinkNotFound struct {
key string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for %q", e.key)
_sinkFactories = map[string]func(*url.URL) (Sink, error){
schemeFile: newFileSink,
}
}

// Sink defines the interface to write to and close logger destinations.
Expand All @@ -62,33 +58,104 @@ type Sink interface {
io.Closer
}

// RegisterSink adds a Sink at the given key so it can be referenced
// in config OutputPaths.
func RegisterSink(key string, sinkFactory func() (Sink, error)) error {
type nopCloserSink struct{ zapcore.WriteSyncer }

func (nopCloserSink) Close() error { return nil }

type errSinkNotFound struct {
scheme string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for scheme %q", e.scheme)
}

// RegisterSink registers a user-supplied factory for all sinks with a
// particular scheme.
//
// All schemes must be ASCII, valid under section 3.1 of RFC 3986
// (https://tools.ietf.org/html/rfc3986#section-3.1), and may not already have
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "must not"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed.

// a factory registered. Zap automatically registers a factory for the "file"
// scheme.
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change? The factory function didn't accept a callback before.

_sinkMutex.Lock()
defer _sinkMutex.Unlock()
if key == "" {
return errors.New("sink key cannot be blank")

if scheme == "" {
return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {
return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[key]; ok {
return fmt.Errorf("sink already registered for key %q", key)
if _, ok := _sinkFactories[normalized]; ok {
return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[key] = sinkFactory
_sinkFactories[normalized] = factory
return nil
}

// newSink invokes the registered sink factory to create and return the
// sink for the given key. Returns errSinkNotFound if the key cannot be found.
func newSink(key string) (Sink, error) {
func newSink(rawURL string) (Sink, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
}
if u.Scheme == "" {
u.Scheme = schemeFile
}

_sinkMutex.RLock()
defer _sinkMutex.RUnlock()
sinkFactory, ok := _sinkFactories[key]
factory, ok := _sinkFactories[u.Scheme]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optionally: can we hold the RLock just while we're looking up the factory?

_sinkMutex.RLock()
factory, ok := _sinkFactories[u.Scheme]
_sinkMutex.RUnlock()

Shouldn't be an issue since it's just an RLock, but want to avoid holding on to locks while triggering the factory which is user-supplied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, can do.

_sinkMutex.RUnlock()
if !ok {
return nil, &errSinkNotFound{key}
return nil, &errSinkNotFound{u.Scheme}
}
return sinkFactory()
return factory(u)
}

type nopCloserSink struct{ zapcore.WriteSyncer }
func newFileSink(u *url.URL) (Sink, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: should we ensure there's no fragments etc? ignoring seems OK too, but it might be a little surprising

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

if u.User != nil {
return nil, fmt.Errorf("user and password not allowed with file URLs: got %v", u)
}
if u.Fragment != "" {
return nil, fmt.Errorf("fragments not allowed with file URLs: got %v", u)
}
if u.RawQuery != "" {
return nil, fmt.Errorf("query parameters not allowed with file URLs: got %v", u)
}
// Error messages are better if we check hostname and port separately.
if u.Port() != "" {
return nil, fmt.Errorf("ports not allowed with file URLs: got %v", u)
}
if hn := u.Hostname(); hn != "" && hn != "localhost" {
return nil, fmt.Errorf("file URLs must leave host empty or use localhost: got %v", u)
}
switch u.Path {
case "stdout":
return nopCloserSink{os.Stdout}, nil
case "stderr":
return nopCloserSink{os.Stderr}, nil
}
return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
}

func (nopCloserSink) Close() error { return nil }
func normalizeScheme(s string) (string, error) {
// https://tools.ietf.org/html/rfc3986#section-3.1
s = strings.ToLower(s)
if first := s[0]; 'a' > first || 'z' < first {
return "", errors.New("must start with a letter")
}
for i := 1; i < len(s); i++ { // iterate over bytes, not runes
c := s[i]
switch {
case 'a' <= c && c <= 'z':
continue
case '0' <= c && c <= '9':
continue
case c == '.' || c == '+' || c == '-':
continue
}
return "", fmt.Errorf("may not contain %q", c)
}
return s, nil
}
94 changes: 59 additions & 35 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,80 @@
package zap

import (
"errors"
"os"
"bytes"
"io/ioutil"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/zap/zapcore"
)

func TestRegisterSink(t *testing.T) {
tests := []struct {
name string
key string
factory func() (Sink, error)
wantError bool
}{
{"valid", "valid", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, false},
{"empty", "", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
{"stdout", "stdout", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
}
const (
memScheme = "m"
nopScheme = "no-op.1234"
)
var memCalls, nopCalls int

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := RegisterSink(tt.key, tt.factory)
if tt.wantError {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.NotNil(t, _sinkFactories[tt.key], "expected the factory to be present")
}
})
buf := bytes.NewBuffer(nil)
memFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, memScheme, "Scheme didn't match registration.")
memCalls++
return nopCloserSink{zapcore.AddSync(buf)}, nil
}
nopFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, nopScheme, "Scheme didn't match registration.")
nopCalls++
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
}

func TestNewSink(t *testing.T) {
defer resetSinkRegistry()
errTestSink := errors.New("test erroring")
err := RegisterSink("errors", func() (Sink, error) { return nil, errTestSink })
assert.Nil(t, err)

require.NoError(t, RegisterSink(strings.ToUpper(memScheme), memFactory), "Failed to register scheme %q.", memScheme)
require.NoError(t, RegisterSink(nopScheme, nopFactory), "Failed to register scheme %q.", memScheme)

sink, close, err := Open(
memScheme+"://somewhere",
nopScheme+"://somewhere-else",
)
assert.NoError(t, err, "Unexpected error opening URLs with registered schemes.")

defer close()

assert.Equal(t, 1, memCalls, "Unexpected number of calls to memory factory.")
assert.Equal(t, 1, nopCalls, "Unexpected number of calls to no-op factory.")

_, err = sink.Write([]byte("foo"))
assert.NoError(t, err, "Failed to write to combined WriteSyncer.")
assert.Equal(t, "foo", buf.String(), "Unexpected buffer contents.")
}

func TestRegisterSinkErrors(t *testing.T) {
nopFactory := func(_ *url.URL) (Sink, error) {
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
tests := []struct {
key string
err error
scheme string
err string
}{
{"stdout", nil},
{"errors", errTestSink},
{"nonexistent", &errSinkNotFound{"nonexistent"}},
{"", "empty string"},
{"FILE", "already registered"},
{"42", "not a valid scheme"},
{"http*", "not a valid scheme"},
}

for _, tt := range tests {
t.Run(tt.key, func(t *testing.T) {
_, err := newSink(tt.key)
assert.Equal(t, tt.err, err)
t.Run("scheme-"+tt.scheme, func(t *testing.T) {
defer resetSinkRegistry()

err := RegisterSink(tt.scheme, nopFactory)
if assert.Error(t, err, "expected error") {
assert.Contains(t, err.Error(), tt.err, "unexpected error")
}
})
}
}
45 changes: 22 additions & 23 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@
package zap

import (
"fmt"
"io"
"io/ioutil"
"os"

"go.uber.org/zap/zapcore"

"go.uber.org/multierr"
)

// Open is a high-level wrapper that takes a variadic number of paths, opens or
// creates each of the specified files, and combines them into a locked
// Open is a high-level wrapper that takes a variadic number of URLs, opens or
// creates each of the specified resources, and combines them into a locked
// WriteSyncer. It also returns any error encountered and a function to close
// any opened files.
//
// Passing no paths returns a no-op WriteSyncer. The special paths "stdout" and
// "stderr" are interpreted as os.Stdout and os.Stderr, respectively.
// Passing no URLs returns a no-op WriteSyncer. Zap handles URLs without a
// scheme and URLs with the "file" scheme. Third-party code may register
// factories for other schemes using RegisterSink.
//
// URLs with the "file" scheme must use absolute paths on the local
// filesystem. No user, password, port, fragments, or query parameters are
// allowed, and the hostname must be empty or "localhost".
//
// Since it's common to write logs to the local filesystem, URLs without a
// scheme (e.g., "/var/log/foo.log") are treated as local file paths. Without
// a scheme, the special paths "stdout" and "stderr" are interpreted as
// os.Stdout and os.Stderr. When specified without a scheme, relative file
// paths also work.
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
writers, close, err := open(paths)
if err != nil {
Expand All @@ -48,36 +59,24 @@ func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
}

func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
var openErr error
writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() {
for _, c := range closers {
c.Close()
}
}

var openErr error
for _, path := range paths {
sink, err := newSink(path)
if err == nil {
// Using a registered sink constructor.
writers = append(writers, sink)
closers = append(closers, sink)
continue
}
if _, ok := err.(*errSinkNotFound); ok {
// No named sink constructor, use key as path to log file.
f, e := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
openErr = multierr.Append(openErr, e)
if e == nil {
writers = append(writers, f)
closers = append(closers, f)
}
if err != nil {
openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
// Sink constructor failed.
openErr = multierr.Append(openErr, err)
writers = append(writers, sink)
closers = append(closers, sink)
}

if openErr != nil {
close()
return writers, nil, openErr
Expand Down
Loading