Skip to content

Commit

Permalink
Send as a separate request if the request body exceeds 19MB
Browse files Browse the repository at this point in the history
  • Loading branch information
akhilamohanan committed May 10, 2022
1 parent 8fccd21 commit 96352a1
Showing 1 changed file with 57 additions and 59 deletions.
116 changes: 57 additions & 59 deletions pkg/apicapi/apic_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ package apicapi
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sort"
"strings"

"github.com/sirupsen/logrus"
)

const (
REQUEST_BODY_LIMIT = 19 * 1024 * 1024 // 19 MB
)

func (conn *ApicConnection) apicBodyAttrCmp(class string,
bodyc *ApicObjectBody, bodyd *ApicObjectBody) bool {
meta, ok := metadata[class]
Expand Down Expand Up @@ -306,17 +311,17 @@ func (conn *ApicConnection) diffApicState(currentState ApicSlice,
return
}

func (conn *ApicConnection) applyMulDiff(updates string, deletes string,
func (conn *ApicConnection) applyMulDiff(updates []string, deletes []string,
context string) {
if deletes != "" {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": deletes, "context": context}).
for _, del := range deletes {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": del, "context": context}).
Debug("Applying APIC multiple object delete")
conn.queueDn(deletes)
conn.queueDn(del)
}
if updates != "" {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": updates, "context": context}).
for _, up := range updates {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": up, "context": context}).
Debug("Applying APIC multiple object update")
conn.queueDn(updates)
conn.queueDn(up)
}
}

Expand Down Expand Up @@ -477,10 +482,14 @@ func (conn *ApicConnection) removeFromDnIndex(dn string) {
}

func (conn *ApicConnection) doWriteMulApicObjects(keyObjects map[string]ApicSlice, fvTenant string, container bool) {
var multiDeletes ApicSlice
var multiUpdates ApicSlice
multiDeletes := make(map[int]ApicSlice)
multiUpdates := make(map[int]ApicSlice)
var muldel []byte
var mulup []byte
var muldeldn string
var mulupdn string
muldelIndex := 1
mulupIndex := 1
for key, objects := range keyObjects {
tag := getTagFromKey(conn.prefix, key)
prepareApicSliceTag(objects, tag)
Expand All @@ -489,44 +498,34 @@ func (conn *ApicConnection) doWriteMulApicObjects(keyObjects map[string]ApicSlic
updates, deletes, deldns := conn.diffMulApicState(conn.desiredState[key], objects)

for _, deleteobj := range deletes {
multiDeletes = append(multiDeletes, deleteobj)
raw, err := json.Marshal(deleteobj)
if err != nil {
conn.log.Error("Failed to Unmarshal deleteObject ", ": ", err)
} else {
muldel = append(muldel, raw...)
if len(muldel) > REQUEST_BODY_LIMIT {
muldel = []byte{}
muldel = append(muldel, raw...)
muldelIndex++
}
multiDeletes[muldelIndex] = append(multiDeletes[muldelIndex], deleteobj)
}
}

for _, updateobj := range updates {
multiUpdates = append(multiUpdates, updateobj)
}
// temp cache to store all the "uni/tn-common/svcCont/svcRedirectPol-kube_svc_default_test-master"
// found in deletes
/* var temp_deletes []string
for _, delete := range deletes {
if strings.Contains(delete, "svcRedirectPol") {
temp_deletes = append(temp_deletes, delete)
}
}
newDelete := false
for _, temp_del := range temp_deletes {
vns_svc_redirect_pol_obj, ok := conn.desiredStateDn[temp_del]
if !ok {
conn.log.Error("no svc_obj found in desiredStateDn cache")
return
}
// Explicitly remove vnsRedirectDest from svcRedirectPol's list of children
for _, body := range vns_svc_redirect_pol_obj {
for _, child := range body.Children {
for class := range child {
if class == "vnsRedirectDest" {
deletes = append(deletes, child.GetDn())
newDelete = true
}
}
}
}
}
if newDelete && len(deletes) != 0 {
conn.log.Debug("Updated apic object deletes list is :", deletes)
raw, err := json.Marshal(updateobj)
if err != nil {
conn.log.Error("Failed to Unmarshal updateObject ", ": ", err)
} else {
mulup = append(mulup, raw...)
if len(mulup) > REQUEST_BODY_LIMIT {
mulup = []byte{}
mulup = append(mulup, raw...)
mulupIndex++
}
*/
multiUpdates[mulupIndex] = append(multiUpdates[mulupIndex], updateobj)
}
}
conn.updateDnIndex(objects)
for _, deldn := range deldns {
muldeldn = muldeldn + deldn
Expand All @@ -551,38 +550,37 @@ func (conn *ApicConnection) doWriteMulApicObjects(keyObjects map[string]ApicSlic
}
conn.indexMutex.Unlock()

/* if conn.syncEnabled {
conn.indexMutex.Unlock()
conn.applyDiff(updates, deletes, "write")
} else {
conn.indexMutex.Unlock()
}
*/
}
if conn.syncEnabled {
if len(multiDeletes) > 0 {
var mulupdns []string
var muldeldns []string
for i, del := range multiDeletes {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
tenantObj["fvTenant"].Children = multiDeletes
index := muldeldn + string(i)
muldeldns = append(muldeldns, index)
tenantObj["fvTenant"].Children = del
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[muldeldn] = mulobj
conn.log.Debug("testttt mulllll 1111111 deletes: ", tenantObj, tenantObj.GetDn())
conn.multipleDn[index] = mulobj
conn.log.Debug("testttt mulllll 1111111 deletes: ", tenantObj, " ", index)
conn.indexMutex.Unlock()
}
if len(multiUpdates) > 0 {
for i, up := range multiUpdates {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
tenantObj["fvTenant"].Children = multiUpdates
index := mulupdn + string(i)
mulupdns = append(mulupdns, index)
tenantObj["fvTenant"].Children = up
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[mulupdn] = mulobj
conn.log.Debug("testttt mulllll 1111111 updates: ", tenantObj, tenantObj.GetDn())
conn.multipleDn[index] = mulobj
conn.log.Debug("testttt mulllll 1111111 updates: ", tenantObj, " ", index)
conn.indexMutex.Unlock()
}
conn.applyMulDiff(mulupdn, muldeldn, "write")
conn.applyMulDiff(mulupdns, muldeldns, "write")
}
}

Expand Down

0 comments on commit 96352a1

Please sign in to comment.