Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement bulk updates to IP sets #1357

Merged
merged 5 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 23 additions & 24 deletions intdataplane/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type InternalDataplane struct {
iptablesNATTables []*iptables.Table
iptablesRawTables []*iptables.Table
iptablesFilterTables []*iptables.Table
ipSetRegistries []*ipsets.Registry
ipSets []*ipsets.IPSets

ipipManager *ipipManager

Expand Down Expand Up @@ -190,18 +190,18 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
RefreshInterval: config.IptablesRefreshInterval,
})
ipSetsConfigV4 := config.RulesConfig.IPSetConfigV4
ipSetRegV4 := ipsets.NewRegistry(ipSetsConfigV4)
ipSetsV4 := ipsets.NewIPSets(ipSetsConfigV4)
dp.iptablesNATTables = append(dp.iptablesNATTables, natTableV4)
dp.iptablesRawTables = append(dp.iptablesRawTables, rawTableV4)
dp.iptablesFilterTables = append(dp.iptablesFilterTables, filterTableV4)
dp.ipSetRegistries = append(dp.ipSetRegistries, ipSetRegV4)
dp.ipSets = append(dp.ipSets, ipSetsV4)

routeTableV4 := routetable.New(config.RulesConfig.WorkloadIfacePrefixes, 4)
dp.routeTables = append(dp.routeTables, routeTableV4)

dp.endpointStatusCombiner = newEndpointStatusCombiner(dp.fromDataplane, config.IPv6Enabled)

dp.RegisterManager(newIPSetsManager(ipSetRegV4, config.MaxIPSetSize))
dp.RegisterManager(newIPSetsManager(ipSetsV4, config.MaxIPSetSize))
dp.RegisterManager(newPolicyManager(rawTableV4, filterTableV4, ruleRenderer, 4))
dp.RegisterManager(newEndpointManager(
rawTableV4,
Expand All @@ -212,10 +212,10 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate))
dp.RegisterManager(newFloatingIPManager(natTableV4, ruleRenderer, 4))
dp.RegisterManager(newMasqManager(ipSetRegV4, natTableV4, ruleRenderer, config.MaxIPSetSize, 4))
dp.RegisterManager(newMasqManager(ipSetsV4, natTableV4, ruleRenderer, config.MaxIPSetSize, 4))
if config.RulesConfig.IPIPEnabled {
// Add a manger to keep the all-hosts IP set up to date.
dp.ipipManager = newIPIPManager(ipSetRegV4, config.MaxIPSetSize)
dp.ipipManager = newIPIPManager(ipSetsV4, config.MaxIPSetSize)
dp.RegisterManager(dp.ipipManager) // IPv4-only
}
if config.IPv6Enabled {
Expand Down Expand Up @@ -252,16 +252,16 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
)

ipSetsConfigV6 := config.RulesConfig.IPSetConfigV6
ipSetRegV6 := ipsets.NewRegistry(ipSetsConfigV6)
dp.ipSetRegistries = append(dp.ipSetRegistries, ipSetRegV6)
ipSetsV6 := ipsets.NewIPSets(ipSetsConfigV6)
dp.ipSets = append(dp.ipSets, ipSetsV6)
dp.iptablesNATTables = append(dp.iptablesNATTables, natTableV6)
dp.iptablesRawTables = append(dp.iptablesRawTables, rawTableV6)
dp.iptablesFilterTables = append(dp.iptablesFilterTables, filterTableV6)

routeTableV6 := routetable.New(config.RulesConfig.WorkloadIfacePrefixes, 6)
dp.routeTables = append(dp.routeTables, routeTableV6)

dp.RegisterManager(newIPSetsManager(ipSetRegV6, config.MaxIPSetSize))
dp.RegisterManager(newIPSetsManager(ipSetsV6, config.MaxIPSetSize))
dp.RegisterManager(newPolicyManager(rawTableV6, filterTableV6, ruleRenderer, 6))
dp.RegisterManager(newEndpointManager(
rawTableV6,
Expand All @@ -272,7 +272,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate))
dp.RegisterManager(newFloatingIPManager(natTableV6, ruleRenderer, 6))
dp.RegisterManager(newMasqManager(ipSetRegV6, natTableV6, ruleRenderer, config.MaxIPSetSize, 6))
dp.RegisterManager(newMasqManager(ipSetsV6, natTableV6, ruleRenderer, config.MaxIPSetSize, 6))
}

for _, t := range dp.iptablesNATTables {
Expand Down Expand Up @@ -574,19 +574,25 @@ func (d *InternalDataplane) apply() {
}
}

// Next, create/update IP sets. We defer deletions of IP sets until after we update
// iptables.
for _, w := range d.ipSetRegistries {
w.ApplyUpdates()
}

if d.forceDataplaneRefresh {
// Refresh timer popped, ask the dataplane to resync as part of its update.
for _, r := range d.routeTables {
// Queue a resync on the next Apply().
r.QueueResync()
}
for _, r := range d.ipSets {
// Queue a resync on the next Apply().
r.QueueResync()
}
d.forceDataplaneRefresh = false
}

// Next, create/update IP sets. We defer deletions of IP sets until after we update
// iptables.
for _, w := range d.ipSets {
w.ApplyUpdates()
}

// Update iptables, this should sever any references to now-unused IP sets.
var reschedDelay time.Duration
for _, t := range d.allIptablesTables {
Expand All @@ -606,20 +612,13 @@ func (d *InternalDataplane) apply() {
}

// Now clean up any left-over IP sets.
for _, w := range d.ipSetRegistries {
for _, w := range d.ipSets {
w.ApplyDeletions()
}

// And publish and status updates.
d.endpointStatusCombiner.Apply()

if d.cleanupPending {
for _, w := range d.ipSetRegistries {
w.AttemptCleanup()
}
d.cleanupPending = false
}

// Set up any needed rescheduling kick.
if d.reschedC != nil {
// We have an active rescheduling timer, stop it so we can restart it with a
Expand Down
16 changes: 8 additions & 8 deletions intdataplane/ipip_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
//
// ipipManager also takes care of the configuration of the IPIP tunnel device.
type ipipManager struct {
ipsetReg ipsetsRegistry
ipsetsDataplane ipsetsDataplane

// activeHostnameToIP maps hostname to string IP address. We don't bother to parse into
// net.IPs because we're going to pass them directly to the IPSet API.
Expand All @@ -47,19 +47,19 @@ type ipipManager struct {
}

func newIPIPManager(
ipSetReg *ipsets.Registry,
ipsetsDataplane ipsetsDataplane,
maxIPSetSize int,
) *ipipManager {
return newIPIPManagerWithShim(ipSetReg, maxIPSetSize, realIPIPNetlink{})
return newIPIPManagerWithShim(ipsetsDataplane, maxIPSetSize, realIPIPNetlink{})
}

func newIPIPManagerWithShim(
ipSetReg ipsetsRegistry,
ipsetsDataplane ipsetsDataplane,
maxIPSetSize int,
dataplane ipipDataplane,
) *ipipManager {
ipipMgr := &ipipManager{
ipsetReg: ipSetReg,
ipsetsDataplane: ipsetsDataplane,
activeHostnameToIP: map[string]string{},
dataplane: dataplane,
ipSetMetadata: ipsets.IPSetMetadata{
Expand Down Expand Up @@ -216,14 +216,14 @@ func (m *ipipManager) CompleteDeferredWork() error {
for _, ip := range m.activeHostnameToIP {
members = append(members, ip)
}
m.ipsetReg.AddOrReplaceIPSet(m.ipSetMetadata, members)
m.ipsetsDataplane.AddOrReplaceIPSet(m.ipSetMetadata, members)
m.ipSetInSync = true
}
return nil
}

// ipsetsRegistry is a shim interface for mocking the IPSet Registry.
type ipsetsRegistry interface {
// ipsetsDataplane is a shim interface for mocking the IPSets object.
type ipsetsDataplane interface {
AddOrReplaceIPSet(setMetadata ipsets.IPSetMetadata, members []string)
AddMembers(setID string, newMembers []string)
RemoveMembers(setID string, removedMembers []string)
Expand Down
23 changes: 14 additions & 9 deletions intdataplane/ipsets_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,44 @@
package intdataplane

import (
log "github.com/Sirupsen/logrus"

"github.com/projectcalico/felix/ipsets"
"github.com/projectcalico/felix/proto"
)

// ipSetsManager simply passes through IP set updates from the datastore to the ipsets.IPSets
// dataplane layer.
type ipSetsManager struct {
ipsetReg ipsetsRegistry
maxSize int
ipsetsDataplane ipsetsDataplane
maxSize int
}

func newIPSetsManager(ipsets ipsetsRegistry, maxIPSetSize int) *ipSetsManager {
func newIPSetsManager(ipsets ipsetsDataplane, maxIPSetSize int) *ipSetsManager {
return &ipSetsManager{
ipsetReg: ipsets,
maxSize: maxIPSetSize,
ipsetsDataplane: ipsets,
maxSize: maxIPSetSize,
}
}

func (m *ipSetsManager) OnUpdate(msg interface{}) {
switch msg := msg.(type) {
// IP set-related messages, these are extremely common.
case *proto.IPSetDeltaUpdate:
m.ipsetReg.AddMembers(msg.Id, msg.AddedMembers)
m.ipsetReg.RemoveMembers(msg.Id, msg.RemovedMembers)
log.WithField("ipSetId", msg.Id).Debug("IP set delta update")
m.ipsetsDataplane.AddMembers(msg.Id, msg.AddedMembers)
m.ipsetsDataplane.RemoveMembers(msg.Id, msg.RemovedMembers)
case *proto.IPSetUpdate:
log.WithField("ipSetId", msg.Id).Debug("IP set update")
metadata := ipsets.IPSetMetadata{
Type: ipsets.IPSetTypeHashIP,
SetID: msg.Id,
MaxSize: m.maxSize,
}
m.ipsetReg.AddOrReplaceIPSet(metadata, msg.Members)
m.ipsetsDataplane.AddOrReplaceIPSet(metadata, msg.Members)
case *proto.IPSetRemove:
m.ipsetReg.RemoveIPSet(msg.Id)
log.WithField("ipSetId", msg.Id).Debug("IP set remove")
m.ipsetsDataplane.RemoveIPSet(msg.Id)
}
}

Expand Down
44 changes: 22 additions & 22 deletions intdataplane/masq_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ import (
// to trigger NAT of outgoing packets from NAT-enabled pools. Traffic to any Calico-owned
// pool is excluded.
type masqManager struct {
ipVersion uint8
ipsetReg ipsetsRegistry
natTable iptablesTable
activePools map[string]*proto.IPAMPool
masqPools set.Set
dirty bool
ruleRenderer rules.RuleRenderer
ipVersion uint8
ipsetsDataplane ipsetsDataplane
natTable iptablesTable
activePools map[string]*proto.IPAMPool
masqPools set.Set
dirty bool
ruleRenderer rules.RuleRenderer

logCxt *log.Entry
}

func newMasqManager(
ipSetReg ipsetsRegistry,
ipsetsDataplane ipsetsDataplane,
natTable iptablesTable,
ruleRenderer rules.RuleRenderer,
maxIPSetSize int,
Expand All @@ -58,26 +58,26 @@ func newMasqManager(
// Make sure our IP sets exist. We set the contents to empty here
// but the IPSets object will defer writing the IP sets until we're
// in sync, by which point we'll have added all our CIDRs into the sets.
ipSetReg.AddOrReplaceIPSet(ipsets.IPSetMetadata{
ipsetsDataplane.AddOrReplaceIPSet(ipsets.IPSetMetadata{
MaxSize: maxIPSetSize,
SetID: rules.IPSetIDNATOutgoingAllPools,
Type: ipsets.IPSetTypeHashNet,
}, []string{})
ipSetReg.AddOrReplaceIPSet(ipsets.IPSetMetadata{
ipsetsDataplane.AddOrReplaceIPSet(ipsets.IPSetMetadata{
MaxSize: maxIPSetSize,
SetID: rules.IPSetIDNATOutgoingMasqPools,
Type: ipsets.IPSetTypeHashNet,
}, []string{})

return &masqManager{
ipVersion: ipVersion,
ipsetReg: ipSetReg,
natTable: natTable,
activePools: map[string]*proto.IPAMPool{},
masqPools: set.New(),
dirty: true,
ruleRenderer: ruleRenderer,
logCxt: log.WithField("ipVersion", ipVersion),
ipVersion: ipVersion,
ipsetsDataplane: ipsetsDataplane,
natTable: natTable,
activePools: map[string]*proto.IPAMPool{},
masqPools: set.New(),
dirty: true,
ruleRenderer: ruleRenderer,
logCxt: log.WithField("ipVersion", ipVersion),
}
}

Expand All @@ -104,10 +104,10 @@ func (d *masqManager) OnUpdate(msg interface{}) {
// defers and coalesces the update so removing then adding the
// same IP is a no-op anyway.
logCxt.Debug("Removing old pool.")
d.ipsetReg.RemoveMembers(rules.IPSetIDNATOutgoingAllPools, []string{oldPool.Cidr})
d.ipsetsDataplane.RemoveMembers(rules.IPSetIDNATOutgoingAllPools, []string{oldPool.Cidr})
if oldPool.Masquerade {
logCxt.Debug("Masquerade was enabled on pool.")
d.ipsetReg.RemoveMembers(rules.IPSetIDNATOutgoingMasqPools, []string{oldPool.Cidr})
d.ipsetsDataplane.RemoveMembers(rules.IPSetIDNATOutgoingMasqPools, []string{oldPool.Cidr})
}
delete(d.activePools, poolID)
d.masqPools.Discard(poolID)
Expand All @@ -123,10 +123,10 @@ func (d *masqManager) OnUpdate(msg interface{}) {

// Update the IP sets.
logCxt.Debug("Adding IPAM pool to IP sets.")
d.ipsetReg.AddMembers(rules.IPSetIDNATOutgoingAllPools, []string{newPool.Cidr})
d.ipsetsDataplane.AddMembers(rules.IPSetIDNATOutgoingAllPools, []string{newPool.Cidr})
if newPool.Masquerade {
logCxt.Debug("IPAM has masquerade enabled.")
d.ipsetReg.AddMembers(rules.IPSetIDNATOutgoingMasqPools, []string{newPool.Cidr})
d.ipsetsDataplane.AddMembers(rules.IPSetIDNATOutgoingMasqPools, []string{newPool.Cidr})
d.masqPools.Add(poolID)
}
d.activePools[poolID] = newPool
Expand Down
4 changes: 2 additions & 2 deletions ip/ip_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (a V4Addr) AsNetIP() net.IP {
}

func (a V4Addr) AsCalicoNetIP() calinet.IP {
return calinet.IP{a.AsNetIP()}
return calinet.IP{IP: a.AsNetIP()}
}

func (a V4Addr) String() string {
Expand All @@ -69,7 +69,7 @@ func (a V6Addr) AsNetIP() net.IP {
}

func (a V6Addr) AsCalicoNetIP() calinet.IP {
return calinet.IP{a.AsNetIP()}
return calinet.IP{IP: a.AsNetIP()}
}

func (a V6Addr) String() string {
Expand Down