Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/PA-23889 give buffer size to the channels #15

Merged
merged 3 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions inskinesis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here's a quick guide on how to get started with the `inskinesis` package:
| MaxGroup | 1 | The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1. |
| RetryCount | 3 | The number of times to retry sending a batch of records to the stream. |
| RetryInterval | 100 ms | The interval between retries. |
| Verbose | false | Whether to enable verbose logging. |

Please note that `N/A` in the Default Value column indicates that these fields are required and do not have default
values.
Expand Down
48 changes: 33 additions & 15 deletions inskinesis/inskinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const outputSeparator = byte('\n')
const errorChannelSize = 100

type KinesisInterface interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
Expand Down Expand Up @@ -61,6 +62,8 @@ type stream struct {

failedCount int // Counter for the number of failed record submissions.
totalCount int // Counter for the total number of records sent to the stream.

verbose bool // Verbose mode
}

type Config struct {
Expand All @@ -73,6 +76,7 @@ type Config struct {
MaxGroup int
RetryCount int
RetryWaitTime time.Duration
Verbose bool
}

// NewKinesis creates a new Kinesis stream.
Expand Down Expand Up @@ -105,14 +109,16 @@ func NewKinesis(config Config) (StreamInterface, error) {

wgLogChan: &sync.WaitGroup{},
wgBatchChan: &sync.WaitGroup{},
logChannel: make(chan interface{}, 1000),
logChannel: make(chan interface{}, 2000),
batchChannel: make(chan []interface{}, 100),
errChannel: make(chan error),
stopChannel: make(chan bool),
stopBatchChannel: make(chan bool),
errChannel: make(chan error, errorChannelSize),
stopChannel: make(chan bool, 10),
stopBatchChannel: make(chan bool, 10),

retryCount: config.RetryCount,
retryWaitTime: 100 * time.Millisecond,

verbose: config.Verbose,
}

if s.logBufferSize == 0 {
Expand Down Expand Up @@ -161,7 +167,9 @@ func (s *stream) startStreaming() {

batches, err := createBatches(batch, s.maxStreamBatchSize, s.maxStreamBatchByteSize)
if err != nil {
s.errChannel <- err
if len(s.errChannel) < errorChannelSize {
s.errChannel <- err
}
s.wgLogChan.Done()
continue
}
Expand Down Expand Up @@ -237,13 +245,15 @@ func (s *stream) sendSingleBatch(batch []interface{}, concurrentLimiter chan str
failedCount, err := s.PutRecords(batch)
s.failedCount += failedCount
if err != nil {
fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
s.errChannel <- err
s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
if len(s.errChannel) < errorChannelSize {
s.errChannel <- err
}

return
}

fmt.Printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name)
}()
}

Expand All @@ -255,7 +265,7 @@ func (s *stream) start() {
func (s *stream) FlushAndStopStreaming() {
s.stopAndWaitLogStreaming()

fmt.Printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name)
s.printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name)
}

// PutRecords sends records to the Kinesis stream.
Expand All @@ -277,9 +287,9 @@ func (s *stream) Put(record interface{}) {
}

func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount int) (int, error) {
fmt.Printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name)
if retryCount < 0 {
fmt.Printf("Retry count exceeded for Kinesis stream %s\n", s.name)
s.printf("Retry count exceeded for Kinesis stream %s\n", s.name)
return len(batch), errors.New("retry count exceeded")
}

Expand All @@ -289,17 +299,17 @@ func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount
})

if err != nil {
fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err)
return len(batch), err
}

if res != nil && res.FailedRecordCount != nil && *res.FailedRecordCount > 0 {
fmt.Printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name)
s.printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name)
failedRecords := s.wrapWithPutRecordsRequestEntry(getFailedRecords(res, batch))
batch = failedRecords
retryCount--

fmt.Printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name)
s.printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name)
time.Sleep(s.retryWaitTime)
failed, err := s.putRecords(batch, retryCount)
if err != nil {
Expand Down Expand Up @@ -328,7 +338,7 @@ func (s *stream) transformRecords(records []interface{}) ([]*kinesis.PutRecordsR
}

if failedRecords > 0 {
fmt.Printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name)
s.printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name)
}

return transformedRecords, err
Expand Down Expand Up @@ -366,5 +376,13 @@ func addOutputSeparatorIfNeeded(record []byte) []byte {
if record[len(record)-1] != outputSeparator {
return append(record, outputSeparator)
}

return record
}

// custom printf if verbose mode is enabled
func (s *stream) printf(format string, a ...interface{}) {
if s.verbose {
fmt.Printf(format, a...)
}
}