forked from apache/incubator-devlake
/
api_extractor.go
121 lines (108 loc) · 3.15 KB
/
api_extractor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package helper
import (
"fmt"
"github.com/merico-dev/lake/models/common"
"reflect"
"github.com/merico-dev/lake/plugins/core"
)
// Accept raw json body and params, return list of entities that need to be stored
type RawDataExtractor func(row *RawData) ([]interface{}, error)
type ApiExtractorArgs struct {
RawDataSubTaskArgs
Params interface{}
Extract RawDataExtractor
BatchSize int
}
// ApiExtractor helps you extract Raw Data from api responses to Tool Layer Data
// It reads rows from specified raw data table, and feed it into `Extract` handler
// you can return arbitrary tool layer entities in this handler, ApiExtractor would
// first delete old data by their RawDataOrigin information, and then perform a
// batch insertion for you.
type ApiExtractor struct {
*RawDataSubTask
args *ApiExtractorArgs
}
func NewApiExtractor(args ApiExtractorArgs) (*ApiExtractor, error) {
// process args
rawDataSubTask, err := newRawDataSubTask(args.RawDataSubTaskArgs)
if err != nil {
return nil, err
}
if args.BatchSize == 0 {
args.BatchSize = 500
}
return &ApiExtractor{
RawDataSubTask: rawDataSubTask,
args: &args,
}, nil
}
func (extractor *ApiExtractor) Execute() error {
// load data from database
db := extractor.args.Ctx.GetDb()
cursor, err := db.Table(extractor.table).Order("id ASC").Where("params = ?", extractor.params).Rows()
if err != nil {
return err
}
defer cursor.Close()
row := &RawData{}
// batch insertion divider
RAW_DATA_ORIGIN := "RawDataOrigin"
divider := NewBatchSaveDivider(db, extractor.args.BatchSize)
divider.OnNewBatchSave(func(rowType reflect.Type) error {
// check if row type has RawDataOrigin
if rawDataOrigin, ok := rowType.Elem().FieldByName(RAW_DATA_ORIGIN); ok {
if (rawDataOrigin.Type != reflect.TypeOf(common.RawDataOrigin{})) {
return fmt.Errorf("type %s must nested RawDataOrigin struct", rowType.Name())
}
} else {
return fmt.Errorf("type %s must nested RawDataOrigin struct", rowType.Name())
}
// delete old data
return db.Delete(
reflect.New(rowType).Interface(),
"_raw_data_table = ? AND _raw_data_params = ?",
extractor.table, extractor.params,
).Error
})
// prgress
extractor.args.Ctx.SetProgress(0, -1)
ctx := extractor.args.Ctx.GetContext()
// iterate all rows
for cursor.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err = db.ScanRows(cursor, row)
if err != nil {
return err
}
results, err := extractor.args.Extract(row)
if err != nil {
return err
}
for _, result := range results {
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
return err
}
// set raw data origin field
reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN).Set(reflect.ValueOf(common.RawDataOrigin{
RawDataTable: extractor.table,
RawDataId: row.ID,
RawDataParams: row.Params,
}))
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {
return err
}
}
extractor.args.Ctx.IncProgress(1)
}
// save the last batches
return divider.Close()
}
var _ core.SubTask = (*ApiExtractor)(nil)