Skip to content

Commit

Permalink
It should work now
Browse files Browse the repository at this point in the history
  • Loading branch information
kngu9 committed Jul 17, 2018
1 parent d58df2f commit 790aeae
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,41 @@ type kinesis struct {
}

func (k *kinesis) initWithStartTime(stream, shard, shardIteratorType, accessKey, secretKey, region string, endpoint string, startTime *time.Time) (*kinesis, error) {
k.startTime = startTime
httpClient := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: DialTimeout,
}).Dial,
TLSHandshakeTimeout: HandShakeTimeout},
Timeout: HTTPTimeout,
}
sess, err := authenticate(accessKey, secretKey)
awsConf := aws.NewConfig().WithRegion(region).WithHTTPClient(httpClient)
if conf.Debug.Verbose {
awsConf = awsConf.WithLogLevel(aws.LogDebugWithRequestRetries | aws.LogDebugWithRequestErrors)
}

if k.endPoint != "" {
awsConf = awsConf.WithEndpoint(k.endPoint)
} else if endpoint != "" {
awsConf = awsConf.WithEndpoint(endpoint)
}

client := awsKinesis.New(sess, awsConf)

k = &kinesis{
stream: stream,
shard: shard,
shardIteratorType: "AT_TIMESTAMP",
client: client,
startTime: startTime,
}
if err != nil {
return k, err
}

return k.init(stream, shard, "AT_TIMESTAMP", accessKey, secretKey, region, endpoint)
err = k.initShardIterator()
return k, err
}

func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string, endpoint string) (*kinesis, error) {
Expand Down

0 comments on commit 790aeae

Please sign in to comment.