Skip to content

Commit

Permalink
lambda-promtail: Add support for WAF logs in S3 (grafana#10416)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds support for WAF logs in S3 in lambda-promtail

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [x] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
  • Loading branch information
lpugoy authored and rhnasc committed Apr 12, 2024
1 parent 9eb2366 commit c01d9ef
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Loki

##### Enhancements
* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Lambda-Promtail: Add support for WAF logs in S3
* [10324](https://github.com/grafana/loki/pull/10324) **ashwanthgoli**: Deprecate ingester.unordered-writes and a few unused configs(log.use-buffered, log.use-sync, frontend.forward-headers-list)

* [10322](https://github.com/grafana/loki/pull/10322) **chaudum**: Deprecate misleading setting `-ruler.evaluation-delay-duration`.
Expand Down
83 changes: 78 additions & 5 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"regexp"
"strconv"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -31,6 +33,8 @@ type parserConfig struct {
timestampRegex *regexp.Regexp
// time format to use to convert the timestamp to time.Time
timestampFormat string
// if the timestamp is a string that can be parsed or a Unix timestamp
timestampType string
// how many lines or jsonToken to skip at the beginning of the file
skipHeaderCount int
// key of the metadata label to use as a value for the__aws_<logType>_owner label
Expand All @@ -45,6 +49,7 @@ const (
CLOUDFRONT_LOG_TYPE string = "cloudfront"
LB_NLB_TYPE string = "net"
LB_ALB_TYPE string = "app"
WAF_LOG_TYPE string = "WAFLogs"
)

var (
Expand All @@ -66,18 +71,25 @@ var (
// CloudFront
// source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming
// example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz
// AWS WAF logs
// source: https://docs.aws.amazon.com/waf/latest/developerguide/logging-s3.html
// format: aws-waf-logs-suffix[/prefix]/AWSLogs/aws-account-id/WAFLogs/region/webacl-name/year/month/day/hour/minute/aws-account-id_waflogs_region_webacl-name_timestamp_hash.log.gz
// example: aws-waf-logs-test/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?P<lb_type>app|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
defaultTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+(?:\.\d+Z)?)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<organization_id>o-[a-z0-9]{10,32})?\/?(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
cloudfrontFilenameRegex = regexp.MustCompile(`(?P<prefix>.*)\/(?P<src>[A-Z0-9]+)\.(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-(.+)`)
cloudfrontTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)`)
wafFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>WAFLogs)\/(?P<region>[\w-]+)\/(?P<src>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)\/(?P<minute>\d+)\/\d+\_waflogs\_[\w-]+_[\w-]+_\d+T\d+Z_\w+`)
wafTimestampRegex = regexp.MustCompile(`"timestamp":\s*(?P<timestamp>\d+),`)
parsers = map[string]parserConfig{
FLOW_LOG_TYPE: {
logTypeLabel: "s3_vpc_flow",
filenameRegex: defaultFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: defaultTimestampRegex,
timestampFormat: time.RFC3339,
timestampType: "string",
skipHeaderCount: 1,
},
LB_LOG_TYPE: {
Expand All @@ -86,6 +98,7 @@ var (
ownerLabelKey: "account_id",
timestampFormat: time.RFC3339,
timestampRegex: defaultTimestampRegex,
timestampType: "string",
},
CLOUDTRAIL_LOG_TYPE: {
logTypeLabel: "s3_cloudtrail",
Expand All @@ -99,8 +112,16 @@ var (
ownerLabelKey: "prefix",
timestampRegex: cloudfrontTimestampRegex,
timestampFormat: "2006-01-02\x0915:04:05",
timestampType: "string",
skipHeaderCount: 2,
},
WAF_LOG_TYPE: {
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
timestampType: "unix",
},
}
)

Expand All @@ -120,7 +141,7 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
return s3Client, nil
}

func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error {
func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser, log *log.Logger) error {
parser, ok := parsers[labels["type"]]
if !ok {
if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE {
Expand Down Expand Up @@ -182,9 +203,21 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
// NLB logs don't have .SSSSSSZ suffix. RFC3339 requires a TZ specifier, use UTC
match[1] += "Z"
}
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err

switch parser.timestampType {
case "string":
timestamp, err = time.Parse(parser.timestampFormat, match[1])
if err != nil {
return err
}
case "unix":
sec, nsec, err := getUnixSecNsec(match[1])
if err != nil {
return err
}
timestamp = time.Unix(sec, nsec).UTC()
default:
level.Warn(*log).Log("msg", fmt.Sprintf("timestamp type of %s parser unknown, using current time", labels["type"]))
}
}

Expand Down Expand Up @@ -250,7 +283,7 @@ func processS3Event(ctx context.Context, ev *events.S3Event, pc Client, log *log
if err != nil {
return fmt.Errorf("Failed to get object %s from bucket %s on account %s\n, %s", labels["key"], labels["bucket"], labels["bucketOwner"], err)
}
err = parseS3Log(ctx, batch, labels, obj.Body)
err = parseS3Log(ctx, batch, labels, obj.Body, log)
if err != nil {
return err
}
Expand Down Expand Up @@ -301,3 +334,43 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
}
return result, nil
}

// getUnixSecNsec returns the Unix time seconds and nanoseconds in the string s.
// It assumes that the first 10 digits of the parsed int is the Unix time in seconds and the rest is the nanoseconds part.
// This assumption will hold until 2286-11-20 17:46:40 UTC, so it's a safe assumption.
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
// For example, given a string 1234567890123:
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
const (
UNIX_SEC_LOG10 = 9
UNIX_NANOSEC_LOG10 = 8
)

i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return sec, nsec, err
}

iLog10 := int(math.Log10(float64(i)))
multiplier := math.Pow10(UNIX_SEC_LOG10 - iLog10)
sec = int64(float64(i) * multiplier)

fractionalSec := float64(i % sec)
if fractionalSec == 0 {
return sec, 0, err
}

fractionalSecLog10 := int(math.Log10(fractionalSec))
multiplier = math.Pow10(UNIX_NANOSEC_LOG10 - fractionalSecLog10)
nsec = int64(fractionalSec * multiplier)

return sec, nsec, err
}
161 changes: 158 additions & 3 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -9,9 +10,9 @@ import (
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/require"

"github.com/go-kit/log"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

func Test_getLabels(t *testing.T) {
Expand Down Expand Up @@ -257,6 +258,41 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_waf",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "waf_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
},
},
},
},
want: map[string]string{
"account_id": "11111111111",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"bucket": "waf_logs_test",
"day": "28",
"hour": "19",
"key": "prefix/AWSLogs/11111111111/WAFLogs/us-east-1/TEST-WEBACL/2021/10/28/19/50/11111111111_waflogs_us-east-1_TEST-WEBACL_20211028T1950Z_e0ca43b5.log.gz",
"minute": "50",
"month": "10",
"region": "us-east-1",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
"year": "2021",
},
wantErr: false,
},
{
name: "missing_type",
args: args{
Expand Down Expand Up @@ -313,6 +349,7 @@ func Test_parseS3Log(t *testing.T) {
expectedLen int
expectedStream string
expectedTimestamps []time.Time
expectedLog string
}{
{
name: "vpcflowlogs",
Expand Down Expand Up @@ -443,6 +480,27 @@ func Test_parseS3Log(t *testing.T) {
},
wantErr: false,
},
{
name: "waflogs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": WAF_LOG_TYPE,
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedTimestamps: []time.Time{
time.Date(2023, time.August, 31, 4, 57, 42, 729000000, time.UTC),
},
wantErr: false,
},
{
name: "missing_parser",
args: args{
Expand All @@ -463,6 +521,31 @@ func Test_parseS3Log(t *testing.T) {
expectedStream: "",
wantErr: true,
},
{
name: "no_timestamp_type",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/waflog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"account_id": "11111111111",
"src": "TEST-WEBACL",
"type": "no_type",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_waf", __aws_s3_waf="TEST-WEBACL", __aws_s3_waf_owner="11111111111"}`,
expectedLog: `level=warn msg="timestamp type of no_type parser unknown, using current time"` + "\n",
wantErr: false,
},
}
parsers["no_type"] = parserConfig{
logTypeLabel: "s3_waf",
filenameRegex: wafFilenameRegex,
ownerLabelKey: "account_id",
timestampRegex: wafTimestampRegex,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -472,7 +555,9 @@ func Test_parseS3Log(t *testing.T) {
if err != nil {
t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err)
}
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
buf := &bytes.Buffer{}
log := log.NewLogfmtLogger(buf)
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj, &log); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
require.Len(t, tt.args.b.streams, tt.expectedLen)
Expand All @@ -486,6 +571,7 @@ func Test_parseS3Log(t *testing.T) {
require.Equal(t, tt.expectedTimestamps[i], entry.Timestamp)
}
}
require.Equal(t, tt.expectedLog, buf.String())
}
})
}
Expand Down Expand Up @@ -595,3 +681,72 @@ func TestProcessSQSEvent(t *testing.T) {
require.Nil(t, err)
require.True(t, handlerCalled)
}

func TestGetUnixSecNsec(t *testing.T) {
type args struct {
s string
}
tests := []struct {
name string
args args
wantErr bool
expectedSec int64
expectedNsec int64
}{
{
name: "timestamp_in_seconds",
args: args{
s: "1234567890",
},
expectedSec: 1234567890,
expectedNsec: 0,
wantErr: false,
},
{
name: "timestamp_in_milliseconds",
args: args{
s: "1234567890123",
},
expectedSec: 1234567890,
expectedNsec: 123000000,
wantErr: false,
},
{
name: "timestamp_in_microseconds",
args: args{
s: "1234567890123456",
},
expectedSec: 1234567890,
expectedNsec: 123456000,
wantErr: false,
},
{
name: "timestamp_in_nanoseconds",
args: args{
s: "1234567890123456789",
},
expectedSec: 1234567890,
expectedNsec: 123456789,
wantErr: false,
},
{
name: "strconv_error",
args: args{
s: "string",
},
expectedSec: 0,
expectedNsec: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sec, nsec, err := getUnixSecNsec(tt.args.s)
if (err != nil) != tt.wantErr {
t.Errorf("getUnixSecNsec() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.expectedSec, sec)
require.Equal(t, tt.expectedNsec, nsec)
})
}
}
Binary file added tools/lambda-promtail/testdata/waflog.log.gz
Binary file not shown.

0 comments on commit c01d9ef

Please sign in to comment.