Skip to content

Commit

Permalink
revert: ssh and apply functions
Browse files Browse the repository at this point in the history
Signed-off-by: fengxsong <fengxsong@outlook.com>
  • Loading branch information
fengxsong committed Jul 21, 2023
1 parent 3d87088 commit bb803fb
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 161 deletions.
2 changes: 1 addition & 1 deletion cmd/sealos/cmd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newAddCmd() *cobra.Command {
Args: cobra.NoArgs,
Example: exampleAdd,
RunE: func(cmd *cobra.Command, args []string) error {
applier, err := apply.NewScaleApplierFromArgs(addArgs, "add")
applier, err := apply.NewScaleApplierFromArgs(cmd, addArgs)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/sealos/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newDeleteCmd() *cobra.Command {
if err := processor.ConfirmDeleteNodes(); err != nil {
return err
}
applier, err := apply.NewScaleApplierFromArgs(deleteArgs, "delete")
applier, err := apply.NewScaleApplierFromArgs(cmd, deleteArgs)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/apply/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (arg *ResetArgs) RegisterFlags(fs *pflag.FlagSet) {
type ScaleArgs struct {
*Cluster
*SSH
fs *pflag.FlagSet
}

func (arg *ScaleArgs) RegisterFlags(fs *pflag.FlagSet, verb, action string) {
Expand All @@ -110,5 +109,4 @@ func (arg *ScaleArgs) RegisterFlags(fs *pflag.FlagSet, verb, action string) {
if arg.SSH != nil {
arg.SSH.RegisterFlags(fs)
}
arg.fs = fs
}
4 changes: 2 additions & 2 deletions pkg/apply/processor/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *CreateProcessor) RunConfig(cluster *v2.Cluster) error {
func (c *CreateProcessor) MountRootfs(cluster *v2.Cluster) error {
logger.Info("Executing pipeline MountRootfs in CreateProcessor.")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
fs, err := rootfs.NewRootfsMounter(cluster.Status.Mounts, c.ClusterFile.GetCluster())
fs, err := rootfs.NewRootfsMounter(cluster.Status.Mounts)
if err != nil {
return err
}
Expand All @@ -129,7 +129,7 @@ func (c *CreateProcessor) MirrorRegistry(cluster *v2.Cluster) error {
func (c *CreateProcessor) Bootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Bootstrap in CreateProcessor")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
bs := bootstrap.New(cluster, c.ClusterFile.GetCluster())
bs := bootstrap.New(cluster)
return bs.Apply(hosts...)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/apply/processor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *DeleteProcessor) PreProcess(cluster *v2.Cluster) error {
func (d *DeleteProcessor) UndoBootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Bootstrap in DeleteProcessor")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
bs := bootstrap.New(cluster, d.ClusterFile.GetCluster())
bs := bootstrap.New(d.ClusterFile.GetCluster())
return bs.Delete(hosts...)
}

Expand All @@ -92,11 +92,11 @@ func (d *DeleteProcessor) UnMountRootfs(cluster *v2.Cluster) error {
hosts = append(hosts, cluster.GetRegistryIPAndPort())
}
// umount don't care imageMounts
fs, err := rootfs.NewRootfsMounter(nil, d.ClusterFile.GetCluster())
fs, err := rootfs.NewRootfsMounter(nil)
if err != nil {
return err
}
return fs.UnMountRootfs(cluster, hosts)
return fs.UnMountRootfs(d.ClusterFile.GetCluster(), hosts)
}

func (d *DeleteProcessor) UnMountImage(cluster *v2.Cluster) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/processor/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (c *InstallProcessor) MountRootfs(cluster *v2.Cluster) error {
return nil
}
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
fs, err := rootfs.NewRootfsMounter(c.NewMounts, c.ClusterFile.GetCluster())
fs, err := rootfs.NewRootfsMounter(c.NewMounts)
if err != nil {
return err
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/apply/processor/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ func (c ScaleProcessor) UnMountRootfs(cluster *v2.Cluster) error {
logger.Warn("delete process unmount rootfs skip is cluster not mount rootfs")
return nil
}
fs, err := rootfs.NewRootfsMounter(cluster.Status.Mounts, c.ClusterFile.GetCluster())
fs, err := rootfs.NewRootfsMounter(cluster.Status.Mounts)
if err != nil {
return err
}
return fs.UnMountRootfs(cluster, hosts)
return fs.UnMountRootfs(c.ClusterFile.GetCluster(), hosts)
}

func (c *ScaleProcessor) JoinCheck(cluster *v2.Cluster) error {
Expand Down Expand Up @@ -184,11 +184,16 @@ func (c *ScaleProcessor) preProcess(cluster *v2.Cluster) error {
if err = SyncClusterStatus(cluster, c.Buildah, false); err != nil {
return err
}
runTime, err := runtime.NewDefaultRuntimeWithCurrentCluster(cluster, c.ClusterFile.GetCluster(), c.ClusterFile.GetKubeadmConfig())
var rt runtime.Interface
if c.IsScaleUp {
rt, err = runtime.NewDefaultRuntimeWithCurrentCluster(cluster, c.ClusterFile.GetKubeadmConfig())
} else {
rt, err = runtime.NewDefaultRuntimeWithCurrentCluster(c.ClusterFile.GetCluster(), c.ClusterFile.GetKubeadmConfig())
}
if err != nil {
return fmt.Errorf("failed to init runtime: %v", err)
}
c.Runtime = runTime
c.Runtime = rt

return err
}
Expand Down Expand Up @@ -234,7 +239,7 @@ func (c *ScaleProcessor) MountRootfs(cluster *v2.Cluster) error {
// since app type images are only sent to the first master, in
// cluster scaling scenario we don't need to sent app images repeatedly.
// so filter out rootfs/patch type
fs, err := rootfs.NewRootfsMounter(filterNoneApplicationMounts(cluster.Status.Mounts), c.ClusterFile.GetCluster())
fs, err := rootfs.NewRootfsMounter(filterNoneApplicationMounts(cluster.Status.Mounts))
if err != nil {
return err
}
Expand All @@ -254,14 +259,14 @@ func filterNoneApplicationMounts(images []v2.MountImage) []v2.MountImage {
func (c *ScaleProcessor) Bootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Bootstrap in ScaleProcessor")
hosts := append(c.MastersToJoin, c.NodesToJoin...)
bs := bootstrap.New(cluster, c.ClusterFile.GetCluster())
bs := bootstrap.New(cluster)
return bs.Apply(hosts...)
}

func (c *ScaleProcessor) UndoBootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline UndoBootstrap in ScaleProcessor")
hosts := append(c.MastersToDelete, c.NodesToDelete...)
bs := bootstrap.New(cluster, c.ClusterFile.GetCluster())
bs := bootstrap.New(c.ClusterFile.GetCluster())
return bs.Delete(hosts...)
}

Expand Down
96 changes: 59 additions & 37 deletions pkg/apply/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/labring/sealos/pkg/apply/applydrivers"
Expand All @@ -29,35 +30,34 @@ import (
v2 "github.com/labring/sealos/pkg/types/v1beta1"
fileutil "github.com/labring/sealos/pkg/utils/file"
"github.com/labring/sealos/pkg/utils/iputils"
"github.com/labring/sealos/pkg/utils/logger"
strings2 "github.com/labring/sealos/pkg/utils/strings"
)

// NewScaleApplierFromArgs will filter ip list from command parameters.
func NewScaleApplierFromArgs(scaleArgs *ScaleArgs, flag string) (applydrivers.Interface, error) {
func NewScaleApplierFromArgs(cmd *cobra.Command, scaleArgs *ScaleArgs) (applydrivers.Interface, error) {
var cluster *v2.Cluster
var curr *v2.Cluster
clusterPath := constants.Clusterfile(scaleArgs.Cluster.ClusterName)

if !fileutil.IsExist(clusterPath) {
cluster = initCluster(scaleArgs.Cluster.ClusterName)
curr = cluster
} else {
clusterFile := clusterfile.NewClusterFile(clusterPath)
err := clusterFile.Process()
if err != nil {
return nil, err
}
cluster = clusterFile.GetCluster()
curr = clusterFile.GetCluster().DeepCopy()
}

curr := cluster.DeepCopy()

if scaleArgs.Cluster.Nodes == "" && scaleArgs.Cluster.Masters == "" {
return nil, fmt.Errorf("the node or master parameter was not committed")
}
var err error
switch flag {
switch cmd.Name() {
case "add":
err = Join(cluster, scaleArgs)
err = verifyAndSetNodes(cmd, cluster, scaleArgs)
case "delete":
err = Delete(cluster, scaleArgs)
}
Expand All @@ -68,14 +68,52 @@ func NewScaleApplierFromArgs(scaleArgs *ScaleArgs, flag string) (applydrivers.In
return applydrivers.NewDefaultScaleApplier(curr, cluster)
}

func Join(cluster *v2.Cluster, scalingArgs *ScaleArgs) error {
return joinNodes(cluster, scalingArgs)
func getSSHFromCommand(cmd *cobra.Command) *v2.SSH {
var (
ret = &v2.SSH{}
fs = cmd.Flags()
changed bool
)
if flagChanged(cmd, "user") {
ret.User, _ = fs.GetString("user")
changed = true
}
if flagChanged(cmd, "passwd") {
ret.Passwd, _ = fs.GetString("passwd")
changed = true
}
if flagChanged(cmd, "pk") {
ret.Pk, _ = fs.GetString("pk")
changed = true
}
if flagChanged(cmd, "pk-passwd") {
ret.PkPasswd, _ = fs.GetString("pk-passwd")
changed = true
}
if flagChanged(cmd, "port") {
ret.Port, _ = fs.GetUint16("port")
changed = true
}
if changed {
return ret
}
return nil
}

func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
func flagChanged(cmd *cobra.Command, name string) bool {
if cmd != nil {
if fs := cmd.Flag(name); fs != nil && fs.Changed {
return true
}
}
return false
}

func verifyAndSetNodes(cmd *cobra.Command, cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
if err := PreProcessIPList(scaleArgs.Cluster); err != nil {
return err
}

masters, nodes := scaleArgs.Cluster.Masters, scaleArgs.Cluster.Nodes
if len(masters) > 0 {
if err := validateIPList(masters); err != nil {
Expand All @@ -94,7 +132,7 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
var hasMaster bool
// check duplicate
alreadyIn := sets.NewString()
// add already add masters and nodes
// add already joined masters and nodes
for i := range cluster.Spec.Hosts {
h := cluster.Spec.Hosts[i]
if strings2.InList(v2.MASTER, h.Roles) {
Expand All @@ -112,7 +150,9 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
if !hasMaster {
return fmt.Errorf("`master` role not found, due to Clusterfile may have been corrupted?")
}
getHostFunc := func(sliceStr string, role string, exclude []string, scaleSshConfig *SSH) (*v2.Host, error) {
override := getSSHFromCommand(cmd)

getHostFunc := func(sliceStr string, role string, exclude []string) (*v2.Host, error) {
ss := strings.Split(sliceStr, ",")
addrs := make([]string, 0)
for _, s := range ss {
Expand All @@ -129,47 +169,29 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
}
}
if len(addrs) > 0 {
clusterSSH := cluster.GetSSH()

var override bool
var overrideSSH v2.SSH

if scaleSshConfig != nil {
overrideSSH.User = scaleSshConfig.User
overrideSSH.Passwd = scaleSshConfig.Password
overrideSSH.PkName = clusterSSH.PkName
overrideSSH.PkData = clusterSSH.PkData
overrideSSH.Pk = scaleSshConfig.Pk
overrideSSH.PkPasswd = scaleSshConfig.PkPassword
overrideSSH.Port = scaleSshConfig.Port

if clusterSSH != overrideSSH {
logger.Info("scale '%s' nodes '%q' with different ssh settings: %+v", role, sliceStr, scaleSshConfig)
clusterSSH = overrideSSH
override = true
}
}
global := cluster.Spec.SSH.DeepCopy()
ssh.OverSSHConfig(global, override)

sshClient := ssh.NewSSHClient(&clusterSSH, true)
sshClient := ssh.NewSSHClient(global, true)

host := &v2.Host{
IPS: addrs,
Roles: []string{role, GetHostArch(sshClient, addrs[0])},
}
if override {
host.SSH = &overrideSSH
if override != nil {
host.SSH = override
}
return host, nil
}
return nil, nil
}

if mastersToAdded, err := getHostFunc(masters, v2.MASTER, cluster.GetMasterIPAndPortList(), scaleArgs.SSH); err != nil {
if mastersToAdded, err := getHostFunc(masters, v2.MASTER, cluster.GetMasterIPAndPortList()); err != nil {
return err
} else if mastersToAdded != nil {
hosts = append(hosts, *mastersToAdded)
}
if nodesToAdded, err := getHostFunc(nodes, v2.NODE, cluster.GetNodeIPAndPortList(), scaleArgs.SSH); err != nil {
if nodesToAdded, err := getHostFunc(nodes, v2.NODE, cluster.GetNodeIPAndPortList()); err != nil {
return err
} else if nodesToAdded != nil {
hosts = append(hosts, *nodesToAdded)
Expand Down
45 changes: 1 addition & 44 deletions pkg/apply/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,53 +825,10 @@ func TestJoin(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := Join(tt.args.cluster, tt.args.scalingArgs); (err != nil) != tt.wantErr {
if err := verifyAndSetNodes(nil, tt.args.cluster, tt.args.scalingArgs); (err != nil) != tt.wantErr {
t.Errorf("Join() error = %v, wantErr %v", err, tt.wantErr)
}
t.Logf("print des cluster hosts: %v", tt.args.cluster.Spec.Hosts)
})
}
}

func TestNewScaleApplierFromArgs(t *testing.T) {
tests := []struct {
name string
op string
args *ScaleArgs
wantErr bool
}{
{
name: "add empty",
op: "add",
args: &ScaleArgs{
Cluster: &Cluster{
Masters: "",
Nodes: "",
ClusterName: "",
},
},
wantErr: true,
},
{
name: "delete master0",
op: "delete",
args: &ScaleArgs{
Cluster: &Cluster{
Masters: "192.168.1.1",
Nodes: "",
ClusterName: "",
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewScaleApplierFromArgs(tt.args, tt.op)
if (err != nil) != tt.wantErr {
t.Errorf("NewScaleApplierFromArgs() error = %v, wantErr %v", err, tt.wantErr)
}
// t.Logf("print des cluster hosts: %v", tt.args.cluster.Spec.Hosts)
})
}
}
4 changes: 2 additions & 2 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type realBootstrap struct {
postflights []Applier
}

func New(cluster, current *v2.Cluster) Interface {
ctx := NewContextFrom(cluster, current)
func New(cluster *v2.Cluster) Interface {
ctx := NewContextFrom(cluster)
bs := &realBootstrap{
ctx: ctx,
preflights: make([]Applier, 0),
Expand Down
4 changes: 2 additions & 2 deletions pkg/bootstrap/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (ctx realContext) GetRemoter() remote.Interface {
return ctx.remoter
}

func NewContextFrom(cluster, current *v2.Cluster) Context {
execer := ssh.NewClusterClient(cluster, current, true)
func NewContextFrom(cluster *v2.Cluster) Context {
execer, _ := ssh.NewSSHByCluster(cluster, true)
envProcessor := env.NewEnvProcessor(cluster, cluster.Status.Mounts)
remoter := remote.New(cluster.GetName(), execer)
return &realContext{
Expand Down

0 comments on commit bb803fb

Please sign in to comment.