Skip to content

Commit

Permalink
EdgeSite working with standard K8s master (kubeedge#400)
Browse files Browse the repository at this point in the history
EdgeSite working with standard K8s master
  • Loading branch information
Cindy authored and lidiyag committed Jun 19, 2019
1 parent 9375a6f commit 6a68439
Show file tree
Hide file tree
Showing 28 changed files with 1,830 additions and 2 deletions.
30 changes: 28 additions & 2 deletions edge/pkg/metamanager/msg_processor.go
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/beehive/pkg/common/util"
"github.com/kubeedge/beehive/pkg/core/context"
Expand Down Expand Up @@ -36,6 +37,30 @@ const (

var connected = false

// sendModuleGroupName is the name of the group to which we send the message
var sendModuleGroupName = modules.HubGroup

// sendModuleName is the name of send module for remote query
var sendModuleName = "websocket"

func init() {
var err error
groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()
if err == nil && groupName != "" {
sendModuleGroupName = groupName
}

edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool()
if err == nil && edgeSite == true {
connected = true
}

moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()
if err == nil && moduleName != "" {
sendModuleName = moduleName
}
}

func feedbackError(err error, info string, request model.Message, c *context.Context) {
errInfo := "Something wrong"
if err != nil {
Expand All @@ -58,7 +83,7 @@ func send2Edged(message *model.Message, sync bool, c *context.Context) {
}

func send2Cloud(message *model.Message, c *context.Context) {
c.Send2Group(modules.HubGroup, *message)
c.Send2Group(sendModuleGroupName, *message)
}

// Resource format: <namespace>/<restype>[/resid]
Expand Down Expand Up @@ -118,6 +143,7 @@ func (m *metaManager) processInsert(message model.Message) {
}
}
resKey, resType, _ := parseResource(message.GetResource())

meta := &dao.Meta{
Key: resKey,
Type: resType,
Expand Down Expand Up @@ -274,7 +300,7 @@ func (m *metaManager) processRemoteQuery(message model.Message) {
// TODO: retry
originalID := message.GetID()
message.UpdateID()
resp, err := m.context.SendSync("websocket", message, 60*time.Second) // TODO: configurable
resp, err := m.context.SendSync(sendModuleName, message, 60*time.Second) // TODO: configurable
log.LOGGER.Infof("########## process get: req[%+v], resp[%+v], err[%+v]", message, resp, err)
if err != nil {
log.LOGGER.Errorf("remote query failed: %v", err)
Expand Down
19 changes: 19 additions & 0 deletions edgesite/Makefile
@@ -0,0 +1,19 @@

# make edge_site
.PHONY: default edge_site
edge_site:
go build cmd/edgesite.go

.PHONY: cross_build
cross_build:
export GOARCH=arm \
export GOOS="linux" \
export GOARM=6 \
export CGO_ENABLED=1 \
export CC=arm-linux-gnueabi-gcc; \
go build cmd/edgesite.go

.PHONY: small_build
small_build:
go build -ldflags="-w -s -extldflags -static" cmd/edgesite.go
upx-ucl --best edge_site
Empty file added edgesite/cmd/.gitkeep
Empty file.
14 changes: 14 additions & 0 deletions edgesite/cmd/edgesite.go
@@ -0,0 +1,14 @@
package main

import (
"github.com/kubeedge/beehive/pkg/core"
"github.com/kubeedge/kubeedge/edge/pkg/common/dbm"
_ "github.com/kubeedge/kubeedge/edge/pkg/edged"
_ "github.com/kubeedge/kubeedge/edge/pkg/metamanager"
_ "github.com/kubeedge/kubeedge/edgesite/pkg/controller"
)

func main() {
dbm.InitDBManager()
core.Run()
}
39 changes: 39 additions & 0 deletions edgesite/conf/edgeSite.yaml
@@ -0,0 +1,39 @@
mqtt:
server: tcp://127.0.0.1:1883 # external mqtt broker url.
internal-server: tcp://127.0.0.1:1884 # internal mqtt broker url.
mode: 0 # 0: internal mqtt broker enable only. 1: internal and external mqtt broker enable. 2: external mqtt broker enable only.
qos: 0 # 0: QOSAtMostOnce, 1: QOSAtLeastOnce, 2: QOSExactlyOnce.
retain: false # if the flag set true, server will store the message and can be delivered to future subscribers.
session-queue-size: 100 # A size of how many sessions will be handled. default to 100.
context-send-module: metaManager
context-receive-module: controller
context-response-module: metaManager

controller:
kube:
master: http://localhost:8080
namespace: "default"
content_type: "application/vnd.kubernetes.protobuf"
qps: 5
burst: 10
node_update_frequency: 10
node-id: fb4ebb70-2783-42b8-b3ef-63e2fd6d242e
node-name: fb4ebb70-2783-42b8-b3ef-63e2fd6d242e

edged:
register-node-namespace: default
hostname-override: fb4ebb70-2783-42b8-b3ef-63e2fd6d242e
interface-name: eth0
node-status-update-frequency: 10 # second
device-plugin-enabled: false
gpu-plugin-enabled: false
image-gc-high-threshold: 80 # percent
image-gc-low-threshold: 40 # percent
maximum-dead-containers-per-container: 1
docker-address: unix:///var/run/docker.sock
version: 2.0.0

metamanager:
context-send-group: controller
context-send-module: controller
edgesite: true
5 changes: 5 additions & 0 deletions edgesite/conf/logging.yaml
@@ -0,0 +1,5 @@
loggerLevel: "DEBUG"
#loggingLevel: "INFO"
enableRsyslog: false
logFormatText: true
writers: [stdout]
2 changes: 2 additions & 0 deletions edgesite/conf/modules.yaml
@@ -0,0 +1,2 @@
modules:
enabled: [controller, metaManager, edged, dbTest]
49 changes: 49 additions & 0 deletions edgesite/pkg/controller/config/buffer.go
@@ -0,0 +1,49 @@
package config

import (
"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/edgesite/pkg/controller/constants"
)

// UpdatePodStatusBuffer is the size of channel which save update pod status message from edge
var UpdatePodStatusBuffer int

// UpdateNodeStatusBuffer is the size of channel which save update node status message from edge
var UpdateNodeStatusBuffer int

// QueryConfigMapBuffer is the size of channel which save query configmap message from edge
var QueryConfigMapBuffer int

// QuerySecretBuffer is the size of channel which save query secret message from edge
var QuerySecretBuffer int

func init() {
if psb, err := config.CONFIG.GetValue("update-pod-status-buffer").ToInt(); err != nil {
UpdatePodStatusBuffer = constants.DefaultUpdatePodStatusBuffer
} else {
UpdatePodStatusBuffer = psb
}
log.LOGGER.Infof("update pod status buffer: %d", UpdatePodStatusBuffer)

if nsb, err := config.CONFIG.GetValue("update-node-status-buffer").ToInt(); err != nil {
UpdateNodeStatusBuffer = constants.DefaultUpdateNodeStatusBuffer
} else {
UpdateNodeStatusBuffer = nsb
}
log.LOGGER.Infof("Update node status buffer: %d", UpdateNodeStatusBuffer)

if qcb, err := config.CONFIG.GetValue("query-configmap-buffer").ToInt(); err != nil {
QueryConfigMapBuffer = constants.DefaultQueryConfigMapBuffer
} else {
QueryConfigMapBuffer = qcb
}
log.LOGGER.Infof("query config map buffer: %d", QueryConfigMapBuffer)

if qsb, err := config.CONFIG.GetValue("query-secret-buffer").ToInt(); err != nil {
QuerySecretBuffer = constants.DefaultQuerySecretBuffer
} else {
QuerySecretBuffer = qsb
}
log.LOGGER.Infof("query secret buffer: %d", QuerySecretBuffer)
}
43 changes: 43 additions & 0 deletions edgesite/pkg/controller/config/context.go
@@ -0,0 +1,43 @@
package config

import (
"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/edgesite/pkg/controller/constants"
)

// ContextSendModule is the name send message to
var ContextSendModule string

// ContextReceiveModule is the name receive message from
var ContextReceiveModule string

// ContextResponseModule is the name response message from
var ContextResponseModule string

// Context ...
var Context *context.Context

func init() {
if smn, err := config.CONFIG.GetValue("context-send-module").ToString(); err != nil {
ContextSendModule = constants.DefaultContextSendModuleName
} else {
ContextSendModule = smn
}
log.LOGGER.Infof(" send module name: %s", ContextSendModule)

if rmn, err := config.CONFIG.GetValue("context-receive-module").ToString(); err != nil {
ContextReceiveModule = constants.DefaultContextReceiveModuleName
} else {
ContextReceiveModule = rmn
}
log.LOGGER.Infof("receive module name: %s", ContextReceiveModule)

if rmn, err := config.CONFIG.GetValue("context-response-module").ToString(); err != nil {
ContextResponseModule = constants.DefaultContextResponseModuleName
} else {
ContextResponseModule = rmn
}
log.LOGGER.Infof("response module name: %s", ContextResponseModule)
}
101 changes: 101 additions & 0 deletions edgesite/pkg/controller/config/kube.go
@@ -0,0 +1,101 @@
package config

import (
"time"

"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/edgesite/pkg/controller/constants"
)

// KubeMaster is the url of edge master(kube api server)
var KubeMaster string

// KubeConfig is the config used connect to edge master
var KubeConfig string

// KubeNamespace is the namespace to watch(default is NamespaceAll)
var KubeNamespace string

// KubeContentType is the content type communicate with edge master(default is "application/vnd.kubernetes.protobuf")
var KubeContentType string

// KubeQPS is the QPS communicate with edge master(default is 1024)
var KubeQPS float32

// KubeBurst default is 10
var KubeBurst int

// NodeID for the current node
var KubeNodeID string

// NodeName for the current node
var KubeNodeName string

// KubeUpdateNodeFrequency is the time duration for update node status(default is 20s)
var KubeUpdateNodeFrequency time.Duration

func init() {
if km, err := config.CONFIG.GetValue("controller.kube.master").ToString(); err != nil {
log.LOGGER.Errorf("kube master not set")
} else {
KubeMaster = km
}
log.LOGGER.Infof("kube master: %s", KubeMaster)

if kc, err := config.CONFIG.GetValue("controller.kube.kubeconfig").ToString(); err != nil {
log.LOGGER.Errorf("kube config not set")
} else {
KubeConfig = kc
}
log.LOGGER.Infof("kube config: %s", KubeConfig)

if kn, err := config.CONFIG.GetValue("controller.kube.namespace").ToString(); err != nil {
KubeNamespace = constants.DefaultKubeNamespace
} else {
KubeNamespace = kn
}
log.LOGGER.Infof("kube namespace: %s", KubeNamespace)

if kct, err := config.CONFIG.GetValue("controller.kube.content_type").ToString(); err != nil {
KubeContentType = constants.DefaultKubeContentType
} else {
KubeContentType = kct
}
log.LOGGER.Infof("kube content type: %s", KubeContentType)

if kqps, err := config.CONFIG.GetValue("controller.kube.qps").ToFloat64(); err != nil {
KubeQPS = constants.DefaultKubeQPS
} else {
KubeQPS = float32(kqps)
}
log.LOGGER.Infof("kube QPS: %f", KubeQPS)

if kb, err := config.CONFIG.GetValue("controller.kube.burst").ToInt(); err != nil {
KubeBurst = constants.DefaultKubeBurst
} else {
KubeBurst = kb
}
log.LOGGER.Infof("kube burst: %d", KubeBurst)

if kuf, err := config.CONFIG.GetValue("controller.kube.node_update_frequency").ToInt64(); err != nil {
KubeUpdateNodeFrequency = constants.DefaultKubeUpdateNodeFrequency * time.Second
} else {
KubeUpdateNodeFrequency = time.Duration(kuf) * time.Second
}
log.LOGGER.Infof("kube update frequency: %v", KubeUpdateNodeFrequency)

if id, err := config.CONFIG.GetValue("controller.kube.node-id").ToString(); err != nil {
KubeNodeID = ""
} else {
KubeNodeID = id
}
log.LOGGER.Infof("kube Node ID: %s", KubeNodeID)

if name, err := config.CONFIG.GetValue("controller.kube.node-name").ToString(); err !=nil {
KubeNodeName = ""
} else {
KubeNodeName = name
}
log.LOGGER.Infof("kube Node Name: %s", KubeNodeName)
}
49 changes: 49 additions & 0 deletions edgesite/pkg/controller/config/load.go
@@ -0,0 +1,49 @@
package config

import (
"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/common/log"
"github.com/kubeedge/kubeedge/edgesite/pkg/controller/constants"
)

// UpdatePodStatusWorkers is the count of goroutines of update pod status
var UpdatePodStatusWorkers int

// UpdateNodeStatusWorkers is the count of goroutines of update node status
var UpdateNodeStatusWorkers int

// QueryConfigMapWorkers is the count of goroutines of query configmap
var QueryConfigMapWorkers int

// QuerySecretWorkers is the count of goroutines of query secret
var QuerySecretWorkers int

func init() {
if psw, err := config.CONFIG.GetValue("update-pod-status-workers").ToInt(); err != nil {
UpdatePodStatusWorkers = constants.DefaultUpdatePodStatusWorkers
} else {
UpdatePodStatusWorkers = psw
}
log.LOGGER.Infof("update pod status workers: %d", UpdatePodStatusWorkers)

if nsw, err := config.CONFIG.GetValue("update-node-status-workers").ToInt(); err != nil {
UpdateNodeStatusWorkers = constants.DefaultUpdateNodeStatusWorkers
} else {
UpdateNodeStatusWorkers = nsw
}
log.LOGGER.Infof("update node status workers: %d", UpdateNodeStatusWorkers)

if qcw, err := config.CONFIG.GetValue("query-configmap-workers").ToInt(); err != nil {
QueryConfigMapWorkers = constants.DefaultQueryConfigMapWorkers
} else {
QueryConfigMapWorkers = qcw
}
log.LOGGER.Infof("query config map workers: %d", QueryConfigMapWorkers)

if qsw, err := config.CONFIG.GetValue("query-secret-workers").ToInt(); err != nil {
QuerySecretWorkers = constants.DefaultQuerySecretWorkers
} else {
QuerySecretWorkers = qsw
}
log.LOGGER.Infof("query secret workers: %d", QuerySecretWorkers)
}

0 comments on commit 6a68439

Please sign in to comment.