forked from TencentCloud/tencentcloud-cls-sdk-go
/
producer_batch.go
88 lines (78 loc) · 2.55 KB
/
producer_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package tencentcloud_cls_sdk_go
import (
"sync"
"time"
"github.com/golang/protobuf/proto"
)
type ProducerBatch struct {
totalDataSize int64
lock sync.RWMutex
logGroup *LogGroup
logGroupSize int
logGroupCount int
attemptCount int
baseRetryBackoffMs int64
nextRetryMs int64
maxRetryIntervalInMs int64
callBackList []CallBack
createTimeMs int64
maxRetryTimes int
topicID string
result *Result
maxReservedAttempts int
}
// NewProducerBatch 初始化Producer batch
func NewProducerBatch(topicID string, config *AsyncProducerClientConfig, callBackFunc CallBack, logData interface{}) *ProducerBatch {
var logs = make([]*Log, 0)
if log, ok := logData.(*Log); ok {
logs = append(logs, log)
} else if logList, ok := logData.([]*Log); ok {
logs = append(logs, logList...)
}
logGroup := &LogGroup{
Logs: logs,
Source: proto.String(config.Source),
}
currentTimeMs := GetTimeMs(time.Now().UnixNano())
producerBatch := &ProducerBatch{
logGroup: logGroup,
attemptCount: 0,
maxRetryIntervalInMs: config.MaxRetryBackoffMs,
callBackList: []CallBack{},
createTimeMs: currentTimeMs,
maxRetryTimes: config.Retries,
baseRetryBackoffMs: config.BaseRetryBackoffMs,
topicID: topicID,
result: NewResult(),
maxReservedAttempts: config.MaxReservedAttempts,
}
producerBatch.totalDataSize = int64(producerBatch.logGroup.Size())
if callBackFunc != nil {
producerBatch.callBackList = append(producerBatch.callBackList, callBackFunc)
}
return producerBatch
}
func (producerBatch *ProducerBatch) getTopicID() string {
defer producerBatch.lock.RUnlock()
producerBatch.lock.RLock()
return producerBatch.topicID
}
func (producerBatch *ProducerBatch) getLogGroupCount() int {
defer producerBatch.lock.RUnlock()
producerBatch.lock.RLock()
return len(producerBatch.logGroup.GetLogs())
}
func (producerBatch *ProducerBatch) addLogToLogGroup(log interface{}) {
defer producerBatch.lock.Unlock()
producerBatch.lock.Lock()
if item, ok := log.(*Log); ok {
producerBatch.logGroup.Logs = append(producerBatch.logGroup.Logs, item)
} else if logList, ok := log.([]*Log); ok {
producerBatch.logGroup.Logs = append(producerBatch.logGroup.Logs, logList...)
}
}
func (producerBatch *ProducerBatch) addProducerBatchCallBack(callBack CallBack) {
defer producerBatch.lock.Unlock()
producerBatch.lock.Lock()
producerBatch.callBackList = append(producerBatch.callBackList, callBack)
}