/
main.go
208 lines (177 loc) · 6.11 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/olivere/elastic/v7"
"github.com/shopspring/decimal"
)
const (
adCostQueryString = "_exists_:ad_id AND report_source:gdt AND -(active:0 AND register:0)"
adCostIndex = "flower-adcost-*"
calcmetaIndex = "flower-calcmeta"
)
var (
start, end int64
host = []string{"http://192.168.16.30:9200"}
user = "admin"
password = "admin"
s = "2023-01-01"
e = "2023-02-01"
concurrency = 20
)
func init() {
startTime, _ := time.ParseInLocation(time.DateOnly, s, time.Local)
start = startTime.Unix()
endTime, _ := time.ParseInLocation(time.DateOnly, e, time.Local)
end = endTime.Unix()
}
func main() {
ctx := context.Background()
cli, err := NewClient(host, user, password)
if err != nil {
panic(err)
}
updates, err := getLatestDataFromAdCost(ctx, cli, start, end)
if err != nil {
panic(err)
}
log.Printf("total updates: %d, worker num: %d", len(updates), concurrency)
var (
updatesChan = make(chan *UpdateByQueryItem)
wg sync.WaitGroup
)
for i := 0; i < concurrency; i++ {
wg.Add(1)
go processMessages(ctx, i, cli, updatesChan, &wg)
}
for i := 0; i < len(updates); i++ {
updatesChan <- &updates[i]
}
close(updatesChan)
wg.Wait()
log.Println("finished")
}
func getLatestDataFromAdCost(ctx context.Context, cli *elastic.Client, start, end int64) ([]UpdateByQueryItem, error) {
aggs := elastic.NewCompositeAggregation().Sources(
elastic.NewCompositeAggregationDateHistogramValuesSource("regDay").Field("@timestamp").TimeZone("Asia/Shanghai").CalendarInterval("1d"),
elastic.NewCompositeAggregationTermsValuesSource("ad_id").Field("ad_id"),
elastic.NewCompositeAggregationTermsValuesSource("active").Field("active"),
elastic.NewCompositeAggregationTermsValuesSource("register").Field("register"),
elastic.NewCompositeAggregationTermsValuesSource("cost").Field("cost"),
elastic.NewCompositeAggregationTermsValuesSource("show").Field("show"),
).
Size(10000)
query := elastic.NewBoolQuery().Filter(
elastic.NewQueryStringQuery(adCostQueryString),
elastic.NewRangeQuery("@timestamp").Format("epoch_second").Gte(start).Lt(end),
)
buckets, err := CompositeAggregation(ctx, cli, []string{adCostIndex}, aggs, query)
if err != nil {
return nil, err
}
updates := make([]UpdateByQueryItem, len(buckets))
for i, bucket := range buckets {
regDay := bucket.Key["regDay"].(float64) / 1000
adId := bucket.Key["ad_id"].(string)
active := bucket.Key["active"].(float64)
activeDec := decimal.NewFromFloat(active)
register := bucket.Key["register"].(float64)
registerDec := decimal.NewFromFloat(register)
show := bucket.Key["show"].(float64)
costDec := decimal.NewFromFloat(bucket.Key["cost"].(float64)).Round(2)
var registerActive, costActive, costRegister float64
if active != 0 {
registerActiveDec := registerDec.DivRound(activeDec, 4)
if !registerActiveDec.IsZero() {
registerActive, _ = registerActiveDec.Float64()
}
costActiveDec := costDec.DivRound(activeDec, 4)
if !costActiveDec.IsZero() {
costActive, _ = costActiveDec.Float64()
}
}
if register != 0 {
costRegisterDec := costDec.DivRound(registerDec, 4)
if !costRegisterDec.IsZero() {
costRegister, _ = costRegisterDec.Float64()
}
}
params := map[string]interface{}{
"active": int64(active),
"register": int64(register),
"show": int64(show),
"register_active": registerActive,
"cost_active": costActive,
"cost_register": costRegister,
}
var script string
for key := range params {
script += fmt.Sprintf("ctx._source[\"%s\"]=params[\"%s\"];", key, key)
}
updates[i] = UpdateByQueryItem{
Index: calcmetaIndex,
Query: fmt.Sprintf("LogType:advertData AND ad_id:%s AND register_time:%d", adId, int64(regDay)),
Script: elastic.NewScript(script).Params(params).Lang("painless"),
}
}
return updates, nil
}
func processMessages(ctx context.Context, workerID int, cli *elastic.Client, updatesChan <-chan *UpdateByQueryItem, wg *sync.WaitGroup) {
defer wg.Done()
var processed int
for update := range updatesChan {
err := UpdateByQuery(ctx, cli, update)
if err != nil {
panic(err)
}
processed++
}
log.Printf("worker %d have processed: %d", workerID, processed)
}
func NewClient(host []string, user, password string) (*elastic.Client, error) {
// 当 es 服务器监听(publish_address)使用内网服务器 ip,而访问(bound_addresses)使用外网IP时,不要设置 client.transport.sniff 为 true。
// 不设置 client.transport.sniff 时,默认为 false (关闭客户端去嗅探整个集群的状态)。因为在自动发现时会使用内网 IP 进行通信,
// 导致无法连接到 es 服务器。因此此时需要直接使用 addTransportAddress 方法把集群中其它机器的 ip 地址加到客户端中。
return elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host...), elastic.SetBasicAuth(user, password))
}
func CompositeAggregation(ctx context.Context, client *elastic.Client, index []string, aggs *elastic.CompositeAggregation, query *elastic.BoolQuery) ([]*elastic.AggregationBucketCompositeItem, error) {
var (
allResultCollected bool
buckets []*elastic.AggregationBucketCompositeItem
)
for !allResultCollected {
service, err := client.Search().Index(index...).Query(query).Aggregation("aggs", aggs).Size(0).Do(ctx)
if err != nil {
return nil, err
}
if agg, found := service.Aggregations.Composite("aggs"); found {
if len(agg.AfterKey) == 0 {
allResultCollected = true
} else {
aggs.AggregateAfter(agg.AfterKey)
}
buckets = append(buckets, agg.Buckets...)
}
}
return buckets, nil
}
type UpdateByQueryItem struct {
Index string
Query string
Script *elastic.Script
}
func UpdateByQuery(ctx context.Context, client *elastic.Client, update *UpdateByQueryItem) error {
result, err := client.UpdateByQuery().Index(update.Index).Q(update.Query).Script(update.Script).IgnoreUnavailable(true).Do(ctx)
if err != nil {
return err
}
if len(result.Failures) > 0 {
bt, _ := json.Marshal(result.Failures)
log.Println("failures:", string(bt))
}
return nil
}