Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase port range and store used ports #681

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions grid-client/deployer/network_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func (d *NetworkDeployer) Validate(ctx context.Context, znet *workloads.ZNet) er
return d.InvalidateBrokenAttributes(znet)
}

// GenerateVersionlessDeployments generates deployments for network deployer without versions
func (d *NetworkDeployer) GenerateVersionlessDeployments(ctx context.Context, znet *workloads.ZNet) (map[uint32]gridtypes.Deployment, error) {
// GenerateVersionlessDeployments generates deployments for network deployer without versions.
// usedPorts can be used to exclude some ports from being assigned to networks
func (d *NetworkDeployer) GenerateVersionlessDeployments(ctx context.Context, znet *workloads.ZNet, usedPorts map[uint32][]uint16) (map[uint32]gridtypes.Deployment, error) {
deployments := make(map[uint32]gridtypes.Deployment)

log.Debug().Msgf("nodes: %v", znet.Nodes)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (d *NetworkDeployer) GenerateVersionlessDeployments(ctx context.Context, zn
if err := znet.AssignNodesWGKey(allNodes); err != nil {
return nil, errors.Wrap(err, "could not assign node wg keys")
}
if err := znet.AssignNodesWGPort(ctx, sub, d.tfPluginClient.NcPool, allNodes); err != nil {
if err := znet.AssignNodesWGPort(ctx, sub, d.tfPluginClient.NcPool, allNodes, usedPorts); err != nil {
return nil, errors.Wrap(err, "could not assign node wg ports")
}

Expand Down Expand Up @@ -274,7 +275,7 @@ func (d *NetworkDeployer) Deploy(ctx context.Context, znet *workloads.ZNet) erro
return err
}

newDeployments, err := d.GenerateVersionlessDeployments(ctx, znet)
newDeployments, err := d.GenerateVersionlessDeployments(ctx, znet, nil)
rawdaGastan marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "could not generate deployments data")
}
Expand Down Expand Up @@ -310,17 +311,17 @@ func (d *NetworkDeployer) Deploy(ctx context.Context, znet *workloads.ZNet) erro
}

// BatchDeploy deploys multiple network deployments using the deployer
func (d *NetworkDeployer) BatchDeploy(ctx context.Context, znets []*workloads.ZNet) error {
func (d *NetworkDeployer) BatchDeploy(ctx context.Context, znets []*workloads.ZNet, updateMetadata ...bool) error {
newDeployments := make(map[uint32][]gridtypes.Deployment)
newDeploymentsSolutionProvider := make(map[uint32][]*uint64)

nodePorts := make(map[uint32][]uint16)
for _, znet := range znets {
err := d.Validate(ctx, znet)
if err != nil {
return err
}

dls, err := d.GenerateVersionlessDeployments(ctx, znet)
dls, err := d.GenerateVersionlessDeployments(ctx, znet, nodePorts)
if err != nil {
return errors.Wrap(err, "could not generate deployments data")
}
Expand All @@ -339,10 +340,15 @@ func (d *NetworkDeployer) BatchDeploy(ctx context.Context, znets []*workloads.ZN

newDls, err := d.deployer.BatchDeploy(ctx, newDeployments, newDeploymentsSolutionProvider)

update := true
if len(updateMetadata) != 0 {
update = updateMetadata[0]
}

// update deployment and plugin state
// error is not returned immediately before updating state because of untracked failed deployments
for _, znet := range znets {
if err := d.updateStateFromDeployments(ctx, znet, newDls); err != nil {
if err := d.updateStateFromDeployments(ctx, znet, newDls, update); err != nil {
return errors.Wrapf(err, "failed to update network '%s' state", znet.Name)
}
}
Expand Down Expand Up @@ -376,7 +382,7 @@ func (d *NetworkDeployer) Cancel(ctx context.Context, znet *workloads.ZNet) erro
return nil
}

func (d *NetworkDeployer) updateStateFromDeployments(ctx context.Context, znet *workloads.ZNet, dls map[uint32][]gridtypes.Deployment) error {
func (d *NetworkDeployer) updateStateFromDeployments(ctx context.Context, znet *workloads.ZNet, dls map[uint32][]gridtypes.Deployment, updateMetadata bool) error {
znet.NodeDeploymentID = map[uint32]uint64{}

for _, nodeID := range znet.Nodes {
Expand All @@ -400,6 +406,9 @@ func (d *NetworkDeployer) updateStateFromDeployments(ctx context.Context, znet *
}
}

if !updateMetadata {
return nil
}
if err := d.ReadNodesConfig(ctx, znet); err != nil {
return errors.Wrapf(err, "could not read node's data for network %s", znet.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion grid-client/deployer/network_deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestNetworkDeployer(t *testing.T) {
Return(client.NewNodeClient(twinID, cl, d.tfPluginClient.RMBTimeout), nil).
AnyTimes()

dls, err := d.GenerateVersionlessDeployments(context.Background(), &znet)
dls, err := d.GenerateVersionlessDeployments(context.Background(), &znet, nil)
assert.NoError(t, err)

externalIP := ""
Expand Down
14 changes: 8 additions & 6 deletions grid-client/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import (
"context"
"math/rand"
"net"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -417,16 +418,17 @@ func AreNodesUp(ctx context.Context, sub subi.SubstrateExt, nodes []uint32, nc N
}

// GetNodeFreeWGPort returns node free wireguard port
func (n *NodeClient) GetNodeFreeWGPort(ctx context.Context, nodeID uint32) (int, error) {
freePorts, err := n.NetworkListWGPorts(ctx)
func (n *NodeClient) GetNodeFreeWGPort(ctx context.Context, nodeID uint32, usedPorts []uint16) (int, error) {
nodeUsedPorts, err := n.NetworkListWGPorts(ctx)
if err != nil {
return 0, errors.Wrap(err, "failed to list wg ports")
}
log.Debug().Msgf("reserved ports for node %d: %v", nodeID, freePorts)
p := uint(rand.Intn(6000) + 2000)
log.Debug().Msgf("reserved ports for node %d: %v", nodeID, nodeUsedPorts)
// from 1024 to 32767 (the lower limit for ephemeral ports)
p := uint(rand.Intn(32768-1024) + 1024)

for contains(freePorts, uint16(p)) {
p = uint(rand.Intn(6000) + 2000)
for contains(nodeUsedPorts, uint16(p)) || slices.Contains(usedPorts, uint16(p)) {
p = uint(rand.Intn(32768-1024) + 1024)
}
log.Debug().Msgf("Selected port for node %d is %d", nodeID, p)
return int(p), nil
Expand Down
35 changes: 20 additions & 15 deletions grid-client/workloads/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,29 @@ func (znet *ZNet) AssignNodesIPs(nodes []uint32) error {
}

// AssignNodesWGPort assign network nodes wireguard port
func (znet *ZNet) AssignNodesWGPort(ctx context.Context, sub subi.SubstrateExt, ncPool client.NodeClientGetter, nodes []uint32) error {
func (znet *ZNet) AssignNodesWGPort(ctx context.Context, sub subi.SubstrateExt, ncPool client.NodeClientGetter, nodes []uint32, usedPorts map[uint32][]uint16) error {
if usedPorts == nil {
usedPorts = make(map[uint32][]uint16)
}
for _, nodeID := range nodes {
if _, ok := znet.WGPort[nodeID]; !ok {
cl, err := ncPool.GetNodeClient(sub, nodeID)
if err != nil {
return errors.Wrap(err, "could not get node client")
}
port, err := cl.GetNodeFreeWGPort(ctx, nodeID)
if err != nil {
return errors.Wrapf(err, "failed to get node %d free wg ports", nodeID)
}
if _, ok := znet.WGPort[nodeID]; ok {
continue
}
cl, err := ncPool.GetNodeClient(sub, nodeID)
if err != nil {
return errors.Wrap(err, "could not get node client")
}
port, err := cl.GetNodeFreeWGPort(ctx, nodeID, usedPorts[nodeID])
if err != nil {
return errors.Wrapf(err, "failed to get node %d free wg ports", nodeID)
}
usedPorts[nodeID] = append(usedPorts[nodeID], uint16(port))

if len(znet.WGPort) == 0 {
znet.WGPort = map[uint32]int{nodeID: port}
continue
}
znet.WGPort[nodeID] = port
if len(znet.WGPort) == 0 {
znet.WGPort = map[uint32]int{nodeID: port}
continue
}
znet.WGPort[nodeID] = port
}

return nil
Expand Down
Loading