Skip to content

Commit

Permalink
Merge Kinesis iterator fix (#1415)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius committed Nov 30, 2019
1 parent 819aa51 commit 40e1a16
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/processor/runtime/golang/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func builtInHandler(context *nuclio.Context, event nuclio.Event) (interface{}, e
"ContentType", event.GetContentType(),
"Body", string(event.GetBody()))

time.Sleep(10 * time.Second)
time.Sleep(65 * time.Second)

return "Built in handler called", nil
}
108 changes: 88 additions & 20 deletions pkg/processor/trigger/kinesis/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kinesis

import (
"fmt"
"strings"
"time"

"github.com/nuclio/nuclio/pkg/errors"
Expand All @@ -27,6 +28,8 @@ import (
kinesisclient "github.com/sendgridlabs/go-kinesis"
)

var errIteratorExpired = errors.New("IteratorExpired")

type shard struct {
logger logger.Logger
kinesisTrigger *kinesis
Expand All @@ -52,50 +55,115 @@ func newShard(parentLogger logger.Logger, kinesisTrigger *kinesis, shardID strin
}

func (s *shard) readFromShard() error {
s.logger.DebugWith("Starting to read from shard")
var err error

getShardIteratorArgs := kinesisclient.NewArgs()
getShardIteratorArgs.Add("StreamName", s.kinesisTrigger.configuration.StreamName)
getShardIteratorArgs.Add("ShardId", s.shardID)
getShardIteratorArgs.Add("ShardIteratorType", "LATEST")

getShardIteratorResponse, err := s.kinesisTrigger.kinesisClient.GetShardIterator(getShardIteratorArgs)
if err != nil {
return errors.Wrap(err, "Failed to get shard iterator")
}
s.logger.DebugWith("Starting to read from shard",
"pollingPeriod", s.kinesisTrigger.configuration.PollingPeriod,
"iteratorType", s.kinesisTrigger.configuration.IteratorType)

// prepare args for get records
getRecordArgs := kinesisclient.NewArgs()
getRecordArgs.Add("ShardIterator", getShardIteratorResponse.ShardIterator)

var getRecordsResponse *kinesisclient.GetRecordsResp
lastRecordSequenceNumber := ""

for {

// wait a bit
time.Sleep(500 * time.Millisecond)
// get next records
getRecordsResponse, err = s.getNextRecords(getRecordArgs, getRecordsResponse, lastRecordSequenceNumber)

// try to get records
getRecordsResponse, err := s.kinesisTrigger.kinesisClient.GetRecords(getRecordArgs)
if err != nil {
s.logger.ErrorWith("Failed to get records", "err", err)

// if there was an error other than iterator expired, wait a bit
if err != errIteratorExpired {
s.logger.WarnWith("Failed to get next records", "err", errors.GetErrorStackString(err, 5))
time.Sleep(s.kinesisTrigger.configuration.pollingPeriodDuration)
}

continue
}

// if we got records, handle them
if len(getRecordsResponse.Records) > 0 {
for _, record := range getRecordsResponse.Records {

// TODO: event pool
event := Event{
body: record.Data,
}

// process the event, don't really do anything with response
s.kinesisTrigger.SubmitEventToWorker(nil, s.worker, &event) // nolint: errcheck
}

// save last sequence number in the batch. we might need to create a shard iterator at this
// sequence number
lastRecordSequenceNumber = getRecordsResponse.Records[len(getRecordsResponse.Records)-1].SequenceNumber

} else {
time.Sleep(s.kinesisTrigger.configuration.pollingPeriodDuration)
}
}
}

// set iterator to next
getRecordArgs.Add("ShardIterator", getRecordsResponse.NextShardIterator)
func (s *shard) getNextRecords(getRecordArgs *kinesisclient.RequestArgs,
getRecordsResponse *kinesisclient.GetRecordsResp,
lastRecordSequenceNumber string) (*kinesisclient.GetRecordsResp, error) {

// get the shard iterator
shardIterator, err := s.getShardIterator(getRecordsResponse, lastRecordSequenceNumber)
if err != nil {
return nil, errors.Wrap(err, "Failed to get shard iterator")
}

// set shard iterator
getRecordArgs.Add("ShardIterator", shardIterator)

// try to get records
getRecordsResponse, err = s.kinesisTrigger.kinesisClient.GetRecords(getRecordArgs)
if err != nil {

// if the error denotes an expired iterator, force recreation of iterator by nullifying the
// records response. getShardIterator() will produce an iterator based off of the last successful
// read sequence number
if strings.Contains(err.Error(), "Iterator expired") {
return nil, errIteratorExpired
}

return nil, errors.Wrap(err, "Failed to get records")
}

return getRecordsResponse, nil
}

func (s *shard) getShardIterator(lastRecordsResponse *kinesisclient.GetRecordsResp,
lastRecordSequenceNumber string) (string, error) {

// if there's a response we need to create the iterator from, use that first
if lastRecordsResponse != nil {
return lastRecordsResponse.NextShardIterator, nil
}

getShardIteratorArgs := kinesisclient.NewArgs()
getShardIteratorArgs.Add("StreamName", s.kinesisTrigger.configuration.StreamName)
getShardIteratorArgs.Add("ShardId", s.shardID)

if lastRecordSequenceNumber == "" {

// if there's no records response and no record sequence number, this must be the first time. use
// the iterator type specified in the configuration
getShardIteratorArgs.Add("ShardIteratorType", s.kinesisTrigger.configuration.IteratorType)
s.logger.DebugWith("Creating initial iterator", "type", s.kinesisTrigger.configuration.IteratorType)
} else {

// if a sequence number was passed, get a shard iterator at that point
getShardIteratorArgs.Add("ShardIteratorType", "AFTER_SEQUENCE_NUMBER")
getShardIteratorArgs.Add("StartingSequenceNumber", lastRecordSequenceNumber)
s.logger.DebugWith("Creating iterator at sequence", "seq", lastRecordSequenceNumber)
}

getShardIteratorResponse, err := s.kinesisTrigger.kinesisClient.GetShardIterator(getShardIteratorArgs)
if err != nil {
return "", errors.Wrap(err, "Failed to get shard iterator")
}

return getShardIteratorResponse.ShardIterator, nil
}
42 changes: 36 additions & 6 deletions pkg/processor/trigger/kinesis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package kinesis

import (
"time"

"github.com/nuclio/nuclio/pkg/errors"
"github.com/nuclio/nuclio/pkg/functionconfig"
"github.com/nuclio/nuclio/pkg/processor/runtime"
Expand All @@ -27,16 +29,20 @@ import (

type Configuration struct {
trigger.Configuration
AccessKeyID string
SecretAccessKey string
RegionName string
StreamName string
Shards []string
AccessKeyID string
SecretAccessKey string
RegionName string
StreamName string
Shards []string
IteratorType string
PollingPeriod string
pollingPeriodDuration time.Duration
}

func NewConfiguration(ID string,
triggerConfiguration *functionconfig.Trigger,
runtimeConfiguration *runtime.Configuration) (*Configuration, error) {
var err error
newConfiguration := Configuration{}

// create base
Expand All @@ -47,7 +53,31 @@ func NewConfiguration(ID string,
return nil, errors.Wrap(err, "Failed to decode attributes")
}

// TODO: validate
if newConfiguration.IteratorType == "" {
newConfiguration.IteratorType = "LATEST"
}

if err := newConfiguration.validateIteratorType(newConfiguration.IteratorType); err != nil {
return nil, errors.Wrapf(err, "Invalid iterator type %s", newConfiguration.IteratorType)
}

if newConfiguration.PollingPeriod == "" {
newConfiguration.PollingPeriod = "500ms"
}

newConfiguration.pollingPeriodDuration, err = time.ParseDuration(newConfiguration.PollingPeriod)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse polling period duration")
}

return &newConfiguration, nil
}

func (c *Configuration) validateIteratorType(iteratorType string) error {
switch iteratorType {
case "TRIM_HORIZON", "LATEST":
return nil
default:
return errors.New("Unsupported iterator type")
}
}

0 comments on commit 40e1a16

Please sign in to comment.