Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single request for hpp #1007

Closed
wants to merge 11 commits into from
3 changes: 1 addition & 2 deletions pkg/apicapi/apic_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ func injectedSvcPortNormalizer(b *ApicObjectBody) {
var metadata = map[string]*apicMeta{
"fvTenant": {
attributes: map[string]interface{}{
"name": "",
"nameAlias": "",
"name": "",
shastrinator marked this conversation as resolved.
Show resolved Hide resolved
},
children: []string{
"fvBD",
Expand Down
245 changes: 245 additions & 0 deletions pkg/apicapi/apic_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ package apicapi
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"sort"
"strings"

"github.com/sirupsen/logrus"
)

const (
REQUEST_BODY_LIMIT = 19 * 1024 * 1024 // 19 MB
)

func (conn *ApicConnection) apicBodyAttrCmp(class string,
bodyc *ApicObjectBody, bodyd *ApicObjectBody) bool {
meta, ok := metadata[class]
Expand Down Expand Up @@ -137,6 +142,113 @@ func (conn *ApicConnection) apicObjCmp(current ApicObject,
return
}

func (conn *ApicConnection) setDeleteStatus(deleteBody *ApicObject, deldn string) {
if deleteBody == nil {
return
}
for _, val := range *deleteBody {
dn, okdn := val.Attributes["dn"]
if okdn && dn == deldn {
attrs := make(map[string]interface{})
attrs["dn"] = val.Attributes["dn"]
attrs["status"] = "deleted"
val.Attributes = attrs
conn.log.Debug("deleteBody: ", *deleteBody)
shastrinator marked this conversation as resolved.
Show resolved Hide resolved
return
} else {
status, okstatus := val.Attributes["status"]
if okstatus && status != "deleted" || !okstatus {
attrs := make(map[string]interface{})
attrs["dn"] = val.Attributes["dn"]
attrs["status"] = ""
val.Attributes = attrs
}
}
if len(val.Children) > 0 {
for _, child := range val.Children {
conn.setDeleteStatus(&child, deldn)
}
}
}
return

}

func (conn *ApicConnection) diffMulApicState(currentState ApicSlice,
desiredState ApicSlice) (updates ApicSlice, deletes ApicSlice, deletedns []string) {
i := 0
j := 0

update := false
delete := false

for i < len(currentState) && j < len(desiredState) {
deleteBody := currentState[i]
cmp := cmpApicObject(currentState[i], desiredState[j])
if cmp < 0 {
deldn := deleteBody.GetDn()
conn.setDeleteStatus(&deleteBody, deldn)
deletes = append(deletes, deleteBody)
deletedns = append(deletedns, deldn)
i++
delete = true
} else if cmp > 0 {
updates = append(updates, desiredState[j])
j++
update = true
} else {
if conn.containerDns[currentState[i].GetDn()] {
if !conn.apicCntCmp(currentState[i], desiredState[j]) {
updates = append(updates, desiredState[j])
update = true
}
} else {
cu, cd := conn.apicObjCmp(currentState[i], desiredState[j])
if cu {
updates = append(updates, desiredState[j])
update = true
}
if len(cd) > 0 {
deletedns = append(deletedns, cd...)
conn.log.Debug("dns to delete: ", cd)
for _, deldn := range cd {
conn.setDeleteStatus(&deleteBody, deldn)
}
deletes = append(deletes, deleteBody)
delete = true
}
}

i++
j++
}
}
// extra old objects
for i < len(currentState) {
deleteBody := currentState[i]
deldn := deleteBody.GetDn()
deletes = append(deletes, deleteBody)
deletedns = append(deletedns, deldn)
i++
delete = true
}
// extra new objects
for j < len(desiredState) {
updates = append(updates, desiredState[j])
j++
update = true
}

if update && len(updates) != 0 {
conn.log.Debug("Mul Apic object updates are :", updates)
}
if delete && len(deletes) != 0 {
conn.log.Debug("Mul Apic object deletes are :", deletes)
}

return
}

func (conn *ApicConnection) diffApicState(currentState ApicSlice,
desiredState ApicSlice) (updates ApicSlice, deletes []string) {

Expand Down Expand Up @@ -199,6 +311,20 @@ func (conn *ApicConnection) diffApicState(currentState ApicSlice,
return
}

func (conn *ApicConnection) applyMulDiff(updates []string, deletes []string,
context string) {
for _, del := range deletes {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": del, "context": context}).
Debug("Applying APIC multiple object delete")
conn.queueDn(del)
}
for _, up := range updates {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": up, "context": context}).
Debug("Applying APIC multiple object update")
conn.queueDn(up)
}
}

func (conn *ApicConnection) applyDiff(updates ApicSlice, deletes []string,
context string) {
sort.Sort(updates)
Expand Down Expand Up @@ -333,6 +459,15 @@ func (conn *ApicConnection) updateDnIndex(objects ApicSlice) {
}
}

func (conn *ApicConnection) removeFromMultipleDn(dn string) {
conn.indexMutex.Lock()
if _, ok := conn.multipleDn[dn]; ok {
delete(conn.multipleDn, dn)

}
conn.indexMutex.Unlock()
}

func (conn *ApicConnection) removeFromDnIndex(dn string) {
if obj, ok := conn.desiredStateDn[dn]; ok {
delete(conn.desiredStateDn, dn)
Expand All @@ -346,6 +481,109 @@ func (conn *ApicConnection) removeFromDnIndex(dn string) {
}
}

func (conn *ApicConnection) doWriteMulApicObjects(keyObjects map[string]ApicSlice, fvTenant string, container bool) {
multiDeletes := make(map[int]ApicSlice)
multiUpdates := make(map[int]ApicSlice)
var muldel []byte
var mulup []byte
var muldeldn string
var mulupdn string
muldelIndex := 1
mulupIndex := 1
for key, objects := range keyObjects {
tag := getTagFromKey(conn.prefix, key)
prepareApicSliceTag(objects, tag)

conn.indexMutex.Lock()
updates, deletes, deldns := conn.diffMulApicState(conn.desiredState[key], objects)

for _, deleteobj := range deletes {
raw, err := json.Marshal(deleteobj)
if err != nil {
conn.log.Error("Failed to Unmarshal deleteObject ", ": ", err)
} else {
muldel = append(muldel, raw...)
if len(muldel) > REQUEST_BODY_LIMIT {
muldel = []byte{}
muldel = append(muldel, raw...)
muldelIndex++
}
multiDeletes[muldelIndex] = append(multiDeletes[muldelIndex], deleteobj)
}
}

for _, updateobj := range updates {
raw, err := json.Marshal(updateobj)
if err != nil {
conn.log.Error("Failed to Unmarshal updateObject ", ": ", err)
} else {
mulup = append(mulup, raw...)
if len(mulup) > REQUEST_BODY_LIMIT {
mulup = []byte{}
mulup = append(mulup, raw...)
mulupIndex++
}
multiUpdates[mulupIndex] = append(multiUpdates[mulupIndex], updateobj)
}
}
conn.updateDnIndex(objects)
for _, deldn := range deldns {
muldeldn = muldeldn + deldn
conn.removeFromDnIndex(deldn)
if container {
delete(conn.containerDns, deldn)
}
}
for _, update := range updates {
dn := update.GetDn()
mulupdn = mulupdn + dn
if container {
conn.containerDns[dn] = true
}
}
if objects == nil {
delete(conn.desiredState, key)
delete(conn.keyHashes, tag)
} else {
conn.desiredState[key] = objects
conn.keyHashes[tag] = key
}
conn.indexMutex.Unlock()

}
if conn.syncEnabled {
var mulupdns []string
var muldeldns []string
for i, del := range multiDeletes {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
index := muldeldn + fmt.Sprint(i)
muldeldns = append(muldeldns, index)
tenantObj["fvTenant"].Children = del
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[index] = mulobj
conn.log.Debug("testttt mulllll 1111111 deletes: ", tenantObj, " ", index)
conn.indexMutex.Unlock()
}
for i, up := range multiUpdates {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
index := mulupdn + fmt.Sprint(i)
mulupdns = append(mulupdns, index)
tenantObj["fvTenant"].Children = up
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[index] = mulobj
conn.log.Debug("testttt mulllll 1111111 updates: ", tenantObj, " ", index)
conn.indexMutex.Unlock()
}
conn.applyMulDiff(mulupdns, muldeldns, "write")
}
}

func (conn *ApicConnection) doWriteApicObjects(key string, objects ApicSlice,
container bool) {

Expand All @@ -355,6 +593,9 @@ func (conn *ApicConnection) doWriteApicObjects(key string, objects ApicSlice,
conn.indexMutex.Lock()
updates, deletes := conn.diffApicState(conn.desiredState[key], objects)

conn.log.Debug("testttt 1111111 deletes: ", deletes)
conn.log.Debug("testttt 1111111 updates: ", updates)

// temp cache to store all the "uni/tn-common/svcCont/svcRedirectPol-kube_svc_default_test-master"
// found in deletes
var temp_deletes []string
Expand Down Expand Up @@ -433,6 +674,10 @@ func (conn *ApicConnection) WriteApicObjects(key string, objects ApicSlice) {
conn.doWriteApicObjects(key, objects, false)
}

func (conn *ApicConnection) WriteMulApicObjects(keyObjects map[string]ApicSlice, fvTenant string) {
conn.doWriteMulApicObjects(keyObjects, fvTenant, false)
}

func (conn *ApicConnection) reconcileApicObject(aci ApicObject) {
conn.indexMutex.Lock()
if !conn.syncEnabled {
Expand Down
13 changes: 13 additions & 0 deletions pkg/apicapi/apic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type ApicConnection struct {

desiredState map[string]ApicSlice
desiredStateDn map[string]ApicObject
multipleDn map[string]map[string]ApicObject
keyHashes map[string]string
containerDns map[string]bool
cachedState map[string]ApicSlice
Expand All @@ -139,12 +140,24 @@ func (s ApicSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (conn *ApicConnection) GetMultipleDn() map[string]map[string]ApicObject {
conn.indexMutex.Lock()
defer conn.indexMutex.Unlock()
return conn.multipleDn
}

func (conn *ApicConnection) GetDesiredState(key string) ApicSlice {
conn.indexMutex.Lock()
defer conn.indexMutex.Unlock()
return conn.desiredState[key]
}

func (conn *ApicConnection) SetSyncEnabled() {
conn.indexMutex.Lock()
defer conn.indexMutex.Unlock()
conn.syncEnabled = true
}

func truncatedName(name string) string {
nameAlias := name
if len(name) > (ApicNameAliasLength - 1) {
Expand Down
15 changes: 13 additions & 2 deletions pkg/apicapi/apicapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func New(log *logrus.Logger, apic []string, user string,
},
desiredState: make(map[string]ApicSlice),
desiredStateDn: make(map[string]ApicObject),
multipleDn: make(map[string]map[string]ApicObject),
keyHashes: make(map[string]string),
containerDns: make(map[string]bool),
cachedState: make(map[string]ApicSlice),
Expand Down Expand Up @@ -334,12 +335,12 @@ func (conn *ApicConnection) handleQueuedDn(dn string) bool {
}
conn.indexMutex.Unlock()
}

var requeue bool
conn.indexMutex.Lock()
pending, hasPendingChange := conn.pendingSubDnUpdate[dn]
conn.pendingSubDnUpdate[dn] = pendingChange{isDirty: true}
obj, hasDesiredState := conn.desiredStateDn[dn]
mulobj, hasMulDn := conn.multipleDn[dn]
conn.indexMutex.Unlock()

if hasPendingChange {
Expand All @@ -366,6 +367,14 @@ func (conn *ApicConnection) handleQueuedDn(dn string) bool {
} else {
requeue = conn.postDn(dn, obj)
}
} else if hasMulDn {
for key, val := range mulobj {
requeue = conn.postDn(key, val)
if !requeue {
conn.removeFromMultipleDn(key)
}
break
}
} else {
if hasPendingChange {
if pending.kind == pendingChangeDelete {
Expand Down Expand Up @@ -1028,7 +1037,8 @@ func (conn *ApicConnection) postDn(dn string, obj ApicObject) bool {
if err != nil {
conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
}
//conn.log.Debug(string(raw))
conn.log.Debug("testtttt : ", url)
conn.log.Debug(string(raw))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
if err != nil {
conn.log.Error("Could not create request: ", err)
Expand Down Expand Up @@ -1068,6 +1078,7 @@ func (conn *ApicConnection) DeleteDn(dn string) bool {
conn.restart()
return false
}
conn.log.Debug("Delete testtttt ", url)
conn.sign(req, uri, nil)
resp, err := conn.client.Do(req)
if err != nil {
Expand Down