Skip to content

Commit

Permalink
Merge 1a43a08 into 4ce2d00
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Jun 19, 2020
2 parents 4ce2d00 + 1a43a08 commit ab89084
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
23 changes: 14 additions & 9 deletions job-service/job-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ func (y *YesterdaySource) nextJob(ctx context.Context) *tracker.JobWithTarget {

ctx, cf := context.WithTimeout(ctx, 5*time.Second)
defer cf()
log.Println("Saving", y)
y.saver.Save(ctx, y)
log.Println("Saving", y.GetName(), y.GetKind(), y.Date.Format("2006-01-02"))
err := y.saver.Save(ctx, y)
if err != nil {
log.Println(err)
}
}

return &job
Expand Down Expand Up @@ -95,13 +98,13 @@ func initYesterday(ctx context.Context, saver persistence.Saver, delay time.Dura
}

// GetName implements StateObject.GetName
func (y *YesterdaySource) GetName() string {
func (y YesterdaySource) GetName() string {
return "singleton" // There is only one job service.
}

// GetKind implements StateObject.GetKind
func (y *YesterdaySource) GetKind() string {
return reflect.TypeOf(y).Name()
func (y YesterdaySource) GetKind() string {
return reflect.TypeOf(y).String()
}

// Service contains all information needed to provide a job service.
Expand All @@ -119,7 +122,7 @@ type Service struct {

// All fields above are const after initialization.
// All fields below are protected by *lock*
lock sync.Mutex
lock *sync.Mutex

// The Date is exported for persistence. It is the only field
// that is recovered after restart. All others are injected
Expand Down Expand Up @@ -160,6 +163,7 @@ func (svc *Service) NextJob(ctx context.Context) tracker.JobWithTarget {
// Note that this will block other calls to NextJob
ctx, cf := context.WithTimeout(ctx, 5*time.Second)
defer cf()
log.Println("Saving", svc.GetName(), svc.GetKind(), svc.Date.Format("2006-01-02"))
err := svc.saver.Save(ctx, svc)
if err != nil {
log.Println(err)
Expand Down Expand Up @@ -268,6 +272,7 @@ func NewJobService(ctx context.Context, tk jobAdder, startDate time.Time,
saver: saver,
jobSpecs: specs,
startDate: startDate,
lock: &sync.Mutex{},
nextIndex: 0,
yesterday: yesterday,
}
Expand All @@ -280,11 +285,11 @@ func NewJobService(ctx context.Context, tk jobAdder, startDate time.Time,
// ---------- Implement persistence.StateObject -------------

// GetName implements StateObject.GetName
func (svc *Service) GetName() string {
func (svc Service) GetName() string {
return "singleton" // There is only one job service.
}

// GetKind implements StateObject.GetKind
func (svc *Service) GetKind() string {
return reflect.TypeOf(svc).Name()
func (svc Service) GetKind() string {
return reflect.TypeOf(svc).String()
}
36 changes: 36 additions & 0 deletions job-service/job-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"time"

"bou.ke/monkey"
"cloud.google.com/go/datastore"
"github.com/go-test/deep"

"github.com/m-lab/go/rtx"

"github.com/m-lab/etl-gardener/config"
"github.com/m-lab/etl-gardener/job-service"
"github.com/m-lab/etl-gardener/persistence"
Expand Down Expand Up @@ -352,3 +355,36 @@ func TestEarlyWrapping(t *testing.T) {
}
}
}

func assertYesterdayStateObject(so persistence.StateObject) {
assertYesterdayStateObject(&job.YesterdaySource{})
}

func TestPersistence(t *testing.T) {
ctx := context.Background()
ds, err := persistence.NewDatastoreSaver(ctx, "mlab-testing")
if err != nil {
t.Fatal(err)
}

now := time.Now()
svc := job.Service{Date: now}
err = ds.Save(ctx, &svc)
rtx.Must(err, "Save error")
t.Log(&svc)

svc.Date = time.Time{}
err = ds.Fetch(ctx, &svc)
rtx.Must(err, "Fetch error")
t.Log(&svc)
if svc.Date.Unix() != now.Unix() {
t.Error("Date should be now", &svc, now.Unix(), svc.Date.Unix())
}

err = ds.Delete(ctx, &svc)
rtx.Must(err, "Delete error")
err = ds.Fetch(ctx, &svc)
if err != datastore.ErrNoSuchEntity {
t.Fatal("Should have errored", err)
}
}
11 changes: 10 additions & 1 deletion persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"log"
"reflect"
"time"

Expand All @@ -11,7 +12,11 @@ import (
// StateObject defines the interface for objects to be saved/retrieved from datastore.
type StateObject interface {
GetName() string
GetKind() string // Should be implemented in the actual type, as return reflect.TypeOf(o).Name()
// Should be implemented in the actual type, as
// func (o ConcreteType) GetKind() string {
// return reflect.TypeOf(o).String()
// }
GetKind() string
}

// Base is the base for persistent objects. All StateObjects should embed
Expand Down Expand Up @@ -66,6 +71,7 @@ func (ds *DatastoreSaver) Save(ctx context.Context, o StateObject) error {
defer cancel()
_, err := ds.Client.Put(ctx, ds.key(o), o)
if err != nil {
log.Println("Save error", err, ds.key(o))
return err
}
return nil
Expand All @@ -77,6 +83,7 @@ func (ds *DatastoreSaver) Delete(ctx context.Context, o StateObject) error {
defer cancel()
err := ds.Client.Delete(ctx, ds.key(o))
if err != nil {
log.Println("Delete error", err, ds.key(o))
return err
}
return nil
Expand All @@ -95,6 +102,7 @@ func (ds *DatastoreSaver) FetchAll(ctx context.Context, o StateObject) ([]*datas
q := datastore.NewQuery(o.GetKind()).Namespace(ds.Namespace)
keys, err := ds.Client.GetAll(ctx, q.KeysOnly(), nil)
if err != nil {
log.Println("FetchAll error", err)
return nil, nil, err
}
// Passing .Interface() to GetAll doesn't work, whether the slice is empty
Expand All @@ -108,6 +116,7 @@ func (ds *DatastoreSaver) FetchAll(ctx context.Context, o StateObject) ([]*datas
objs := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(o)), len(keys), len(keys)).Interface()
err = ds.Client.GetMulti(ctx, keys, objs)
if err != nil {
log.Println(err)
return nil, nil, err
}
return keys, objs, err
Expand Down

0 comments on commit ab89084

Please sign in to comment.