Skip to content

Commit

Permalink
Properly handle overflowing memcache RPC payloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
xStrom committed May 20, 2019
1 parent 9ba5925 commit 5321736
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 65 deletions.
4 changes: 2 additions & 2 deletions cache.go
Expand Up @@ -94,15 +94,15 @@ func (c *cache) setUnderLock(item *cacheItem) {
}
}

// Set takes ownership of item
// Set takes ownership of item and treats it as immutable
func (c *cache) Set(item *cacheItem) {
c.lock.Lock()
c.setUnderLock(item)
c.meetLimitUnderLock()
c.lock.Unlock()
}

// SetMulti takes ownership of item
// SetMulti takes ownership of the individual items and treats them as immutable
func (c *cache) SetMulti(items []*cacheItem) {
c.lock.Lock()
for _, item := range items {
Expand Down
177 changes: 123 additions & 54 deletions goon.go
Expand Up @@ -58,6 +58,12 @@ var (
IgnoreFieldMismatch = true
)

var (
// Determines if memcache.PutMulti errors are returned by goon.
// Currently only meant for use during goon development testing.
propagateMemcachePutError = false
)

// Goon holds the app engine context and the request memory cache.
type Goon struct {
Context context.Context
Expand Down Expand Up @@ -127,6 +133,16 @@ const (
memcacheMaxRPCSize = 32 << 20 // 32 MiB
)

// Datastore limits
const (
datastoreGetMultiMaxItems = 1000
datastorePutMultiMaxItems = 500
datastoreDeleteMultiMaxItems = 500

// The maximum GetMulti result RPC size was determined experimentally on 2019-05-20
datastoreGetMultiMaxRPCSize = 50 << 20 // 50 MiB
)

// NewGoon creates a new Goon object from the given request.
func NewGoon(r *http.Request) *Goon {
return FromContext(appengine.NewContext(r))
Expand Down Expand Up @@ -264,8 +280,6 @@ func (g *Goon) Put(src interface{}) (*datastore.Key, error) {
return ks[0], nil
}

const putMultiLimit = 500

// PutMulti is a batch version of Put.
//
// src must be a *[]S, *[]*S, *[]I, []S, []*S, or []I, for some struct type S,
Expand All @@ -279,14 +293,14 @@ func (g *Goon) PutMulti(src interface{}) ([]*datastore.Key, error) {
v := reflect.Indirect(reflect.ValueOf(src))
mu := new(sync.Mutex)
multiErr, any := make(appengine.MultiError, len(keys)), false
goroutines := (len(keys)-1)/putMultiLimit + 1
goroutines := (len(keys)-1)/datastorePutMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * putMultiLimit
hi := (i + 1) * putMultiLimit
lo := i * datastorePutMultiMaxItems
hi := (i + 1) * datastorePutMultiMaxItems
if hi > len(keys) {
hi = len(keys)
}
Expand Down Expand Up @@ -352,29 +366,62 @@ func (g *Goon) FlushLocalCache() {
g.cache.Flush()
}

type memcacheTask struct {
items []*memcache.Item
size int
}

// NB! putMemcache is expected to treat cacheItem as immutable!
func (g *Goon) putMemcache(citems []*cacheItem) error {
// Go over all the cache items and generate memcache tasks from them,
// by splitting them up based on payload size
items := make([]*memcache.Item, len(citems))
tasks := make([]memcacheTask, 0, 1) // In most cases there's just a single task
lastSplit := 0
payloadSize := 0
for i, citem := range citems {
// payloadSize will overflow if we push 2+ gigs on a 32bit machine
payloadSize += len(citem.value)
items[i] = &memcache.Item{
Key: citem.key,
Value: citem.value,
}
itemSize := memcacheOverhead + len(citem.key) + len(citem.value)
if payloadSize+itemSize > memcacheMaxRPCSize {
tasks = append(tasks, memcacheTask{items: items[lastSplit:i], size: payloadSize})
lastSplit = i
payloadSize = 0
}
payloadSize += itemSize
}
tasks = append(tasks, memcacheTask{items: items[lastSplit:len(citems)], size: payloadSize})
// Execute all the tasks with goroutines
count := len(tasks)
errc := make(chan error, count)
for i := 0; i < count; i++ {
go func(idx int) {
memcacheTimeout := MemcachePutTimeoutSmall
if tasks[idx].size >= MemcachePutTimeoutThreshold {
memcacheTimeout = MemcachePutTimeoutLarge
}
tc, cf := context.WithTimeout(g.Context, memcacheTimeout)
errc <- memcache.SetMulti(tc, tasks[idx].items)
cf()
}(i)
}
memcacheTimeout := MemcachePutTimeoutSmall
if payloadSize >= MemcachePutTimeoutThreshold {
memcacheTimeout = MemcachePutTimeoutLarge
}
errc := make(chan error)
go func() {
tc, cf := context.WithTimeout(g.Context, memcacheTimeout)
errc <- memcache.SetMulti(tc, items)
cf()
}()
err := <-errc
return err
// Wait for all goroutines to finish and log any errors.
// Also return a non-deterministic error if there are any.
var rerr error
for i := 0; i < count; i++ {
err := <-errc
if err != nil {
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
} else {
g.error(err)
}
rerr = err
}
}
return rerr
}

// Get loads the entity based on dst's key into dst
Expand All @@ -401,8 +448,6 @@ func (g *Goon) Get(dst interface{}) error {
return nil
}

const getMultiLimit = 1000

// GetMulti is a batch version of Get.
//
// dst must be a *[]S, *[]*S, *[]I, []S, []*S, or []I, for some struct type S,
Expand All @@ -416,6 +461,7 @@ func (g *Goon) GetMulti(dst interface{}) error {
v := reflect.Indirect(reflect.ValueOf(dst))

multiErr, anyErr := make(appengine.MultiError, len(keys)), false
var extraErr error

if g.inTransaction {
// todo: support getMultiLimit in transactions
Expand Down Expand Up @@ -492,23 +538,42 @@ func (g *Goon) GetMulti(dst interface{}) error {
return nil
}

// NB! memcache.GetMulti is limited to 32 MiB in the data returned.
// So if we fetch 40 keys with all having 1 MiB size, the result
// will be 32 items and the remaining 8 keys will be indistinguishable
// from a cache miss. Thus this memcache.GetMulti system should be
// made more robust, where if the returned data is bigger than
// memcacheMaxRPCSize - memcacheMaxItemSize
// then we do another GetMulti on the missing keys.

// memcache.GetMulti is limited to memcacheMaxRPCSize for the data returned.
// Thus if the returned data is bigger than memcacheMaxRPCSize - memcacheMaxItemSize
// then we do another memcache.GetMulti on the missing keys.
memvalues := make(map[string]*memcache.Item, len(mckeys))
tc, cf := context.WithTimeout(g.Context, MemcacheGetTimeout)
memvalues, err := memcache.GetMulti(tc, mckeys)
mcKeysSet := make(map[string]struct{}, len(mckeys))
for _, mk := range mckeys {
mcKeysSet[mk] = struct{}{}
}
for {
nextmckeys := make([]string, 0, len(mcKeysSet))
for mk := range mcKeysSet {
nextmckeys = append(nextmckeys, mk)
}
mvs, err := memcache.GetMulti(tc, nextmckeys)
// timing out or another error from memcache isn't something to fail over, but do log it
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
break
} else if err != nil {
g.error(err)
break
}
payloadSize := 0
for k, v := range mvs {
memvalues[k] = v
payloadSize += memcacheOverhead + len(v.Key) + len(v.Value)
delete(mcKeysSet, k)
}
if len(mcKeysSet) == 0 || payloadSize < memcacheMaxRPCSize-memcacheMaxItemSize {
break
}
}
cf()
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
} else if err != nil {
g.error(err) // timing out or another error from memcache isn't something to fail over, but do log it
// No memvalues found, prepare the datastore fetch list already prepared above
} else if len(memvalues) > 0 {

if len(memvalues) > 0 {
// since memcache fetch was successful, reset the datastore fetch list and repopulate it
dskeys = dskeys[:0]
dsdst = dsdst[:0]
Expand Down Expand Up @@ -550,14 +615,14 @@ func (g *Goon) GetMulti(dst interface{}) error {
}

mu := new(sync.Mutex)
goroutines := (len(dskeys)-1)/getMultiLimit + 1
goroutines := (len(dskeys)-1)/datastoreGetMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * getMultiLimit
hi := (i + 1) * getMultiLimit
lo := i * datastoreGetMultiMaxItems
hi := (i + 1) * datastoreGetMultiMaxItems
if hi > len(dskeys) {
hi = len(dskeys)
}
Expand Down Expand Up @@ -610,24 +675,30 @@ func (g *Goon) GetMulti(dst interface{}) error {
}
}
if len(toCache) > 0 {
// Populate memcache
if err := g.putMemcache(toCache); err != nil {
// since putMemcache() gives no guarantee it will actually store the data in memcache
// we log and swallow this error
if appengine.IsTimeoutError(err) {
g.timeoutError(err)
} else {
g.error(err)
}
}
// Populate memcache in a goroutine because there's network involved
// and we can be doing useful work while waiting for I/O
errc := make(chan error)
go func() {
errc <- g.putMemcache(toCache)
}()
// Populate local cache
g.cache.SetMulti(toCache)
// Wait for memcache population to finish
err := <-errc
// .. but only propagate the memcache error if configured to do so
if propagateMemcachePutError && err != nil {
mu.Lock()
extraErr = err
mu.Unlock()
}
}
}(i)
}
wg.Wait()
if anyErr {
return realError(multiErr)
} else if extraErr != nil {
return extraErr
}
return nil
}
Expand All @@ -642,8 +713,6 @@ func (g *Goon) Delete(key *datastore.Key) error {
return err
}

const deleteMultiLimit = 500

// DeleteMulti is a batch version of Delete.
func (g *Goon) DeleteMulti(keys []*datastore.Key) error {
if len(keys) == 0 {
Expand All @@ -653,14 +722,14 @@ func (g *Goon) DeleteMulti(keys []*datastore.Key) error {

mu := new(sync.Mutex)
multiErr, any := make(appengine.MultiError, len(keys)), false
goroutines := (len(keys)-1)/deleteMultiLimit + 1
goroutines := (len(keys)-1)/datastoreDeleteMultiMaxItems + 1
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
lo := i * deleteMultiLimit
hi := (i + 1) * deleteMultiLimit
lo := i * datastoreDeleteMultiMaxItems
hi := (i + 1) * datastoreDeleteMultiMaxItems
if hi > len(keys) {
hi = len(keys)
}
Expand Down

0 comments on commit 5321736

Please sign in to comment.