-
Notifications
You must be signed in to change notification settings - Fork 85
/
search_async.go
130 lines (114 loc) · 4.56 KB
/
search_async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2019 The Moov Authors
// Use of this source code is governed by an Apache License
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/go-kit/kit/log"
)
var (
watchResearchBatchSize = 100
)
func init() {
watchResearchBatchSize = readWebhookBatchSize(os.Getenv("WEBHOOK_BATCH_SIZE"))
}
func readWebhookBatchSize(str string) int {
if str == "" {
return watchResearchBatchSize
}
d, _ := strconv.Atoi(str)
if d > 0 {
return d
}
return watchResearchBatchSize
}
// spawnResearching will block and select on updates for when to re-inspect all watches setup.
// Since watches are used to post list data via webhooks they are used as catalysts in other systems.
func (s *searcher) spawnResearching(logger log.Logger, companyRepo companyRepository, custRepo customerRepository, watchRepo watchRepository, webhookRepo webhookRepository, updates chan *downloadStats) {
for range updates {
s.logger.Log("search", "async: starting re-search of watches")
cursor := watchRepo.getWatchesCursor(logger, watchResearchBatchSize)
for {
watches, _ := cursor.Next()
if len(watches) == 0 {
break
}
for i := range watches {
var body *bytes.Buffer
var err error
// Perform a query (ID watches) or search (name watches) and encode the model in JSON for calling the webhook.
switch {
case watches[i].customerID != "":
s.logger.Log("search", fmt.Sprintf("async: watch %s for customer %s found", watches[i].id, watches[i].customerID))
body, err = getCustomerBody(s, watches[i].id, watches[i].customerID, 1.0, custRepo)
case watches[i].customerName != "":
s.logger.Log("search", fmt.Sprintf("async: name watch '%s' for customer %s found", watches[i].customerName, watches[i].id))
sdns := s.TopSDNs(5, watches[i].customerName)
for i := range sdns {
if strings.EqualFold(sdns[i].SDNType, "individual") {
body, err = getCustomerBody(s, watches[i].id, sdns[i].EntityID, sdns[i].match, custRepo)
break
}
}
case watches[i].companyID != "":
s.logger.Log("search", fmt.Sprintf("async: watch %s for company %s found", watches[i].id, watches[i].companyID))
body, err = getCompanyBody(s, watches[i].id, watches[i].companyID, 1.0, companyRepo)
case watches[i].companyName != "":
s.logger.Log("search", fmt.Sprintf("async: name watch '%s' for company %s found", watches[i].companyName, watches[i].id))
sdns := s.TopSDNs(5, watches[i].companyName)
for i := range sdns {
if !strings.EqualFold(sdns[i].SDNType, "individual") {
body, err = getCompanyBody(s, watches[i].id, sdns[i].EntityID, sdns[i].match, companyRepo)
break
}
}
}
if err != nil {
s.logger.Log("search", fmt.Sprintf("async: watch %s: %v", watches[i].id, err))
continue // skip to next watch since we failed somewhere
}
// Send HTTP webhook
now := time.Now()
status, err := callWebhook(watches[i].id, body, watches[i].webhook, watches[i].authToken)
if err != nil {
s.logger.Log("search", fmt.Errorf("async: problem writing watch (%s) webhook status: %v", watches[i].id, err))
}
if err := webhookRepo.recordWebhook(watches[i].id, now, status); err != nil {
s.logger.Log("search", fmt.Errorf("async: problem writing watch (%s) webhook status: %v", watches[i].id, err))
}
}
}
}
}
// getCustomerBody returns the JSON encoded form of a given customer by their EntityID
func getCustomerBody(s *searcher, watchID string, customerID string, match float64, repo customerRepository) (*bytes.Buffer, error) {
customer, _ := getCustomerByID(customerID, s, repo)
if customer == nil {
return nil, fmt.Errorf("async: watch %s customer %v not found", watchID, customerID)
}
customer.Match = match
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(customer); err != nil {
return nil, fmt.Errorf("problem creating JSON for customer watch %s: %v", watchID, err)
}
return &buf, nil
}
// getCompanyBody returns the JSON encoded form of a given customer by their EntityID
func getCompanyBody(s *searcher, watchID string, companyID string, match float64, repo companyRepository) (*bytes.Buffer, error) {
company, _ := getCompanyByID(companyID, s, repo)
if company == nil {
return nil, fmt.Errorf("async: watch %s company %v not found", watchID, companyID)
}
company.Match = match
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(company); err != nil {
return nil, fmt.Errorf("problem creating JSON for company watch %s: %v", watchID, err)
}
return &buf, nil
}