Skip to content

Commit

Permalink
refactor(main): ssh interface instance func (#3654)
Browse files Browse the repository at this point in the history
  • Loading branch information
cuisongliu committed Aug 10, 2023
1 parent 7bbf122 commit 432ea4e
Show file tree
Hide file tree
Showing 13 changed files with 16 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/apply/processor/interface.go
Expand Up @@ -152,7 +152,7 @@ func ConfirmDeleteNodes() error {
func MirrorRegistry(cluster *v2.Cluster, mounts []v2.MountImage) error {
registries := cluster.GetRegistryIPAndPortList()
logger.Debug("registry nodes is: %+v", registries)
sshClient := ssh.NewSSHClient(&cluster.Spec.SSH, true)
sshClient := ssh.NewSSHByCluster(cluster, true)
syncer := registry.New(constants.NewData(cluster.GetName()), sshClient, mounts)
return syncer.Sync(context.Background(), registries...)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/apply/reset.go
Expand Up @@ -77,9 +77,7 @@ func (r *ClusterArgs) resetArgs(args *ResetArgs) error {
nodes := stringsutil.SplitRemoveEmpty(args.Cluster.Nodes, ",")
r.hosts = []v2.Host{}

clusterSSH := r.cluster.GetSSH()
sshClient := ssh.NewSSHClient(&clusterSSH, true)

sshClient := ssh.NewSSHByCluster(r.cluster, true)
r.setHostWithIpsPort(masters, []string{v2.MASTER, GetHostArch(sshClient, masters[0])})
if len(nodes) > 0 {
r.setHostWithIpsPort(nodes, []string{v2.NODE, GetHostArch(sshClient, nodes[0])})
Expand Down
3 changes: 1 addition & 2 deletions pkg/apply/run.go
Expand Up @@ -121,8 +121,7 @@ func (r *ClusterArgs) runArgs(imageList []string, args *RunArgs) error {
nodes := stringsutil.SplitRemoveEmpty(args.Cluster.Nodes, ",")
r.hosts = []v2.Host{}

clusterSSH := r.cluster.GetSSH()
sshClient := ssh.NewSSHClient(&clusterSSH, true)
sshClient := ssh.NewSSHByCluster(r.cluster, true)
if len(masters) > 0 {
host, port := iputils.GetHostIPAndPortOrDefault(masters[0], defaultPort)
master0addr := net.JoinHostPort(host, port)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/context.go
Expand Up @@ -59,7 +59,7 @@ func (ctx realContext) GetRemoter() remote.Interface {
}

func NewContextFrom(cluster *v2.Cluster) Context {
execer, _ := ssh.NewSSHByCluster(cluster, true)
execer := ssh.NewSSHByCluster(cluster, true)
envProcessor := env.NewEnvProcessor(cluster, cluster.Status.Mounts)
remoter := remote.New(cluster.GetName(), execer)
return &realContext{
Expand Down
7 changes: 1 addition & 6 deletions pkg/checker/crictl_checker.go
Expand Up @@ -104,12 +104,7 @@ func (n *CRICtlChecker) Check(cluster *v2.Cluster, phase string) error {
pauseImage = mountImg.Env["sandboxImage"]
}
}
sshCtx, err := ssh.NewSSHByCluster(cluster, false)
if err != nil {
status.Error = fmt.Errorf("get ssh interface error: %w", err).Error()
return nil
}

sshCtx := ssh.NewSSHByCluster(cluster, false)
root := constants.NewData(cluster.Name).RootFSPath()
regInfo := helpers.GetRegistryInfo(sshCtx, root, cluster.GetRegistryIPAndPort())

Expand Down
5 changes: 1 addition & 4 deletions pkg/checker/host_checker.go
Expand Up @@ -40,10 +40,7 @@ func (a HostChecker) Check(cluster *v2.Cluster, _ string) error {
if len(a.IPs) != 0 {
ipList = a.IPs
}
sshClient, err := ssh.NewSSHByCluster(cluster, false)
if err != nil {
return fmt.Errorf("checker: failed to create ssh client, %v", err)
}
sshClient := ssh.NewSSHByCluster(cluster, false)
if err := checkHostnameUnique(sshClient, ipList); err != nil {
return err
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/checker/registry_checker.go
Expand Up @@ -87,11 +87,7 @@ func (n *RegistryChecker) Check(cluster *v2.Cluster, phase string) error {
}
}

sshCtx, err := ssh.NewSSHByCluster(cluster, false)
if err != nil {
status.Error = fmt.Errorf("get ssh interface error: %w", err).Error()
return nil
}
sshCtx := ssh.NewSSHByCluster(cluster, false)
root := constants.NewData(cluster.Name).RootFSPath()
regInfo := helpers.GetRegistryInfo(sshCtx, root, cluster.GetRegistryIPAndPort())
status.Auth = fmt.Sprintf("%s:%s", regInfo.Username, regInfo.Password)
Expand All @@ -100,7 +96,7 @@ func (n *RegistryChecker) Check(cluster *v2.Cluster, phase string) error {
Username: regInfo.Username,
Password: regInfo.Password,
}
_, err = crane.NewRegistry(status.RegistryDomain, cfg)
_, err := crane.NewRegistry(status.RegistryDomain, cfg)
if err != nil {
status.Error = fmt.Errorf("get registry interface error: %w", err).Error()
return nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/filesystem/rootfs/rootfs_default.go
Expand Up @@ -51,8 +51,7 @@ func (f *defaultRootfs) getClusterName(cluster *v2.Cluster) string {
}

func (f *defaultRootfs) getSSH(cluster *v2.Cluster) ssh.Interface {
sshClient, _ := ssh.NewSSHByCluster(cluster, true)
return sshClient
return ssh.NewSSHByCluster(cluster, true)
}

func (f *defaultRootfs) mountRootfs(cluster *v2.Cluster, ipList []string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/guest/guest.go
Expand Up @@ -63,7 +63,7 @@ func (d *Default) Apply(cluster *v2.Cluster, mounts []v2.MountImage) error {
_ = fileutil.CleanFiles(kubeConfig)
}()
}
sshInterface := ssh.NewSSHClient(&cluster.Spec.SSH, true)
sshInterface := ssh.NewSSHByCluster(cluster, true)
for _, cmd := range guestCMD {
logger.Debug("exec guest command: %s", cmd)
if err := sshInterface.CmdAsync(cluster.GetMaster0IPAndPort(), envInterface.WrapperShell(cluster.GetMaster0IP(), cmd)); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/password/apply.go
Expand Up @@ -132,7 +132,7 @@ func (r *RegistryPasswdResults) Validate() (*v1beta1.Cluster, error) {

func (r *RegistryPasswdResults) Apply(cluster *v1beta1.Cluster) error {
if r.execer == nil {
r.execer = ssh.NewSSHClient(&cluster.Spec.SSH, true)
r.execer = ssh.NewSSHByCluster(cluster, true)
}
if r.upgrade == nil {
r.upgrade = NewUpgrade(cluster.Name, r.execer)
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/runtime.go
Expand Up @@ -126,7 +126,7 @@ func (k *KubeadmRuntime) DeleteMasters(mastersIPList []string) error {
}

func newKubeadmRuntime(cluster *v2.Cluster, kubeadm *KubeadmConfig) (Interface, error) {
sshClient, _ := ssh.NewSSHByCluster(cluster, true)
sshClient := ssh.NewSSHByCluster(cluster, true)
k := &KubeadmRuntime{
Mutex: &sync.Mutex{},
Cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ssh/exec.go
Expand Up @@ -54,7 +54,7 @@ func NewExecCmdFromIPs(cluster *v2.Cluster, ips []string) (Exec, error) {
}

func (e *Exec) RunCmd(cmd string) error {
sshClient := NewSSHClient(&e.cluster.Spec.SSH, true)
sshClient := NewSSHByCluster(e.cluster, true)
eg, _ := errgroup.WithContext(context.Background())
for _, ipAddr := range e.ipList {
ip := ipAddr
Expand All @@ -69,7 +69,7 @@ func (e *Exec) RunCmd(cmd string) error {
}

func (e *Exec) RunCopy(srcFilePath, dstFilePath string) error {
sshClient := NewSSHClient(&e.cluster.Spec.SSH, true)
sshClient := NewSSHByCluster(e.cluster, true)
eg, _ := errgroup.WithContext(context.Background())
for _, ipAddr := range e.ipList {
ip := ipAddr
Expand Down
5 changes: 2 additions & 3 deletions pkg/ssh/ssh.go
Expand Up @@ -168,15 +168,14 @@ func NewSSHClient(ssh *v2.SSH, isStdout bool) Interface {
return client
}

func NewSSHByCluster(cluster *v2.Cluster, isStdout bool) (Interface, error) {
func NewSSHByCluster(cluster *v2.Cluster, isStdout bool) Interface {
cc := &clusterClient{
cluster: cluster,
isStdout: isStdout,
configs: make(map[string]*Option),
cache: make(map[*Option]Interface),
}

return cc, nil
return cc
}

func WaitSSHReady(client Interface, _ int, hosts ...string) error {
Expand Down

0 comments on commit 432ea4e

Please sign in to comment.