Skip to content

Commit

Permalink
Merge 2d8e927 into 532a0e5
Browse files Browse the repository at this point in the history
  • Loading branch information
yachang committed Dec 14, 2018
2 parents 532a0e5 + 2d8e927 commit 8aea6bd
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 14 deletions.
8 changes: 4 additions & 4 deletions annotator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ service: annotator

# TODO(dev): adjust CPU and memory based on actual requirements.
resources:
cpu: 2
cpu: 10
# Instances support between [(cpu * 0.9) - 0.4, (cpu * 6.5) - 0.4]
# Actual memory available is exposed via GAE_MEMORY_MB environment variable.
memory_gb: 6
memory_gb: 60

# TODO - Do we need any disk? Adjust once we understand requirements.
disk_size_gb: 10

automatic_scaling:
# We expect negligible load, so this is unlikely to trigger.
min_num_instances: 2
max_num_instances: 20
min_num_instances: 20
max_num_instances: 40
cool_down_period_sec: 1800
cpu_utilization:
target_utilization: 0.60
Expand Down
3 changes: 3 additions & 0 deletions geolite2/geo-g2.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,15 @@ func determineFilenameOfLatestGeolite2File() (string, error) {
func LoadGeoLite2Dataset(filename string, bucketname string) (*GeoDataset, error) {
zip, err := loader.CreateZipReader(context.Background(), bucketname, filename)
if err != nil {
log.Println("cannot create zip reader")
return nil, err
}
log.Println("begin to load " + filename)
dataset, err := loadGeoLite2(zip)
if err != nil {
return nil, err
}
log.Println("loaded " + filename)
date, err := api.ExtractDateFromFilename(filename)
if err != nil {
log.Println("Error extracting date:", filename)
Expand Down
48 changes: 48 additions & 0 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,51 @@ func TestGetMetadataForSingleIP(t *testing.T) {
}
}
}

func TestE2ELoadMultipleDataset(t *testing.T) {
manager.InitDataset()

tests := []struct {
ip string
time string
res string
}{
//{"1.4.128.0", "1199145600", `{"Geo":{"continent_code":"AS","country_code":"TH","country_code3":"THA","country_name":"Thailand","region":"40","city":"Bangkok","latitude":13.754,"longitude":100.501},"ASN":{}}`},
//{"1.5.190.1", "1420070400", `{"Geo":{"continent_code":"AS","country_code":"JP","country_code3":"JPN","country_name":"Japan","region":"40","city":"Tokyo","latitude":35.685,"longitude":139.751},"ASN":{}}`},
{"1.9.128.0", "1512086400", `Cannot get meta data`},
{"1.22.128.0", "1512086400", `Cannot get meta data`},
}
for _, test := range tests {
w := httptest.NewRecorder()
r := &http.Request{}
r.URL, _ = url.Parse("/annotate?ip_addr=" + url.QueryEscape(test.ip) + "&since_epoch=" + url.QueryEscape(test.time))
handler.Annotate(w, r)
body := w.Body.String()
if string(body) != test.res {
t.Errorf("\nGot\n__%s__\nexpected\n__%s__\n", body, test.res)
}
}

time.Sleep(20 * time.Second)

tests2 := []struct {
ip string
time string
res string
}{
//{"1.4.128.0", "1199145600", `{"Geo":{"continent_code":"AS","country_code":"TH","country_code3":"THA","country_name":"Thailand","region":"40","city":"Bangkok","latitude":13.754,"longitude":100.501},"ASN":{}}`},
//{"1.5.190.1", "1420070400", `{"Geo":{"continent_code":"AS","country_code":"JP","country_code3":"JPN","country_name":"Japan","region":"40","city":"Tokyo","latitude":35.685,"longitude":139.751},"ASN":{}}`},
{"1.9.128.0", "1512086400", `{"Geo":{"continent_code":"AS","country_code":"MY","country_name":"Malaysia","region":"14","city":"Kuala Lumpur","postal_code":"50400","latitude":3.149,"longitude":101.697},"ASN":{}}`},
{"1.22.128.0", "1512086400", `{"Geo":{"continent_code":"AS","country_code":"IN","country_name":"India","region":"DL","city":"Delhi","postal_code":"110062","latitude":28.6667,"longitude":77.2167},"ASN":{}}`},
}
for _, test := range tests2 {
w := httptest.NewRecorder()
r := &http.Request{}
r.URL, _ = url.Parse("/annotate?ip_addr=" + url.QueryEscape(test.ip) + "&since_epoch=" + url.QueryEscape(test.time))
handler.Annotate(w, r)
body := w.Body.String()
if string(body) != test.res {
t.Errorf("\nGot\n__%s__\nexpected\n__%s__\n", body, test.res)
}
}
}
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func CreateZipReader(ctx context.Context, bucket string, bucketObj string) (*zip
// Takes context returns *Reader
reader, err := obj.NewReader(ctx)
defer reader.Close()

if err != nil {
log.Println(err)
return nil, errors.New("Failed creating new reader")
Expand Down
45 changes: 35 additions & 10 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func NewAnnotatorMap(loader func(string) (api.Annotator, error)) *AnnotatorMap {
func (am *AnnotatorMap) setAnnotatorIfNil(key string, ann api.Annotator) error {
am.mutex.Lock()
defer am.mutex.Unlock()

old, ok := am.annotators[key]
if !ok {
return ErrGoroutineNotOwner
Expand Down Expand Up @@ -101,21 +100,20 @@ func (am *AnnotatorMap) maybeSetNil(key string) bool {
// This synchronously attempts to set map entry to nil, and
// if successful, proceeds to asynchronously load the new dataset.
func (am *AnnotatorMap) checkAndLoadAnnotator(key string) {
// hacking code here before we implement the legacy dataset loading.
if !geoloader.GeoLite2Regex.MatchString(key) {
log.Println("cannot load legacy " + key)
return
}

reserved := am.maybeSetNil(key)
if reserved {
// This goroutine now has exclusive ownership of the
// map entry, and the responsibility for loading the annotator.
go func(key string) {
// Check the number of datasets in memory. Given the memory
// limit, some dataset may be removed from memory if needed.
if len(am.annotators) >= MaxDatasetInMemory {
for fileKey, _ := range am.annotators {
log.Println("remove Geolite2 dataset " + fileKey)
delete(am.annotators, fileKey)
break
}
}
log.Println(key)
if geoloader.GeoLite2Regex.MatchString(key) {
log.Println("plan to load " + key)
newAnn, err := am.loader(key)
if err != nil {
// TODO add a metric
Expand Down Expand Up @@ -144,10 +142,37 @@ func (am *AnnotatorMap) GetAnnotator(key string) (api.Annotator, error) {
am.mutex.RUnlock()

if !ok {
// Check the number of datasets in memory. Given the memory
// limit, some dataset may be removed from memory if needed.
MaxPendingDataset := 2
numInMemory := 0
numPending := 0
for fileKey, _ := range am.annotators {
if am.annotators[fileKey] == nil {
numPending++
} else {
numInMemory++
}
}
if numPending >= MaxPendingDataset {
return nil, errors.New("already too many dataset pending")
}
if numInMemory >= MaxDatasetInMemory {
for fileKey, _ := range am.annotators {
if am.annotators[fileKey] != nil {
log.Println("remove Geolite2 dataset " + fileKey)
am.mutex.Lock()
delete(am.annotators, fileKey)
am.mutex.Unlock()
break
}
}
}
// There is not yet any entry for this date. Try to load it.
am.checkAndLoadAnnotator(key)
return nil, ErrPendingAnnotatorLoad
}

if ann == nil {
// Another goroutine is already loading this entry. Return error.
return nil, ErrPendingAnnotatorLoad
Expand Down

0 comments on commit 8aea6bd

Please sign in to comment.