forked from Azure/azure-kusto-go
/
status.go
303 lines (246 loc) · 9.49 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package ingest
import (
"fmt"
"time"
storageuid "github.com/gofrs/uuid"
"github.com/google/uuid"
"github.com/kylelemons/godebug/pretty"
"github.com/yangzuo0621/azure-kusto-go/kusto/ingest/internal/properties"
)
// StatusCode is the ingestion status
type StatusCode string
//goland:noinspection GoUnusedConst - Part of the API
const (
// Pending status represents a temporary status.
// Might change during the course of ingestion based on the
// outcome of the data ingestion operation into Kusto.
Pending StatusCode = "Pending"
// Succeeded status represents a permanent status.
// The data has been successfully ingested to Kusto.
Succeeded StatusCode = "Succeeded"
// Failed Status represents a permanent status.
// The data has not been ingested to Kusto.
Failed StatusCode = "Failed"
// Queued status represents a permanent status.
// The data has been queued for ingestion & status tracking was not requested.
// (This does not indicate that the ingestion was successful.)
Queued StatusCode = "Queued"
// Skipped status represents a permanent status.
// No data was supplied for ingestion. The ingest operation was skipped.
Skipped StatusCode = "Skipped"
// PartiallySucceeded status represents a permanent status.
// Part of the data was successfully ingested to Kusto, while other parts failed.
PartiallySucceeded StatusCode = "PartiallySucceeded"
// StatusRetrievalFailed means the client ran into truble reading the status from the service
StatusRetrievalFailed StatusCode = "StatusRetrievalFailed"
// StatusRetrievalCanceled means the user canceld the status check
StatusRetrievalCanceled StatusCode = "StatusRetrievalCanceled"
)
// IsFinal returns true if the ingestion status is a final status, or false if the status is temporary
func (i StatusCode) IsFinal() bool {
return i != Pending
}
// IsSuccess returns true if the status code is a final successfull status code
func (i StatusCode) IsSuccess() bool {
switch i {
case Succeeded, Queued:
return true
default:
return false
}
}
// FailureStatusCode indicates the status of failuted ingestion attempts
type FailureStatusCode string
const (
// Unknown represents an undefined or unset failure state
Unknown FailureStatusCode = "Unknown"
// Permanent represnets failure state that will benefit from a retry attempt
Permanent FailureStatusCode = "Permanent"
// Transient represnet a retryable failure state
Transient FailureStatusCode = "Transient"
// Exhausted represents a retryable failure that has exhusted all retry attempts
Exhausted FailureStatusCode = "Exhausted"
)
// IsRetryable indicates whether there's any merit in retying ingestion
func (i FailureStatusCode) IsRetryable() bool {
switch i {
case Transient, Exhausted:
return true
default:
return false
}
}
// statusRecord is a record containing information regarding the status of an ingation command
type statusRecord struct {
// Status is The ingestion status returned from the service. Status remains 'Pending' during the ingestion process and
// is updated by the service once the ingestion completes. When <see cref="IngestionReportMethod"/> is set to 'Queue', the ingestion status
// will always be 'Queued' and the caller needs to query the reports queues for ingestion status, as configured. To query statuses that were
// reported to queue, see: <see href="https://docs.microsoft.com/en-us/azure/kusto/api/netfx/kusto-ingest-client-status#ingestion-status-in-azure-queue"/>.
// When <see cref="IngestionReportMethod"/> is set to 'Table', call <see cref="IKustoIngestionResult.GetIngestionStatusBySourceId"/> or
// <see cref="IKustoIngestionResult.GetIngestionStatusCollection"/> to retrieve the most recent ingestion status.
Status StatusCode
// IngestionSourceID is a unique identifier representing the ingested source. It can be supplied during the ingestion execution.
IngestionSourceID uuid.UUID
// IngestionSourcePath is the URI of the blob, potentially including the secret needed to access
// the blob. This can be a filesystem URI (on-premises deployments only),
// or an Azure Blob Storage URI (including a SAS key or a semicolon followed by the account key).
IngestionSourcePath string
// Database is the name of the database holding the target table.
Database string
// Table is the name of the target table into which the data will be ingested.
Table string
// UpdatedOn is the last updated time of the ingestion status.
UpdatedOn time.Time
// OperationID is the ingestion's operation ID.
OperationID uuid.UUID
// ActivityID is the ingestion's activity ID.
ActivityID uuid.UUID
// ErrorCode In case of a failure, indicates the failure's error code.
ErrorCode string
// FailureStatus - In case of a failure, indicates the failure's status.
FailureStatus FailureStatusCode
// Details is a human readable description of the error added in case of a failure.
Details string
// OriginatesFromUpdatePolicy indicates whether or not the failure originated from an Update Policy, in case of a failure.
OriginatesFromUpdatePolicy bool
}
const (
undefinedString = "Undefined"
unknownString = "Unknown"
)
// newStatusRecord creates a new record initialized with defaults.
func newStatusRecord() statusRecord {
rec := statusRecord{
Status: Failed,
IngestionSourceID: uuid.Nil,
IngestionSourcePath: undefinedString,
Database: undefinedString,
Table: undefinedString,
UpdatedOn: time.Now(),
OperationID: uuid.Nil,
ActivityID: uuid.Nil,
ErrorCode: unknownString,
FailureStatus: Unknown,
Details: "",
OriginatesFromUpdatePolicy: false,
}
return rec
}
// FromProps takes in data from ingestion options.
func (r *statusRecord) FromProps(props properties.All) {
r.IngestionSourceID = props.Source.ID
r.Database = props.Ingestion.DatabaseName
r.Table = props.Ingestion.TableName
r.UpdatedOn = time.Now()
if props.Ingestion.BlobPath != "" && r.IngestionSourcePath == undefinedString {
r.IngestionSourcePath = props.Ingestion.BlobPath
}
}
// FromMap converts an ingestion status record to a key value map.
func (r *statusRecord) FromMap(data map[string]interface{}) {
strStatus := safeGetString(data, "Status")
if len(strStatus) > 0 {
r.Status = StatusCode(strStatus)
}
strStatus = safeGetString(data, "FailureStatus")
if len(strStatus) > 0 {
r.FailureStatus = FailureStatusCode(strStatus)
}
r.IngestionSourcePath = safeGetString(data, "IngestionSourcePath")
r.Database = safeGetString(data, "Database")
r.Table = safeGetString(data, "Table")
r.ErrorCode = safeGetString(data, "ErrorCode")
r.Details = safeGetString(data, "Details")
r.IngestionSourceID = getGoogleUUIDFromInterface(data, "IngestionSourceId")
r.OperationID = getGoogleUUIDFromInterface(data, "OperationId")
r.ActivityID = getGoogleUUIDFromInterface(data, "ActivityId")
if data["UpdatedOn"] != nil {
if t, err := getTimeFromInterface(data["UpdatedOn"]); err == nil {
r.UpdatedOn = t
}
}
if data["OriginatesFromUpdatePolicy"] != nil {
if b, ok := data["OriginatesFromUpdatePolicy"].(bool); ok {
r.OriginatesFromUpdatePolicy = b
}
}
}
// StatusFromMapForTests converts an ingestion status record to a key value map. This is useful for comparison in tests.
func StatusFromMapForTests(data map[string]interface{}) error {
r := newStatusRecord()
r.FromMap(data)
return r
}
// ToMap converts an ingestion status record to a key value map.
func (r *statusRecord) ToMap() map[string]interface{} {
data := make(map[string]interface{})
// Since we only create the initial record, It's not our responsibility to write the following fields:
// OperationID, AcitivityID, ErrorCode, FailureStatus, Details, OriginatesFromUpdatePolicy
// Those will be read from the server if they have data in them
data["Status"] = r.Status
data["IngestionSourceId"] = r.IngestionSourceID
data["IngestionSourcePath"] = r.IngestionSourcePath
data["Database"] = r.Database
data["Table"] = r.Table
data["UpdatedOn"] = r.UpdatedOn.Format(time.RFC3339Nano)
return data
}
// String implements fmt.Stringer.
func (r *statusRecord) String() string {
return pretty.Sprint(r)
}
// Error converts an ingestion status to a string. Since we only provide the record in case of an error, the success branches will never be called.
func (r statusRecord) Error() string {
switch r.Status {
case Succeeded:
return fmt.Sprintf("Ingestion succeeded\n" + r.String())
case Queued:
return fmt.Sprintf("Ingestion Queued\n" + r.String())
case PartiallySucceeded:
return fmt.Sprintf("Ingestion succeeded partially\n" + r.String())
default:
return fmt.Sprintf("Ingestion Failed\n" + r.String())
}
}
func getTimeFromInterface(x interface{}) (time.Time, error) {
switch x := x.(type) {
case string:
return time.Parse(time.RFC3339Nano, x)
case time.Time:
return x, nil
default:
return time.Now(), fmt.Errorf("getTimeFromInterface: Unexpected format %T", x)
}
}
func getGoogleUUIDFromInterface(data map[string]interface{}, key string) uuid.UUID {
x := data[key]
if x == nil {
return uuid.Nil
}
switch x := x.(type) {
case uuid.UUID:
return x
case string:
uid, err := uuid.Parse(x)
if err != nil {
return uuid.Nil
}
return uid
case storageuid.UUID:
uid, err := uuid.ParseBytes(x.Bytes())
if err != nil {
return uuid.Nil
}
return uid
default:
return uuid.Nil
}
}
func safeGetString(data map[string]interface{}, key string) string {
if v := data[key]; v != nil {
if s, ok := v.(string); ok {
return s
}
}
return ""
}