Skip to content

Commit

Permalink
Merge 66a0e95 into 37c2884
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Feb 22, 2018
2 parents 37c2884 + 66a0e95 commit f8260fc
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 46 deletions.
11 changes: 3 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@ before_install:
- source "${HOME}/google-cloud-sdk/path.bash.inc"

# Install test credentials.
# These are uploaded to travis by doing this from within the repository
# $ gcloud --project mlab-testing iam service-accounts keys create --iam-account go-travis-testing@mlab-testing.iam.gserviceaccount.com go_travis_testing.json
# $ KEY="$(base64 go_travis_testing.json)"
# $ travis env set TEST_SERVICE_ACCOUNT_mlab_testing "${KEY}"
# These are uploaded to travis by invoking setup_service_accounts_for_travis.sh (from
# the m-lab/travis repo), from a directory within *this* repo.
#
# New mechanism for setting up testing service account.
# Note that anyone with github ACLs to push to a branch can hack .travis.yml
# and discover these credentials in the travis logs.
- if [[ -n "$TEST_SERVICE_ACCOUNT_mlab_testing" ]] ; then
echo "$TEST_SERVICE_ACCOUNT_mlab_testing" | base64 -d > travis-testing.key ;
fi
# New mechanism for setting up testing service account.
# TODO use version in travis directory.
- if [[ -n "$SERVICE_ACCOUNT_mlab_testing" ]] ; then
$TRAVIS_BUILD_DIR/travis/activate_service_account.sh SERVICE_ACCOUNT_mlab_testing ;
Expand Down
46 changes: 12 additions & 34 deletions cloud/ds/ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"log"
"os"
"time"

"cloud.google.com/go/datastore"
"google.golang.org/api/option"
Expand All @@ -21,7 +20,7 @@ import (
type Saver struct {
namespace string
kind string
client *datastore.Client
Client *datastore.Client
}

// NewSaver creates a new saver using the provided namespace and kind.
Expand All @@ -40,47 +39,26 @@ func NewSaver(namespace, kind string, opts ...option.ClientOption) (Saver, error
return Saver{namespace, kind, client}, nil
}

// Load retrieves an arbitrary record from datastore.
func (s Saver) Load(name string, obj interface{}) error {
// NameKey creates a full key using the Saver settings.
func (s Saver) NameKey(name string) *datastore.Key {
k := datastore.NameKey(s.kind, name, nil)
k.Namespace = s.namespace
return k
}

// Load retrieves an arbitrary record from datastore.
func (s Saver) Load(name string, obj interface{}) error {
k := s.NameKey(name)
log.Printf("%+v\n", k)
return s.client.Get(context.Background(), k, obj)
return s.Client.Get(context.Background(), k, obj)
}

// Save stores an arbitrary object to kind/key in the default namespace.
// If a record already exists, then it is overwritten.
// TODO(gfr) Make an upsert version of this:
// https://cloud.google.com/datastore/docs/concepts/entities
func (s Saver) Save(key string, obj interface{}) error {
k := datastore.NameKey(s.kind, key, nil)
k.Namespace = s.namespace
kk, err := s.client.Put(context.Background(), k, obj)
log.Printf("%+v\n", kk)
k := s.NameKey(key)
_, err := s.Client.Put(context.Background(), k, obj)
return err
}

// OwnerLease is a DataStore record that controls ownership of the reprocessing task.
// An instance must own this before doing any reprocessing / dedupping operations.
// It should be periodically renewed to avoid another instance taking ownership.
// TODO - should this have a mutex or other synchronization support?
type OwnerLease struct {
Hostname string // Hostname of the owner.
InstanceID string // instance ID of the owner.
LeaseExpiration time.Time // Time that the lease will expire.
}

// Renew renews the ownership lease for X minutes.
// TODO - should this run on a timer, or in a go routine?
func (ol *OwnerLease) Renew() {

// TODO -
}

// TakeOwnership attempts to take ownership of the lease.
// Expected to be attempted at startup, and process should fail health check
// if this fails.
func TakeOwnership() (OwnerLease, error) {

return OwnerLease{}, nil
}
7 changes: 3 additions & 4 deletions cloud/ds/ds_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"google.golang.org/api/option"
)

func Options() []option.ClientOption {
func MLabTestAuth() []option.ClientOption {
opts := []option.ClientOption{}
if os.Getenv("TRAVIS") != "" {
authOpt := option.WithCredentialsFile("../../travis-testing.key")
authOpt := option.WithAPIKey(os.Getenv("SERVICE_ACCOUNT_mlab_testing"))
opts = append(opts, authOpt)
}

return opts
}

Expand All @@ -29,7 +28,7 @@ type TestObj struct {
// TODO use DATASTORE_EMULATOR_HOST
func TestSaveLoad(t *testing.T) {
os.Setenv("PROJECT", "mlab-testing")
saver, err := ds.NewSaver("", "test", Options()...)
saver, err := ds.NewSaver("", "test", MLabTestAuth()...)
if err != nil {
t.Fatal(err)
}
Expand Down
217 changes: 217 additions & 0 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Package dispatch identifies dates to reprocess, and feeds them into
// the reprocessing network.
package dispatch

import (
"context"
"errors"
"log"
"math/rand"
"os"
"time"

"cloud.google.com/go/datastore"

"github.com/m-lab/etl-gardener/cloud/ds"
)

// ###############################################################################
// Ownership related code
// ###############################################################################

const (
// DSNamespace is the namespace for all gardener related DataStore entities.
DSNamespace = "Gardener"

// DSOwnerLeaseName is the name of the single OwnerLease object.
DSOwnerLeaseName = "OwnerLease"
)

var (
// TestMode controls datastore retry delays to allow faster testing.
TestMode = false
)

// Errors for leases
var (
ErrLostLease = errors.New("lost ownership lease")
ErrNotOwner = errors.New("owner does not match instance")
ErrOwnershipRequested = errors.New("another instance has requested ownership")
ErrInvalidState = errors.New("invalid owner lease state")
ErrNoSuchLease = errors.New("lease does not exist")
ErrNotAvailable = errors.New("lease not available")
)

// OwnerLease is a DataStore record that controls ownership of the reprocessing task.
// An instance must own this before doing any reprocessing / dedupping operations.
// It should be periodically renewed to avoid another instance taking ownership
// without a handshake.
// TODO - should this have a mutex?
type OwnerLease struct {
InstanceID string // instance ID of the owner.
LeaseExpiration time.Time // Time that the lease will expire.
NewInstanceID string // ID of instance trying to assume ownership.
}

// NewOwnerLease returns a properly initialized OwnerLease object.
func NewOwnerLease() OwnerLease {
instance := os.Getenv("GAE_INSTANCE")

return OwnerLease{instance, time.Now().Add(5 * time.Minute), ""}
}

// Validate checks that fields have been initialized.
func (ol *OwnerLease) validate() error {
if ol.NewInstanceID != "" {
return ErrOwnershipRequested
}
if ol.InstanceID == "" {
log.Printf("%+v\n", ol)
return ErrInvalidState
}
return nil
}

// Renew renews the ownership lease for interval.
// The receiver must have InstanceID already set.
// TODO - should this run on a timer, or in a go routine?
func (ol *OwnerLease) Renew(saver *ds.Saver, interval time.Duration) error {
err := ol.validate()
if err != nil {
log.Println(err)
return err
}
ctx, cf := context.WithTimeout(context.Background(), 10*time.Second)
defer cf()
_, err = saver.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
k := saver.NameKey(DSOwnerLeaseName)
var lease OwnerLease
err := tx.Get(k, &lease)
if err != nil {
return err
}
if lease.InstanceID != ol.InstanceID {
log.Println(ol.InstanceID, "lost lease to", lease.InstanceID)
return ErrLostLease
}

if lease.NewInstanceID != "" {
log.Println(lease.InstanceID, "relinquishing ownership to", lease.NewInstanceID)
if lease.LeaseExpiration.After(time.Now()) {
lease.LeaseExpiration = time.Now()
_, err = tx.Put(k, &lease)
if err != nil {
return err
}
}
return ErrOwnershipRequested
}

lease.LeaseExpiration = time.Now().Add(interval)
_, err = tx.Put(k, &lease)
return err
})
return err
}

// TakeOwnershipIfAvailable assumes ownership if no-one else owns it.
func (ol *OwnerLease) takeOwnershipIfAvailable(saver *ds.Saver, interval time.Duration) error {
ctx, cf := context.WithTimeout(context.Background(), 10*time.Second)
defer cf()
_, err := saver.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
k := saver.NameKey(DSOwnerLeaseName)
var lease OwnerLease
err := tx.Get(k, &lease)
// If lease is expired, or doesn't exist, go ahead and try to take it.
if err == datastore.ErrNoSuchEntity || lease.LeaseExpiration.Before(time.Now()) {
ol.LeaseExpiration = time.Now().Add(interval)
ol.NewInstanceID = ""
tx.Put(k, ol)
return nil
}
return ErrNotAvailable
})
return err
}

// RequestLease sets the NewInstanceID field to indicate that we want ownership.
func (ol *OwnerLease) requestLease(saver *ds.Saver) error {
ctx, cf := context.WithTimeout(context.Background(), 10*time.Second)
defer cf()
_, err := saver.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
k := saver.NameKey(DSOwnerLeaseName)
var lease OwnerLease
err := tx.Get(k, &lease)
if err != nil {
return err
}
if lease.NewInstanceID == "" {
lease.NewInstanceID = ol.InstanceID
log.Println(ol.InstanceID, "requesting lease from", lease.InstanceID)
_, err = tx.Put(k, &lease)
}
return err
})
return err
}

// WaitForOwnership retries TakeOwnershipIfAvailable until timeout or success.
func (ol *OwnerLease) waitForOwnership(saver *ds.Saver, interval time.Duration) error {
for timeout := time.Now().Add(2 * time.Minute); time.Now().Before(timeout); {
log.Println("Trying again to get ownership", ol.InstanceID)
err := ol.takeOwnershipIfAvailable(saver, interval)
if err != ErrNotAvailable {
return err
}
if TestMode {
time.Sleep(time.Duration(1) * time.Second)
} else {
time.Sleep(time.Duration(5+rand.Intn(10)) * time.Second)
}
}

return ErrNotAvailable
}

// Lease attempts to take ownership of the lease.
// Expected to be attempted at startup, and process should fail health check
// if this fails repeatedly.
func (ol *OwnerLease) Lease(saver *ds.Saver, interval time.Duration) error {
if ol.validate() != nil {
return ol.validate()
}
err := ol.takeOwnershipIfAvailable(saver, interval)
if err == nil {
return err
}
err = ol.requestLease(saver)
if err != nil {
return err
}
err = ol.waitForOwnership(saver, interval)
return err
}

// Delete deletes the lease iff held by ol.
func (ol *OwnerLease) Delete(saver *ds.Saver) error {
ctx, cf := context.WithTimeout(context.Background(), 10*time.Second)
defer cf()
_, err := saver.Client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
k := saver.NameKey(DSOwnerLeaseName)
var lease OwnerLease
err := tx.Get(k, &lease)
if err != nil {
return err
}
if lease.InstanceID == ol.InstanceID {
err = tx.Delete(k)
return err
}
return ErrNotOwner
})
return err
}

// ###############################################################################
// Dispatch interface and related code
// ###############################################################################

0 comments on commit f8260fc

Please sign in to comment.