Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single request for hpp #1007

Closed
wants to merge 11 commits into from
3 changes: 1 addition & 2 deletions pkg/apicapi/apic_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ func injectedSvcPortNormalizer(b *ApicObjectBody) {
var metadata = map[string]*apicMeta{
"fvTenant": {
attributes: map[string]interface{}{
"name": "",
"nameAlias": "",
"name": "",
shastrinator marked this conversation as resolved.
Show resolved Hide resolved
},
children: []string{
"fvBD",
Expand Down
247 changes: 247 additions & 0 deletions pkg/apicapi/apic_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,113 @@ func (conn *ApicConnection) apicObjCmp(current ApicObject,
return
}

func (conn *ApicConnection) setDeleteStatus(deleteBody *ApicObject, deldn string) {
if deleteBody == nil {
return
}
for _, val := range *deleteBody {
dn, okdn := val.Attributes["dn"]
if okdn && dn == deldn {
attrs := make(map[string]interface{})
attrs["dn"] = val.Attributes["dn"]
attrs["status"] = "deleted"
val.Attributes = attrs
conn.log.Debug("deleteBody: ", *deleteBody)
shastrinator marked this conversation as resolved.
Show resolved Hide resolved
return
} else {
status, okstatus := val.Attributes["status"]
if okstatus && status != "deleted" || !okstatus {
attrs := make(map[string]interface{})
attrs["dn"] = val.Attributes["dn"]
attrs["status"] = ""
val.Attributes = attrs
}
}
if len(val.Children) > 0 {
for _, child := range val.Children {
conn.setDeleteStatus(&child, deldn)
}
}
}
return

}

func (conn *ApicConnection) diffMulApicState(currentState ApicSlice,
desiredState ApicSlice) (updates ApicSlice, deletes ApicSlice, deletedns []string) {
i := 0
j := 0

update := false
delete := false

for i < len(currentState) && j < len(desiredState) {
deleteBody := currentState[i]
cmp := cmpApicObject(currentState[i], desiredState[j])
if cmp < 0 {
deldn := deleteBody.GetDn()
conn.setDeleteStatus(&deleteBody, deldn)
deletes = append(deletes, deleteBody)
deletedns = append(deletedns, deldn)
i++
delete = true
} else if cmp > 0 {
updates = append(updates, desiredState[j])
j++
update = true
} else {
if conn.containerDns[currentState[i].GetDn()] {
if !conn.apicCntCmp(currentState[i], desiredState[j]) {
updates = append(updates, desiredState[j])
update = true
}
} else {
cu, cd := conn.apicObjCmp(currentState[i], desiredState[j])
if cu {
updates = append(updates, desiredState[j])
update = true
}
if len(cd) > 0 {
deletedns = append(deletedns, cd...)
conn.log.Debug("dns to delete: ", cd)
for _, deldn := range cd {
conn.setDeleteStatus(&deleteBody, deldn)
}
deletes = append(deletes, deleteBody)
delete = true
}
}

i++
j++
}
}
// extra old objects
for i < len(currentState) {
deleteBody := currentState[i]
deldn := deleteBody.GetDn()
deletes = append(deletes, deleteBody)
deletedns = append(deletedns, deldn)
i++
delete = true
}
// extra new objects
for j < len(desiredState) {
updates = append(updates, desiredState[j])
j++
update = true
}

if update && len(updates) != 0 {
conn.log.Debug("Mul Apic object updates are :", updates)
}
if delete && len(deletes) != 0 {
conn.log.Debug("Mul Apic object deletes are :", deletes)
}

return
}

func (conn *ApicConnection) diffApicState(currentState ApicSlice,
desiredState ApicSlice) (updates ApicSlice, deletes []string) {

Expand Down Expand Up @@ -199,6 +306,20 @@ func (conn *ApicConnection) diffApicState(currentState ApicSlice,
return
}

func (conn *ApicConnection) applyMulDiff(updates string, deletes string,
context string) {
if deletes != "" {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": deletes, "context": context}).
Debug("Applying APIC multiple object delete")
conn.queueDn(deletes)
}
if updates != "" {
conn.log.WithFields(logrus.Fields{"mod": "APICAPI", "DN": updates, "context": context}).
Debug("Applying APIC multiple object update")
conn.queueDn(updates)
}
}

func (conn *ApicConnection) applyDiff(updates ApicSlice, deletes []string,
context string) {
sort.Sort(updates)
Expand Down Expand Up @@ -333,6 +454,15 @@ func (conn *ApicConnection) updateDnIndex(objects ApicSlice) {
}
}

func (conn *ApicConnection) removeFromMultipleDn(dn string) {
conn.indexMutex.Lock()
if _, ok := conn.multipleDn[dn]; ok {
delete(conn.multipleDn, dn)

}
conn.indexMutex.Unlock()
}

func (conn *ApicConnection) removeFromDnIndex(dn string) {
if obj, ok := conn.desiredStateDn[dn]; ok {
delete(conn.desiredStateDn, dn)
Expand All @@ -346,6 +476,116 @@ func (conn *ApicConnection) removeFromDnIndex(dn string) {
}
}

func (conn *ApicConnection) doWriteMulApicObjects(keyObjects map[string]ApicSlice, fvTenant string, container bool) {
var multiDeletes ApicSlice
var multiUpdates ApicSlice
var muldeldn string
var mulupdn string
for key, objects := range keyObjects {
tag := getTagFromKey(conn.prefix, key)
prepareApicSliceTag(objects, tag)

conn.indexMutex.Lock()
updates, deletes, deldns := conn.diffMulApicState(conn.desiredState[key], objects)

for _, deleteobj := range deletes {
multiDeletes = append(multiDeletes, deleteobj)
}

for _, updateobj := range updates {
multiUpdates = append(multiUpdates, updateobj)
}
// temp cache to store all the "uni/tn-common/svcCont/svcRedirectPol-kube_svc_default_test-master"
// found in deletes
/* var temp_deletes []string
for _, delete := range deletes {
if strings.Contains(delete, "svcRedirectPol") {
temp_deletes = append(temp_deletes, delete)
}
}
newDelete := false
for _, temp_del := range temp_deletes {
vns_svc_redirect_pol_obj, ok := conn.desiredStateDn[temp_del]
if !ok {
conn.log.Error("no svc_obj found in desiredStateDn cache")
return
}
// Explicitly remove vnsRedirectDest from svcRedirectPol's list of children
for _, body := range vns_svc_redirect_pol_obj {
for _, child := range body.Children {
for class := range child {
if class == "vnsRedirectDest" {
deletes = append(deletes, child.GetDn())
newDelete = true

}
}
}
}
}
if newDelete && len(deletes) != 0 {
conn.log.Debug("Updated apic object deletes list is :", deletes)
}
*/
conn.updateDnIndex(objects)
for _, deldn := range deldns {
muldeldn = muldeldn + deldn
conn.removeFromDnIndex(deldn)
if container {
delete(conn.containerDns, deldn)
}
}
for _, update := range updates {
dn := update.GetDn()
mulupdn = mulupdn + dn
if container {
conn.containerDns[dn] = true
}
}
if objects == nil {
delete(conn.desiredState, key)
delete(conn.keyHashes, tag)
} else {
conn.desiredState[key] = objects
conn.keyHashes[tag] = key
}
conn.indexMutex.Unlock()

/* if conn.syncEnabled {
conn.indexMutex.Unlock()
conn.applyDiff(updates, deletes, "write")
} else {
conn.indexMutex.Unlock()
}
*/
}
if conn.syncEnabled {
if len(multiDeletes) > 0 {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
tenantObj["fvTenant"].Children = multiDeletes
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[muldeldn] = mulobj
conn.log.Debug("testttt mulllll 1111111 deletes: ", tenantObj, tenantObj.GetDn())
conn.indexMutex.Unlock()
}
if len(multiUpdates) > 0 {
mulobj := make(map[string]ApicObject)
tenantObj := NewFvTenant(fvTenant)
conn.indexMutex.Lock()
tenantObj["fvTenant"].Children = multiUpdates
tenantObj["fvTenant"].Attributes["status"] = "modified"
mulobj[tenantObj.GetDn()] = tenantObj
conn.multipleDn[mulupdn] = mulobj
conn.log.Debug("testttt mulllll 1111111 updates: ", tenantObj, tenantObj.GetDn())
conn.indexMutex.Unlock()
}
conn.applyMulDiff(mulupdn, muldeldn, "write")
}
}

func (conn *ApicConnection) doWriteApicObjects(key string, objects ApicSlice,
container bool) {

Expand All @@ -355,6 +595,9 @@ func (conn *ApicConnection) doWriteApicObjects(key string, objects ApicSlice,
conn.indexMutex.Lock()
updates, deletes := conn.diffApicState(conn.desiredState[key], objects)

conn.log.Debug("testttt 1111111 deletes: ", deletes)
conn.log.Debug("testttt 1111111 updates: ", updates)

// temp cache to store all the "uni/tn-common/svcCont/svcRedirectPol-kube_svc_default_test-master"
// found in deletes
var temp_deletes []string
Expand Down Expand Up @@ -433,6 +676,10 @@ func (conn *ApicConnection) WriteApicObjects(key string, objects ApicSlice) {
conn.doWriteApicObjects(key, objects, false)
}

func (conn *ApicConnection) WriteMulApicObjects(keyObjects map[string]ApicSlice, fvTenant string) {
conn.doWriteMulApicObjects(keyObjects, fvTenant, false)
}

func (conn *ApicConnection) reconcileApicObject(aci ApicObject) {
conn.indexMutex.Lock()
if !conn.syncEnabled {
Expand Down
1 change: 1 addition & 0 deletions pkg/apicapi/apic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type ApicConnection struct {

desiredState map[string]ApicSlice
desiredStateDn map[string]ApicObject
multipleDn map[string]map[string]ApicObject
keyHashes map[string]string
containerDns map[string]bool
cachedState map[string]ApicSlice
Expand Down
15 changes: 13 additions & 2 deletions pkg/apicapi/apicapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func New(log *logrus.Logger, apic []string, user string,
},
desiredState: make(map[string]ApicSlice),
desiredStateDn: make(map[string]ApicObject),
multipleDn: make(map[string]map[string]ApicObject),
keyHashes: make(map[string]string),
containerDns: make(map[string]bool),
cachedState: make(map[string]ApicSlice),
Expand Down Expand Up @@ -334,12 +335,12 @@ func (conn *ApicConnection) handleQueuedDn(dn string) bool {
}
conn.indexMutex.Unlock()
}

var requeue bool
conn.indexMutex.Lock()
pending, hasPendingChange := conn.pendingSubDnUpdate[dn]
conn.pendingSubDnUpdate[dn] = pendingChange{isDirty: true}
obj, hasDesiredState := conn.desiredStateDn[dn]
mulobj, hasMulDn := conn.multipleDn[dn]
conn.indexMutex.Unlock()

if hasPendingChange {
Expand All @@ -366,6 +367,14 @@ func (conn *ApicConnection) handleQueuedDn(dn string) bool {
} else {
requeue = conn.postDn(dn, obj)
}
} else if hasMulDn {
for key, val := range mulobj {
requeue = conn.postDn(key, val)
if !requeue {
conn.removeFromMultipleDn(key)
}
break
}
} else {
if hasPendingChange {
if pending.kind == pendingChangeDelete {
Expand Down Expand Up @@ -1028,7 +1037,8 @@ func (conn *ApicConnection) postDn(dn string, obj ApicObject) bool {
if err != nil {
conn.log.Error("Could not serialize object for dn ", dn, ": ", err)
}
//conn.log.Debug(string(raw))
conn.log.Debug("testtttt : ", url)
conn.log.Debug(string(raw))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(raw))
if err != nil {
conn.log.Error("Could not create request: ", err)
Expand Down Expand Up @@ -1068,6 +1078,7 @@ func (conn *ApicConnection) DeleteDn(dn string) bool {
conn.restart()
return false
}
conn.log.Debug("Delete testtttt ", url)
conn.sign(req, uri, nil)
resp, err := conn.client.Do(req)
if err != nil {
Expand Down