Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ var ModeKeyOptions = map[string][]Option{
KeyName: KeyESVersion,
Default: ElasticVersion5,
ChooseOnly: true,
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6, ElasticVersion7},
Description: "版本(es_version)",
ToolTip: "版本,3.x包含了2.x",
},
Expand Down
1 change: 1 addition & 0 deletions reader/config/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ const (
ElasticVersion3 = "3.x"
ElasticVersion5 = "5.x"
ElasticVersion6 = "6.x"
ElasticVersion7 = "7.x"
)

// Constants for HTTP
Expand Down
90 changes: 90 additions & 0 deletions reader/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/robfig/cron"
elasticV3 "gopkg.in/olivere/elastic.v3"
elasticV5 "gopkg.in/olivere/elastic.v5"
elasticV7 "gopkg.in/olivere/elastic.v7"

"github.com/qiniu/log"

Expand Down Expand Up @@ -89,6 +90,7 @@ type Reader struct {
elasticV3Client *elasticV3.Client
elasticV5Client *elasticV5.Client
elasticV6Client *elasticV6.Client
elasticV7Client *elasticV7.Client
}

func init() {
Expand Down Expand Up @@ -132,7 +134,22 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
var elasticV3Client *elasticV3.Client
var elasticV5Client *elasticV5.Client
var elasticV6Client *elasticV6.Client
var elasticV7Client *elasticV7.Client
switch esVersion {
case ElasticVersion7:
optFns := []elasticV7.ClientOptionFunc{
elasticV7.SetHealthcheck(false),
elasticV7.SetURL(eshost),
}

if len(authUsername) > 0 && len(authPassword) > 0 {
optFns = append(optFns, elasticV7.SetBasicAuth(authUsername, authPassword))
}

elasticV7Client, err = elasticV7.NewClient(optFns...)
if err != nil {
return nil, err
}
case ElasticVersion6:
optFns := []elasticV6.ClientOptionFunc{
elasticV6.SetHealthcheck(false),
Expand Down Expand Up @@ -205,6 +222,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
elasticV3Client: elasticV3Client,
elasticV5Client: elasticV5Client,
elasticV6Client: elasticV6Client,
elasticV7Client: elasticV7Client,
}
if len(cronSched) > 0 {
cronSched = strings.ToLower(cronSched)
Expand Down Expand Up @@ -325,12 +343,42 @@ func (r *Reader) getIndexShift() string {

// 循环读取默认间隔时间3s,只支持全量读取,不支持offset字段
func (r *Reader) execWithLoop() error {
defer func() {
if rec := recover(); rec != nil {
log.Errorf("Runner[%v] recover when exec with loop\npanic: %v\nstack: %s", r.meta.RunnerName, rec, debug.Stack())
}
}()
var index = r.esindex
if r.dateShift {
index = r.getIndexShift()
}
// Create a client
switch r.esVersion {
case ElasticVersion7:
scroll := r.elasticV7Client.Scroll(index).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
return nil // all results retrieved
}
if err != nil {
return err // something went wrong
}

// Send the hits to the hits channel
for _, hit := range results.Hits.Hits {
r.readChan <- Record{
data: hit.Source,
}
}
r.offset = results.ScrollId
if r.isStopping() || r.hasStopped() {
return nil
}
}
case ElasticVersion6:
scroll := r.elasticV6Client.Scroll(index).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
Expand Down Expand Up @@ -414,12 +462,51 @@ func (r *Reader) execWithLoop() error {

// 定时读取,支持增量读取,需要指定具有自增属性的offset字段
func (r *Reader) execWithCron() error {
defer func() {
if rec := recover(); rec != nil {
log.Errorf("Runner[%v] recover when exec with cron\npanic: %v\nstack: %s", r.meta.RunnerName, rec, debug.Stack())
}
}()
var index = r.esindex
if r.dateShift {
index = r.getIndexShift()
}
// Create a client
switch r.esVersion {
case ElasticVersion7:
var rangeQuery *elasticV7.RangeQuery
if r.cronOffsetValueIsValid {
rangeQuery = elasticV7.NewRangeQuery(r.cronOffsetKey).Gte(r.cronOffsetValue)
} else {
rangeQuery = elasticV7.NewRangeQuery(r.cronOffsetKey)
}
scroll := r.elasticV7Client.Scroll(index).Query(rangeQuery).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
return nil // all results retrieved
}
if err != nil {
return err // something went wrong
}

// Send the hits to the hits channel
for _, hit := range results.Hits.Hits {
m := make(map[string]interface{})
jsoniter.Unmarshal(hit.Source, &m)
r.readChan <- Record{
data: hit.Source,
cronOffset: m[r.cronOffsetKey],
}
}
r.offset = results.ScrollId
if r.isStopping() || r.hasStopped() {
return nil
}
}
case ElasticVersion6:
var rangeQuery *elasticV6.RangeQuery
if r.cronOffsetValueIsValid {
Expand Down Expand Up @@ -637,6 +724,9 @@ func (r *Reader) Close() error {
if r.elasticV6Client != nil {
r.elasticV6Client.Stop()
}
if r.elasticV7Client != nil {
r.elasticV7Client.Stop()
}

// 如果此时没有 routine 正在运行,则在此处关闭数据管道,否则由 routine 在退出时负责关闭
if atomic.CompareAndSwapInt32(&r.routineStatus, StatusInit, StatusStopping) {
Expand Down
Loading