Skip to content

Commit

Permalink
Merge pull request #1 from yangxggo/refactor_ssh
Browse files Browse the repository at this point in the history
Refactor ssh
  • Loading branch information
yangxggo committed Jul 21, 2023
2 parents 3d87088 + e73897d commit 5cd6757
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 130 deletions.
2 changes: 1 addition & 1 deletion cmd/sealos/cmd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newAddCmd() *cobra.Command {
Args: cobra.NoArgs,
Example: exampleAdd,
RunE: func(cmd *cobra.Command, args []string) error {
applier, err := apply.NewScaleApplierFromArgs(addArgs, "add")
applier, err := apply.NewScaleApplierFromArgs(cmd, addArgs)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/sealos/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newDeleteCmd() *cobra.Command {
if err := processor.ConfirmDeleteNodes(); err != nil {
return err
}
applier, err := apply.NewScaleApplierFromArgs(deleteArgs, "delete")
applier, err := apply.NewScaleApplierFromArgs(cmd, deleteArgs)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/apply/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (arg *ResetArgs) RegisterFlags(fs *pflag.FlagSet) {
type ScaleArgs struct {
*Cluster
*SSH
fs *pflag.FlagSet
}

func (arg *ScaleArgs) RegisterFlags(fs *pflag.FlagSet, verb, action string) {
Expand All @@ -110,5 +109,4 @@ func (arg *ScaleArgs) RegisterFlags(fs *pflag.FlagSet, verb, action string) {
if arg.SSH != nil {
arg.SSH.RegisterFlags(fs)
}
arg.fs = fs
}
4 changes: 2 additions & 2 deletions pkg/apply/processor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *DeleteProcessor) PreProcess(cluster *v2.Cluster) error {
func (d *DeleteProcessor) UndoBootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Bootstrap in DeleteProcessor")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
bs := bootstrap.New(cluster, d.ClusterFile.GetCluster())
bs := bootstrap.New(d.ClusterFile.GetCluster())
return bs.Delete(hosts...)
}

Expand All @@ -96,7 +96,7 @@ func (d *DeleteProcessor) UnMountRootfs(cluster *v2.Cluster) error {
if err != nil {
return err
}
return fs.UnMountRootfs(cluster, hosts)
return fs.UnMountRootfs(d.ClusterFile.GetCluster(), hosts)
}

func (d *DeleteProcessor) UnMountImage(cluster *v2.Cluster) error {
Expand Down
13 changes: 9 additions & 4 deletions pkg/apply/processor/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c ScaleProcessor) UnMountRootfs(cluster *v2.Cluster) error {
if err != nil {
return err
}
return fs.UnMountRootfs(cluster, hosts)
return fs.UnMountRootfs(c.ClusterFile.GetCluster(), hosts)
}

func (c *ScaleProcessor) JoinCheck(cluster *v2.Cluster) error {
Expand Down Expand Up @@ -184,11 +184,16 @@ func (c *ScaleProcessor) preProcess(cluster *v2.Cluster) error {
if err = SyncClusterStatus(cluster, c.Buildah, false); err != nil {
return err
}
runTime, err := runtime.NewDefaultRuntimeWithCurrentCluster(cluster, c.ClusterFile.GetCluster(), c.ClusterFile.GetKubeadmConfig())
var rt runtime.Interface
if c.IsScaleUp {
rt, err = runtime.NewDefaultRuntimeWithCurrentCluster(cluster, c.ClusterFile.GetKubeadmConfig())
} else {
rt, err = runtime.NewDefaultRuntimeWithCurrentCluster(c.ClusterFile.GetCluster(), c.ClusterFile.GetKubeadmConfig())
}
if err != nil {
return fmt.Errorf("failed to init runtime: %v", err)
}
c.Runtime = runTime
c.Runtime = rt

return err
}
Expand Down Expand Up @@ -261,7 +266,7 @@ func (c *ScaleProcessor) Bootstrap(cluster *v2.Cluster) error {
func (c *ScaleProcessor) UndoBootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline UndoBootstrap in ScaleProcessor")
hosts := append(c.MastersToDelete, c.NodesToDelete...)
bs := bootstrap.New(cluster, c.ClusterFile.GetCluster())
bs := bootstrap.New(c.ClusterFile.GetCluster())
return bs.Delete(hosts...)
}

Expand Down
93 changes: 58 additions & 35 deletions pkg/apply/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strconv"
"strings"


"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/labring/sealos/pkg/apply/applydrivers"
Expand All @@ -29,35 +31,34 @@ import (
v2 "github.com/labring/sealos/pkg/types/v1beta1"
fileutil "github.com/labring/sealos/pkg/utils/file"
"github.com/labring/sealos/pkg/utils/iputils"
"github.com/labring/sealos/pkg/utils/logger"
strings2 "github.com/labring/sealos/pkg/utils/strings"
)

// NewScaleApplierFromArgs will filter ip list from command parameters.
func NewScaleApplierFromArgs(scaleArgs *ScaleArgs, flag string) (applydrivers.Interface, error) {
func NewScaleApplierFromArgs(cmd *cobra.Command, scaleArgs *ScaleArgs) (applydrivers.Interface, error) {
var cluster *v2.Cluster
var curr *v2.Cluster
clusterPath := constants.Clusterfile(scaleArgs.Cluster.ClusterName)

if !fileutil.IsExist(clusterPath) {
cluster = initCluster(scaleArgs.Cluster.ClusterName)
curr = cluster
} else {
clusterFile := clusterfile.NewClusterFile(clusterPath)
err := clusterFile.Process()
if err != nil {
return nil, err
}
cluster = clusterFile.GetCluster()
curr = clusterFile.GetCluster().DeepCopy()
}

curr := cluster.DeepCopy()

if scaleArgs.Cluster.Nodes == "" && scaleArgs.Cluster.Masters == "" {
return nil, fmt.Errorf("the node or master parameter was not committed")
}
var err error
switch flag {
switch cmd.Name() {
case "add":
err = Join(cluster, scaleArgs)
err = verifyAndSetNodes(cmd, cluster, scaleArgs)
case "delete":
err = Delete(cluster, scaleArgs)
}
Expand All @@ -68,14 +69,52 @@ func NewScaleApplierFromArgs(scaleArgs *ScaleArgs, flag string) (applydrivers.In
return applydrivers.NewDefaultScaleApplier(curr, cluster)
}

func Join(cluster *v2.Cluster, scalingArgs *ScaleArgs) error {
return joinNodes(cluster, scalingArgs)
func getSSHFromCommand(cmd *cobra.Command) *v2.SSH {
var (
ret = &v2.SSH{}
fs = cmd.Flags()
changed bool
)
if flagChanged(cmd, "user") {
ret.User, _ = fs.GetString("user")
changed = true
}
if flagChanged(cmd, "passwd") {
ret.Passwd, _ = fs.GetString("passwd")
changed = true
}
if flagChanged(cmd, "pk") {
ret.Pk, _ = fs.GetString("pk")
changed = true
}
if flagChanged(cmd, "pk-passwd") {
ret.PkPasswd, _ = fs.GetString("pk-passwd")
changed = true
}
if flagChanged(cmd, "port") {
ret.Port, _ = fs.GetUint16("port")
changed = true
}
if changed {
return ret
}
return nil
}

func flagChanged(cmd *cobra.Command, name string) bool {
if cmd != nil {
if fs := cmd.Flag(name); fs != nil && fs.Changed {
return true
}
}
return false
}

func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
func verifyAndSetNodes(cmd *cobra.Command, cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
if err := PreProcessIPList(scaleArgs.Cluster); err != nil {
return err
}

masters, nodes := scaleArgs.Cluster.Masters, scaleArgs.Cluster.Nodes
if len(masters) > 0 {
if err := validateIPList(masters); err != nil {
Expand All @@ -94,7 +133,7 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
var hasMaster bool
// check duplicate
alreadyIn := sets.NewString()
// add already add masters and nodes
// add already joined masters and nodes
for i := range cluster.Spec.Hosts {
h := cluster.Spec.Hosts[i]
if strings2.InList(v2.MASTER, h.Roles) {
Expand All @@ -112,7 +151,9 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
if !hasMaster {
return fmt.Errorf("`master` role not found, due to Clusterfile may have been corrupted?")
}
getHostFunc := func(sliceStr string, role string, exclude []string, scaleSshConfig *SSH) (*v2.Host, error) {
override := getSSHFromCommand(cmd)

getHostFunc := func(sliceStr string, role string, exclude []string) (*v2.Host, error) {
ss := strings.Split(sliceStr, ",")
addrs := make([]string, 0)
for _, s := range ss {
Expand All @@ -129,35 +170,17 @@ func joinNodes(cluster *v2.Cluster, scaleArgs *ScaleArgs) error {
}
}
if len(addrs) > 0 {
clusterSSH := cluster.GetSSH()

var override bool
var overrideSSH v2.SSH

if scaleSshConfig != nil {
overrideSSH.User = scaleSshConfig.User
overrideSSH.Passwd = scaleSshConfig.Password
overrideSSH.PkName = clusterSSH.PkName
overrideSSH.PkData = clusterSSH.PkData
overrideSSH.Pk = scaleSshConfig.Pk
overrideSSH.PkPasswd = scaleSshConfig.PkPassword
overrideSSH.Port = scaleSshConfig.Port

if clusterSSH != overrideSSH {
logger.Info("scale '%s' nodes '%q' with different ssh settings: %+v", role, sliceStr, scaleSshConfig)
clusterSSH = overrideSSH
override = true
}
}
global := cluster.Spec.SSH.DeepCopy()
ssh.OverSSHConfig(global, override)

sshClient := ssh.NewSSHClient(&clusterSSH, true)
sshClient := ssh.NewSSHClient(global, true)

host := &v2.Host{
IPS: addrs,
Roles: []string{role, GetHostArch(sshClient, addrs[0])},
}
if override {
host.SSH = &overrideSSH
if override != nil {
host.SSH = override
}
return host, nil
}
Expand Down
45 changes: 1 addition & 44 deletions pkg/apply/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,53 +825,10 @@ func TestJoin(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := Join(tt.args.cluster, tt.args.scalingArgs); (err != nil) != tt.wantErr {
if err := verifyAndSetNodes(nil, tt.args.cluster, tt.args.scalingArgs); (err != nil) != tt.wantErr {
t.Errorf("Join() error = %v, wantErr %v", err, tt.wantErr)
}
t.Logf("print des cluster hosts: %v", tt.args.cluster.Spec.Hosts)
})
}
}

func TestNewScaleApplierFromArgs(t *testing.T) {
tests := []struct {
name string
op string
args *ScaleArgs
wantErr bool
}{
{
name: "add empty",
op: "add",
args: &ScaleArgs{
Cluster: &Cluster{
Masters: "",
Nodes: "",
ClusterName: "",
},
},
wantErr: true,
},
{
name: "delete master0",
op: "delete",
args: &ScaleArgs{
Cluster: &Cluster{
Masters: "192.168.1.1",
Nodes: "",
ClusterName: "",
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewScaleApplierFromArgs(tt.args, tt.op)
if (err != nil) != tt.wantErr {
t.Errorf("NewScaleApplierFromArgs() error = %v, wantErr %v", err, tt.wantErr)
}
// t.Logf("print des cluster hosts: %v", tt.args.cluster.Spec.Hosts)
})
}
}
4 changes: 2 additions & 2 deletions pkg/bootstrap/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (ctx realContext) GetRemoter() remote.Interface {
return ctx.remoter
}

func NewContextFrom(cluster, current *v2.Cluster) Context {
execer := ssh.NewClusterClient(cluster, current, true)
func NewContextFrom(cluster *v2.Cluster) Context {
execer, _ := ssh.NewSSHByCluster(cluster, true)
envProcessor := env.NewEnvProcessor(cluster, cluster.Status.Mounts)
remoter := remote.New(cluster.GetName(), execer)
return &realContext{
Expand Down
23 changes: 12 additions & 11 deletions pkg/filesystem/rootfs/rootfs_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ import (
)

type defaultRootfs struct {
currentCluster *v2.Cluster
// clusterService image.ClusterService
// imgList types.ImageListOCIV1
// cluster types.ClusterManifestList
mounts []v2.MountImage
}

Expand All @@ -54,8 +50,8 @@ func (f *defaultRootfs) getClusterName(cluster *v2.Cluster) string {
return cluster.Name
}

func (f *defaultRootfs) getSSH(cluster, current *v2.Cluster) ssh.Interface {
return ssh.NewClusterClient(cluster, current, true)
func (f *defaultRootfs) getSSH(cluster *v2.Cluster) (ssh.Interface, error) {
return ssh.NewSSHByCluster(cluster, true)
}

func (f *defaultRootfs) mountRootfs(cluster *v2.Cluster, ipList []string) error {
Expand Down Expand Up @@ -93,7 +89,10 @@ func (f *defaultRootfs) mountRootfs(cluster *v2.Cluster, ipList []string) error
return err
}

sshClient := f.getSSH(cluster, f.currentCluster)
sshClient, err := f.getSSH(cluster)
if err != nil {
return err
}

notRegistryDirFilter := func(entry fs.DirEntry) bool { return !constants.IsRegistryDir(entry) }

Expand All @@ -118,8 +117,7 @@ func (f *defaultRootfs) mountRootfs(cluster *v2.Cluster, ipList []string) error
return egg.Wait()
})
}
err := eg.Wait()
if err != nil {
if err = eg.Wait(); err != nil {
return err
}

Expand All @@ -146,11 +144,14 @@ func (f *defaultRootfs) unmountRootfs(cluster *v2.Cluster, ipList []string) erro
rmRootfs := fmt.Sprintf("rm -rf %s", clusterRootfsDir)
deleteHomeDirCmd := fmt.Sprintf("rm -rf %s", constants.ClusterDir(cluster.Name))
eg, _ := errgroup.WithContext(context.Background())
sshClient, err := f.getSSH(cluster)
if err != nil {
return err
}
for _, IP := range ipList {
ip := IP
eg.Go(func() error {
SSH := f.getSSH(cluster, f.currentCluster)
return SSH.CmdAsync(ip, rmRootfs, deleteHomeDirCmd)
return sshClient.CmdAsync(ip, rmRootfs, deleteHomeDirCmd)
})
}
return eg.Wait()
Expand Down

0 comments on commit 5cd6757

Please sign in to comment.