Skip to content

Commit

Permalink
fix/PA-23889 give buffer size to the channels (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafet authored Nov 3, 2023
1 parent 54535d5 commit b6eac8c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
1 change: 1 addition & 0 deletions inskinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here's a quick guide on how to get started with the `inskinesis` package:
| MaxGroup | 1 | The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1. |
| RetryCount | 3 | The number of times to retry sending a batch of records to the stream. |
| RetryInterval | 100 ms | The interval between retries. |
| Verbose | false | Whether to enable verbose logging. |

Please note that `N/A` in the Default Value column indicates that these fields are required and do not have default
values.
Expand Down
48 changes: 33 additions & 15 deletions inskinesis/inskinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const outputSeparator = byte('\n')
const errorChannelSize = 100

type KinesisInterface interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
Expand Down Expand Up @@ -61,6 +62,8 @@ type stream struct {

failedCount int // Counter for the number of failed record submissions.
totalCount int // Counter for the total number of records sent to the stream.

verbose bool // Verbose mode
}

type Config struct {
Expand All @@ -73,6 +76,7 @@ type Config struct {
MaxGroup int
RetryCount int
RetryWaitTime time.Duration
Verbose bool
}

// NewKinesis creates a new Kinesis stream.
Expand Down Expand Up @@ -105,14 +109,16 @@ func NewKinesis(config Config) (StreamInterface, error) {

wgLogChan: &sync.WaitGroup{},
wgBatchChan: &sync.WaitGroup{},
logChannel: make(chan interface{}, 1000),
logChannel: make(chan interface{}, 2000),
batchChannel: make(chan []interface{}, 100),
errChannel: make(chan error),
stopChannel: make(chan bool),
stopBatchChannel: make(chan bool),
errChannel: make(chan error, errorChannelSize),
stopChannel: make(chan bool, 10),
stopBatchChannel: make(chan bool, 10),

retryCount: config.RetryCount,
retryWaitTime: 100 * time.Millisecond,

verbose: config.Verbose,
}

if s.logBufferSize == 0 {
Expand Down Expand Up @@ -161,7 +167,9 @@ func (s *stream) startStreaming() {

batches, err := createBatches(batch, s.maxStreamBatchSize, s.maxStreamBatchByteSize)
if err != nil {
s.errChannel <- err
if len(s.errChannel) < errorChannelSize {
s.errChannel <- err
}
s.wgLogChan.Done()
continue
}
Expand Down Expand Up @@ -237,13 +245,15 @@ func (s *stream) sendSingleBatch(batch []interface{}, concurrentLimiter chan str
failedCount, err := s.PutRecords(batch)
s.failedCount += failedCount
if err != nil {
fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
s.errChannel <- err
s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
if len(s.errChannel) < errorChannelSize {
s.errChannel <- err
}

return
}

fmt.Printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name)
}()
}

Expand All @@ -255,7 +265,7 @@ func (s *stream) start() {
func (s *stream) FlushAndStopStreaming() {
s.stopAndWaitLogStreaming()

fmt.Printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name)
s.printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name)
}

// PutRecords sends records to the Kinesis stream.
Expand All @@ -277,9 +287,9 @@ func (s *stream) Put(record interface{}) {
}

func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount int) (int, error) {
fmt.Printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name)
if retryCount < 0 {
fmt.Printf("Retry count exceeded for Kinesis stream %s\n", s.name)
s.printf("Retry count exceeded for Kinesis stream %s\n", s.name)
return len(batch), errors.New("retry count exceeded")
}

Expand All @@ -289,17 +299,17 @@ func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount
})

if err != nil {
fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
return len(batch), err
}

if res != nil && res.FailedRecordCount != nil && *res.FailedRecordCount > 0 {
fmt.Printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name)
s.printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name)
failedRecords := s.wrapWithPutRecordsRequestEntry(getFailedRecords(res, batch))
batch = failedRecords
retryCount--

fmt.Printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name)
time.Sleep(s.retryWaitTime)
failed, err := s.putRecords(batch, retryCount)
if err != nil {
Expand Down Expand Up @@ -328,7 +338,7 @@ func (s *stream) transformRecords(records []interface{}) ([]*kinesis.PutRecordsR
}

if failedRecords > 0 {
fmt.Printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name)
s.printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name)
}

return transformedRecords, err
Expand Down Expand Up @@ -366,5 +376,13 @@ func addOutputSeparatorIfNeeded(record []byte) []byte {
if record[len(record)-1] != outputSeparator {
return append(record, outputSeparator)
}

return record
}

// custom printf if verbose mode is enabled
func (s *stream) printf(format string, a ...interface{}) {
if s.verbose {
fmt.Printf(format, a...)
}
}

0 comments on commit b6eac8c

Please sign in to comment.