Skip to content

Commit

Permalink
Merge 5be16ce into b69dbb5
Browse files Browse the repository at this point in the history
  • Loading branch information
akhilamohanan committed Nov 1, 2023
2 parents b69dbb5 + 5be16ce commit 754fbc5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/apicapi/apic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type ApicConnection struct {
CachedSubnetDns map[string]string

deltaQueue workqueue.RateLimitingInterface
odevQueue workqueue.RateLimitingInterface
}

func (s ApicSlice) Len() int {
Expand Down
36 changes: 33 additions & 3 deletions pkg/apicapi/apicapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (conn *ApicConnection) handleSocketUpdate(apicresp *ApicResponse) {
}

for _, obj := range apicresp.Imdata {
for _, body := range obj {
for key, body := range obj {
if dn, ok := body.Attributes["dn"].(string); ok {
if status, isStr := body.Attributes["status"].(string); isStr {
var pendingKind int
Expand All @@ -277,8 +277,16 @@ func (conn *ApicConnection) handleSocketUpdate(apicresp *ApicResponse) {
subIds: subIds,
isDirty: false,
}
if conn.deltaQueue != nil {
conn.deltaQueue.Add(dn)
conn.log.Debug("akhilaaa key: ", key)
if key == "opflexODev" {
conn.log.Debug("akhilaaa opflexODev detected")
if conn.odevQueue != nil {
conn.odevQueue.Add(dn)
}
} else {
if conn.deltaQueue != nil {
conn.deltaQueue.Add(dn)
}
}
conn.indexMutex.Unlock()
}
Expand Down Expand Up @@ -385,6 +393,7 @@ func (conn *ApicConnection) handleQueuedDn(dn string) bool {
func (conn *ApicConnection) processQueue(queue workqueue.RateLimitingInterface,
queueStop <-chan struct{}) {
go wait.Until(func() {
conn.log.Debug("akhila qqqqqqq ")
for {
dn, quit := queue.Get()
if quit {
Expand Down Expand Up @@ -418,6 +427,7 @@ func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {
done := make(chan struct{})
restart := make(chan struct{})
queueStop := make(chan struct{})
odevQueueStop := make(chan struct{})
syncHook := make(chan fullSync, 1)
conn.restartCh = restart

Expand Down Expand Up @@ -457,6 +467,16 @@ func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {
),
"delta")
go conn.processQueue(conn.deltaQueue, queueStop)
conn.odevQueue = workqueue.NewNamedRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond,
10*time.Second),
&workqueue.BucketRateLimiter{
Limiter: rate.NewLimiter(rate.Limit(10), int(100)),
},
),
"odev")
go conn.processQueue(conn.odevQueue, odevQueueStop)
conn.indexMutex.Unlock()

refreshInterval := conn.RefreshInterval
Expand Down Expand Up @@ -502,9 +522,11 @@ func (conn *ApicConnection) runConn(stopCh <-chan struct{}) {

closeConn := func(stop bool) {
close(queueStop)
close(odevQueueStop)

conn.indexMutex.Lock()
conn.deltaQueue = nil
conn.odevQueue = nil
conn.stopped = stop
conn.syncEnabled = false
conn.subscriptions.ids = make(map[string]string)
Expand Down Expand Up @@ -901,6 +923,14 @@ func (conn *ApicConnection) queueDn(dn string) {
conn.indexMutex.Unlock()
}

func (conn *ApicConnection) odevQueueDn(dn string) {
conn.indexMutex.Lock()
if conn.odevQueue != nil {
conn.odevQueue.Add(dn)
}
conn.indexMutex.Unlock()
}

func (conn *ApicConnection) ForceRelogin() {
conn.token = ""
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (cont *AciController) Run(stopCh <-chan struct{}) {
}
// If not defined, default is 1800
if cont.config.ApicRefreshTimer == "" {
cont.config.ApicRefreshTimer = "1800"
cont.config.ApicRefreshTimer = "900"
}
refreshTimeout, err := strconv.Atoi(cont.config.ApicRefreshTimer)
if err != nil {
Expand Down

0 comments on commit 754fbc5

Please sign in to comment.