Skip to content

Commit

Permalink
refactor: move local action out from pkg/ssh to pkg/exec (#3929) (#3937)
Browse files Browse the repository at this point in the history
* refactor: move local action out from pkg/ssh to pkg/exec

* fix: using exec.Interface to support both local and remote commands

* move cobra command implementation to cmd directory

Co-authored-by: fengxsong <fengxsong@outlook.com>
  • Loading branch information
sealos-ci-robot and fengxsong committed Sep 15, 2023
1 parent 814e17f commit 7f2af2f
Show file tree
Hide file tree
Showing 34 changed files with 432 additions and 290 deletions.
73 changes: 47 additions & 26 deletions cmd/sealos/cmd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package cmd

import (
"context"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
"github.com/labring/sealos/pkg/types/v1beta1"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
)

var roles string
var clusterName string
var ips []string

var exampleExec = `
exec to default cluster: default
Expand All @@ -40,37 +42,56 @@ set ips to exec cmd:
`

func newExecCmd() *cobra.Command {
var cluster *v1beta1.Cluster
var (
roles []string
ips []string
cluster *v2.Cluster
)
var execCmd = &cobra.Command{
Use: "exec",
Short: "Execute shell command or script on specified nodes",
Example: exampleExec,
Args: cobra.ExactArgs(1),
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if len(ips) > 0 {
execIPCmd, err := ssh.NewExecCmdFromIPs(cluster, ips)
if err != nil {
return err
}
return execIPCmd.RunCmd(args[0])
}
execRoleCmd, err := ssh.NewExecCmdFromRoles(cluster, roles)
if err != nil {
return err
}
return execRoleCmd.RunCmd(args[0])
targets := getTargets(cluster, ips, roles)
return runCommand(cluster, targets, args)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
cls, err := clusterfile.GetClusterFromName(clusterName)
if err != nil {
return err
}
cluster = cls
return nil
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
cluster, err = clusterfile.GetClusterFromName(clusterName)
return
},
}
execCmd.Flags().StringVarP(&clusterName, "cluster", "c", "default", "name of cluster to applied exec action")
execCmd.Flags().StringVarP(&roles, "roles", "r", "", "run command on nodes with role")
execCmd.Flags().StringVarP(&clusterName, "cluster", "c", "default", "name of cluster to run commands")
execCmd.Flags().StringSliceVarP(&roles, "roles", "r", []string{}, "run command on nodes with role")
execCmd.Flags().StringSliceVar(&ips, "ips", []string{}, "run command on nodes with ip address")
return execCmd
}

func getTargets(cluster *v2.Cluster, ips []string, roles []string) []string {
if len(ips) > 0 {
return ips
}
if len(roles) == 0 {
return cluster.GetAllIPS()
}
var targets []string
for i := range roles {
targets = append(targets, cluster.GetIPSByRole(roles[i])...)
}
return targets
}

func runCommand(cluster *v2.Cluster, targets []string, args []string) error {
execer, err := exec.New(ssh.NewCacheClientFromCluster(cluster, true))
if err != nil {
return err
}
eg, _ := errgroup.WithContext(context.Background())
for _, ipAddr := range targets {
ip := ipAddr
eg.Go(func() error {
return execer.CmdAsync(ip, args...)
})
}
return eg.Wait()
}
63 changes: 36 additions & 27 deletions cmd/sealos/cmd/scp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ limitations under the License.
package cmd

import (
"context"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
"github.com/labring/sealos/pkg/types/v1beta1"
"github.com/labring/sealos/pkg/utils/logger"
)

// Shared with exec.go
// var roles string
// var clusterName string
// var ips []string

const exampleScp = `
copy file to default cluster: default
sealos scp "/root/aa.txt" "/root/dd.txt"
Expand All @@ -41,37 +41,46 @@ set ips to copy file:
`

func newScpCmd() *cobra.Command {
var cluster *v1beta1.Cluster
var (
roles []string
ips []string
cluster *v1beta1.Cluster
)
var scpCmd = &cobra.Command{
Use: "scp",
Short: "Copy file to remote on specified nodes",
Example: exampleScp,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
if len(ips) > 0 {
sshCmd, err := ssh.NewExecCmdFromIPs(cluster, ips)
if err != nil {
return err
}
return sshCmd.RunCopy(args[0], args[1])
}
sshCmd, err := ssh.NewExecCmdFromRoles(cluster, roles)
if err != nil {
return err
}
return sshCmd.RunCopy(args[0], args[1])
targets := getTargets(cluster, ips, roles)
return runCopy(cluster, targets, args)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
cls, err := clusterfile.GetClusterFromName(clusterName)
if err != nil {
return err
}
cluster = cls
return nil
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
cluster, err = clusterfile.GetClusterFromName(clusterName)
return
},
}
scpCmd.Flags().StringVarP(&clusterName, "cluster", "c", "default", "name of cluster to applied scp action")
scpCmd.Flags().StringVarP(&roles, "roles", "r", "", "copy file to nodes with role")
scpCmd.Flags().StringVarP(&clusterName, "cluster", "c", "default", "name of cluster to run scp action")
scpCmd.Flags().StringSliceVarP(&roles, "roles", "r", []string{}, "copy file to nodes with role")
scpCmd.Flags().StringSliceVar(&ips, "ips", []string{}, "copy file to nodes with ip address")
return scpCmd
}

func runCopy(cluster *v1beta1.Cluster, targets []string, args []string) error {
execer, err := exec.New(ssh.NewCacheClientFromCluster(cluster, true))
if err != nil {
return err
}
eg, _ := errgroup.WithContext(context.Background())
for _, ipAddr := range targets {
ip := ipAddr
eg.Go(func() error {
return execer.Copy(ip, args[0], args[1])
})
}
if err = eg.Wait(); err != nil {
return err
}
logger.Info("transfers files success")
return nil
}
4 changes: 3 additions & 1 deletion pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func NewApplierFromFile(cmd *cobra.Command, path string, args *Args) (applydrive
return nil, fmt.Errorf("cluster name cannot be empty, make sure %s file is correct", path)
}

CheckAndInitialize(cluster)
if err := CheckAndInitialize(cluster); err != nil {
return nil, err
}

localpath := constants.Clusterfile(cluster.Name)
cf := clusterfile.NewClusterFile(localpath)
Expand Down
7 changes: 6 additions & 1 deletion pkg/apply/processor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/labring/sealos/pkg/buildah"
"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/filesystem/registry"
"github.com/labring/sealos/pkg/ssh"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
Expand Down Expand Up @@ -155,7 +156,11 @@ func MirrorRegistry(cluster *v2.Cluster, mounts []v2.MountImage) error {
registries := cluster.GetRegistryIPAndPortList()
logger.Debug("registry nodes is: %+v", registries)
sshClient := ssh.NewCacheClientFromCluster(cluster, true)
syncer := registry.New(constants.NewPathResolver(cluster.GetName()), sshClient, mounts)
execer, err := exec.New(sshClient)
if err != nil {
return err
}
syncer := registry.New(constants.NewPathResolver(cluster.GetName()), execer, mounts)
return syncer.Sync(context.Background(), registries...)
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/apply/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/labring/sealos/pkg/apply/applydrivers"
"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
"github.com/labring/sealos/pkg/utils/logger"
Expand Down Expand Up @@ -69,9 +70,13 @@ func (r *ClusterArgs) resetArgs(cmd *cobra.Command, args *ResetArgs) error {
r.hosts = []v2.Host{}

sshClient := ssh.NewCacheClientFromCluster(r.cluster, true)
r.setHostWithIpsPort(masters, []string{v2.MASTER, GetHostArch(sshClient, masters[0])})
execer, err := exec.New(sshClient)
if err != nil {
return err
}
r.setHostWithIpsPort(masters, []string{v2.MASTER, GetHostArch(execer, masters[0])})
if len(nodes) > 0 {
r.setHostWithIpsPort(nodes, []string{v2.NODE, GetHostArch(sshClient, nodes[0])})
r.setHostWithIpsPort(nodes, []string{v2.NODE, GetHostArch(execer, nodes[0])})
}
r.cluster.Spec.Hosts = r.hosts
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/apply/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/labring/sealos/pkg/apply/processor"
"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
"github.com/labring/sealos/pkg/utils/iputils"
Expand Down Expand Up @@ -124,15 +125,19 @@ func (r *ClusterArgs) runArgs(cmd *cobra.Command, args *RunArgs, imageList []str
r.hosts = []v2.Host{}

sshClient := ssh.NewCacheClientFromCluster(r.cluster, true)
execer, err := exec.New(sshClient)
if err != nil {
return err
}
if len(masters) > 0 {
host, port := iputils.GetHostIPAndPortOrDefault(masters[0], defaultPort)
master0addr := net.JoinHostPort(host, port)
r.setHostWithIpsPort(masters, []string{v2.MASTER, GetHostArch(sshClient, master0addr)})
r.setHostWithIpsPort(masters, []string{v2.MASTER, GetHostArch(execer, master0addr)})
}
if len(nodes) > 0 {
host, port := iputils.GetHostIPAndPortOrDefault(nodes[0], defaultPort)
node0addr := net.JoinHostPort(host, port)
r.setHostWithIpsPort(nodes, []string{v2.NODE, GetHostArch(sshClient, node0addr)})
r.setHostWithIpsPort(nodes, []string{v2.NODE, GetHostArch(execer, node0addr)})
}
r.cluster.Spec.Hosts = append(r.cluster.Spec.Hosts, r.hosts...)

Expand Down
8 changes: 6 additions & 2 deletions pkg/apply/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/labring/sealos/pkg/apply/applydrivers"
"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
fileutil "github.com/labring/sealos/pkg/utils/file"
Expand Down Expand Up @@ -172,10 +173,13 @@ func verifyAndSetNodes(cmd *cobra.Command, cluster *v2.Cluster, scaleArgs *Scale
ssh.OverSSHConfig(global, override)

sshClient := ssh.MustNewClient(global, true)

execer, err := exec.New(sshClient)
if err != nil {
return nil, err
}
host := &v2.Host{
IPS: addrs,
Roles: []string{role, GetHostArch(sshClient, addrs[0])},
Roles: []string{role, GetHostArch(execer, addrs[0])},
}
if override != nil {
host.SSH = override
Expand Down
30 changes: 16 additions & 14 deletions pkg/apply/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ import (
"path/filepath"
"strings"

"github.com/labring/sealos/pkg/constants"

"github.com/labring/sealos/pkg/utils/hash"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/exec"
"github.com/labring/sealos/pkg/ssh"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
"github.com/labring/sealos/pkg/utils/hash"
"github.com/labring/sealos/pkg/utils/iputils"
"github.com/labring/sealos/pkg/utils/logger"
stringsutil "github.com/labring/sealos/pkg/utils/strings"

"k8s.io/apimachinery/pkg/util/sets"

v2 "github.com/labring/sealos/pkg/types/v1beta1"
)

func initCluster(clusterName string) *v2.Cluster {
Expand Down Expand Up @@ -89,9 +87,9 @@ func validateIPList(s string) error {
return nil
}

func getHostArch(sshClient ssh.Interface) func(string) v2.Arch {
func getHostArch(execer exec.Interface) func(string) v2.Arch {
return func(ip string) v2.Arch {
out, err := sshClient.Cmd(ip, "arch")
out, err := execer.Cmd(ip, "arch")
if err != nil {
logger.Warn("failed to get host arch: %v, defaults to amd64", err)
return v2.AMD64
Expand All @@ -111,8 +109,8 @@ func getHostArch(sshClient ssh.Interface) func(string) v2.Arch {
// GetHostArch returns the host architecture of the given ip using SSH.
// Note that hosts of the same type(master/node) must have the same architecture,
// so we only need to check the first host of the given type.
func GetHostArch(sshClient ssh.Interface, ip string) string {
return string(getHostArch(sshClient)(ip))
func GetHostArch(execer exec.Interface, ip string) string {
return string(getHostArch(execer)(ip))
}

func GetImagesDiff(current, desired []string) []string {
Expand All @@ -139,7 +137,7 @@ func GetNewImages(currentCluster, desiredCluster *v2.Cluster) []string {
return nil
}

func CheckAndInitialize(cluster *v2.Cluster) {
func CheckAndInitialize(cluster *v2.Cluster) error {
cluster.Spec.SSH.Port = cluster.Spec.SSH.DefaultPort()

if cluster.Spec.SSH.Pk == "" {
Expand All @@ -148,14 +146,18 @@ func CheckAndInitialize(cluster *v2.Cluster) {

if len(cluster.Spec.Hosts) == 0 {
sshClient := ssh.MustNewClient(cluster.Spec.SSH.DeepCopy(), true)

execer, err := exec.New(sshClient)
if err != nil {
return err
}
localIpv4 := iputils.GetLocalIpv4()
defaultPort := defaultSSHPort(cluster.Spec.SSH.Port)
addr := net.JoinHostPort(localIpv4, defaultPort)

cluster.Spec.Hosts = append(cluster.Spec.Hosts, v2.Host{
IPS: []string{addr},
Roles: []string{v2.MASTER, GetHostArch(sshClient, addr)},
Roles: []string{v2.MASTER, GetHostArch(execer, addr)},
})
}
return nil
}

0 comments on commit 7f2af2f

Please sign in to comment.