Extend the mega-watcher to report opened port changes. #1588

Merged
merged 4 commits into from Feb 12, 2015
@@ -78,19 +78,23 @@ var marshalTestCases = []struct {
Service: "Shazam",
Series: "precise",
CharmURL: "cs:~user/precise/wordpress-42",
- Ports: []network.Port{
- {
- Protocol: "http",
- Number: 80},
- },
+ Ports: []network.Port{{
+ Protocol: "http",
+ Number: 80,
+ }},
+ PortRanges: []network.PortRange{{
+ FromPort: 80,
+ ToPort: 80,
+ Protocol: "http",
+ }},
PublicAddress: "testing.invalid",
PrivateAddress: "10.0.0.1",
MachineId: "1",
Status: "error",
StatusInfo: "foo",
},
},
- json: `["unit", "change", {"CharmURL": "cs:~user/precise/wordpress-42", "MachineId": "1", "Series": "precise", "Name": "Benji", "PublicAddress": "testing.invalid", "Service": "Shazam", "PrivateAddress": "10.0.0.1", "Ports": [{"Protocol": "http", "Number": 80}], "Status": "error", "StatusInfo": "foo", "StatusData": null, "Subordinate": false}]`,
+ json: `["unit", "change", {"CharmURL": "cs:~user/precise/wordpress-42", "MachineId": "1", "Series": "precise", "Name": "Benji", "PublicAddress": "testing.invalid", "Service": "Shazam", "PrivateAddress": "10.0.0.1", "Ports": [{"Protocol": "http", "Number": 80}], "PortRanges": [{"FromPort": 80, "ToPort": 80, "Protocol": "http"}], "Status": "error", "StatusInfo": "foo", "StatusData": null, "Subordinate": false}]`,
}, {
about: "RelationInfo Delta",
value: multiwatcher.Delta{
View
@@ -1036,7 +1036,7 @@ func (s *MachineSuite) TestRefreshWhenNotAlive(c *gc.C) {
}
func (s *MachineSuite) TestMachinePrincipalUnits(c *gc.C) {
- // Check that Machine.Units works correctly.
+ // Check that Machine.Units and st.UnitsFor work correctly.
// Make three machines, three services and three units for each service;
// variously assign units to machines and check that Machine.Units
@@ -1076,6 +1076,7 @@ func (s *MachineSuite) TestMachinePrincipalUnits(c *gc.C) {
}
units[3], err = s3.AllUnits()
c.Assert(err, jc.ErrorIsNil)
+ c.Assert(sortedUnitNames(units[3]), jc.DeepEquals, []string{"s3/0", "s3/1", "s3/2"})
assignments := []struct {
machine *state.Machine
@@ -1096,10 +1097,17 @@ func (s *MachineSuite) TestMachinePrincipalUnits(c *gc.C) {
for i, a := range assignments {
c.Logf("test %d", i)
+ expect := sortedUnitNames(append(a.units, a.subordinates...))
+
+ // The units can be retrieved from the machine model.
got, err := a.machine.Units()
c.Assert(err, jc.ErrorIsNil)
- expect := sortedUnitNames(append(a.units, a.subordinates...))
- c.Assert(sortedUnitNames(got), gc.DeepEquals, expect)
+ c.Assert(sortedUnitNames(got), jc.DeepEquals, expect)
+
+ // The units can be retrieved from the machine id.
+ got, err = s.State.UnitsFor(a.machine.Id())
+ c.Assert(err, jc.ErrorIsNil)
+ c.Assert(sortedUnitNames(got), jc.DeepEquals, expect)
}
}
View
@@ -11,6 +11,7 @@ import (
"github.com/juju/errors"
"gopkg.in/mgo.v2"
+ "github.com/juju/juju/network"
"github.com/juju/juju/state/multiwatcher"
"github.com/juju/juju/state/watcher"
)
@@ -99,13 +100,45 @@ func translateLegacyUnitAgentStatus(in multiwatcher.Status) multiwatcher.Status
return in
}
+func getUnitPortRangesAndPorts(st *State, unitName string) ([]network.PortRange, []network.Port, error) {
+ // Get opened port ranges for the unit and convert them to ports,
+ // as older clients/servers do not know about ranges). See bug
+ // http://pad.lv/1418344 for more info.
+ unit, err := st.Unit(unitName)
+ if err != nil {
+ return nil, nil, errors.Annotatef(err, "failed to get unit %q", unitName)
+ }
+ portRanges, err := unit.OpenedPorts()
+ // Since the port ranges are associated with the unit's machine,
+ // we need to check for NotAssignedError.
+ if errors.IsNotAssigned(errors.Cause(err)) {
+ // Not assigned, so there won't be any ports opened.
+ return nil, nil, nil
+ } else if err != nil {
+ return nil, nil, errors.Annotate(err, "failed to get unit port ranges")
+ }
+ // For backward compatibility, if there are no ports opened, return an
+ // empty slice rather than a nil slice. Use a len(portRanges) capacity to
+ // avoid unnecessary allocations, since most of the times only specific
+ // ports are opened by charms.
+ compatiblePorts := make([]network.Port, 0, len(portRanges))
+ for _, portRange := range portRanges {
+ for j := portRange.FromPort; j <= portRange.ToPort; j++ {
+ compatiblePorts = append(compatiblePorts, network.Port{
+ Number: j,
+ Protocol: portRange.Protocol,
+ })
+ }
+ }
+ return portRanges, compatiblePorts, nil
+}
+
func (u *backingUnit) updated(st *State, store *multiwatcherStore, id interface{}) error {
info := &multiwatcher.UnitInfo{
Name: u.Name,
Service: u.Service,
Series: u.Series,
MachineId: u.MachineId,
- Ports: u.Ports,
Subordinate: u.Principal != "",
}
if u.CharmURL != nil {
@@ -114,19 +147,28 @@ func (u *backingUnit) updated(st *State, store *multiwatcherStore, id interface{
oldInfo := store.Get(info.EntityId())
if oldInfo == nil {
// We're adding the entry for the first time,
- // so fetch the associated unit status.
+ // so fetch the associated unit status and opened ports.
sdoc, err := getStatus(st, unitGlobalKey(u.Name))
if err != nil {
return err
}
info.Status = multiwatcher.Status(sdoc.Status)
info.Status = translateLegacyUnitAgentStatus(info.Status)
info.StatusInfo = sdoc.StatusInfo
+ portRanges, compatiblePorts, err := getUnitPortRangesAndPorts(st, u.Name)
+ if err != nil {
+ return errors.Trace(err)
+ }
+ info.PortRanges = portRanges
+ info.Ports = compatiblePorts
+
} else {
- // The entry already exists, so preserve the current status.
+ // The entry already exists, so preserve the current status and ports.
oldInfo := oldInfo.(*multiwatcher.UnitInfo)
info.Status = oldInfo.Status
info.StatusInfo = oldInfo.StatusInfo
+ info.Ports = oldInfo.Ports
+ info.PortRanges = oldInfo.PortRanges
}
publicAddress, privateAddress, err := getUnitAddresses(st, u.Name)
if err != nil {
@@ -480,6 +522,79 @@ func backingEntityIdForSettingsKey(key string) (eid multiwatcher.EntityId, extra
return
}
+type backingOpenedPorts map[string]interface{}
+
+func (p *backingOpenedPorts) updated(st *State, store *multiwatcherStore, id interface{}) error {
+ localID := st.localID(id.(string))
+ parentId, ok := backingEntityIdForOpenedPortsKey(localID)
+ if !ok {
+ return nil
+ }
+ switch info := store.Get(parentId).(type) {
+ case nil:
+ // The parent info doesn't exist. This is unexpected because the port
+ // always refers to a machine. Anyway, ignore the ports for now.
+ return nil
+ case *multiwatcher.MachineInfo:
+ // Retrieve the units placed in the machine.
+ units, err := st.UnitsFor(info.Id)
+ if err != nil {
+ return errors.Trace(err)
+ }
+ // Update the ports on all units assigned to the machine.
+ for _, u := range units {
+ if err := updateUnitPorts(st, store, u); err != nil {
+ return errors.Trace(err)
+ }
+ }
+ }
+ return nil
+}
+
+func (p *backingOpenedPorts) removed(st *State, store *multiwatcherStore, id interface{}) {}
+
+func (p *backingOpenedPorts) mongoId() interface{} {
+ panic("cannot find mongo id from openedPorts document")
+}
+
+// updateUnitPorts updates the Ports and PortRanges info of the given unit.
+func updateUnitPorts(st *State, store *multiwatcherStore, u *Unit) error {
+ eid, ok := backingEntityIdForGlobalKey(u.globalKey())
+ if !ok {
+ // This should never happen.
+ return errors.New("cannot retrieve entity id for unit")
+ }
+ switch oldInfo := store.Get(eid).(type) {
+ case nil:
+ // The unit info doesn't exist. This is unlikely to happen, but ignore
+ // the status until a unitInfo is included in the store.
+ return nil
+ case *multiwatcher.UnitInfo:
+ portRanges, compatiblePorts, err := getUnitPortRangesAndPorts(st, oldInfo.Name)
+ if err != nil {
+ return errors.Trace(err)
+ }
+ unitInfo := *oldInfo
+ unitInfo.PortRanges = portRanges
+ unitInfo.Ports = compatiblePorts
+ store.Update(&unitInfo)
+ default:
+ return nil
+ }
+ return nil
+}
+
+// backingEntityIdForOpenedPortsKey returns the entity id for the given
+// openedPorts key. Any extra information in the key is discarded.
+func backingEntityIdForOpenedPortsKey(key string) (multiwatcher.EntityId, bool) {
+ parts, err := extractPortsIdParts(key)
+ if err != nil {
+ logger.Debugf("cannot parse ports key %q: %v", key, err)
+ return multiwatcher.EntityId{}, false
+ }
+ return backingEntityIdForGlobalKey(machineGlobalKey(parts[1]))
+}
+
// backingEntityIdForGlobalKey returns the entity id for the given global key.
// It returns false if the key is not recognized.
func backingEntityIdForGlobalKey(key string) (multiwatcher.EntityId, bool) {
@@ -571,6 +686,10 @@ func newAllWatcherStateBacking(st *State) Backing {
Collection: st.db.C(settingsC),
infoType: reflect.TypeOf(backingSettings{}),
subsidiary: true,
+ }, {
+ Collection: st.db.C(openedPortsC),
+ infoType: reflect.TypeOf(backingOpenedPorts{}),
+ subsidiary: true,
}}
// Populate the collection maps from the above set of collections.
for _, c := range collections {
Oops, something went wrong.