Skip to content

Commit

Permalink
Merge pull request #1 from sendinblue/issue/merge-upstream
Browse files Browse the repository at this point in the history
ReadLast & Read events from one family
  • Loading branch information
alexdebril authored May 18, 2022
2 parents 864a024 + dfb3187 commit 6311b61
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 12 deletions.
21 changes: 10 additions & 11 deletions mapping/mapper_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mapping

import (
"fmt"
"log"
"testing"
"time"
Expand Down Expand Up @@ -32,10 +31,10 @@ func TestSeekRaw(t *testing.T) {
t.Fatal("should NOT have found the column")
}
if col != "" {
t.Fatal(fmt.Sprintf("column must be empty, got %s \n", col))
t.Fatalf("column must be empty, got %s \n", col)
}
if val != "" {
t.Fatal(fmt.Sprintf("value must be empty, got %s \n", val))
t.Fatalf("value must be empty, got %s \n", val)
}
}

Expand Down Expand Up @@ -69,10 +68,10 @@ func TestSeekMapped(t *testing.T) {
t.Fatal("should NOT have found the column")
}
if col != "" {
t.Fatal(fmt.Sprintf("column must be empty, got %s \n", col))
t.Fatalf("column must be empty, got %s \n", col)
}
if val != "" {
t.Fatal(fmt.Sprintf("value must be empty, got %s \n", val))
t.Fatalf("value must be empty, got %s \n", val)
}
}

Expand Down Expand Up @@ -128,10 +127,10 @@ func TestMapper(t *testing.T) {
func compare(t *testing.T, m *Mapper, col string, val string, wantedCol string, wantedVal string) {
fCol, fVal := getMappedData(m.Mapping, m.rules.toEvent, col, val)
if fCol != wantedCol {
t.Fatal(fmt.Sprintf("wrong column: wanted %s, got %s", wantedCol, fCol))
t.Fatalf("wrong column: wanted %s, got %s", wantedCol, fCol)
}
if fVal != wantedVal {
t.Fatal(fmt.Sprintf("wrong value: wanted %s, got %s", wantedVal, fVal))
t.Fatalf("wrong value: wanted %s, got %s", wantedVal, fVal)
}
}

Expand Down Expand Up @@ -228,10 +227,10 @@ func TestTurnToShortColumn(t *testing.T) {
t.Fatal("should NOT have found the column")
}
if col != "" {
t.Fatal(fmt.Sprintf("column must be empty, got %s \n", col))
t.Fatalf("column must be empty, got %s \n", col)
}
if val != "" {
t.Fatal(fmt.Sprintf("value must be empty, got %s \n", val))
t.Fatalf("value must be empty, got %s \n", val)
}
}

Expand Down Expand Up @@ -265,10 +264,10 @@ func TestTurnToMappedColumnValue(t *testing.T) {
t.Fatal("should NOT have found the column")
}
if col != "" {
t.Fatal(fmt.Sprintf("column must be empty, got %s \n", col))
t.Fatalf("column must be empty, got %s \n", col)
}
if val != "" {
t.Fatal(fmt.Sprintf("value must be empty, got %s \n", val))
t.Fatalf("value must be empty, got %s \n", val)
}
}

Expand Down
27 changes: 26 additions & 1 deletion repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,32 @@ This method takes a row key as an argument, uses its internal adapter to read th
parses all cells contained in the row to turn it into a map of data.Event and finally returns the data.Set that contains all the events.
*/
func (r *Repository) Read(ctx context.Context, key string) (*data.Set, error) {
row, err := r.adapter.ReadRow(ctx, key)
return r.read(ctx, key)
}

/*
ReadFamily reads a row from the repository keeping only the desired column family and map it to a data.Set
This method takes a row key and the column family as an argument, uses its internal adapter to read the row from Big Table,
parses all cells contained in the row to turn it into a map of data.Event and finally returns the data.Set that contains all the events.
Be careful, this method will perform an exact match on the column family name.
*/
func (r *Repository) ReadFamily(ctx context.Context, key string, family string) (*data.Set, error) {
familyFilter := bigtable.RowFilter(bigtable.FamilyFilter(family))
return r.read(ctx, key, familyFilter)
}

// ReadLast reads a row from the repository while returning only the latest cell values after
// mapping it to a data.Set. This method takes a row key as an argument, uses its internal adapter
// to read the row from Big Table, parses only the latest cells contained in the row to turn it into
// a map of data.Event and finally returns the data.Set that contains all the events.
func (r *Repository) ReadLast(ctx context.Context, key string) (*data.Set, error) {
return r.read(ctx, key, bigtable.RowFilter(bigtable.LatestNFilter(1)))
}

func (r *Repository) read(ctx context.Context, key string, opts ...bigtable.ReadOption) (*data.Set, error) {
row, err := r.adapter.ReadRow(ctx, key, opts...)
if err != nil {
return nil, err
}
Expand Down
246 changes: 246 additions & 0 deletions repository/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,143 @@ func ExampleRepository_Search() {
// Computer
}

func ExampleRepository_ReadLast() {
ctx := context.Background()
client := getBigTableClient(ctx)
c, err := fs.ReadFile("testdata/mapping.json")
if err != nil {
log.Fatalln(err)
}
jsonMapping, err := mapping.LoadMapping(c)
if err != nil {
log.Fatalln(err)
}
mapper := mapping.NewMapper(jsonMapping)
tbl := client.Open(table)

repo := NewRepository(tbl, mapper)
eventSet := &data.Set{Events: map[string][]*data.Event{
"front": {
{
RowKey: "contactz-102",
Date: time.Date(2018, time.January, 1, 0, 2, 0, 0, time.UTC),
Cells: map[string]string{
"event_type": "add_to_cart",
"device_type": "Computer",
"url": "https://example.org/some/product",
},
},
},
}}

// insert
errs, err := repo.Write(ctx, eventSet)
if err != nil {
log.Fatalln(err)
}
if len(errs) > 0 {
log.Fatalln(errs)
}

// update
eventSet = &data.Set{Events: map[string][]*data.Event{
"front": {
{
RowKey: "contactz-102",
Date: time.Now(),
Cells: map[string]string{
"event_type": "purchase",
"device_type": "Smartphone",
"url": "https://example.org/some/cart",
},
},
},
}}
errs, err = repo.Write(ctx, eventSet)
if err != nil {
log.Fatalln(err)
}
if len(errs) > 0 {
log.Fatalln(errs)
}

readSet, err := repo.ReadLast(ctx, "contactz-102")
if err != nil {
log.Fatalln(err)
}
for _, event := range readSet.Events["front"] {
fmt.Println(event.Cells["event_type"])
fmt.Println(event.Cells["device_type"])
}
// Output:
// add_to_cart
//
// purchase
// Smartphone
}

func ExampleRepository_ReadFamily() {
ctx := context.Background()
client := getBigTableClient(ctx)
c, err := fs.ReadFile("testdata/mapping.json")
if err != nil {
log.Fatalln(err)
}
jsonMapping, err := mapping.LoadMapping(c)
if err != nil {
log.Fatalln(err)
}
mapper := mapping.NewMapper(jsonMapping)
tbl := client.Open(table)

repo := NewRepository(tbl, mapper)
eventSet := &data.Set{Events: map[string][]*data.Event{
"front": {
{
RowKey: "contactz-102",
Date: time.Date(2018, time.January, 1, 0, 2, 0, 0, time.UTC),
Cells: map[string]string{
"event_type": "add_to_cart",
"device_type": "Computer",
"url": "https://example.org/some/product",
},
},
},
"blog": {
{
RowKey: "contactz-102",
Date: time.Date(2018, time.January, 1, 0, 2, 0, 0, time.UTC),
Cells: map[string]string{
"event_type": "page_view",
"device_type": "Computer",
"url": "https://example.org/blog/article/1",
},
},
},
}}

// insert
errs, err := repo.Write(ctx, eventSet)
if err != nil {
log.Fatalln(err)
}
if len(errs) > 0 {
log.Fatalln(errs)
}

readSet, err := repo.ReadFamily(ctx, "contactz-102", "blog")
if err != nil {
log.Fatalln(err)
}
for _, event := range readSet.Events["blog"] {
fmt.Println(event.Cells["event_type"])
fmt.Println(event.Cells["device_type"])
}
// Output:
// page_view
// Computer
}

var t1 = bigtable.Time(time.Date(2020, time.January, 1, 0, 1, 0, 0, time.UTC))
var t2 = bigtable.Time(time.Date(2020, time.January, 1, 0, 2, 0, 0, time.UTC))
var t3 = bigtable.Time(time.Date(2020, time.January, 1, 0, 3, 0, 0, time.UTC))
Expand Down Expand Up @@ -360,6 +497,111 @@ func TestRepository_Search(t *testing.T) {

}

func TestRepository_ReadLast(t *testing.T) {
ctx := context.Background()
repository := &Repository{
adapter: mockAdapter{},
mapper: getMockMapper(t),
}
eventSet, err := repository.ReadLast(ctx, "contact-3")
if err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatalf("failed to read: %v", err)
}
if len(eventSet.Events) != 1 {
t.Fatalf("expected 1 event family, got %d", len(eventSet.Events))
}
if v, ok := eventSet.Events["front"]; !ok {
t.Fatalf("expected front family, got %v", v)
} else {
if len(v) != 3 {
t.Fatalf("expected 3 events, got %d", len(v))
}

if v[0].RowKey != "contact-3" {
t.Fatalf("expected contact-3, got %s", v[0].RowKey)
}
if v[0].Cells["url"] != "http://someexample.url/query/string/1" {
t.Fatalf("expected http://someexample.url/query/string/1, got %s", v[0].Cells["url"])
}
if v[0].Cells["device_type"] != "Smartphone" {
t.Fatalf("expected Smartphone, got %s", v[0].Cells["device_type"])
}
// here we're testing each event_type depending on the timestamp.
// It's because Go doesn't guarantee the order of the map iteration
for _, event := range v {
if event.Date.Unix() == t1.Time().Unix() {
if event.Cells["event_type"] != "page_view" {
t.Fatalf("expected page_view, got %s", event.Cells["event_type"])
}
}
if event.Date.Unix() == t2.Time().Unix() {
if event.Cells["event_type"] != "add_to_cart" {
t.Fatalf("expected add_to_cart, got %s", event.Cells["event_type"])
}
}
if event.Date.Unix() == t3.Time().Unix() {
if event.Cells["event_type"] != "purchase" {
t.Fatalf("expected purchase, got %s", event.Cells["event_type"])
}
}
}
}
}

func TestRepository_ReadFamily(t *testing.T) {
ctx := context.Background()
repository := &Repository{
adapter: mockAdapter{},
mapper: getMockMapper(t),
}
eventSet, err := repository.ReadFamily(ctx, "contact-3", "front")
if err != nil {
t.Fatalf("failed to read: %v", err)
}
if len(eventSet.Events) != 1 {
t.Fatalf("expected 1 event family, got %d", len(eventSet.Events))
}
if v, ok := eventSet.Events["front"]; !ok {
t.Fatalf("expected front family, got %v", v)
} else {
if len(v) != 3 {
t.Fatalf("expected 3 events, got %d", len(v))
}

if v[0].RowKey != "contact-3" {
t.Fatalf("expected contact-3, got %s", v[0].RowKey)
}
if v[0].Cells["url"] != "http://someexample.url/query/string/1" {
t.Fatalf("expected http://someexample.url/query/string/1, got %s", v[0].Cells["url"])
}
if v[0].Cells["device_type"] != "Smartphone" {
t.Fatalf("expected Smartphone, got %s", v[0].Cells["device_type"])
}
// here we're testing each event_type depending on the timestamp.
// It's because Go doesn't guarantee the order of the map iteration
for _, event := range v {
if event.Date.Unix() == t1.Time().Unix() {
if event.Cells["event_type"] != "page_view" {
t.Fatalf("expected page_view, got %s", event.Cells["event_type"])
}
}
if event.Date.Unix() == t2.Time().Unix() {
if event.Cells["event_type"] != "add_to_cart" {
t.Fatalf("expected add_to_cart, got %s", event.Cells["event_type"])
}
}
if event.Date.Unix() == t3.Time().Unix() {
if event.Cells["event_type"] != "purchase" {
t.Fatalf("expected purchase, got %s", event.Cells["event_type"])
}
}
}
}
}

//go:embed testdata/mapping.json
var fs embed.FS

Expand Down Expand Up @@ -497,6 +739,10 @@ func getBigTableClient(ctx context.Context) *bigtable.Client {
log.Fatalln(err)
}

if err = adminClient.CreateColumnFamily(ctx, table, "blog"); err != nil {
log.Fatalln(err)
}

client, err := bigtable.NewClient(ctx, projectID, instance, option.WithGRPCConn(conn))
if err != nil {
log.Fatalln(err)
Expand Down

0 comments on commit 6311b61

Please sign in to comment.