Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ var errNoAnnotator = errors.New("no Annotator found")
func AnnotateLegacy(date time.Time, ips []api.RequestData) (map[string]*api.GeoData, time.Time, error) {
responseMap := make(map[string]*api.GeoData)

ann := manager.GetAnnotator(date)
ann, err := manager.GetAnnotator(date)
if err != nil {
return nil, time.Time{}, err
}
if ann == nil {
// stop sending more request in the same batch because w/ high chance the dataset is not ready
return nil, time.Time{}, errNoAnnotator
Expand Down Expand Up @@ -160,7 +163,10 @@ func AnnotateLegacy(date time.Time, ips []api.RequestData) (map[string]*api.GeoD
func AnnotateV2(date time.Time, ips []string) (v2.Response, error) {
responseMap := make(map[string]*api.GeoData, len(ips))

ann := manager.GetAnnotator(date)
ann, err := manager.GetAnnotator(date)
if err != nil {
return v2.Response{}, err
}
if ann == nil {
// Just reject the request. Caller should try again until successful, or different error.
return v2.Response{}, errNoAnnotator
Expand All @@ -182,7 +188,7 @@ func AnnotateV2(date time.Time, ips []string) (v2.Response, error) {

annotation, err := ann.GetAnnotation(&request)
if err != nil {
metrics.ErrorTotal.Inc()
metrics.ErrorTotal.WithLabelValues("GetAnnotation Error").Inc()
continue
}
responseMap[request.IP] = &annotation
Expand Down Expand Up @@ -334,8 +340,12 @@ func BatchValidateAndParse(jsonBuffer []byte) ([]api.RequestData, error) {
func GetMetadataForSingleIP(request *api.RequestData) (api.GeoData, error) {
metrics.TotalLookups.Inc()
// TODO replace with generic GetAnnotator, that respects time.
ann := manager.GetAnnotator(request.Timestamp)
ann, err := manager.GetAnnotator(request.Timestamp)
if err != nil {
return api.GeoData{}, err
}
if ann == nil {
log.Println("This shouldn't happen")
return api.GeoData{}, manager.ErrNilDataset
}

Expand Down
115 changes: 90 additions & 25 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,30 @@
// such as geoloader.
package manager

// The implementation is currently rather naive. Eviction is done based only on whether
// there is a pending request, and there are already the max number of datasets loaded.
// A later implementation will use LRU and dead time to make this determination.
//
// Behavior:
// If a legacy dataset is requests, return the CurrentAnnotator instead.
// If the requested dataset is loaded, return it.
// If the requested dataset is loading, return ErrPendingAnnotatorLoad
// If the dataset is not loaded or pending, check:
// A: If there are already MaxPending loads in process:
// Do nothing and reply with ErrPendingAnnotatorLoad (even though this isn't true)
// B: If there is room to load it?
// YES: start loading it, and return ErrPendingAnnotatorLoad
// NO: kick out an existing dataset and return ErrPendingAnnotatorLoad.
//
// Please modify with extreme caution. The lock MUST be held when ACCESSING any field
// of AnnotatorMap.

// Note that the system may evict up to the number of pending loads, so at any given time,
// there may only be MaxDatasetInMemory = MaxPending actually loaded.

// Also note that anyone holding an annotator will prevent it from being collected by the
// GC, so simply evicting it is not a guarantee that the memory will be reclaimed.

import (
"errors"
"log"
Expand All @@ -10,13 +34,14 @@ import (

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/geoloader"
)

const (
MaxDatasetInMemory = 5
"github.com/m-lab/annotation-service/metrics"
)

var (
// These are vars instead of consts to facilitate testing.
MaxDatasetInMemory = 5 // Limit on number of loaded datasets
MaxPending = 2 // Limit on number of concurrently loading datasets.

// ErrNilDataset is returned when CurrentAnnotator is nil.
ErrNilDataset = errors.New("CurrentAnnotator is nil")

Expand Down Expand Up @@ -56,8 +81,9 @@ type AnnotatorMap struct {
// Keys are filename of the datasets.
annotators map[string]api.Annotator
// Lock to be held when reading or writing the map.
mutex sync.RWMutex
loader func(string) (api.Annotator, error)
mutex sync.RWMutex
numPending int
loader func(string) (api.Annotator, error)
}

// NewAnnotatorMap creates a new map that will use the provided loader for loading new Annotators.
Expand All @@ -74,16 +100,31 @@ func (am *AnnotatorMap) setAnnotatorIfNil(key string, ann api.Annotator) error {

old, ok := am.annotators[key]
if !ok {
log.Println("This should never happen", ErrGoroutineNotOwner)
metrics.ErrorTotal.WithLabelValues("WrongOwner").Inc()
return ErrGoroutineNotOwner
}
if old != nil {
log.Println("This should never happen", ErrMapEntryAlreadySet)
metrics.ErrorTotal.WithLabelValues("MapEntryAlreadySet").Inc()
return ErrMapEntryAlreadySet
}

am.annotators[key] = ann
metrics.PendingLoads.Dec()
metrics.DatasetCount.Inc()
am.numPending--
log.Println("Loaded", key)
return nil

}

// This creates a reservation for loading a dataset, IFF map entry is empty (not nil or populated)
// If the dataset is not loaded or pending, check:
// A: If there are already MaxPending loads in process:
// Do nothing and reply false
// B: If there is room to load it?
// YES: make the reservation (by setting entry to nil) and return true.
// NO: kick out an existing dataset and return false.
func (am *AnnotatorMap) maybeSetNil(key string) bool {
am.mutex.Lock()
defer am.mutex.Unlock()
Expand All @@ -93,29 +134,46 @@ func (am *AnnotatorMap) maybeSetNil(key string) bool {
return false
}

if am.numPending >= MaxPending {
log.Println("Too many pending", key)
return false
}
// Check the number of datasets in memory. Given the memory
// limit, some dataset may be removed from memory if needed.
if len(am.annotators) >= MaxDatasetInMemory {
for fileKey := range am.annotators {
if am.annotators[fileKey] != nil {
log.Println("removing Geolite2 dataset " + fileKey)
delete(am.annotators, fileKey)
metrics.EvictionCount.Inc()
metrics.DatasetCount.Dec()
break
}
}
return false
}

// Place marker so that other requesters know it is loading.
am.annotators[key] = nil
metrics.PendingLoads.Inc()
am.numPending++
return true
}

// This synchronously attempts to set map entry to nil, and
// if successful, proceeds to asynchronously load the new dataset.
func (am *AnnotatorMap) checkAndLoadAnnotator(key string) {
if !geoloader.GeoLite2Regex.MatchString(key) {
return
}
reserved := am.maybeSetNil(key)
if reserved {
// This goroutine now has exclusive ownership of the
// map entry, and the responsibility for loading the annotator.
go func(key string) {
// Check the number of datasets in memory. Given the memory
// limit, some dataset may be removed from memory if needed.
if len(am.annotators) >= MaxDatasetInMemory {
for fileKey, _ := range am.annotators {
log.Println("remove Geolite2 dataset " + fileKey)
delete(am.annotators, fileKey)
break
}
}
// TODO - this is currently redundant, as we already checked this.
if geoloader.GeoLite2Regex.MatchString(key) {
log.Println("plan to load " + key)
newAnn, err := am.loader(key)
if err != nil {
// TODO add a metric
Expand Down Expand Up @@ -146,36 +204,43 @@ func (am *AnnotatorMap) GetAnnotator(key string) (api.Annotator, error) {
if !ok {
// There is not yet any entry for this date. Try to load it.
am.checkAndLoadAnnotator(key)
metrics.RejectionCount.WithLabelValues("New Dataset")
return nil, ErrPendingAnnotatorLoad
}

if ann == nil {
// Another goroutine is already loading this entry. Return error.
metrics.RejectionCount.WithLabelValues("Dataset Pending")
return nil, ErrPendingAnnotatorLoad
}
return ann, nil
}

// GetAnnotator returns the correct annotator to use for a given timestamp.
func GetAnnotator(date time.Time) api.Annotator {
// TODO: Update to properly handle legacy datasets.
func GetAnnotator(date time.Time) (api.Annotator, error) {
// key := strconv.FormatInt(date.Unix(), encodingBase)
if date.After(geoloader.LatestDatasetDate) {
currentDataMutex.RLock()
ann := CurrentAnnotator
currentDataMutex.RUnlock()
return ann
return ann, nil
}
// TODO HACK: This is a temporary measure until we have support for the legacy datasets.
if date.Before(geoloader.GeoLite2StartDate) {
currentDataMutex.RLock()
ann := CurrentAnnotator
currentDataMutex.RUnlock()
return ann, nil
}
filename, err := geoloader.SelectArchivedDataset(date)

if err != nil {
return nil
metrics.RejectionCount.WithLabelValues("Selection Error")
return nil, err
}

ann, err := archivedAnnotator.GetAnnotator(filename)
if err == nil {
return ann
}
return nil

return archivedAnnotator.GetAnnotator(filename)
}

// InitDataset will update the filename list of archived dataset in memory
Expand Down
68 changes: 63 additions & 5 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,49 @@ package manager_test

import (
"errors"
"log"
"sync"
"testing"
"time"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/geolite2"
"github.com/m-lab/annotation-service/manager"
)

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)
}

func fakeLoader(date string) (api.Annotator, error) {
time.Sleep(10 * time.Millisecond)
return &geolite2.GeoDataset{}, nil
}

func TestAnnotatorMap(t *testing.T) {
manager.MaxPending = 2
manager.MaxDatasetInMemory = 3

am := manager.NewAnnotatorMap(fakeLoader)
names := []string{"Maxmind/2018/01/01/20180101T054119Z-GeoLite2-City-CSV.zip",
"Maxmind/2018/01/02/20180201T054119Z-GeoLite2-City-CSV.zip",
"Maxmind/2018/01/03/20180301T054119Z-GeoLite2-City-CSV.zip",
"Maxmind/2018/01/04/20180401T054119Z-GeoLite2-City-CSV.zip",
"Maxmind/2018/01/05/20180501T054119Z-GeoLite2-City-CSV.zip"}

ann, err := am.GetAnnotator("Maxmind/2018/09/12/20180912T054119Z-GeoLite2-City-CSV.zip")
// These are all fake names.
_, err := am.GetAnnotator(names[0])
if err != manager.ErrPendingAnnotatorLoad {
t.Error("Should be", manager.ErrPendingAnnotatorLoad)
}

ann, err = am.GetAnnotator("Maxmind/2017/08/15/20170815T200946Z-GeoLite2-City-CSV.zip")
_, err = am.GetAnnotator(names[1])
if err != manager.ErrPendingAnnotatorLoad {
t.Error("Should be", manager.ErrPendingAnnotatorLoad)
}

_, err = am.GetAnnotator(names[2])
if err != manager.ErrPendingAnnotatorLoad {
t.Error("Should be", manager.ErrPendingAnnotatorLoad)
}
Expand All @@ -33,22 +55,58 @@ func TestAnnotatorMap(t *testing.T) {
go func(date string) {
err := errors.New("start")
for ; err != nil; _, err = am.GetAnnotator(date) {
time.Sleep(3 * time.Millisecond)
}
wg.Done()
}("Maxmind/2018/09/12/20180912T054119Z-GeoLite2-City-CSV.zip")
}(names[0])
go func(date string) {
err := errors.New("start")
for ; err != nil; _, err = am.GetAnnotator(date) {
time.Sleep(3 * time.Millisecond)
}
wg.Done()
}("Maxmind/2017/08/15/20170815T200946Z-GeoLite2-City-CSV.zip")
}(names[1])
wg.Wait()

ann, err = am.GetAnnotator("Maxmind/2017/08/15/20170815T200946Z-GeoLite2-City-CSV.zip")
ann, err := am.GetAnnotator(names[0])
if err != nil {
t.Error("Not expecting:", err)
}
if ann == nil {
t.Error("Expecting non-nil annotator")
}

// Now try to load 2 more. The second one should cause an eviction.
wg = &sync.WaitGroup{}
wg.Add(2)
go func(date string) {
err := errors.New("start")
for ; err != nil; _, err = am.GetAnnotator(date) {
time.Sleep(3 * time.Millisecond)
}
wg.Done()
}(names[2])
go func(date string) {
err := errors.New("start")
for ; err != nil; _, err = am.GetAnnotator(date) {
time.Sleep(3 * time.Millisecond)
}
wg.Done()
}(names[3])
wg.Wait()

// Loading two more will have caused one to be evicted, so exactly one of these
// should no longer be loaded, and return an ErrPendingAnnotatorLoad.
_, err0 := am.GetAnnotator(names[0])
_, err1 := am.GetAnnotator(names[1])
switch {
case err0 == nil && err1 == nil:
t.Error("One of the items should have been evicted")
case err0 == nil && err1 == manager.ErrPendingAnnotatorLoad:
// Good
case err0 == manager.ErrPendingAnnotatorLoad && err1 == nil:
// Good
default:
t.Error("Should have had exactly one ErrPending...", err0, err1)
}
}
Loading