Skip to content

Commit

Permalink
improved s3 functionalities
Browse files Browse the repository at this point in the history
- added copy to s3
- added callback to API on s3 upload
  • Loading branch information
rem7 committed Feb 7, 2018
1 parent bae7eca commit e025ee4
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 57 deletions.
13 changes: 9 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type StreamConfig struct {
StreamApiKey string `yaml:"stream_api_key" ini:"stream_api_key"`
RecordFormatString string `ini:"record_format"`
RecordFormat []Attribute `yaml:"record_format"`
Options []string `yaml:"options"`
}

type LiveServerConfig struct {
Expand Down Expand Up @@ -158,17 +159,21 @@ func configureStreams(ctx context.Context, config ConfigFile) map[string]Streame
streamName := conf.StreamName

var stream Streamer
switch {
case conf.Type == "firehose":
switch conf.Type {
case "firehose":
log.WithField("stream", streamName).Infof("streaming to firehose: %s", conf.Name)
stream = NewFirehoseStream(ctx, conf.RecordFormat, config.AwsAccessKey,
config.AwsSecretAccessKey, config.AwsRegion, config.AwsSTSRole, conf.Name)
case conf.Type == "csv":
case "s3":
log.WithField("stream", streamName).Info("streaming to s3")
stream = NewS3Stream(ctx, conf.RecordFormat, config.AwsAccessKey,
config.AwsSecretAccessKey, config.AwsRegion, config.AwsSTSRole, conf.Name, conf.Options)
case "csv":
filename := conf.Name + ".csv"
log.WithField("stream", streamName).Infof("streaming to csv %s", filename)
stream = NewCSVStream(conf.RecordFormat, filename)
break
case conf.Type == "http":
case "http":
log.WithField("stream", streamName).Info("streaming to http")
stream = NewDCHTTPStream(conf.RecordFormat, conf.Url, conf.StreamApiKey, 125000)
break
Expand Down
36 changes: 22 additions & 14 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package main
import (
"bufio"
"context"
"crypto/md5"
"crypto/rand"
"fmt"
log "github.com/Sirupsen/logrus"
Expand All @@ -23,10 +24,11 @@ import (
"strconv"
"strings"
"time"

//"math/big"
)

var splitOptionsRegex = regexp.MustCompile(`^([^\:]*)\:\s?(.*)`)

func GenerateUUID() (string, error) {

uuid := make([]byte, 16)
Expand All @@ -45,10 +47,10 @@ func GenerateUUID() (string, error) {
func ParseOptions(options []string) map[string]string {
optionsProps := make(map[string]string)
for i := 0; i < len(options); i++ {
optionsSplit := strings.Split(options[i], ":")
if len(optionsSplit) > 1 {
key, value := optionsSplit[0], optionsSplit[1]
optionsProps[key] = strings.TrimSpace(value)
optionsSplit := splitOptionsRegex.FindStringSubmatch(options[i])
if len(optionsSplit) > 2 {
key, value := optionsSplit[1], optionsSplit[2]
optionsProps[strings.TrimSpace(key)] = strings.TrimSpace(value)
}
}
return optionsProps
Expand Down Expand Up @@ -316,27 +318,27 @@ func getIPs() []string {
func interfaceToString(inf interface{}) (string, error) {

switch t := inf.(type) {
case nil :
case nil:
return "\\N", nil
case string :
case string:
if isNull(inf.(string)) {
return "\\N", nil
} else {
return inf.(string), nil
}
case int :
case int:
return strconv.Itoa(inf.(int)), nil
case bool :
case bool:
return strconv.FormatBool(inf.(bool)), nil
case float32 :
case float32:
return strconv.FormatFloat(inf.(float64), 'f', -1, 32), nil
case float64 :
case float64:
return strconv.FormatFloat(inf.(float64), 'f', -1, 64), nil
case uint64 :
case uint64:
return strconv.FormatUint(inf.(uint64), 10), nil
case error :
case error:
return inf.(error).Error(), nil
default :
default:
return fmt.Sprintf("%v", t), fmt.Errorf("Coercing string conversion on: '%v' from unknown type: %T", t, t)
}
}
Expand All @@ -349,3 +351,9 @@ func stringInSlice(a string, slice []string) bool {
}
return false
}

func getHash(data []byte) string {
h := md5.New()
h.Write(data)
return fmt.Sprintf("%x", h.Sum(nil))
}
20 changes: 14 additions & 6 deletions streamer_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,28 @@ LOOP:
dataCopy := make([]*firehose.Record, len(accum))
copy(dataCopy, accum)

s.wg.Add(1)
go s.uploadRecords(dataCopy, 0)
s.uploadRecords(dataCopy, 0)

accum = []*firehose.Record{}
sizeAccumulator = 0

}

if exit {
s.wg.Done()
s.wg.Wait()
break LOOP
}

}
}

func (s *FirehoseStream) uploadRecords(data []*firehose.Record, failCount int) {
s.wg.Add(1)
go s._uploadRecords(data, failCount)
}

func (s *FirehoseStream) _uploadRecords(data []*firehose.Record, failCount int) {

defer s.wg.Done()

Expand All @@ -160,8 +166,7 @@ func (s *FirehoseStream) uploadRecords(data []*firehose.Record, failCount int) {
r, err := s.svc.PutRecordBatch(params)
if err != nil {
log.Error(err.Error())
s.wg.Add(1)
go s.uploadRecords(data, failCount+1)
s.uploadRecords(data, failCount+1)
return
}

Expand All @@ -175,8 +180,11 @@ func (s *FirehoseStream) uploadRecords(data []*firehose.Record, failCount int) {
}
}

s.wg.Add(1)
go s.uploadRecords(newData, failCount+1)
s.uploadRecords(data, failCount+1)
}

if failCount > 0 {
log.Warnf("%v records succeeded after %v retries", len(data), failCount)
}

}
Expand Down
Loading

0 comments on commit e025ee4

Please sign in to comment.