Skip to content

Commit

Permalink
[receiver/awss3receiver] Add support for encoding extensions (#33221)
Browse files Browse the repository at this point in the history
**Description:** Add the ability to configure what encoding extensions
to use based on key name suffix.

**Link to tracking Issue:** #30750

**Testing:** Unit tests

**Documentation:** Documented new "encodings" configuration option.
  • Loading branch information
adcharre committed Jun 11, 2024
1 parent fe11cc0 commit 5a503c0
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 20 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_encodings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for encoding extensions to be used in the AWS S3 Receiver.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
21 changes: 21 additions & 0 deletions receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,32 @@ The following exporter configuration parameters are supported.
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional |
| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional |
| `encodings:` | An array of entries with the following properties: | | Optional |
| `extension` | Extension to use for decoding a key with a matching suffix. | | Required |
| `suffix` | Key suffix to match against. | | Required |

### Time format for `starttime` and `endtime`
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
The time format is either `YYYY-MM-DD HH:MM` or simply `YYYY-MM-DD`, in which case the time is assumed to be `00:00`.

### Encodings
By default, the receiver understands the following encodings:
- otlp_json (OpenTelemetry Protocol format represented as json) with a suffix of `.json`
- otlp_proto (OpenTelemetry Protocol format represented as Protocol Buffers) with a suffix of `.binpb`

The `encodings` options allows you to specify Encoding Extensions to use to decode keys with matching suffixes.


### Example Configuration

```yaml
extension:
# example of text encoding extension
text_encoding:
encoding: utf8
marshaling_separator: "\n"
unmarshaling_separator: "\r?\n"

receivers:
awss3:
starttime: "2024-01-01 01:00"
Expand All @@ -46,4 +64,7 @@ receivers:
s3_bucket: "mybucket"
s3_prefix: "trace"
s3_partition: "minute"
encodings:
- extension: text_encoding
suffix: ".txt"
```
6 changes: 6 additions & 0 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ type S3DownloaderConfig struct {
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
}

type Encoding struct {
Extension component.ID `mapstructure:"extension"`
Suffix string `mapstructure:"suffix"`
}

// Config defines the configuration for the file receiver.
type Config struct {
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
StartTime string `mapstructure:"starttime"`
EndTime string `mapstructure:"endtime"`
Encodings []Encoding `mapstructure:"encodings"`
}

const (
Expand Down
23 changes: 23 additions & 0 deletions receiver/awss3receiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,29 @@ func TestLoadConfig(t *testing.T) {
EndTime: "2024-02-03",
},
},
{
id: component.NewIDWithName(metadata.Type, "3"),
expected: &Config{
S3Downloader: S3DownloaderConfig{
Region: "us-east-1",
S3Bucket: "abucket",
S3Partition: "minute",
EndpointPartitionID: "aws",
},
StartTime: "2024-01-31 15:00",
EndTime: "2024-02-03",
Encodings: []Encoding{
{
Extension: component.NewIDWithName(component.MustNewType("foo"), "bar"),
Suffix: "baz",
},
{
Extension: component.NewIDWithName(component.MustNewType("nop"), "nop"),
Suffix: "nop",
},
},
},
},
}

for _, tt := range tests {
Expand Down
83 changes: 63 additions & 20 deletions receiver/awss3receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"strings"

Expand All @@ -18,12 +19,21 @@ import (
"go.uber.org/zap"
)

type encodingExtension struct {
extension ptrace.Unmarshaler
suffix string
}

type encodingExtensions []encodingExtension

type awss3TraceReceiver struct {
s3Reader *s3Reader
consumer consumer.Traces
logger *zap.Logger
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
s3Reader *s3Reader
consumer consumer.Traces
logger *zap.Logger
cancel context.CancelFunc
obsrecv *receiverhelper.ObsReport
encodingsConfig []Encoding
extensions encodingExtensions
}

func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Traces, settings receiver.Settings) (*awss3TraceReceiver, error) {
Expand All @@ -39,16 +49,24 @@ func newAWSS3TraceReceiver(ctx context.Context, cfg *Config, traces consumer.Tra
if err != nil {
return nil, err
}

return &awss3TraceReceiver{
s3Reader: reader,
consumer: traces,
logger: settings.Logger,
cancel: nil,
obsrecv: obsrecv,
s3Reader: reader,
consumer: traces,
logger: settings.Logger,
cancel: nil,
obsrecv: obsrecv,
encodingsConfig: cfg.Encodings,
}, nil
}

func (r *awss3TraceReceiver) Start(_ context.Context, _ component.Host) error {
func (r *awss3TraceReceiver) Start(_ context.Context, host component.Host) error {
var err error
r.extensions, err = newEncodingExtensions(r.encodingsConfig, host)
if err != nil {
return err
}

var ctx context.Context
ctx, r.cancel = context.WithCancel(context.Background())
go func() {
Expand Down Expand Up @@ -81,15 +99,16 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data
}
}

var unmarshaler ptrace.Unmarshaler
var format string
if strings.HasSuffix(key, ".json") {
unmarshaler = &ptrace.JSONUnmarshaler{}
format = "otlp_json"
}
if strings.HasSuffix(key, ".binpb") {
unmarshaler = &ptrace.ProtoUnmarshaler{}
format = "otlp_proto"
unmarshaler, format := r.extensions.findExtension(key)
if unmarshaler == nil {
if strings.HasSuffix(key, ".json") {
unmarshaler = &ptrace.JSONUnmarshaler{}
format = "otlp_json"
}
if strings.HasSuffix(key, ".binpb") {
unmarshaler = &ptrace.ProtoUnmarshaler{}
format = "otlp_proto"
}
}
if unmarshaler == nil {
r.logger.Warn("Unsupported file format", zap.String("key", key))
Expand All @@ -104,3 +123,27 @@ func (r *awss3TraceReceiver) receiveBytes(ctx context.Context, key string, data
r.obsrecv.EndTracesOp(obsCtx, format, traces.SpanCount(), err)
return err
}

func newEncodingExtensions(encodingsConfig []Encoding, host component.Host) (encodingExtensions, error) {
encodings := make(encodingExtensions, 0)
extensions := host.GetExtensions()
for _, encoding := range encodingsConfig {
if e, ok := extensions[encoding.Extension]; ok {
if u, ok := e.(ptrace.Unmarshaler); ok {
encodings = append(encodings, encodingExtension{extension: u, suffix: encoding.Suffix})
}
} else {
return nil, fmt.Errorf("extension %q not found", encoding.Extension)
}
}
return encodings, nil
}

func (encodings encodingExtensions) findExtension(key string) (ptrace.Unmarshaler, string) {
for _, encoding := range encodings {
if strings.HasSuffix(key, encoding.suffix) {
return encoding.extension, encoding.suffix
}
}
return nil, ""
}
Loading

0 comments on commit 5a503c0

Please sign in to comment.