-
Notifications
You must be signed in to change notification settings - Fork 311
/
jsonloader.go
77 lines (64 loc) · 2.16 KB
/
jsonloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package encoding
import (
"encoding/json"
"fmt"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
type JsonLoader struct {
destType string
columnData map[string]interface{}
fileWriter LoadFileWriter
}
// NewJSONLoader returns a new JsonLoader
// JsonLoader is only for BQ now. Treat this is as custom BQ loader.
// If more warehouses are added in the future, change this accordingly.
func NewJSONLoader(destType string, writer LoadFileWriter) *JsonLoader {
loader := &JsonLoader{destType: destType, fileWriter: writer}
loader.columnData = make(map[string]interface{})
return loader
}
func (loader *JsonLoader) IsLoadTimeColumn(columnName string) bool {
return columnName == warehouseutils.ToProviderCase(loader.destType, UUIDTsColumn) || columnName == warehouseutils.ToProviderCase(loader.destType, LoadedAtColumn)
}
func (loader *JsonLoader) GetLoadTimeFormat(columnName string) string {
switch columnName {
case warehouseutils.ToProviderCase(loader.destType, UUIDTsColumn):
return BQUuidTSFormat
case warehouseutils.ToProviderCase(loader.destType, LoadedAtColumn):
return BQLoadedAtFormat
}
return ""
}
func (loader *JsonLoader) AddColumn(columnName, _ string, val interface{}) {
providerColumnName := warehouseutils.ToProviderCase(loader.destType, columnName)
loader.columnData[providerColumnName] = val
}
func (loader *JsonLoader) AddRow(columnNames, row []string) {
for i, columnName := range columnNames {
providerColumnName := warehouseutils.ToProviderCase(loader.destType, columnName)
loader.columnData[providerColumnName] = row[i]
}
}
func (loader *JsonLoader) AddEmptyColumn(columnName string) {
loader.AddColumn(columnName, "", nil)
}
func (loader *JsonLoader) WriteToString() (string, error) {
var (
jsonData []byte
err error
)
if jsonData, err = json.Marshal(loader.columnData); err != nil {
return "", fmt.Errorf("json.Marshal: %w", err)
}
return string(jsonData) + "\n", nil
}
func (loader *JsonLoader) Write() error {
var (
eventData string
err error
)
if eventData, err = loader.WriteToString(); err != nil {
return fmt.Errorf("writeToString: %w", err)
}
return loader.fileWriter.WriteGZ(eventData)
}