Skip to content

Commit

Permalink
feat: add support for AWS S3 connector
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Sep 6, 2024
1 parent 2acf4ae commit 98521e2
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Supported Connectors:
* [Aerospike](https://www.aerospike.com/)
* [Apache Kafka](https://kafka.apache.org/)
* [Apache Pulsar](https://pulsar.apache.org/)
* AWS ([S3](https://aws.amazon.com/s3/))
* [NATS](https://nats.io/)
* [Redis](https://redis.io/)

Expand Down
2 changes: 2 additions & 0 deletions aws/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package aws implements streaming connectors for Amazon Web Services.
package aws
21 changes: 21 additions & 0 deletions aws/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module github.com/reugn/go-streams/aws

go 1.21.0

require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2
github.com/reugn/go-streams v0.10.0
)

require (
github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
)
24 changes: 24 additions & 0 deletions aws/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 h1:Roo69qTpfu8OlJ2Tb7pAYVuF0CpuUMB0IYWwYP/4DZM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17/go.mod h1:NcWPxQzGM1USQggaTVwz6VpqMZPX1CvDJLDh6jnOCa4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 h1:FLMkfEiRjhgeDTCjjLoc3URo/TBkgeQbocA78lfkzSI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19/go.mod h1:Vx+GucNSsdhaxs3aZIKfSUjKVGsxN25nX2SRcdhuw08=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsdzgl7ZL2KlXiUAoJnI/VxfHCvDFr2QDFj6u4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 h1:u+EfGmksnJc/x5tq3A+OD7LrMbSSR/5TrKLvkdy/fhY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17/go.mod h1:VaMx6302JHax2vHJWgRo+5n9zvbacs3bLU/23DNQrTY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 h1:Kp6PWAlXwP1UvIflkIP6MFZYBNDCa4mFCGtxrpICVOg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ=
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
293 changes: 293 additions & 0 deletions aws/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package aws

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

const s3DefaultChunkSize = 5 * 1024 * 1024 // 5 MB

// S3SourceConfig represents the configuration options for the S3 source
// connector.
type S3SourceConfig struct {
// The name of the S3 bucket to read from.
Bucket string
// The path within the bucket to use. If empty, the root of the
// bucket will be used.
Path string
// The number of concurrent workers to use when reading data from S3.
// The default is 1.
Parallelism int
// The size of chunks in bytes to use when reading data from S3.
// The default is 5 MB.
ChunkSize int
}

// S3Source represents the AWS S3 source connector.
type S3Source struct {
client *s3.Client
config *S3SourceConfig
objectCh chan string
out chan any
logger *slog.Logger
}

var _ streams.Source = (*S3Source)(nil)

// NewS3Source returns a new [S3Source].
// The connector reads all objects within the configured path and transmits
// them as an [S3Object] through the output channel.
func NewS3Source(ctx context.Context, client *s3.Client,
config *S3SourceConfig, logger *slog.Logger) *S3Source {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aws.s3"),
slog.String("type", "source")))

if config.Parallelism < 1 {
config.Parallelism = 1
}
if config.ChunkSize < 1 {
config.ChunkSize = s3DefaultChunkSize
}

s3Source := &S3Source{
client: client,
config: config,
objectCh: make(chan string, config.Parallelism),
out: make(chan any),
logger: logger,
}

// list objects in the configured path
go s3Source.listObjects(ctx)

// read the objects and send data downstream
go s3Source.getObjects(ctx)

return s3Source
}

// listObjects reads the list of objects in the configured path and streams
// the keys to the objectCh channel.
func (s *S3Source) listObjects(ctx context.Context) {
var continuationToken *string
for {
listResponse, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &s.config.Bucket,
Prefix: &s.config.Path,
ContinuationToken: continuationToken,
})

if err != nil {
s.logger.Error("Failed to list objects", slog.Any("error", err),
slog.Any("continuationToken", continuationToken))
break
}

for _, object := range listResponse.Contents {
s.objectCh <- *object.Key
}

continuationToken = listResponse.NextContinuationToken
if continuationToken == nil {
break
}
}
// close the objects channel
close(s.objectCh)
}

// getObjects reads the objects data and sends it downstream.
func (s *S3Source) getObjects(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
loop:
for {
select {
case key, ok := <-s.objectCh:
if !ok {
break loop
}
objectOutput, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &s.config.Bucket,
Key: &key,
})
if err != nil {
s.logger.Error("Failed to get object", slog.Any("error", err),
slog.String("key", key))
}

var data []byte
n, err := bufio.NewReaderSize(objectOutput.Body, s.config.ChunkSize).
Read(data)
if err != nil {
s.logger.Error("Failed to read object", slog.Any("error", err),
slog.String("key", key))
continue
}

s.logger.Debug("Successfully read object", slog.String("key", key),
slog.Int("size", n))

// send the read data downstream as an S3Object
s.out <- &S3Object{
Key: key,
Data: bytes.NewReader(data),
}
case <-ctx.Done():
s.logger.Debug("Object reading finished", slog.Any("error", ctx.Err()))
break loop
}
}
}()
}

// wait for all object readers to exit
wg.Wait()
s.logger.Info("Closing connector")
// close the output channel
close(s.out)
}

// Via streams data to a specified operator and returns it.
func (s *S3Source) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
}

// Out returns the output channel of the S3Source connector.
func (s *S3Source) Out() <-chan any {
return s.out
}

// S3Object contains details of the S3 object.
type S3Object struct {
// Key is the object name including any subdirectories.
// For example, "directory/file.json".
Key string
// Data is an [io.Reader] representing the binary content of the object.
// This can be a file, a buffer, or any other type that implements the
// io.Reader interface.
Data io.Reader
}

// S3SinkConfig represents the configuration options for the S3 sink
// connector.
type S3SinkConfig struct {
// The name of the S3 bucket to write to.
Bucket string
// The number of concurrent workers to use when writing data to S3.
// The default is 1.
Parallelism int
}

// S3Sink represents the AWS S3 sink connector.
type S3Sink struct {
client *s3.Client
config *S3SinkConfig
in chan any
logger *slog.Logger
}

var _ streams.Sink = (*S3Sink)(nil)

// NewS3Sink returns a new [S3Sink].
// Incoming elements are expected to be of the [S3PutObject] type. These will
// be uploaded to the configured bucket using their key field as the path.
func NewS3Sink(ctx context.Context, client *s3.Client,
config *S3SinkConfig, logger *slog.Logger) *S3Sink {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aws.s3"),
slog.String("type", "sink")))

if config.Parallelism < 1 {
config.Parallelism = 1
}

s3Sink := &S3Sink{
client: client,
config: config,
in: make(chan any, config.Parallelism),
logger: logger,
}

// start writing incoming data
go s3Sink.writeObjects(ctx)

return s3Sink
}

// writeObjects writes incoming stream data elements to S3 using the
// configured parallelism.
func (s *S3Sink) writeObjects(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range s.in {
var err error
switch object := data.(type) {
case S3Object:
err = s.writeObject(ctx, &object)
case *S3Object:
err = s.writeObject(ctx, object)
default:
s.logger.Error("Unsupported data type",
slog.String("type", fmt.Sprintf("%T", object)))
}

if err != nil {
s.logger.Error("Error writing object",
slog.Any("error", err))
}
}
}()
}

// wait for all writers to exit
wg.Wait()
s.logger.Info("All object writers exited")
}

// writeObject writes a single object to S3.
func (s *S3Sink) writeObject(ctx context.Context, putObject *S3Object) error {
putObjectOutput, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &s.config.Bucket,
Key: &putObject.Key,
Body: putObject.Data,
})
if err != nil {
return fmt.Errorf("failed to put object: %w", err)
}

s.logger.Debug("Successfully put object", slog.String("key", putObject.Key),
slog.Any("etag", putObjectOutput.ETag))

return nil
}

// In returns the input channel of the S3Sink connector.
func (s *S3Sink) In() chan<- any {
return s.in
}
1 change: 1 addition & 0 deletions examples/aws/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/s3/s3
33 changes: 33 additions & 0 deletions examples/aws/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:
minio:
image: minio/minio:latest
container_name: minio-server
ports:
- "9000:9000"
- "9001:9001"
expose:
- "9000"
- "9001"
healthcheck:
test: [ "CMD", "mc", "ready", "local" ]
interval: 10s
timeout: 10s
retries: 3
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data/ --console-address :9001

minio-client:
image: minio/mc
container_name: minio-client
depends_on:
minio:
condition: service_healthy
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/stream-test;
/usr/bin/mc anonymous set public myminio/stream-test;
exit 0;
"
Loading

0 comments on commit 98521e2

Please sign in to comment.