Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ jobs:
go get github.com/securego/gosec/cmd/gosec
gosec ./...
- name: Run Golang Linter
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v3
with:
version: v1.45.2
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ go 1.17

require (
cloud.google.com/go/bigtable v1.13.0
cloud.google.com/go/storage v1.10.0
github.com/davecgh/go-spew v1.1.0
github.com/pierrre/compare v1.0.2
github.com/pkg/errors v0.9.1
google.golang.org/api v0.70.0
google.golang.org/grpc v1.44.0
)
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand All @@ -78,6 +79,7 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -145,9 +147,11 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down Expand Up @@ -182,6 +186,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrre/compare v1.0.2 h1:k4IUsHgh+dbcAOIWCfxVa/7G6STjADH2qmhomv+1quc=
github.com/pierrre/compare v1.0.2/go.mod h1:8UvyRHH+9HS8Pczdd2z5x/wvv67krDwVxoOndaIIDVU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
71 changes: 71 additions & 0 deletions mapping/gcs_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package mapping

import (
"context"
"encoding/json"
"fmt"
"io"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"google.golang.org/api/option"
)

type GcloudCreds struct {
Type string `json:"type"`
ProjectID string `json:"project_id"`
PrivateKeyID string `json:"private_key_id"`
PrivateKey string `json:"private_key"`
ClientEmail string `json:"client_email"`
ClientID string `json:"client_id"`
AuthURI string `json:"auth_uri"`
TokenURI string `json:"token_uri"`
AuthProvider string `json:"auth_provider_x509_cert_url"`
CertURL string `json:"client_x509_cert_url"`
}

type gcsBucketGetter struct {
objectGetter interface {
Object(name string) *storage.ObjectHandle
}
}

func NewGCSBucketGetter(gcreds *GcloudCreds, bucketName string) (*gcsBucketGetter, *storage.Client, error) {
credsB, err := json.Marshal(gcreds)
if err != nil {
return nil, nil, errors.Wrap(err, "json marshal")
}

client, err := storage.NewClient(context.Background(), option.WithCredentialsJSON(credsB))
if err != nil {
return nil, nil, errors.Wrap(err, "gcs storage client")
}
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil
}

func NewGCSBucketGetterFromEnvironment(bucketName string) (*gcsBucketGetter, *storage.Client, error) {
client, err := storage.NewClient(context.Background())
if err != nil {
return nil, nil, errors.Wrap(err, "gcs storage client")
}
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, client, nil
}

func NewGCSBucketGetterWithClient(client *storage.Client, bucketName string) (*gcsBucketGetter, error) {
return &gcsBucketGetter{objectGetter: client.Bucket(bucketName)}, nil
}

// GetStorageWriter returns the storage writer for google cloud storage.
func (r *gcsBucketGetter) GetStorageWriter(ctx context.Context, fileName string) io.WriteCloser {
return r.objectGetter.Object(fileName).NewWriter(ctx)
}

// GetStorageReader returns the storage reader for google cloud storage.
func (r *gcsBucketGetter) GetStorageReader(ctx context.Context, fileName string) (io.ReadCloser, error) {
return r.objectGetter.Object(fileName).NewReader(ctx)
}

func getMappingFilename(eventFamily string, version string, environment string) string {
// event_family/v1.0.0.json
return fmt.Sprintf("%s/%s/%s.json", eventFamily, environment, version)
}
8 changes: 4 additions & 4 deletions mapping/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ func TestMapper(t *testing.T) {
t.Fatal("should not raise an error")
}
mapper := NewMapper(mapping)
compare(t, mapper, "ui", "1233", "user_id", "1233")
compare(t, mapper, "oi", "1", "is_opted_in", "true")
compare(t, mapper, "3", "1", "order_status", "processing")
compareMappedData(t, mapper, "ui", "1233", "user_id", "1233")
compareMappedData(t, mapper, "oi", "1", "is_opted_in", "true")
compareMappedData(t, mapper, "3", "1", "order_status", "processing")
}

func compare(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) {
func compareMappedData(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) {
fCol, fVal := getMappedData(m.Mapping, m.rules.toEvent, col, val)
if fCol != wantedCol {
t.Fatalf("wrong column: wanted %s, got %s", wantedCol, fCol)
Expand Down
34 changes: 32 additions & 2 deletions mapping/mapping.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Package mapping provides the API to convert data coming from Big Table into a data.Set.
*/
*/
/*
The mapping system is based on a set of rules described inside a JSON mapping file, here's an example:
{
Expand Down Expand Up @@ -48,13 +48,17 @@ The mapping system will map it to a new data.Event containing the following data
"is_opted_in": "true",
"order_status": "processing"
}
*/
*/
package mapping

import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"

"github.com/pkg/errors"
)

// Mapping describes the mapping between data stored in Big Table and its human-readable representation.
Expand Down Expand Up @@ -84,6 +88,32 @@ func LoadMapping(c []byte) (*Mapping, error) {
return m, nil
}

// LoadMappingVersion loads a mapping from a slice of bytes and its version.
// You can use this function if you prefer to open the mapping file yourself.
func LoadMappingVersion(c []byte, version string) (*Mapping, error) {
mv := map[string]*Mapping{}
err := json.Unmarshal(c, &mv)
if err != nil {
return nil, err
}
m, ok := mv[version]
if !ok {
return nil, errors.New(fmt.Sprintf("no mapping found for version %s", version))
}
return m, nil
}

// LoadMappingIO loads a mapping from a IO reader.
func LoadMappingIO(reader io.ReadCloser) (*Mapping, error) {
m := &Mapping{}
decoder := json.NewDecoder(reader)
err := decoder.Decode(&m)
if err != nil {
return nil, errors.Wrap(err, "decode mapping")
}
return m, nil
}

// LoadMappingFromFile loads a mapping from a file.
func LoadMappingFromFile(path string) (*Mapping, error) {
c, err := os.ReadFile(filepath.Clean(path))
Expand Down
48 changes: 48 additions & 0 deletions mapping/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mapping

import (
"context"
"io"
"time"

"cloud.google.com/go/storage"
)

type Reader struct {
readerBucket func(ctx context.Context, fileName string) (io.ReadCloser, error)
}

func NewReader(gcreds *GcloudCreds, bucketName string) (*Reader, *storage.Client, error) {
gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName)
if err != nil {
return nil, nil, err
}
return &Reader{
readerBucket: gb.GetStorageReader,
}, gbClient, nil
}

func newReaderFromGCSClient(gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) *Reader {
return &Reader{
readerBucket: gbSL,
}
}

func (r *Reader) Load(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*50)
defer cancel()
filename := getMappingFilename(eventFamily, version, environment)
reader, err := r.readerBucket(ctx, filename)
if err != nil {
return nil, err
}
m, err := LoadMappingIO(reader)
if err != nil {
return nil, err
}
err = reader.Close()
if err != nil {
return nil, err
}
return m, nil
}
85 changes: 85 additions & 0 deletions mapping/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package mapping

import (
"context"
"encoding/json"
"io"
"time"

std_errors "errors"

"cloud.google.com/go/storage"
"github.com/davecgh/go-spew/spew"
"github.com/pierrre/compare"
"github.com/pkg/errors"
)

type Writer struct {
writerBucket func(ctx context.Context, fileName string) io.WriteCloser
readerLoad func(ctx context.Context, eventFamily string, version string, environment string) (*Mapping, error)
}

func NewWriter(gcreds *GcloudCreds, bucketName string) (*Writer, *storage.Client, error) {
gb, gbClient, err := NewGCSBucketGetter(gcreds, bucketName)
if err != nil {
return nil, nil, errors.Wrap(err, "new gcs bucket")
}

return &Writer{
writerBucket: gb.GetStorageWriter,
readerLoad: newReaderFromGCSClient(gb.GetStorageReader).Load,
}, gbClient, nil
}

func NewWriterFromGCSClient(gbSW func(ctx context.Context, fileName string) io.WriteCloser, gbSL func(ctx context.Context, fileName string) (io.ReadCloser, error)) (*Writer, error) {
return &Writer{
writerBucket: gbSW,
readerLoad: newReaderFromGCSClient(gbSL).Load,
}, nil
}

func (w *Writer) Upload(ctx context.Context, eventFamily, version string, environment string, writeMapping *Mapping, forceUpload bool) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*50)
defer cancel()
filename := getMappingFilename(eventFamily, version, environment)
//if force upload is false, we check for already existing mapping and return without overwriting
if !forceUpload {
readMapping, err := w.readerLoad(ctx, eventFamily, version, environment)
if err != nil && UnwrapAll(err) != storage.ErrObjectNotExist {
return errors.Wrap(err, "get storage reader")
}
diff := compare.Compare(readMapping, writeMapping)
if readMapping != nil {
return errors.Errorf("mapping already exists:\nread:\n%s\nwrite:\n%s\ndiff:\n%+v", spew.Sdump(readMapping), spew.Sdump(writeMapping), diff)
}
}

// only if force upload is true or object does not exists
writer := w.writerBucket(ctx, filename)
encoder := json.NewEncoder(writer)
err := encoder.Encode(writeMapping)
if err != nil {
return errors.Wrap(err, "encode mapping")
}
err = writer.Close()
if err != nil {
return errors.Wrap(err, "close uploaded gcp file")
}
return nil
}

// Unwrap calls std_errors.Unwrap.
func Unwrap(err error) error {
return std_errors.Unwrap(err)
}

// UnwrapAll unwraps all nested errors, and returns the last one.
func UnwrapAll(err error) error {
for {
werr := Unwrap(err)
if werr == nil {
return err
}
err = werr
}
}
3 changes: 2 additions & 1 deletion repository/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"embed"
"fmt"
"google.golang.org/grpc/credentials/insecure"
"log"
"regexp"
"strconv"
"testing"
"time"

"google.golang.org/grpc/credentials/insecure"

"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/bttest"
"github.com/sendinblue/bigtable-access-layer/data"
Expand Down