-
Notifications
You must be signed in to change notification settings - Fork 111
/
ingest.go
145 lines (121 loc) · 3.16 KB
/
ingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package druid
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type taskResult struct {
Task string
}
type taskReport struct {
IngestionStatsAndErrors struct {
Payload struct {
RowStats struct {
BuildSegments struct {
Processed float64
}
}
IngestionState string
}
}
}
type datasourceDetails struct {
Segments struct {
Count float64
}
}
// Ingest uses the Druid REST API to submit an ingestion spec. It returns once Druid has finished ingesting data.
// This function is for test and development usage and has not been tested for production use.
func Ingest(coordinatorURL, specJSON, datasourceName string, timeout time.Duration) error {
var status taskResult
err := sendRequest(coordinatorURL, http.MethodPost, "/druid/indexer/v1/task", specJSON, &status)
if err != nil {
return err
}
deadline := time.Now().Add(timeout)
pollInterval := 2 * time.Second
for {
time.Sleep(pollInterval)
if time.Now().After(deadline) {
return fmt.Errorf("ingestion timeout")
}
tr, err := getTaskReport(coordinatorURL, status.Task)
if err != nil {
// The coordinator may return 404 or 500 on the first few polls
if strings.Contains(err.Error(), "failed with status:") {
continue
}
return err
}
success := tr.IngestionStatsAndErrors.Payload.IngestionState != ""
if !success {
continue
}
ds, err := getDatasourceDetails(coordinatorURL, datasourceName)
if err != nil {
return err
}
segmentsProcessedCount := tr.IngestionStatsAndErrors.Payload.RowStats.BuildSegments.Processed
ingestionState := tr.IngestionStatsAndErrors.Payload.IngestionState
segmentsSubmittedCount := ds.Segments.Count
if ingestionState == "COMPLETED" && segmentsProcessedCount == segmentsSubmittedCount {
return nil
}
}
}
func getTaskReport(coordinatorURL, taskID string) (*taskReport, error) {
var res taskReport
path := fmt.Sprintf("/druid/indexer/v1/task/%s/reports", taskID)
err := sendRequest(coordinatorURL, http.MethodGet, path, "", &res)
if err != nil {
return nil, err
}
return &res, err
}
func getDatasourceDetails(coordinatorURL, datasourceName string) (*datasourceDetails, error) {
var res datasourceDetails
path := fmt.Sprintf("/druid/coordinator/v1/datasources/%s", datasourceName)
err := sendRequest(coordinatorURL, http.MethodGet, path, "", &res)
if err != nil {
return nil, err
}
return &res, err
}
func sendRequest(coordinatorURL, method, path, jsonBody string, out any) error {
reqURL, err := url.JoinPath(coordinatorURL, path)
if err != nil {
return err
}
var reqBody io.Reader
if jsonBody != "" {
reqBody = strings.NewReader(jsonBody)
}
req, err := http.NewRequest(method, reqURL, reqBody)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode >= http.StatusBadRequest {
return fmt.Errorf("coordinator request failed with status: %d", res.StatusCode)
}
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if len(body) > 0 {
err = json.Unmarshal(body, out)
if err != nil {
return err
}
}
return nil
}