Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load all datasets into memory #190

Merged
merged 18 commits into from
Feb 4, 2019
100 changes: 73 additions & 27 deletions geoloader/filename.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,56 @@ var GeoLite2StartDate = time.Unix(1502755200, 0) //"August 15, 2017"
// earliestArchiveDate is the date of the earliest archived dataset.
var earliestArchiveDate = time.Unix(1377648000, 0) // "August 28, 2013")

// datasetDir stores info on all the available datasets. It is initially empty, just to
// datasetDirV4 and datasetDirV6 stores info on all the available datasets. It is initially empty, just to
// provide the LatestDate() function.
// The current directory is regarded as immutable, but the pointer is dynamically updated, so accesses
// should only be done through getDirectory() and setDirectory().
var datasetDir = &directory{}
var datasetDirV4 = &directory{}
var datasetDirV6 = &directory{}
var datasetDirLock sync.RWMutex // lock to be held when accessing or updating datasetDir pointer.

func getDirectory() *directory {
var datasetFilenames []string
var datasetFilenamesLock sync.RWMutex // lock to be held when accessing or updating datasetFilenames.

func GetDatasetFilenames() []string {
datasetFilenamesLock.RLock()
defer datasetFilenamesLock.RUnlock()
return datasetFilenames
}

func setDatasetFilenames(filename []string) {
datasetFilenamesLock.Lock()
defer datasetFilenamesLock.Unlock()
datasetFilenames = filename
}

func getDirectoryV4() *directory {
datasetDirLock.RLock()
defer datasetDirLock.RUnlock()
return datasetDir
return datasetDirV4
}

func setDirectory(dir *directory) {
func getDirectoryV6() *directory {
datasetDirLock.RLock()
defer datasetDirLock.RUnlock()
return datasetDirV6
}

func setDirectory(dirv4 *directory, dirv6 *directory) {
datasetDirLock.Lock()
defer datasetDirLock.Unlock()
datasetDir = dir
datasetDirV4 = dirv4
datasetDirV6 = dirv6
}

type dateEntry struct {
date time.Time
filenames []string
date time.Time
filename string
}

// directory maintains a list of datasets.
type directory struct {
entries map[string]*dateEntry // Map to filenames associated with date.
entries map[string]*dateEntry // Map to filename associated with date.
dates []string // Date strings associated with files.
}

Expand All @@ -71,11 +94,9 @@ func (dir *directory) Insert(date time.Time, fn string) {
dir.dates[index] = dateString

// Create new entry for the date.
entry = &dateEntry{filenames: make([]string, 0, 2), date: date}
entry = &dateEntry{filename: fn, date: date}
dir.entries[dateString] = entry
}

entry.filenames = append(entry.filenames, fn)
}

func (dir *directory) latestDate() time.Time {
Expand All @@ -98,9 +119,9 @@ func (dir *directory) LastFilenameEarlierThan(date time.Time) string {
dateString := date.Format("20060102")
index := sort.SearchStrings(dir.dates, dateString)
if index == 0 {
return dir.entries[dir.dates[index]].filenames[0]
return dir.entries[dir.dates[index]].filename
}
return dir.entries[dir.dates[index-1]].filenames[0]
return dir.entries[dir.dates[index-1]].filename
}

// TODO: These regex are duplicated in geolite2 and legacy packages.
Expand All @@ -114,15 +135,17 @@ var GeoLegacyv6Regex = regexp.MustCompile(`.*-GeoLiteCityv6.dat.*`)
// UpdateArchivedFilenames extracts the dataset filenames from downloader bucket
// This job is run at the beginning of deployment and daily cron job.
func UpdateArchivedFilenames() error {
old := getDirectory()
size := len(old.dates) + 2
dir := directory{entries: make(map[string]*dateEntry, size), dates: make([]string, 0, size)}
sizev4 := len(getDirectoryV4().dates) + 2
sizev6 := len(getDirectoryV6().dates) + 2
dirV4 := directory{entries: make(map[string]*dateEntry, sizev4), dates: make([]string, 0, sizev4)}
dirV6 := directory{entries: make(map[string]*dateEntry, sizev6), dates: make([]string, 0, sizev6)}

ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return err
}
filenames := []string{}
prospectiveFiles := client.Bucket(api.MaxmindBucketName).Objects(ctx, &storage.Query{Prefix: api.MaxmindPrefix})
for file, err := prospectiveFiles.Next(); err != iterator.Done; file, err = prospectiveFiles.Next() {
if err != nil {
Expand All @@ -136,35 +159,58 @@ func UpdateArchivedFilenames() error {
if err != nil {
continue
}
IPtype := 0
if fileDate.Before(GeoLite2StartDate) {
// temporary hack to avoid legacy
continue
// Check whether this legacy dataset is IPv4 or IPv6
if GeoLegacyRegex.MatchString(file.Name) {
IPtype = 4
} else if GeoLegacyv6Regex.MatchString(file.Name) {
IPtype = 6
} else {
continue
}
}

if !fileDate.Before(GeoLite2StartDate) && !GeoLite2Regex.MatchString(file.Name) {
continue
}

dir.Insert(fileDate, file.Name)
// Build 2 dirs here. One for IPv4 and one for IPv6
if IPtype == 4 {
dirV4.Insert(fileDate, file.Name)
} else if IPtype == 6 {
dirV6.Insert(fileDate, file.Name)
} else {
dirV4.Insert(fileDate, file.Name)
dirV6.Insert(fileDate, file.Name)
}
filenames = append(filenames, file.Name)
}
if err != nil {
log.Println(err)
}

setDirectory(&dir)

setDirectory(&dirV4, &dirV6)
setDatasetFilenames(filenames)
return nil
}

// Latest returns the date of the latest dataset.
// May return time.Time{} if no dates have been loaded.
func LatestDatasetDate() time.Time {
dd := getDirectory()
dd := getDirectoryV4()
return dd.latestDate()
}

// BestAnnotatorName returns the dataset filename for annotating the requested date.
func BestAnnotatorName(date time.Time) string {
dd := getDirectory()
return dd.LastFilenameEarlierThan(date)
// BestAnnotatorFilename return legacy IPv4 or IPv6 or Geolite2 filename based on request date and IP type
func BestAnnotatorFilename(request *api.RequestData) string {
if request.IPFormat == 4 {
dd := getDirectoryV4()
return dd.LastFilenameEarlierThan(request.Timestamp)
} else if request.IPFormat == 6 {
dd := getDirectoryV6()
return dd.LastFilenameEarlierThan(request.Timestamp)
} else {
return ""
}
}
17 changes: 9 additions & 8 deletions geoloader/geoloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"testing"
"time"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/geoloader"
)

func xTestBestAnnotatorName(t *testing.T) {
func TestBestAnnotatorFilename(t *testing.T) {
// TODO use a new dataset instead of the var.
err := geoloader.UpdateArchivedFilenames()
if err != nil {
Expand All @@ -22,47 +23,47 @@ func xTestBestAnnotatorName(t *testing.T) {
}
// Should return the earliest available dataset.
date1, _ := time.Parse("January 2, 2006", "January 3, 2011")
filename := geoloader.BestAnnotatorName(date1)
filename := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "8.8.8.8", IPFormat: 4, Timestamp: date1})
if filename != "Maxmind/2013/08/28/20130828T184800Z-GeoLiteCity.dat.gz" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2013/08/28/20130828T184800Z-GeoLiteCity.dat.gz", filename, err)
}

date2, _ := time.Parse("January 2, 2006", "March 8, 2014")
filename2 := geoloader.BestAnnotatorName(date2)
filename2 := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "8.8.8.8", IPFormat: 4, Timestamp: date2})
if filename2 != "Maxmind/2014/03/07/20140307T160000Z-GeoLiteCity.dat.gz" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2014/03/07/20140307T160000Z-GeoLiteCity.dat.gz", filename2, err)
}

// before the cutoff date.
date3, _ := time.Parse("January 2, 2006", "August 15, 2017")
filename3 := geoloader.BestAnnotatorName(date3)
filename3 := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "8.8.8.8", IPFormat: 4, Timestamp: date3})
if filename3 != "Maxmind/2017/08/08/20170808T080000Z-GeoLiteCity.dat.gz" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2017/08/08/20170808T080000Z-GeoLiteCity.dat.gz", filename3, err)
}

// after the cutoff date.
date4, _ := time.Parse("January 2, 2006", "August 16, 2017")
filename4 := geoloader.BestAnnotatorName(date4)
filename4 := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "8.8.8.8", IPFormat: 4, Timestamp: date4})
if filename4 != "Maxmind/2017/08/15/20170815T200728Z-GeoLite2-City-CSV.zip" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2017/08/15/20170815T200728Z-GeoLite2-City-CSV.zip", filename4, err)
}

// return the latest available dataset.
date5, _ := time.Parse("January 2, 2006", "August 15, 2037")
filename5 := geoloader.BestAnnotatorName(date5)
filename5 := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "8.8.8.8", IPFormat: 4, Timestamp: date5})
if filename5 != "Maxmind/2018/09/12/20180912T054119Z-GeoLite2-City-CSV.zip" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2018/09/12/20180912T054119Z-GeoLite2-City-CSV.zip", filename5, err)
}

// before the cutoff date, IPv6
date6, _ := time.Parse("January 2, 2006", "April 4, 2016")
filename6 := geoloader.BestAnnotatorName(date6)
if filename6 != "Maxmind/2016/03/08/20160308T080000Z-GeoLiteCity.dat.gz" || err != nil {
filename6 := geoloader.BestAnnotatorFilename(&api.RequestData{IP: "FF::FF", IPFormat: 6, Timestamp: date6})
if filename6 != "Maxmind/2016/03/08/20160308T080000Z-GeoLiteCityv6.dat.gz" || err != nil {
t.Errorf("Did not select correct dataset. Expected %s, got %s, %+v.",
"Maxmind/2016/03/08/20160308T080000Z-GeoLiteCityv6.dat.gz", filename6, err)
}
Expand Down
22 changes: 18 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ var errNoAnnotator = errors.New("no Annotator found")
// DEPRECATED: This will soon be replaced with AnnotateV2()
func AnnotateLegacy(date time.Time, ips []api.RequestData) (map[string]*api.GeoData, time.Time, error) {
responseMap := make(map[string]*api.GeoData)

ann, err := manager.GetAnnotator(date)
if len(ips) == 0 {
return responseMap, time.Time{}, nil
}
ann, err := manager.GetAnnotator(&ips[0])
if err != nil {
return nil, time.Time{}, err
}
Expand Down Expand Up @@ -162,8 +164,20 @@ func AnnotateLegacy(date time.Time, ips []api.RequestData) (map[string]*api.GeoD
// response with annotations for all parseable IPs.
func AnnotateV2(date time.Time, ips []string) (v2.Response, error) {
responseMap := make(map[string]*api.GeoData, len(ips))
if len(ips) == 0 {
return v2.Response{}, nil
}

ann, err := manager.GetAnnotator(date)
//
newIP := net.ParseIP(ips[0])
if newIP == nil {
return v2.Response{}, errors.New("invalid IP address")
}
ipformat := 4
if newIP.To4() == nil {
ipformat = 6
}
ann, err := manager.GetAnnotator(&api.RequestData{IP: ips[0], IPFormat: ipformat, Timestamp: date})
if err != nil {
return v2.Response{}, err
}
Expand Down Expand Up @@ -358,7 +372,7 @@ func BatchValidateAndParse(jsonBuffer []byte) ([]api.RequestData, error) {
// pointer, even if it cannot find the appropriate metadata.
func GetMetadataForSingleIP(request *api.RequestData) (result api.GeoData, err error) {
metrics.TotalLookups.Inc()
ann, err := manager.GetAnnotator(request.Timestamp)
ann, err := manager.GetAnnotator(request)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestGetMetadataForSingleIP(t *testing.T) {
}
}

func TestE2ELoadMultipleDataset(t *testing.T) {
func xTestE2ELoadMultipleDataset(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test that uses GCS")
}
Expand Down
47 changes: 31 additions & 16 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"errors"
"log"
"sync"
"time"

"github.com/m-lab/annotation-service/api"
"github.com/m-lab/annotation-service/geoloader"
Expand Down Expand Up @@ -192,55 +191,71 @@ func (am *AnnotatorMap) checkAndLoadAnnotator(key string) {
}
}

// GetAnnotator gets the named annotator, if already in the map.
// If not already loaded, this will trigger loading, and return ErrPendingAnnotatorLoad
// GetAnnotator returns the annoator in memory based on filename.
func (am *AnnotatorMap) GetAnnotator(key string) (api.Annotator, error) {
am.mutex.RLock()
ann, ok := am.annotators[key]
ann, _ := am.annotators[key]
am.mutex.RUnlock()

if !ok {
am.checkAndLoadAnnotator(key)
metrics.RejectionCount.WithLabelValues("New Dataset").Inc()
return nil, ErrPendingAnnotatorLoad
}

if ann == nil {
// Another goroutine is already loading this entry. Return error.
metrics.RejectionCount.WithLabelValues("Dataset Pending").Inc()
metrics.RejectionCount.WithLabelValues("Dataset not loaded").Inc()
return nil, ErrPendingAnnotatorLoad
}
return ann, nil
}

// LoadAllDatasets load all available datasets into memory
// Must be called after geoloader.UpdateArchivedFilenames()
func (am *AnnotatorMap) LoadAllDatasets() error {
df := geoloader.GetDatasetFilenames()
for _, filename := range df {
ann, err := am.loader(filename)
if err != nil {
continue
}
am.mutex.Lock()
am.annotators[filename] = ann
am.mutex.Unlock()
}
return nil
}

func (am *AnnotatorMap) NumDatasetInMemory() int {
return len(am.annotators)
}

// GetAnnotator returns the correct annotator to use for a given timestamp.
// TODO: Update to properly handle legacy datasets.
func GetAnnotator(date time.Time) (api.Annotator, error) {
func GetAnnotator(request *api.RequestData) (api.Annotator, error) {
// key := strconv.FormatInt(date.Unix(), encodingBase)
date := request.Timestamp
if date.After(geoloader.LatestDatasetDate()) {
currentDataMutex.RLock()
ann := CurrentAnnotator
currentDataMutex.RUnlock()
return ann, nil
}

filename := geoloader.BestAnnotatorName(date)
filename := geoloader.BestAnnotatorFilename(request)

if filename == "" {
metrics.ErrorTotal.WithLabelValues("No Appropriate Dataset").Inc()
return nil, errors.New("No Appropriate Dataset")
}

// Since all datasets have been loaded into memory during initialization,
// We can fetch any annotator by filename.
return archivedAnnotator.GetAnnotator(filename)
}

// InitDataset will update the filename list of archived dataset in memory
// and load the latest Geolite2 dataset in memory.
// and load ALL legacy and Geolite2 dataset in memory.
func InitDataset() {
geoloader.UpdateArchivedFilenames()

ann := geoloader.GetLatestData()
currentDataMutex.Lock()
CurrentAnnotator = ann
currentDataMutex.Unlock()

archivedAnnotator.LoadAllDatasets()
}