Skip to content

Commit

Permalink
fix(evpn-bridge): some more fixes
Browse files Browse the repository at this point in the history
Signed-off-by: atulpatel261194 <Atul.Patel@intel.com>
  • Loading branch information
atulpatel261194 committed May 29, 2024
1 parent 1ed4abd commit 85aaecf
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 64 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ func cleanUp() {

switch config.GlobalConfig.Buildenv {
case intelStr:

gen_linux.DeInitialize()
intel_e2000_linux.DeInitialize()
frr.DeInitialize()
ipu_vendor.DeInitialize()
netlink.DeInitialize()
case "ci":
gen_linux.DeInitialize()
ci_linux.DeInitialize()
Expand All @@ -165,7 +167,6 @@ func main() {
// log.Println(err)
log.Panicf("Error in initialize(): %v", err)
}

// start the main cmd
if err := rootCmd.Execute(); err != nil {
log.Panicf("Error in Execute(): %v", err)
Expand Down
20 changes: 14 additions & 6 deletions pkg/netlink/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// EventBus holds the event bus info
type EventBus struct {
subscribers map[string][]*Subscriber
mutex sync.RWMutex
mutex sync.Mutex
}

// Subscriber holds the info for each subscriber
Expand Down Expand Up @@ -44,8 +44,8 @@ func (e *EventBus) Subscribe(eventType string) *Subscriber {

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(eventType string, data interface{}) {
e.mutex.RLock()
defer e.mutex.RUnlock()
e.mutex.Lock()
defer e.mutex.Unlock()

subscribers, ok := e.subscribers[eventType]
if !ok {
Expand All @@ -57,7 +57,15 @@ func (e *EventBus) Publish(eventType string, data interface{}) {
}
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
func (s *Subscriber) Unsubscribe() {
close(s.Ch)
// Unsubscribe closes all subscriber channels and empties the subscriber map.
func (e *EventBus) Unsubscribe() {
e.mutex.Lock()
defer e.mutex.Unlock()

for eventName, subs := range e.subscribers {
for _, sub := range subs {
close(sub.Ch) // Close each channel
}
delete(e.subscribers, eventName) // Remove the entry from the map
}
}
120 changes: 77 additions & 43 deletions pkg/netlink/netlink_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,28 @@ type neighList struct {
NS []neighStruct
}

// netMaskToInt convert network mask to int
func netMaskToInt(mask int) (netmaskint [4]int64) {
// netMaskToInt converts a CIDR network mask (e.g., 24 for a /24 subnet) to a 4-octet netmask.
func netMaskToInt(mask int) (netmaskint [4]uint8) {
if mask < 0 || mask > 32 {
return netmaskint
}

var binarystring string
if mask >= 0 {
for i := 1; i <= mask; i++ {
binarystring += "1"
}
for i := 1; i <= (32 - mask); i++ {
binarystring += "0"
for i := 0; i < mask; i++ {
binarystring += "1"
}
for i := mask; i < 32; i++ {
binarystring += "0"
}

for i := 0; i < 4; i++ {
octet, err := strconv.ParseUint(binarystring[i*8:(i+1)*8], 2, 8)
if err != nil {
return netmaskint
}
netmaskint[i] = uint8(octet)
}

oct1 := binarystring[0:8]
oct2 := binarystring[8:16]
oct3 := binarystring[16:24]
oct4 := binarystring[24:]
netmaskint[0], _ = strconv.ParseInt(oct1, 2, 64)
netmaskint[1], _ = strconv.ParseInt(oct2, 2, 64)
netmaskint[2], _ = strconv.ParseInt(oct3, 2, 64)
netmaskint[3], _ = strconv.ParseInt(oct4, 2, 64)
return netmaskint
}

Expand Down Expand Up @@ -424,6 +426,33 @@ const (
routeTypeNeighbor = "neighbor"
)

const (
// RouteAdded event const
RouteAdded = "route_added"
// RouteUpdated event const
RouteUpdated = "route_updated"
// RouteDeleted event const
RouteDeleted = "route_deleted"
// NexthopAdded event const
NexthopAdded = "nexthop_added"
// NexthopUpdated event const
NexthopUpdated = "nexthop_updated"
// NexthopDeleted event const
NexthopDeleted = "nexthop_deleted"
// FdbEntryAdded event const
FdbEntryAdded = "fdb_entry_added"
// FdbEntryUpdated event const
FdbEntryUpdated = "fdb_entry_updated"
// FdbEntryDeleted event const
FdbEntryDeleted = "fdb_entry_deleted"
// L2NexthopAdded event const
L2NexthopAdded = "l2_nexthop_added"
// L2NexthopUpdated event const
L2NexthopUpdated = "l2_nexthop_updated"
// L2NexthopDeleted event const
L2NexthopDeleted = "l2_nexthop_deleted"
)

// getFlag gets the flag
func getFlag(s string) int {
f := 0
Expand Down Expand Up @@ -603,7 +632,7 @@ func setRouteType(rs RouteStruct, v *infradb.Vrf) string {
func ParseRoute(v *infradb.Vrf, Rm []*routeCmdInfo, t int) routeList {
var route routeList
for _, Ro := range Rm {
if Ro.Type =="" && (Ro.Dev !="" || Ro.Gateway != "") {
if Ro.Type == "" && (Ro.Dev != "" || Ro.Gateway != "") {
Ro.Type = routeTypeLocal
}
var rs RouteStruct
Expand All @@ -618,7 +647,7 @@ func ParseRoute(v *infradb.Vrf, Rm []*routeCmdInfo, t int) routeList {
dev, _ := vn.LinkByName(Ro.Dev)
rs.Route0.LinkIndex = dev.Attrs().Index
}
if Ro.Dst !="" {
if Ro.Dst != "" {
var Mask int
split := Ro.Dst
if strings.Contains(Ro.Dst, "/") {
Expand Down Expand Up @@ -651,7 +680,7 @@ func ParseRoute(v *infradb.Vrf, Rm []*routeCmdInfo, t int) routeList {
}
rs.Route0.Dst = nIP
}
if Ro.Metric != 0 {
if Ro.Metric != 0 {
rs.Route0.Priority = Ro.Metric
}
if Ro.Protocol != "" {
Expand Down Expand Up @@ -682,7 +711,7 @@ func ParseRoute(v *infradb.Vrf, Rm []*routeCmdInfo, t int) routeList {
}
rs.Route0.Gw = nIP.IP
}
if !reflect.ValueOf(Ro.VRF).IsZero() {
if Ro.VRF != nil {
rs.Vrf, _ = infradb.GetVrf(Ro.VRF.Name)
}
if Ro.Table != 0 {
Expand Down Expand Up @@ -711,7 +740,7 @@ func ParseFdb(fdbIP fdbIPStruct) FdbEntryStruct {
break
}
}
if !(reflect.ValueOf(fdbentry.lb).IsZero()) {
if fdbentry.lb != nil {
bp := fdbentry.lb.MacTable[fdbentry.Mac]
if bp != "" {
fdbentry.bp, _ = infradb.GetBP(bp)
Expand All @@ -738,7 +767,7 @@ func (l2n L2NexthopStruct) ParseL2NH(vlanID int, dev string, dst string, LB *inf
l2n.Type = SVI
} else if l2n.Dev == fmt.Sprintf("vxlan-%d", l2n.VlanID) {
l2n.Type = VXLAN
} else if !(reflect.ValueOf(l2n.bp).IsZero()) {
} else if l2n.bp != nil {
l2n.Type = BRIDGEPORT
} else {
l2n.Type = None
Expand Down Expand Up @@ -768,12 +797,12 @@ func addFdbEntry(m FdbEntryStruct) {

// addL2Nexthop add the l2 nexthop
func addL2Nexthop(m FdbEntryStruct) FdbEntryStruct {
if reflect.ValueOf(latestL2Nexthop).IsZero() {
if len(latestL2Nexthop) == 0 {
log.Fatal("netlink: L2Nexthop DB empty\n")
return FdbEntryStruct{}
}
latestNexthops := latestL2Nexthop[m.Nexthop.Key]
if !(reflect.ValueOf(latestNexthops).IsZero()) {
if latestL2Nexthop != nil || len(latestL2Nexthop) != 0 {
latestNexthops.FdbRefs = append(latestNexthops.FdbRefs, m)
m.Nexthop = latestNexthops
} else {
Expand Down Expand Up @@ -1326,7 +1355,7 @@ func readNeighbors(v *infradb.Vrf) {
// preFilterMac filter the mac
func preFilterMac(f FdbEntryStruct) bool {
// TODO m.nexthop.dst
if f.VlanID != 0 || !(reflect.ValueOf(f.Nexthop.Dst).IsZero()) {
if f.VlanID != 0 || (f.Nexthop.Dst != nil && !f.Nexthop.Dst.IsUnspecified()) {
log.Printf("netlink: %d vlan \n", len(f.Nexthop.Dst.String()))
return true
}
Expand Down Expand Up @@ -1520,10 +1549,10 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
phyFlag = true
}
}
if (!reflect.ValueOf(nexthop.nexthop.Gw).IsZero()) && nexthop.nexthop.LinkIndex != 0 && strings.HasPrefix(NameIndex[nexthop.nexthop.LinkIndex], path.Base(nexthop.Vrf.Name)+"-") && !nexthop.Local {
if (nexthop.nexthop.Gw != nil && !nexthop.nexthop.Gw.IsUnspecified()) && nexthop.nexthop.LinkIndex != 0 && strings.HasPrefix(NameIndex[nexthop.nexthop.LinkIndex], path.Base(nexthop.Vrf.Name)+"-") && !nexthop.Local {
nexthop.NhType = SVI
link, _ := vn.LinkByName(NameIndex[nexthop.nexthop.LinkIndex])
if !reflect.ValueOf(nexthop.Neighbor).IsZero() {
if nexthop.Neighbor != nil {
if nexthop.Neighbor.Type == SVI {
nexthop.NhType = SVI
nexthop.Metadata["direction"] = RX
Expand Down Expand Up @@ -1553,7 +1582,7 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
log.Printf("netlink: Failed to gather data for nexthop on physical port\n")
}
}
} else if (!reflect.ValueOf(nexthop.nexthop.Gw).IsZero()) && phyFlag && !nexthop.Local {
} else if (nexthop.nexthop.Gw != nil && !nexthop.nexthop.Gw.IsUnspecified()) && phyFlag && !nexthop.Local {
nexthop.NhType = PHY
link1, _ := vn.LinkByName(NameIndex[nexthop.nexthop.LinkIndex])
if link1 == nil {
Expand All @@ -1562,15 +1591,15 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
nexthop.Metadata["direction"] = TX
nexthop.Metadata["smac"] = link1.Attrs().HardwareAddr.String()
nexthop.Metadata["egress_vport"] = phyPorts[nexthop.nexthop.Gw.String()]
if !reflect.ValueOf(nexthop.Neighbor).IsZero() {
if nexthop.Neighbor != nil {
if nexthop.Neighbor.Type == PHY {
nexthop.Metadata["dmac"] = nexthop.Neighbor.Neigh0.HardwareAddr.String()
}
} else {
nexthop.Resolved = false
log.Printf("netlink: Failed to gather data for nexthop on physical port")
}
} else if (!reflect.ValueOf(nexthop.nexthop.Gw).IsZero()) && NameIndex[nexthop.nexthop.LinkIndex] == fmt.Sprintf("br-%s", path.Base(nexthop.Vrf.Name)) && !nexthop.Local {
} else if (nexthop.nexthop.Gw != nil && !nexthop.nexthop.Gw.IsUnspecified()) && NameIndex[nexthop.nexthop.LinkIndex] == fmt.Sprintf("br-%s", path.Base(nexthop.Vrf.Name)) && !nexthop.Local {
nexthop.NhType = VXLAN
G, _ := infradb.GetVrf(nexthop.Vrf.Name)
var detail map[string]interface{}
Expand All @@ -1594,14 +1623,14 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
}
nexthop.Metadata["direction"] = TX
nexthop.Metadata["inner_smac"] = Rmac.String()
if reflect.ValueOf(Rmac).IsZero() {
if Rmac == nil || len(Rmac) == 0 {
nexthop.Resolved = false
}
vtepip := G.Spec.VtepIP.IP
nexthop.Metadata["local_vtep_ip"] = vtepip.String()
nexthop.Metadata["remote_vtep_ip"] = nexthop.nexthop.Gw.String()
nexthop.Metadata["vni"] = *nexthop.Vrf.Spec.Vni
if !reflect.ValueOf(nexthop.Neighbor).IsZero() {
if nexthop.Neighbor != nil {
nexthop.Metadata["inner_dmac"] = nexthop.Neighbor.Neigh0.HardwareAddr.String()
G, _ := infradb.GetVrf("//network.opiproject.org/vrfs/GRD")
r := lookupRoute(nexthop.nexthop.Gw, G)
Expand All @@ -1611,7 +1640,7 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
link, _ := vn.LinkByName(NameIndex[phyNh.nexthop.LinkIndex])
nexthop.Metadata["phy_smac"] = link.Attrs().HardwareAddr.String()
nexthop.Metadata["egress_vport"] = phyPorts[NameIndex[phyNh.nexthop.LinkIndex]]
if !reflect.ValueOf(phyNh.Neighbor).IsZero() {
if phyNh.Neighbor != nil {
nexthop.Metadata["phy_dmac"] = phyNh.Neighbor.Neigh0.HardwareAddr.String()
} else {
// The VXLAN nexthop can only be installed when the phy_nexthops are Resolved.
Expand All @@ -1633,7 +1662,7 @@ func (nexthop NexthopStruct) annotate() NexthopStruct {
nexthop.Metadata["direction"] = RX
nexthop.Metadata["dmac"] = link1.Attrs().HardwareAddr.String()
nexthop.Metadata["egress_vport"] = (int((link1.Attrs().HardwareAddr)[0]) << 8) + int((link1.Attrs().HardwareAddr)[1])
if reflect.ValueOf(nexthop.Vrf.Spec.Vni).IsZero() {
if nexthop.Vrf.Spec.Vni == nil {
nexthop.Metadata["vlanID"] = uint32(4089)
} else {
nexthop.Metadata["vlanID"] = *nexthop.Vrf.Metadata.RoutingTable[0] //*nexthop.Vrf.Spec.Vni
Expand All @@ -1647,7 +1676,7 @@ func (l2n L2NexthopStruct) annotate() L2NexthopStruct {
// Annotate certain L2 Nexthops with additional information from LB and GRD
l2n.Metadata = make(map[interface{}]interface{})
LB := l2n.lb
if !(reflect.ValueOf(LB).IsZero()) {
if LB != nil {
if l2n.Type == SVI {
l2n.Metadata["vrf_id"] = *LB.Spec.Vni
} else if l2n.Type == VXLAN {
Expand All @@ -1668,7 +1697,7 @@ func (l2n L2NexthopStruct) annotate() L2NexthopStruct {
link, _ := vn.LinkByName(NameIndex[phyNh.nexthop.LinkIndex])
l2n.Metadata["phy_smac"] = link.Attrs().HardwareAddr.String()
l2n.Metadata["egress_vport"] = phyPorts[NameIndex[phyNh.nexthop.LinkIndex]]
if !reflect.ValueOf(phyNh.Neighbor).IsZero() {
if phyNh.Neighbor != nil {
if phyNh.Neighbor.Type == PHY {
l2n.Metadata["phy_dmac"] = phyNh.Neighbor.Neigh0.HardwareAddr.String()
l2n.Resolved = true
Expand All @@ -1691,7 +1720,7 @@ func (fdb FdbEntryStruct) annotate() FdbEntryStruct {
if fdb.VlanID == 0 {
return fdb
}
if reflect.ValueOf(fdb.lb).IsZero() {
if fdb.lb != nil {
return fdb
}

Expand Down Expand Up @@ -1746,7 +1775,7 @@ func installFilterRoute(routeSt *RouteStruct) bool {
}
}
routeSt.Nexthops = nh
keep := checkRtype(routeSt.NlType) && len(nh) != 0 && strings.Compare(routeSt.Route0.Dst.IP.String(), "0.0.0.0") != 0
keep := checkRtype(routeSt.NlType) && len(nh) != 0 && routeSt.Route0.Dst.IP.String() != "0.0.0.0"
return keep
}

Expand Down Expand Up @@ -1783,7 +1812,7 @@ func installFilterFDB(fdb FdbEntryStruct) bool {
// Drop entries w/o VLAN ID or associated LogicalBridge ...
// ... other than with L2 nexthops of type VXLAN and BridgePort ...
// ... and VXLAN entries with unresolved underlay nextop.
keep := !reflect.ValueOf(fdb.VlanID).IsZero() && !reflect.ValueOf(fdb.lb).IsZero() && checkFdbType(fdb.Type) && fdb.Nexthop.Resolved
keep := fdb.VlanID != 0 && fdb.lb != nil && checkFdbType(fdb.Type) && fdb.Nexthop.Resolved
if !keep {
log.Printf("netlink: install_filter: dropping {%v}", fdb)
}
Expand All @@ -1792,7 +1821,7 @@ func installFilterFDB(fdb FdbEntryStruct) bool {

// installFilterL2N install the l2 filter
func installFilterL2N(l2n L2NexthopStruct) bool {
keep := !(reflect.ValueOf(l2n.Type).IsZero() && l2n.Resolved && reflect.ValueOf(l2n.FdbRefs).IsZero())
keep := !(l2n.Type == 0 && l2n.Resolved && reflect.ValueOf(l2n.FdbRefs).IsZero())
if !keep {
log.Printf("netlink: install_filter fDB: dropping {%+v}", l2n)
}
Expand Down Expand Up @@ -1936,13 +1965,13 @@ func monitorNetlink() {
// Inform subscribers to delete configuration for any still remaining Netlink DB objects.
log.Printf("netlink: Delete any residual objects in DB")
for _, r := range routes {
notifyAddDel(r, "route_deleted")
notifyAddDel(r, "RouteDeleted")
}
for _, nexthop := range Nexthops {
notifyAddDel(nexthop, "nexthop_deleted")
notifyAddDel(nexthop, "NexthopDeleted")
}
for _, m := range fDB {
notifyAddDel(m, "FDB_entry_deleted")
notifyAddDel(m, "fdb_entry_deleted")
}
log.Printf("netlink: DB cleanup completed.")
}
Expand All @@ -1964,3 +1993,8 @@ func Initialize() {
nlink = utils.NewNetlinkWrapper()
go monitorNetlink() // monitor Thread started
}

// DeInitialize function handles stops functionality
func DeInitialize() {
stopMonitoring = true
}
Loading

0 comments on commit 85aaecf

Please sign in to comment.