Skip to content

Commit

Permalink
Merge pull request #2007 from shijilin0116/console
Browse files Browse the repository at this point in the history
feat: Kubekey Web Console
  • Loading branch information
liangzai006 committed Oct 13, 2023
2 parents ebfde7c + a1da3e2 commit 4e4e00b
Show file tree
Hide file tree
Showing 117 changed files with 5,957 additions and 86 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Expand Up @@ -35,4 +35,8 @@ _artifacts

# Used during parts of the build process. Files _should_ get cleaned up automatically.
# This is also a good location for any temporary manfiests used during development
tmp
tmp

# kubekey-console node_modules and package-lock.json files
console/node_modules
console/package-lock.json
23 changes: 17 additions & 6 deletions cmd/kk/apis/kubekey/v1alpha2/cluster_types.go
Expand Up @@ -112,7 +112,7 @@ type RegistryConfig struct {
DataRoot string `yaml:"dataRoot" json:"dataRoot,omitempty"`
NamespaceOverride string `yaml:"namespaceOverride" json:"namespaceOverride,omitempty"`
BridgeIP string `yaml:"bridgeIP" json:"bridgeIP,omitempty"`
Auths runtime.RawExtension `yaml:"auths" json:"auths,omitempty"`
Auths runtime.RawExtension `yaml:"auths,omitempty" json:"auths,omitempty"`
}

// KubeSphere defines the configuration information of the KubeSphere.
Expand Down Expand Up @@ -157,7 +157,7 @@ func (cfg *ClusterSpec) GenerateCertSANs() []string {
}

// GroupHosts is used to group hosts according to the configuration file.s
func (cfg *ClusterSpec) GroupHosts() map[string][]*KubeHost {
func (cfg *ClusterSpec) GroupHosts(isBackend bool) (map[string][]*KubeHost, error) {
hostMap := make(map[string]*KubeHost)
for _, hostCfg := range cfg.Hosts {
host := toHosts(hostCfg)
Expand All @@ -168,10 +168,22 @@ func (cfg *ClusterSpec) GroupHosts() map[string][]*KubeHost {

//Check that the parameters under roleGroups are incorrect
if len(roleGroups[Master]) == 0 && len(roleGroups[ControlPlane]) == 0 {
logger.Log.Fatal(errors.New("The number of master/control-plane cannot be 0"))
err := errors.New("The number of master/control-plane cannot be 0")
if isBackend {
logger.Log.Errorln(err)
return nil, err
} else {
logger.Log.Fatal(err)
}
}
if len(roleGroups[Etcd]) == 0 && cfg.Etcd.Type == KubeKey {
logger.Log.Fatal(errors.New("The number of etcd cannot be 0"))
err := errors.New("The number of etcd cannot be 0")
if isBackend {
logger.Log.Errorln(err)
return nil, err
} else {
logger.Log.Fatal(err)
}
}
if len(roleGroups[Registry]) > 1 {
logger.Log.Fatal(errors.New("The number of registry node cannot be greater than 1."))
Expand All @@ -181,8 +193,7 @@ func (cfg *ClusterSpec) GroupHosts() map[string][]*KubeHost {
host.SetRole(Master)
roleGroups[Master] = append(roleGroups[Master], host)
}

return roleGroups
return roleGroups, nil
}

// +kubebuilder:object:generate=false
Expand Down
9 changes: 6 additions & 3 deletions cmd/kk/apis/kubekey/v1alpha2/default.go
Expand Up @@ -104,13 +104,16 @@ const (
DefaultKubeVipMode = "ARP"
)

func (cfg *ClusterSpec) SetDefaultClusterSpec() (*ClusterSpec, map[string][]*KubeHost) {
func (cfg *ClusterSpec) SetDefaultClusterSpec(isBackend bool) (*ClusterSpec, map[string][]*KubeHost, error) {
clusterCfg := ClusterSpec{}

clusterCfg.Hosts = SetDefaultHostsCfg(cfg)
clusterCfg.RoleGroups = cfg.RoleGroups
clusterCfg.Etcd = SetDefaultEtcdCfg(cfg)
roleGroups := clusterCfg.GroupHosts()
roleGroups, err := clusterCfg.GroupHosts(isBackend)
if err != nil {
return nil, nil, err
}
clusterCfg.ControlPlaneEndpoint = SetDefaultLBCfg(cfg, roleGroups[Master])
clusterCfg.Network = SetDefaultNetworkCfg(cfg)
clusterCfg.Storage = SetDefaultStorageCfg(cfg)
Expand Down Expand Up @@ -139,7 +142,7 @@ func (cfg *ClusterSpec) SetDefaultClusterSpec() (*ClusterSpec, map[string][]*Kub
if cfg.Kubernetes.ProxyMode == "" {
clusterCfg.Kubernetes.ProxyMode = DefaultProxyMode
}
return &clusterCfg, roleGroups
return &clusterCfg, roleGroups, nil
}

func SetDefaultHostsCfg(cfg *ClusterSpec) []HostCfg {
Expand Down
4 changes: 2 additions & 2 deletions cmd/kk/apis/kubekey/v1alpha2/kubernetes_types.go
Expand Up @@ -44,8 +44,8 @@ type Kubernetes struct {
KubeletArgs []string `yaml:"kubeletArgs" json:"kubeletArgs,omitempty"`
KubeProxyArgs []string `yaml:"kubeProxyArgs" json:"kubeProxyArgs,omitempty"`
FeatureGates map[string]bool `yaml:"featureGates" json:"featureGates,omitempty"`
KubeletConfiguration runtime.RawExtension `yaml:"kubeletConfiguration" json:"kubeletConfiguration,omitempty"`
KubeProxyConfiguration runtime.RawExtension `yaml:"kubeProxyConfiguration" json:"kubeProxyConfiguration,omitempty"`
KubeletConfiguration runtime.RawExtension `yaml:"kubeletConfiguration,omitempty" json:"kubeletConfiguration,omitempty"`
KubeProxyConfiguration runtime.RawExtension `yaml:"kubeProxyConfiguration,omitempty" json:"kubeProxyConfiguration,omitempty"`
Audit Audit `yaml:"audit" json:"audit,omitempty"`
}

Expand Down
65 changes: 65 additions & 0 deletions cmd/kk/cmd/console/common/console_common.go
@@ -0,0 +1,65 @@
package controllers

import (
"bufio"
"fmt"
"github.com/gorilla/websocket"
kubekeyapiv1alpha2 "github.com/kubesphere/kubekey/v3/cmd/kk/apis/kubekey/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"os"
"strings"
)

type Cluster struct {
Kind string `yaml:"kind,omitempty" json:"kind,omitempty"`
ApiVersion string `yaml:"apiVersion,omitempty" json:"apiVersion,omitempty"`
metav1.ObjectMeta `yaml:"metadata,omitempty" json:"metadata,omitempty"`

Spec kubekeyapiv1alpha2.ClusterSpec `yaml:"spec,omitempty" json:"spec,omitempty"`
}

var Upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

type WebSocketWriter struct {
WsConn *websocket.Conn
}

//var clientConn *websocket.Conn

func SetupCaptureBuffer() *WebSocketWriter {
captureBuffer := &WebSocketWriter{}
readerOut, writerOut, _ := os.Pipe()
os.Stdout = writerOut
outReader := bufio.NewReader(readerOut)
// 新开线程,监听管道信息,并输出到websocket
go func() {
for {
line, _, err := outReader.ReadLine()
if err != nil {
break
}
if captureBuffer.WsConn != nil {
captureBuffer.WsConn.WriteMessage(websocket.TextMessage, line)
}
}
}()
return captureBuffer
}

func FormatErrorMessage(err error) string {
msg := err.Error()
if !strings.HasPrefix(msg, "error: ") {
msg = fmt.Sprintf("error: %s", msg)
}
if !strings.HasSuffix(msg, "\n") {
msg += "\n"
}
return msg
}
28 changes: 28 additions & 0 deletions cmd/kk/cmd/console/console.go
@@ -0,0 +1,28 @@
package console

import (
"github.com/spf13/cobra"

"github.com/kubesphere/kubekey/v3/cmd/kk/cmd/options"
)

type ConsoleOptions struct {
CommonOptions *options.CommonOptions
}

func NewConsoleOptions() *ConsoleOptions {
return &ConsoleOptions{
CommonOptions: options.NewCommonOptions(),
}
}
func NewCmdConsole() *cobra.Command {
o := NewConsoleOptions()
cmd := &cobra.Command{
Use: "console",
Short: "Start a web console of kubekey",
}

o.CommonOptions.AddCommonFlag(cmd)
cmd.AddCommand(NewCmdConsoleStart())
return cmd
}
65 changes: 65 additions & 0 deletions cmd/kk/cmd/console/console_common/console_common.go
@@ -0,0 +1,65 @@
package console_common

import (
"bufio"
"fmt"
"github.com/gorilla/websocket"
kubekeyapiv1alpha2 "github.com/kubesphere/kubekey/v3/cmd/kk/apis/kubekey/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"os"
"strings"
)

type Cluster struct {
Kind string `yaml:"kind,omitempty" json:"kind,omitempty"`
ApiVersion string `yaml:"apiVersion,omitempty" json:"apiVersion,omitempty"`
metav1.ObjectMeta `yaml:"metadata,omitempty" json:"metadata,omitempty"`

Spec kubekeyapiv1alpha2.ClusterSpec `yaml:"spec,omitempty" json:"spec,omitempty"`
}

var Upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

type WebSocketWriter struct {
WsConn *websocket.Conn
}

//var clientConn *websocket.Conn

func SetupCaptureBuffer() *WebSocketWriter {
captureBuffer := &WebSocketWriter{}
readerOut, writerOut, _ := os.Pipe()
os.Stdout = writerOut
outReader := bufio.NewReader(readerOut)
// 新开线程,监听管道信息,并输出到websocket
go func() {
for {
line, _, err := outReader.ReadLine()
if err != nil {
break
}
if captureBuffer.WsConn != nil {
captureBuffer.WsConn.WriteMessage(websocket.TextMessage, line)
}
}
}()
return captureBuffer
}

func FormatErrorMessage(err error) string {
msg := err.Error()
if !strings.HasPrefix(msg, "error: ") {
msg = fmt.Sprintf("error: %s", msg)
}
if !strings.HasSuffix(msg, "\n") {
msg += "\n"
}
return msg
}
89 changes: 89 additions & 0 deletions cmd/kk/cmd/console/controllers/operators/add_node.go
@@ -0,0 +1,89 @@
package operators

import (
"fmt"
"github.com/gin-gonic/gin"
kubekeyapiv1alpha2 "github.com/kubesphere/kubekey/v3/cmd/kk/apis/kubekey/v1alpha2"
"github.com/kubesphere/kubekey/v3/cmd/kk/cmd/console/console_common"
"github.com/kubesphere/kubekey/v3/cmd/kk/pkg/common"
"github.com/kubesphere/kubekey/v3/cmd/kk/pkg/pipelines"
"gopkg.in/yaml.v3"
"log"
"os"
)

func AddNode(c *gin.Context, targetDir string, tmpDir string) {
// 升级连接
clusterName := c.DefaultQuery("clusterName", "")
clientConn, err := console_common.Upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Fatalf("Failed to set websocket upgrade: %v", err)
return
}
// 建立管道stdout->websocket,监听信息
captureBuffer := console_common.SetupCaptureBuffer()
captureBuffer.WsConn = clientConn
for {
_, readMsg, readErr := clientConn.ReadMessage()

if readErr != nil {
fmt.Println("websocket后台读取消息出错:", err)
fmt.Println("添加节点失败")
break
}
go func(message []byte) {
// 写入文件
mkdirErr := os.MkdirAll(fmt.Sprintf("./%s/%s", tmpDir, clusterName), 0755)
if mkdirErr != nil {
fmt.Println("创建目录时出错:", err)
fmt.Println("添加节点失败")
return
}
filePath := fmt.Sprintf("./%s/%s/Cluster-addNode.yaml", tmpDir, clusterName)
err := os.WriteFile(filePath, readMsg, 0644)
if err != nil {
fmt.Println("写入文件时出错:", err)
fmt.Println("添加节点失败")
return
}
// 解析yaml数据到data
var data kubekeyapiv1alpha2.Cluster
unmarshalErr := yaml.Unmarshal(readMsg, &data)
if unmarshalErr != nil {
fmt.Println("websocket解析yaml出错:", unmarshalErr)
fmt.Println("添加节点失败")
return
}
arg := common.Argument{
FilePath: filePath,
Debug: false,
IgnoreErr: false,
SkipConfirmCheck: true,
SkipPullImages: false,
ContainerManager: data.Spec.Kubernetes.ContainerManager,
Artifact: "",
InstallPackages: false,
}
actionErr := pipelines.AddNodes(arg, "")
if actionErr != nil {
msg := console_common.FormatErrorMessage(actionErr)
fmt.Println(msg)
fmt.Println("添加节点失败")
} else {
fmt.Println("添加节点成功")
mkdirErr := os.MkdirAll(fmt.Sprintf("./%s/%s", targetDir, clusterName), 0755)
if mkdirErr != nil {
fmt.Println("创建目录时出错:", err)
return
}
filePath := fmt.Sprintf("./%s/%s/Cluster.yaml", targetDir, clusterName)
err := os.WriteFile(filePath, readMsg, 0644)
if err != nil {
fmt.Println("写入文件时出错:", err)
return
}
}
}(readMsg)
}

}

0 comments on commit 4e4e00b

Please sign in to comment.