-
Notifications
You must be signed in to change notification settings - Fork 307
/
mapper.go
105 lines (93 loc) · 3.38 KB
/
mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package rsources
import "sort"
func statusFromQueryResult(jobRunId string, statMap map[JobTargetKey]Stats) JobStatus {
status := JobStatus{
ID: jobRunId,
}
taskRunIdIndex := make(map[string]int) // task run id -> index
sourceIdIndex := make(map[string]map[string]int) // task run id -> source id -> index
sortedKeys := make([]JobTargetKey, 0, len(statMap))
for key := range statMap {
sortedKeys = append(sortedKeys, key)
}
sort.Slice(sortedKeys, func(i, j int) bool {
return sortedKeys[i].String() < sortedKeys[j].String()
})
for _, key := range sortedKeys {
stat := statMap[key]
var idx int
var ok bool
if idx, ok = taskRunIdIndex[key.TaskRunID]; !ok {
idx = len(taskRunIdIndex)
status.TasksStatus = append(status.TasksStatus, TaskStatus{
ID: key.TaskRunID,
})
taskRunIdIndex[key.TaskRunID] = idx
sourceIdIndex[key.TaskRunID] = make(map[string]int)
}
var sourceIdx int
if sourceIdx, ok = sourceIdIndex[key.TaskRunID][key.SourceID]; !ok {
sourceIdx = len(status.TasksStatus[idx].SourcesStatus)
status.TasksStatus[idx].SourcesStatus = append(
status.TasksStatus[idx].SourcesStatus, SourceStatus{
ID: key.SourceID,
})
sourceIdIndex[key.TaskRunID][key.SourceID] = sourceIdx
}
if key.DestinationID == "" {
status.TasksStatus[idx].SourcesStatus[sourceIdx].Stats = stat
} else {
status.TasksStatus[idx].SourcesStatus[sourceIdx].DestinationsStatus = append(
status.TasksStatus[idx].SourcesStatus[sourceIdx].DestinationsStatus, DestinationStatus{
ID: key.DestinationID,
Stats: stat,
Completed: stat.completed(),
})
}
}
for _, taskStatus := range status.TasksStatus {
for i := range taskStatus.SourcesStatus {
sourcestat := &taskStatus.SourcesStatus[i]
sourcestat.calculateCompleted()
}
}
return status
}
func failedRecordsFromQueryResult(jobRunId string, recordsMap map[JobTargetKey]FailedRecords) JobFailedRecords {
result := JobFailedRecords{
ID: jobRunId,
}
taskRunIdIndex := make(map[string]int) // task run id -> index
sourceIdIndex := make(map[string]map[string]int) // task run id -> source id -> index
sortedKeys := make([]JobTargetKey, 0, len(recordsMap))
for key := range recordsMap {
sortedKeys = append(sortedKeys, key)
}
sort.Slice(sortedKeys, func(i, j int) bool {
return sortedKeys[i].String() < sortedKeys[j].String()
})
for _, key := range sortedKeys {
records := recordsMap[key]
var taskIdx, sourceIdx int
var ok bool
if taskIdx, ok = taskRunIdIndex[key.TaskRunID]; !ok {
taskIdx = len(taskRunIdIndex)
result.Tasks = append(result.Tasks, TaskFailedRecords{
ID: key.TaskRunID,
})
taskRunIdIndex[key.TaskRunID] = taskIdx
sourceIdIndex[key.TaskRunID] = make(map[string]int)
}
if sourceIdx, ok = sourceIdIndex[key.TaskRunID][key.SourceID]; !ok {
sourceIdx = len(result.Tasks[taskIdx].Sources)
sourceIdIndex[key.TaskRunID][key.SourceID] = sourceIdx
result.Tasks[taskIdx].Sources = append(result.Tasks[taskIdx].Sources, SourceFailedRecords{ID: key.SourceID})
}
if key.DestinationID == "" {
result.Tasks[taskIdx].Sources[sourceIdx].Records = append(result.Tasks[taskIdx].Sources[sourceIdx].Records, records...)
} else {
result.Tasks[taskIdx].Sources[sourceIdx].Destinations = append(result.Tasks[taskIdx].Sources[sourceIdx].Destinations, DestinationFailedRecords{ID: key.DestinationID, Records: records})
}
}
return result
}