-
Notifications
You must be signed in to change notification settings - Fork 307
/
tablearchiver.go
125 lines (101 loc) · 2.95 KB
/
tablearchiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package tablearchiver
import (
"bytes"
"context"
"database/sql"
"errors"
"os"
"path/filepath"
"text/template"
"text/template/parse"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)
const (
PaginationAction = "{{.Pagination}}"
OffsetAction = "{{.Offset}}"
)
type TableJSONArchiver struct {
DbHandle *sql.DB
Pagination int
Offset int
QueryTemplate string
OutputPath string
FileManager filemanager.FileManager
}
var (
pkgLogger logger.LoggerI
)
func init() {
pkgLogger = logger.NewLogger().Child("tablearchiver")
}
func (jsonArchiver *TableJSONArchiver) Do() (location string, err error) {
err = os.MkdirAll(filepath.Dir(jsonArchiver.OutputPath), os.ModePerm)
if err != nil {
pkgLogger.Errorf(`[TableJSONArchiver]: Error in creating local directory: %v`, err)
return location, err
}
gzWriter, err := misc.CreateGZ(jsonArchiver.OutputPath)
if err != nil {
pkgLogger.Errorf(`[TableJSONArchiver]: Error in creating gzWriter: %v`, err)
return location, err
}
t := template.Must(template.New("").Parse(jsonArchiver.QueryTemplate))
var hasPagination bool
for _, node := range t.Root.Nodes {
if node.Type() == parse.NodeAction {
if node.String() == PaginationAction {
hasPagination = true
}
}
}
if hasPagination && jsonArchiver.Pagination < 1 {
err = errors.New(`[TableJSONArchiver] Pagination limit is mandatory if query template has PaginationAction`)
return location, err
}
offset := jsonArchiver.Offset
for {
data := map[string]int{
"Pagination": jsonArchiver.Pagination,
"Offset": offset,
}
buf := bytes.Buffer{}
t.Execute(&buf, data)
query := buf.String()
var rawJSONRows sql.NullString
row := jsonArchiver.DbHandle.QueryRow(query)
err = row.Scan(&rawJSONRows)
if err != nil {
pkgLogger.Errorf(`[TableJSONArchiver]: Scanning row failed with error : %v`, err)
return location, err
}
// break when json is null
if !rawJSONRows.Valid {
break
}
jsonBytes := []byte(rawJSONRows.String)
jsonBytes = bytes.Replace(jsonBytes, []byte("}, \n {"), []byte("}\n{"), -1) //replacing ", \n " with "\n"
jsonBytes = jsonBytes[1 : len(jsonBytes)-1] //stripping starting '[' and ending ']'
jsonBytes = append(jsonBytes, '\n') //appending '\n'
gzWriter.Write(jsonBytes)
if !hasPagination {
break
}
offset += jsonArchiver.Pagination
}
gzWriter.CloseGZ()
file, err := os.Open(jsonArchiver.OutputPath)
if err != nil {
pkgLogger.Errorf(`[TableJSONArchiver]: Error opening local file dump: %v`, err)
return
}
defer file.Close()
output, err := jsonArchiver.FileManager.Upload(context.TODO(), file)
if err != nil {
pkgLogger.Errorf(`[TableJSONArchiver]: Error uploading local file dump to object storage: %v`, err)
return
}
location = output.Location
return location, nil
}