Skip to content

Commit

Permalink
Merge pull request moby#44087 from jchorl/josh/sdkupgrade
Browse files Browse the repository at this point in the history
Upgrade to aws go sdk v2 for cloudwatch logging driver
  • Loading branch information
corhere committed Jan 10, 2023
2 parents cf3ce18 + c12d7b6 commit 62296f9
Show file tree
Hide file tree
Showing 556 changed files with 74,077 additions and 48,349 deletions.
209 changes: 101 additions & 108 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@
package awslogs // import "github.com/docker/docker/daemon/logger/awslogs"

import (
"context"
"fmt"
"os"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials/endpointcreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/endpointcreds"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/smithy-go"
smithymiddleware "github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/dockerversion"
Expand Down Expand Up @@ -59,15 +61,8 @@ const (
// this replacement happens.
maximumBytesPerEvent = 262144 - perEventBytes

resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
invalidSequenceTokenCode = "InvalidSequenceTokenException"
resourceNotFoundCode = "ResourceNotFoundException"

credentialsEndpoint = "http://169.254.170.2" //nolint:gosec // G101: Potential hardcoded credentials

userAgentHeader = "User-Agent"

// See: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
logsFormatHeader = "x-amzn-logs-format"
jsonEmfLogFormat = "json/emf"
Expand Down Expand Up @@ -102,17 +97,17 @@ type logStreamConfig struct {
var _ logger.SizedLogger = &logStream{}

type api interface {
CreateLogGroup(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error)
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
CreateLogGroup(context.Context, *cloudwatchlogs.CreateLogGroupInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogGroupOutput, error)
CreateLogStream(context.Context, *cloudwatchlogs.CreateLogStreamInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.CreateLogStreamOutput, error)
PutLogEvents(context.Context, *cloudwatchlogs.PutLogEventsInput, ...func(*cloudwatchlogs.Options)) (*cloudwatchlogs.PutLogEventsOutput, error)
}

type regionFinder interface {
Region() (string, error)
GetRegion(context.Context, *imds.GetRegionInput, ...func(*imds.Options)) (*imds.GetRegionOutput, error)
}

type wrappedEvent struct {
inputLogEvent *cloudwatchlogs.InputLogEvent
inputLogEvent types.InputLogEvent
insertOrder int
}
type byTimestamp []wrappedEvent
Expand Down Expand Up @@ -325,12 +320,14 @@ var strftimeToRegex = map[string]string{

// newRegionFinder is a variable such that the implementation
// can be swapped out for unit tests.
var newRegionFinder = func() (regionFinder, error) {
s, err := session.NewSession()
var newRegionFinder = func(ctx context.Context) (regionFinder, error) {
cfg, err := config.LoadDefaultConfig(ctx) // default config, because we don't yet know the region
if err != nil {
return nil, err
}
return ec2metadata.New(s), nil

client := imds.NewFromConfig(cfg)
return client, nil
}

// newSDKEndpoint is a variable such that the implementation
Expand All @@ -341,7 +338,7 @@ var newSDKEndpoint = credentialsEndpoint
// Customizations to the default client from the SDK include a Docker-specific
// User-Agent string and automatic region detection using the EC2 Instance
// Metadata Service when region is otherwise unspecified.
func newAWSLogsClient(info logger.Info) (api, error) {
func newAWSLogsClient(info logger.Info, configOpts ...func(*config.LoadOptions) error) (*cloudwatchlogs.Client, error) {
var region, endpoint *string
if os.Getenv(regionEnvKey) != "" {
region = aws.String(os.Getenv(regionEnvKey))
Expand All @@ -353,72 +350,73 @@ func newAWSLogsClient(info logger.Info) (api, error) {
endpoint = aws.String(info.Config[endpointKey])
}
if region == nil || *region == "" {
logrus.Info("Trying to get region from EC2 Metadata")
ec2MetadataClient, err := newRegionFinder()
logrus.Info("Trying to get region from IMDS")
regFinder, err := newRegionFinder(context.TODO())
if err != nil {
logrus.WithError(err).Error("could not create EC2 metadata client")
return nil, errors.Wrap(err, "could not create EC2 metadata client")
logrus.WithError(err).Error("could not create regionFinder")
return nil, errors.Wrap(err, "could not create regionFinder")
}

r, err := ec2MetadataClient.Region()
r, err := regFinder.GetRegion(context.TODO(), &imds.GetRegionInput{})
if err != nil {
logrus.WithError(err).Error("Could not get region from EC2 metadata, environment, or log option")
return nil, errors.New("Cannot determine region for awslogs driver")
logrus.WithError(err).Error("Could not get region from IMDS, environment, or log option")
return nil, errors.Wrap(err, "cannot determine region for awslogs driver")
}
region = &r
}

sess, err := session.NewSession()
if err != nil {
return nil, errors.New("Failed to create a service client session for awslogs driver")
region = &r.Region
}

// attach region to cloudwatchlogs config
sess.Config.Region = region

// attach endpoint to cloudwatchlogs config
if endpoint != nil {
sess.Config.Endpoint = endpoint
}
configOpts = append(configOpts, config.WithRegion(*region))

if uri, ok := info.Config[credentialsEndpointKey]; ok {
logrus.Debugf("Trying to get credentials from awslogs-credentials-endpoint")

endpoint := fmt.Sprintf("%s%s", newSDKEndpoint, uri)
creds := endpointcreds.NewCredentialsClient(*sess.Config, sess.Handlers, endpoint,
func(p *endpointcreds.Provider) {
p.ExpiryWindow = 5 * time.Minute
})
configOpts = append(configOpts, config.WithCredentialsProvider(endpointcreds.New(endpoint)))
}

// attach credentials to cloudwatchlogs config
sess.Config.Credentials = creds
cfg, err := config.LoadDefaultConfig(context.TODO(), configOpts...)
if err != nil {
logrus.WithError(err).Error("Could not initialize AWS SDK config")
return nil, errors.Wrap(err, "could not initialize AWS SDK config")
}

logrus.WithFields(logrus.Fields{
"region": *region,
}).Debug("Created awslogs client")

client := cloudwatchlogs.New(sess)

client.Handlers.Build.PushBackNamed(request.NamedHandler{
Name: "DockerUserAgentHandler",
Fn: func(r *request.Request) {
currentAgent := r.HTTPRequest.Header.Get(userAgentHeader)
r.HTTPRequest.Header.Set(userAgentHeader,
fmt.Sprintf("Docker %s (%s) %s",
dockerversion.Version, runtime.GOOS, currentAgent))
},
})
var clientOpts []func(*cloudwatchlogs.Options)

if info.Config[logFormatKey] != "" {
client.Handlers.Build.PushBackNamed(request.NamedHandler{
Name: "LogFormatHeaderHandler",
Fn: func(req *request.Request) {
req.HTTPRequest.Header.Set(logsFormatHeader, info.Config[logFormatKey])
},
logFormatMiddleware := smithymiddleware.BuildMiddlewareFunc("logFormat", func(
ctx context.Context, in smithymiddleware.BuildInput, next smithymiddleware.BuildHandler,
) (
out smithymiddleware.BuildOutput, metadata smithymiddleware.Metadata, err error,
) {
switch v := in.Request.(type) {
case *smithyhttp.Request:
v.Header.Add(logsFormatHeader, jsonEmfLogFormat)
}
return next.HandleBuild(ctx, in)
})
clientOpts = append(
clientOpts,
cloudwatchlogs.WithAPIOptions(func(stack *smithymiddleware.Stack) error {
return stack.Build.Add(logFormatMiddleware, smithymiddleware.Before)
}),
)
}

clientOpts = append(
clientOpts,
cloudwatchlogs.WithAPIOptions(middleware.AddUserAgentKeyValue("Docker", dockerversion.Version)),
)

if endpoint != nil {
clientOpts = append(clientOpts, cloudwatchlogs.WithEndpointResolver(cloudwatchlogs.EndpointResolverFromURL(*endpoint)))
}

client := cloudwatchlogs.NewFromConfig(cfg, clientOpts...)

return client, nil
}

Expand Down Expand Up @@ -468,7 +466,9 @@ func (l *logStream) create() error {
if err == nil {
return nil
}
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == resourceNotFoundCode && l.logCreateGroup {

var apiErr *types.ResourceNotFoundException
if errors.As(err, &apiErr) && l.logCreateGroup {
if err := l.createLogGroup(); err != nil {
return errors.Wrap(err, "failed to create Cloudwatch log group")
}
Expand All @@ -482,18 +482,18 @@ func (l *logStream) create() error {

// createLogGroup creates a log group for the instance of the awslogs logging driver
func (l *logStream) createLogGroup() error {
if _, err := l.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
if _, err := l.client.CreateLogGroup(context.TODO(), &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(l.logGroupName),
}); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
fields := logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"origError": awsErr.OrigErr(),
"errorCode": apiErr.ErrorCode(),
"message": apiErr.ErrorMessage(),
"logGroupName": l.logGroupName,
"logCreateGroup": l.logCreateGroup,
}
if awsErr.Code() == resourceAlreadyExistsCode {
if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
// Allow creation to succeed
logrus.WithFields(fields).Info("Log group already exists")
return nil
Expand Down Expand Up @@ -522,18 +522,17 @@ func (l *logStream) createLogStream() error {
LogStreamName: aws.String(l.logStreamName),
}

_, err := l.client.CreateLogStream(input)

_, err := l.client.CreateLogStream(context.TODO(), input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
fields := logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"origError": awsErr.OrigErr(),
"errorCode": apiErr.ErrorCode(),
"message": apiErr.ErrorMessage(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}
if awsErr.Code() == resourceAlreadyExistsCode {
if _, ok := apiErr.(*types.ResourceAlreadyExistsException); ok {
// Allow creation to succeed
logrus.WithFields(fields).Info("Log stream already exists")
return nil
Expand Down Expand Up @@ -644,7 +643,7 @@ func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int6
splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
line := bytes[:splitOffset]
event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
inputLogEvent: types.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
Expand Down Expand Up @@ -704,24 +703,18 @@ func (l *logStream) publishBatch(batch *eventBatch) {
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dataAlreadyAcceptedCode {
// already submitted, just grab the correct sequence token
parts := strings.Split(awsErr.Message(), " ")
nextSequenceToken = &parts[len(parts)-1]
logrus.WithFields(logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Info("Data already accepted, ignoring error")
err = nil
} else if awsErr.Code() == invalidSequenceTokenCode {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
token := parts[len(parts)-1]
nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
}
if apiErr := (*types.DataAlreadyAcceptedException)(nil); errors.As(err, &apiErr) {
// already submitted, just grab the correct sequence token
nextSequenceToken = apiErr.ExpectedSequenceToken
logrus.WithFields(logrus.Fields{
"errorCode": apiErr.ErrorCode(),
"message": apiErr.ErrorMessage(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Info("Data already accepted, ignoring error")
err = nil
} else if apiErr := (*types.InvalidSequenceTokenException)(nil); errors.As(err, &apiErr) {
nextSequenceToken, err = l.putLogEvents(cwEvents, apiErr.ExpectedSequenceToken)
}
}
if err != nil {
Expand All @@ -732,20 +725,20 @@ func (l *logStream) publishBatch(batch *eventBatch) {
}

// putLogEvents wraps the PutLogEvents API
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
func (l *logStream) putLogEvents(events []types.InputLogEvent, sequenceToken *string) (*string, error) {
input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: events,
SequenceToken: sequenceToken,
LogGroupName: aws.String(l.logGroupName),
LogStreamName: aws.String(l.logStreamName),
}
resp, err := l.client.PutLogEvents(input)
resp, err := l.client.PutLogEvents(context.TODO(), input)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
logrus.WithFields(logrus.Fields{
"errorCode": awsErr.Code(),
"message": awsErr.Message(),
"origError": awsErr.OrigErr(),
"errorCode": apiErr.ErrorCode(),
"message": apiErr.ErrorMessage(),
"logGroupName": l.logGroupName,
"logStreamName": l.logStreamName,
}).Error("Failed to put log events")
Expand Down Expand Up @@ -842,8 +835,8 @@ func (slice byTimestamp) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
cwEvents := make([]*cloudwatchlogs.InputLogEvent, len(events))
func unwrapEvents(events []wrappedEvent) []types.InputLogEvent {
cwEvents := make([]types.InputLogEvent, len(events))
for i, input := range events {
cwEvents[i] = input.inputLogEvent
}
Expand Down

0 comments on commit 62296f9

Please sign in to comment.