-
Notifications
You must be signed in to change notification settings - Fork 1
/
writer.go
85 lines (74 loc) · 2.47 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package mapping
import (
"context"
"encoding/json"
"io"
"time"
std_errors "errors"
"cloud.google.com/go/storage"
"github.com/davecgh/go-spew/spew"
"github.com/pierrre/compare"
"github.com/pkg/errors"
)
type Writer struct {
writerBucket func(ctx context.Context, fileName string) io.WriteCloser
readerLoad func(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error)
}
func NewWriter(gcreds *GcloudCreds, bucketName string) (*Writer, *storage.Client, error) {
gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName)
if err != nil {
return nil, nil, errors.Wrap(err, "new gcs bucket")
}
return &Writer{
writerBucket: gb.GetStorageWriter,
readerLoad: newReaderFromGCSClient(gb.GetStorageReader).Load,
}, gbClient, nil
}
func NewWriterFromGCSClient(gbSW func(ctx context.Context, fileName string) io.WriteCloser, gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) (*Writer, error) {
return &Writer{
writerBucket: gbSW,
readerLoad: newReaderFromGCSClient(gbSL).Load,
}, nil
}
func (w *Writer) Upload(ctx context.Context, eventFamily, version string, environment string, writeMapping *Mapping, forceUpload bool) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*50)
defer cancel()
filename := getMappingFilename(eventFamily, version, environment)
//if force upload is false, we check for already existing mapping and return without overwriting
if !forceUpload {
readMapping, err := w.readerLoad(ctx, eventFamily, version, environment)
if err != nil && UnwrapAll(err) != storage.ErrObjectNotExist {
return errors.Wrap(err, "get storage reader")
}
diff := compare.Compare(readMapping, writeMapping)
if readMapping != nil {
return errors.Errorf("mapping already exists:\nread:\n%s\nwrite:\n%s\ndiff:\n%+v", spew.Sdump(readMapping), spew.Sdump(writeMapping), diff)
}
}
// only if force upload is true or object does not exists
writer := w.writerBucket(ctx, filename)
encoder := json.NewEncoder(writer)
err := encoder.Encode(writeMapping)
if err != nil {
return errors.Wrap(err, "encode mapping")
}
err = writer.Close()
if err != nil {
return errors.Wrap(err, "close uploaded gcp file")
}
return nil
}
// Unwrap calls std_errors.Unwrap.
func Unwrap(err error) error {
return std_errors.Unwrap(err)
}
// UnwrapAll unwraps all nested errors, and returns the last one.
func UnwrapAll(err error) error {
for {
werr := Unwrap(err)
if werr == nil {
return err
}
err = werr
}
}