-
-
Notifications
You must be signed in to change notification settings - Fork 115
/
exporter.go
148 lines (126 loc) · 4.28 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package fileexporter
import (
"context"
"fmt"
"log"
"os"
"runtime"
"strings"
"sync"
"text/template"
"github.com/thomaspoignant/go-feature-flag/exporter"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)
type Exporter struct {
// Format is the output format you want in your exported file.
// Available format are JSON, CSV, and Parquet.
// Default: JSON
Format string
// OutputDir is the location of the directory where to store the exported files
// It should finish with a /
// Default: the current directory
OutputDir string
// Filename is the name of your output file
// You can use a templated config to define the name of your export files.
// Available replacement are {{ .Hostname}}, {{ .Timestamp}} and {{ .Format}}
// Default: "flag-variation-{{ .Hostname}}-{{ .Timestamp}}.{{ .Format}}"
Filename string
// CsvTemplate is used if your output format is CSV.
// This field will be ignored if you are using another format than CSV.
// You can decide which fields you want in your CSV line with a go-template syntax,
// please check exporter/feature_event.go to see what are the fields available.
// Default:
// {{ .Kind}};{{ .ContextKind}};{{ .UserKey}};{{ .CreationDate}};{{ .Key}};{{ .Variation}};{{ .Value}};
// {{ .Default}};{{ .Source}}\n
CsvTemplate string
// ParquetCompressionCodec is the parquet compression codec for better space efficiency.
// Available options https://github.com/apache/parquet-format/blob/master/Compression.md
// Default: SNAPPY
ParquetCompressionCodec string
csvTemplate *template.Template
filenameTemplate *template.Template
initTemplates sync.Once
}
// Export is saving a collection of events in a file.
func (f *Exporter) Export(_ context.Context, _ *log.Logger, featureEvents []exporter.FeatureEvent) error {
// Parse the template only once
f.initTemplates.Do(func() {
f.csvTemplate = exporter.ParseTemplate("csvFormat", f.CsvTemplate, exporter.DefaultCsvTemplate)
f.filenameTemplate = exporter.ParseTemplate("filenameFormat", f.Filename, exporter.DefaultFilenameTemplate)
})
// Default format for the output
if f.Format == "" {
f.Format = "json"
}
f.Format = strings.ToLower(f.Format)
// Get the filename
filename, err := exporter.ComputeFilename(f.filenameTemplate, f.Format)
if err != nil {
return err
}
filePath := f.OutputDir + "/" + filename
if f.Format == "parquet" {
return f.writeParquet(filePath, featureEvents)
}
return f.writeFile(filePath, featureEvents)
}
// IsBulk return false if we should directly send the data as soon as it is produce
// and true if we collect the data to send them in bulk.
func (f *Exporter) IsBulk() bool {
return true
}
func (f *Exporter) writeFile(filePath string, featureEvents []exporter.FeatureEvent) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer file.Close()
for _, event := range featureEvents {
var line []byte
var err error
// Convert the line in the right format
switch f.Format {
case "csv":
line, err = exporter.FormatEventInCSV(f.csvTemplate, event)
case "json":
line, err = exporter.FormatEventInJSON(event)
default:
line, err = exporter.FormatEventInJSON(event)
}
// Handle error and write line into the file
if err != nil {
return fmt.Errorf("impossible to format the event in %s: %v", f.Format, err)
}
_, errWrite := file.Write(line)
if errWrite != nil {
return fmt.Errorf("error while writing the export file: %v", err)
}
}
return nil
}
func (f *Exporter) writeParquet(filePath string, featureEvents []exporter.FeatureEvent) error {
fw, err := local.NewLocalFileWriter(filePath)
if err != nil {
return err
}
defer fw.Close()
pw, err := writer.NewParquetWriter(fw, new(exporter.FeatureEvent), int64(runtime.NumCPU()))
if err != nil {
return err
}
pw.CompressionType = parquet.CompressionCodec_SNAPPY
if ct, err := parquet.CompressionCodecFromString(f.ParquetCompressionCodec); err == nil {
pw.CompressionType = ct
}
for _, event := range featureEvents {
if err := event.MarshalInterface(); err != nil {
return err
}
if err = pw.Write(event); err != nil {
return fmt.Errorf("error while writing the export file: %v", err)
}
}
return pw.WriteStop()
}