diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3da623a..78dd943 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,4 +30,6 @@ jobs: go get github.com/securego/gosec/cmd/gosec gosec ./... - name: Run Golang Linter - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v3 + with: + version: v1.45.2 diff --git a/go.mod b/go.mod index 29bc2e0..25946aa 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.17 require ( cloud.google.com/go/bigtable v1.13.0 + cloud.google.com/go/storage v1.10.0 + github.com/davecgh/go-spew v1.1.0 + github.com/pierrre/compare v1.0.2 + github.com/pkg/errors v0.9.1 google.golang.org/api v0.70.0 google.golang.org/grpc v1.44.0 ) diff --git a/go.sum b/go.sum index 96600a8..562f9db 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= +cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -78,6 +79,7 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -145,9 +147,11 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= +github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ= github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -182,6 +186,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pierrre/compare v1.0.2 h1:k4IUsHgh+dbcAOIWCfxVa/7G6STjADH2qmhomv+1quc= +github.com/pierrre/compare v1.0.2/go.mod h1:8UvyRHH+9HS8Pczdd2z5x/wvv67krDwVxoOndaIIDVU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/mapping/gcs_bucket.go b/mapping/gcs_bucket.go new file mode 100644 index 0000000..e3651aa --- /dev/null +++ b/mapping/gcs_bucket.go @@ -0,0 +1,71 @@ +package mapping + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "cloud.google.com/go/storage" + "github.com/pkg/errors" + "google.golang.org/api/option" +) + +type GcloudCreds struct { + Type string `json:"type"` + ProjectID string `json:"project_id"` + PrivateKeyID string `json:"private_key_id"` + PrivateKey string `json:"private_key"` + ClientEmail string `json:"client_email"` + ClientID string `json:"client_id"` + AuthURI string `json:"auth_uri"` + TokenURI string `json:"token_uri"` + AuthProvider string `json:"auth_provider_x509_cert_url"` + CertURL string `json:"client_x509_cert_url"` +} + +type gcsBucketGetter struct { + objectGetter interface { + Object(name string) *storage.ObjectHandle + } +} + +func NewGCSBucketGetter(gcreds *GcloudCreds, bucketName string) (*gcsBucketGetter, *storage.Client, error) { + credsB, err := json.Marshal(gcreds) + if err != nil { + return nil, nil, errors.Wrap(err, "json marshal") + } + + client, err := storage.NewClient(context.Background(), option.WithCredentialsJSON(credsB)) + if err != nil { + return nil, nil, errors.Wrap(err, "gcs storage client") + } + return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil +} + +func NewGCSBucketGetterFromEnvironment(bucketName string) (*gcsBucketGetter, *storage.Client, error) { + client, err := storage.NewClient(context.Background()) + if err != nil { + return nil, nil, errors.Wrap(err, "gcs storage client") + } + return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil +} + +func NewGCSBucketGetterWithClient(client *storage.Client, bucketName string) (*gcsBucketGetter, error) { + return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, nil +} + +// GetStorageWriter returns the storage writer for google cloud storage. +func (r *gcsBucketGetter) GetStorageWriter(ctx context.Context, fileName string) io.WriteCloser { + return r.objectGetter.Object(fileName).NewWriter(ctx) +} + +// GetStorageReader returns the storage reader for google cloud storage. +func (r *gcsBucketGetter) GetStorageReader(ctx context.Context, fileName string) (io.ReadCloser, error) { + return r.objectGetter.Object(fileName).NewReader(ctx) +} + +func getMappingFilename(eventFamily string, version string, environment string) string { + // event_family/v1.0.0.json + return fmt.Sprintf("%s/%s/%s.json", eventFamily, environment, version) +} diff --git a/mapping/mapper_test.go b/mapping/mapper_test.go index af1bee8..29a9010 100644 --- a/mapping/mapper_test.go +++ b/mapping/mapper_test.go @@ -119,12 +119,12 @@ func TestMapper(t *testing.T) { t.Fatal("should not raise an error") } mapper := NewMapper(mapping) - compare(t, mapper, "ui", "1233", "user_id", "1233") - compare(t, mapper, "oi", "1", "is_opted_in", "true") - compare(t, mapper, "3", "1", "order_status", "processing") + compareMappedData(t, mapper, "ui", "1233", "user_id", "1233") + compareMappedData(t, mapper, "oi", "1", "is_opted_in", "true") + compareMappedData(t, mapper, "3", "1", "order_status", "processing") } -func compare(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) { +func compareMappedData(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) { fCol, fVal := getMappedData(m.Mapping, m.rules.toEvent, col, val) if fCol != wantedCol { t.Fatalf("wrong column: wanted %s, got %s", wantedCol, fCol) diff --git a/mapping/mapping.go b/mapping/mapping.go index c5692d3..3838f88 100644 --- a/mapping/mapping.go +++ b/mapping/mapping.go @@ -1,6 +1,6 @@ /* Package mapping provides the API to convert data coming from Big Table into a data.Set. - */ +*/ /* The mapping system is based on a set of rules described inside a JSON mapping file, here's an example: { @@ -48,13 +48,17 @@ The mapping system will map it to a new data.Event containing the following data "is_opted_in": "true", "order_status": "processing" } - */ +*/ package mapping import ( "encoding/json" + "fmt" + "io" "os" "path/filepath" + + "github.com/pkg/errors" ) // Mapping describes the mapping between data stored in Big Table and its human-readable representation. @@ -84,6 +88,32 @@ func LoadMapping(c []byte) (*Mapping, error) { return m, nil } +// LoadMappingVersion loads a mapping from a slice of bytes and its version. +// You can use this function if you prefer to open the mapping file yourself. +func LoadMappingVersion(c []byte, version string) (*Mapping, error) { + mv := map[string]*Mapping{} + err := json.Unmarshal(c, &mv) + if err != nil { + return nil, err + } + m, ok := mv[version] + if !ok { + return nil, errors.New(fmt.Sprintf("no mapping found for version %s", version)) + } + return m, nil +} + +// LoadMappingIO loads a mapping from a IO reader. +func LoadMappingIO(reader io.ReadCloser) (*Mapping, error) { + m := &Mapping{} + decoder := json.NewDecoder(reader) + err := decoder.Decode(&m) + if err != nil { + return nil, errors.Wrap(err, "decode mapping") + } + return m, nil +} + // LoadMappingFromFile loads a mapping from a file. func LoadMappingFromFile(path string) (*Mapping, error) { c, err := os.ReadFile(filepath.Clean(path)) diff --git a/mapping/reader.go b/mapping/reader.go new file mode 100644 index 0000000..b0d0bbb --- /dev/null +++ b/mapping/reader.go @@ -0,0 +1,48 @@ +package mapping + +import ( + "context" + "io" + "time" + + "cloud.google.com/go/storage" +) + +type Reader struct { + readerBucket func(ctx context.Context, fileName string) (io.ReadCloser, error) +} + +func NewReader(gcreds *GcloudCreds, bucketName string) (*Reader, *storage.Client, error) { + gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName) + if err != nil { + return nil, nil, err + } + return &Reader{ + readerBucket: gb.GetStorageReader, + }, gbClient, nil +} + +func newReaderFromGCSClient(gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) *Reader { + return &Reader{ + readerBucket: gbSL, + } +} + +func (r *Reader) Load(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*50) + defer cancel() + filename := getMappingFilename(eventFamily, version, environment) + reader, err := r.readerBucket(ctx, filename) + if err != nil { + return nil, err + } + m, err := LoadMappingIO(reader) + if err != nil { + return nil, err + } + err = reader.Close() + if err != nil { + return nil, err + } + return m, nil +} diff --git a/mapping/writer.go b/mapping/writer.go new file mode 100644 index 0000000..94d4479 --- /dev/null +++ b/mapping/writer.go @@ -0,0 +1,85 @@ +package mapping + +import ( + "context" + "encoding/json" + "io" + "time" + + std_errors "errors" + + "cloud.google.com/go/storage" + "github.com/davecgh/go-spew/spew" + "github.com/pierrre/compare" + "github.com/pkg/errors" +) + +type Writer struct { + writerBucket func(ctx context.Context, fileName string) io.WriteCloser + readerLoad func(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error) +} + +func NewWriter(gcreds *GcloudCreds, bucketName string) (*Writer, *storage.Client, error) { + gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName) + if err != nil { + return nil, nil, errors.Wrap(err, "new gcs bucket") + } + + return &Writer{ + writerBucket: gb.GetStorageWriter, + readerLoad: newReaderFromGCSClient(gb.GetStorageReader).Load, + }, gbClient, nil +} + +func NewWriterFromGCSClient(gbSW func(ctx context.Context, fileName string) io.WriteCloser, gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) (*Writer, error) { + return &Writer{ + writerBucket: gbSW, + readerLoad: newReaderFromGCSClient(gbSL).Load, + }, nil +} + +func (w *Writer) Upload(ctx context.Context, eventFamily, version string, environment string, writeMapping *Mapping, forceUpload bool) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*50) + defer cancel() + filename := getMappingFilename(eventFamily, version, environment) + //if force upload is false, we check for already existing mapping and return without overwriting + if !forceUpload { + readMapping, err := w.readerLoad(ctx, eventFamily, version, environment) + if err != nil && UnwrapAll(err) != storage.ErrObjectNotExist { + return errors.Wrap(err, "get storage reader") + } + diff := compare.Compare(readMapping, writeMapping) + if readMapping != nil { + return errors.Errorf("mapping already exists:\nread:\n%s\nwrite:\n%s\ndiff:\n%+v", spew.Sdump(readMapping), spew.Sdump(writeMapping), diff) + } + } + + // only if force upload is true or object does not exists + writer := w.writerBucket(ctx, filename) + encoder := json.NewEncoder(writer) + err := encoder.Encode(writeMapping) + if err != nil { + return errors.Wrap(err, "encode mapping") + } + err = writer.Close() + if err != nil { + return errors.Wrap(err, "close uploaded gcp file") + } + return nil +} + +// Unwrap calls std_errors.Unwrap. +func Unwrap(err error) error { + return std_errors.Unwrap(err) +} + +// UnwrapAll unwraps all nested errors, and returns the last one. +func UnwrapAll(err error) error { + for { + werr := Unwrap(err) + if werr == nil { + return err + } + err = werr + } +} diff --git a/repository/repository_test.go b/repository/repository_test.go index 662de09..21d5412 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -4,13 +4,14 @@ import ( "context" "embed" "fmt" - "google.golang.org/grpc/credentials/insecure" "log" "regexp" "strconv" "testing" "time" + "google.golang.org/grpc/credentials/insecure" + "cloud.google.com/go/bigtable" "cloud.google.com/go/bigtable/bttest" "github.com/sendinblue/bigtable-access-layer/data"