forked from olivere/elastic
/
scroll.go
124 lines (109 loc) · 2.78 KB
/
scroll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
// Scroll illustrates scrolling through a set of documents.
//
// Example
//
// Scroll through an index called "products".
// Use "_uid" as the default field:
//
// scroll -index=products -size=100
//
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"github.com/olivere/elastic"
)
func main() {
var (
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
index = flag.String("index", "", "Elasticsearch index name")
typ = flag.String("type", "", "Elasticsearch type name")
size = flag.Int("size", 100, "Slice of documents to get per scroll")
sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
)
flag.Parse()
log.SetFlags(0)
if *url == "" {
log.Fatal("missing url parameter")
}
if *index == "" {
log.Fatal("missing index parameter")
}
if *size <= 0 {
log.Fatal("size must be greater than zero")
}
// Create an Elasticsearch client
client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
if err != nil {
log.Fatal(err)
}
// Setup a group of goroutines from the excellent errgroup package
g, ctx := errgroup.WithContext(context.TODO())
// Hits channel will be sent to from the first set of goroutines and consumed by the second
type hit struct {
Slice int
Hit elastic.SearchHit
}
hitsc := make(chan hit)
begin := time.Now()
// Start goroutine for this sliced scroll
g.Go(func() error {
defer close(hitsc)
// Prepare the query
var query elastic.Query
if *typ == "" {
query = elastic.NewMatchAllQuery()
} else {
query = elastic.NewTypeQuery(*typ)
}
svc := client.Scroll(*index).Query(query)
for {
res, err := svc.Do(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
for _, searchHit := range res.Hits.Hits {
// Pass the hit to the hits channel, which will be consumed below
select {
case hitsc <- hit{Hit: *searchHit}:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
// Second goroutine will consume the hits sent from the workers in first set of goroutines
var total uint64
g.Go(func() error {
for range hitsc {
// We simply count the hits here.
current := atomic.AddUint64(&total, 1)
sec := int(time.Since(begin).Seconds())
fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
select {
default:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Wait until all goroutines are finished
if err := g.Wait(); err != nil {
log.Fatal(err)
}
fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
}