Skip to content

Commit

Permalink
Merge pull request #168 from m-lab/extend-dir
Browse files Browse the repository at this point in the history
add AnnWrapper
  • Loading branch information
gfr10598 committed Jan 4, 2019
2 parents 63fd528 + c83f64d commit a07f27f
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 7 deletions.
32 changes: 32 additions & 0 deletions geoloader/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,35 @@ package geoloader

// These are exported for testing.
var NewDirectory = newDirectory

/*
// Make a public version of AnnWrapper for testing.
type AnnWrapper struct {
annWrapper
}
func (ae *AnnWrapper) UpdateLastUsed() {
ae.updateLastUsed()
}
func (ae *AnnWrapper) GetLastUsed() time.Time {
return ae.getLastUsed()
}
func (ae *AnnWrapper) Status() error {
return ae.status()
}
func (ae *AnnWrapper) ReserveForLoad() bool {
return ae.reserveForLoad()
}
func (ae *AnnWrapper) SetAnnotator(ann api.Annotator, err error) error {
return ae.setAnnotator(ann, err)
}
func (ae *AnnWrapper) GetAnnotator() (ann api.Annotator, err error) {
return ae.getAnnotator()
}
func (ae *AnnWrapper) Unload() {
ae.unload()
}
func NewAnnWrapper() AnnWrapper {
return AnnWrapper{newAnnWrapper()}
}
*/
42 changes: 35 additions & 7 deletions geoloader/filename.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package geoloader

import (
"context"
"errors"
"log"
"regexp"
"sort"
Expand Down Expand Up @@ -42,19 +43,44 @@ func setDirectory(dir *directory) {
datasetDir = dir
}

type dateEntry struct {
date time.Time
var (
// ErrAnnotatorLoading is returned (externally) when an annotator is being loaded.
ErrAnnotatorLoading = errors.New("annotator is being loaded")

// These are UNEXPECTED errors!!
// ErrGoroutineNotOwner is returned when goroutine attempts to set annotator entry, but is not the owner.
ErrGoroutineNotOwner = errors.New("goroutine does not own annotator slot")
// ErrMapEntryAlreadySet is returned when goroutine attempts to set annotator, but entry is non-null.
ErrMapEntryAlreadySet = errors.New("annotator already set")
// ErrNilEntry is returned when map has a nil entry, which should never happen.
ErrNilEntry = errors.New("map entry is nil")

// errAlreadyLoaded = errors.New("annotator is already loaded")
// errAlreadyLoading = errors.New("another goroutine is already loading annotator")
)

type directoryEntry struct {
// date and filenames are immutable.
date time.Time // The date associated with this annotator.
// All filenames associated with this date/annotator.
// Only the first filename is currently required or used.
filenames []string

annotator AnnWrapper
}

func newEntry(date time.Time) directoryEntry {
return directoryEntry{date: date, filenames: make([]string, 0, 2), annotator: NewAnnWrapper()}
}

// directory maintains a list of datasets.
type directory struct {
entries map[string]*dateEntry // Map to filenames associated with date.
dates []string // Date strings associated with files.
entries map[string]*directoryEntry // Map to filenames associated with date.
dates []string // Date strings associated with files.
}

func newDirectory(size int) directory {
return directory{entries: make(map[string]*dateEntry, size), dates: make([]string, 0, size)}
return directory{entries: make(map[string]*directoryEntry, size), dates: make([]string, 0, size)}
}

// Insert inserts a new filename into the directory at the given date.
Expand All @@ -71,7 +97,9 @@ func (dir *directory) Insert(date time.Time, fn string) {
dir.dates[index] = dateString

// Create new entry for the date.
entry = &dateEntry{filenames: make([]string, 0, 2), date: date}
// TODO make this NOT a pointer?
e := newEntry(date)
entry = &e
dir.entries[dateString] = entry
}

Expand Down Expand Up @@ -116,7 +144,7 @@ var GeoLegacyv6Regex = regexp.MustCompile(`.*-GeoLiteCityv6.dat.*`)
func UpdateArchivedFilenames() error {
old := getDirectory()
size := len(old.dates) + 2
dir := directory{entries: make(map[string]*dateEntry, size), dates: make([]string, 0, size)}
dir := directory{entries: make(map[string]*directoryEntry, size), dates: make([]string, 0, size)}

ctx := context.Background()
client, err := storage.NewClient(ctx)
Expand Down
122 changes: 122 additions & 0 deletions geoloader/wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package geoloader

// The AnnWrapper struct controls concurrent operations on Annotator objects.
// It is designed for minimal contention on GetAnnotator(), and safe loading and unloading.
// TODO - pull this out to an internal package, since it is only used by the directory.

import (
"log"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/metrics"
)

type AnnWrapper struct {
// The lock must be held when accessing ann or err fields.
lock sync.RWMutex

// Updated only while holding write lock.
ann api.Annotator

// err field is used to indicate the wrapper status.
// nil error means that the annotator is populated and ready for use.
// An empty wrapper will have a non-nil error, indicating whether a previous load failed,
// or annotator is currently loading, or annotator is nil and wrapper is idle.
err error

// This field is accessed using atomics.
// In an empty wrapper, this should point to time.Time{} zero value.
lastUsed unsafe.Pointer // pointer to the lastUsed time.Time.
}

// UpdateLastUsed updates the last used time to the current time.
func (ae *AnnWrapper) UpdateLastUsed() {
newTime := time.Now()
atomic.StorePointer(&ae.lastUsed, unsafe.Pointer(&newTime))
}

// GetLastUsed returns the time that the annotator was last successfully fetched with GetAnnotator.
func (ae *AnnWrapper) GetLastUsed() time.Time {
ptr := atomic.LoadPointer(&ae.lastUsed)
if ptr == nil {
log.Println("Error in getLastUsed for", ae)
return time.Time{}
}
return *(*time.Time)(ptr)
}

// ReserveForLoad attempts to set the state to loading, indicated by the `err` field
// containing ErrAnnotatorLoading.
// Returns true IFF the reservation was obtained.
func (ae *AnnWrapper) ReserveForLoad() bool {
ae.lock.Lock()
defer ae.lock.Unlock()
if ae.err == nil {
return false
}
if ae.err == ErrAnnotatorLoading { // This is the public error
return false
}
// This takes ownership of the slot
ae.err = ErrAnnotatorLoading
return true
}

// SetAnnotator attempts to store `ann`, and update the error state.
// It may fail if the state has changed, e.g. because of an unload.
func (ae *AnnWrapper) SetAnnotator(ann api.Annotator, err error) error {
ae.lock.Lock()
defer ae.lock.Unlock()

if ae.err != ErrAnnotatorLoading {
log.Println("This should never happen", ErrGoroutineNotOwner)
return ErrGoroutineNotOwner
}
if ae.ann != nil {
log.Println("This should never happen", ErrMapEntryAlreadySet)
return ErrMapEntryAlreadySet
}
ae.ann = ann
ae.err = err
ae.UpdateLastUsed()

metrics.LoadCount.Inc()
metrics.PendingLoads.Dec()
metrics.DatasetCount.Inc()

return nil
}

// GetAnnotator gets the current annotator, if there is one, and the error state.
func (ae *AnnWrapper) GetAnnotator() (ann api.Annotator, err error) {
ae.UpdateLastUsed()

ae.lock.RLock()
defer ae.lock.RUnlock()

return ae.ann, ae.err
}

// Unload unloads the annotator and resets the state to the empty state.
// If another goroutine is concurrently trying to load this, we don't
// really care. The other goroutine will fail when it attempts to SetAnnotator()
func (ae *AnnWrapper) Unload() {
ae.lock.Lock()
defer ae.lock.Unlock()

if ae.ann != nil {
ae.ann.Unload()
}

ae.ann = nil
ae.err = ErrNilEntry
atomic.StorePointer(&ae.lastUsed, unsafe.Pointer(&time.Time{}))
}

func NewAnnWrapper() AnnWrapper {
return AnnWrapper{err: ErrNilEntry, lastUsed: unsafe.Pointer(&time.Time{})}
}
113 changes: 113 additions & 0 deletions geoloader/wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package geoloader_test

import (
"errors"
"testing"
"time"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/geoloader"
)

type fakeAnnotator struct {
api.Annotator
unloadCount int
}

func (f *fakeAnnotator) Unload() {
f.unloadCount++
}

// This exercises all the basic functions of the wrapper.TestAnnWrapper
// TODO - the next PR should add a test that checks for concurrency correctness and races.
func TestAnnWrapper(t *testing.T) {
aw := geoloader.NewAnnWrapper()

if !aw.GetLastUsed().Equal(time.Time{}) {
t.Error("incorrect last used")
}

// Check status.
_, err := aw.GetAnnotator()
if err != geoloader.ErrNilEntry {
t.Error(err)
}

// Should do nothing.
aw.Unload()

fakeErr := errors.New("FakeError")
if aw.SetAnnotator(nil, fakeErr) != geoloader.ErrGoroutineNotOwner {
t.Error("Should have failed to set annotator")
}
if !aw.ReserveForLoad() {
t.Fatal("didn't get reservation")
}
if aw.ReserveForLoad() {
t.Fatal("shouldn't have gotten reservation")
}

err = aw.SetAnnotator(nil, fakeErr)
if err != nil {
t.Error("Should have succeeded:", err)
}

// Just check the status to see that fakeErr is returned.
_, err = aw.GetAnnotator()
if err != fakeErr {
t.Error(err)
}

// Should be able to get the reservation for loading.
if !aw.ReserveForLoad() {
t.Fatal("didn't get reservation")
}
// Attempt to get the annotator should give loading error status.
_, err = aw.GetAnnotator()
if err != geoloader.ErrAnnotatorLoading {
t.Error(err)
}

fakeAnn := fakeAnnotator{}

err = aw.SetAnnotator(&fakeAnn, nil)
if err != nil {
t.Error(err)
}

// There should now be a valid annotator, and this should update the lastUsed time.
updateTime := time.Now()
ann, err := aw.GetAnnotator()
if err != nil {
t.Error(err)
}
if ann != &fakeAnn {
t.Error("Annotator not as expected")
}

// The GetAnnotator call should have updated the lastUsed time.
if aw.GetLastUsed().Before(updateTime) {
t.Error("last used should be close to now", aw.GetLastUsed(), updateTime)
}

// Since annotator is valid, we shouldn't be able to reserve for load.
if aw.ReserveForLoad() {
t.Fatal("shouldn't have gotten reservation")
}

// Now unload the annotator.
aw.Unload()
if !aw.GetLastUsed().Equal(time.Time{}) {
t.Error("incorrect last used")
}

// Now we should be able to get a reservation again.
if !aw.ReserveForLoad() {
t.Fatal("didn't get reservation")
}

// Check that Unload was actually called.
if fakeAnn.unloadCount != 1 {
t.Error("Should have called Unload once", fakeAnn.unloadCount)
}
}

0 comments on commit a07f27f

Please sign in to comment.