Skip to content

Commit

Permalink
Optimize localcontroller
Browse files Browse the repository at this point in the history
1. reuse CR definition of pkg/apis in all managers.
2. cleanup server.
3. fix db getClient bug, and optimize it.
4. rename wsclient to gmclient.
5. fix GetUniqueIdentifier bug.

Signed-off-by: JimmyYang <yangjin39@huawei.com>
  • Loading branch information
JimmyYang authored and llhuii committed Jan 28, 2021
1 parent 1540f7e commit c4cfc1b
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 466 deletions.
33 changes: 21 additions & 12 deletions cmd/neptune-lc/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

"github.com/edgeai-neptune/neptune/cmd/neptune-lc/app/options"
"github.com/edgeai-neptune/neptune/pkg/localcontroller/common/constants"
"github.com/edgeai-neptune/neptune/pkg/localcontroller/gmclient"
"github.com/edgeai-neptune/neptune/pkg/localcontroller/manager"
"github.com/edgeai-neptune/neptune/pkg/localcontroller/server"
"github.com/edgeai-neptune/neptune/pkg/localcontroller/wsclient"
"github.com/edgeai-neptune/neptune/pkg/version/verflag"
)

Expand Down Expand Up @@ -64,25 +64,34 @@ It manages dataset and models, and controls ai features in local nodes.`, cmdNam

// runServer runs server
func runServer() {
c := wsclient.NewClient(Options)
c := gmclient.NewWebSocketClient(Options)
if err := c.Start(); err != nil {
return
}

_, err := manager.NewDatasetManager(c, Options)
if err != nil {
klog.Errorf("create dataset manager failed, error: %v", err)
}
dm := manager.NewDatasetManager(c, Options)

_, err = manager.NewModelManager(c)
if err != nil {
klog.Errorf("create model manager failed, error: %v", err)
}
mm := manager.NewModelManager(c)

jm := manager.NewJointInferenceManager(c)

fm := manager.NewFederatedLearningManager(c)

s := server.NewServer(Options, jm, fm)
s := server.New(Options)

for _, m := range []manager.FeatureManager{
dm, mm, jm, fm,
} {
s.AddFeatureManager(m)
c.Subscribe(m)
err := m.Start()
if err != nil {
klog.Errorf("failed to start manager %s: %v",
m.GetName(), err)
return
}
klog.Infof("manager %s is started", m.GetName())
}

s.Start()
s.ListenAndServe()
}
48 changes: 32 additions & 16 deletions pkg/localcontroller/db/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"encoding/json"
"os"
"path/filepath"

Expand All @@ -15,35 +16,46 @@ import (
type Resource struct {
gorm.Model
Name string `gorm:"unique"`
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
MetaData string `json:"metadata"`
Spec string `json:"spec"`
TypeMeta string
ObjectMeta string
Spec string
}

var dbClient *gorm.DB

// SaveResource saves resource info in db
func SaveResource(resource *Resource) error {
func SaveResource(name string, typeMeta, objectMeta, spec interface{}) error {
var err error
dbClient := getClient()

r := Resource{}

queryResult := dbClient.Where("name = ?", resource.Name).First(&r)
typeMetaData, _ := json.Marshal(typeMeta)
objectMetaData, _ := json.Marshal(objectMeta)
specData, _ := json.Marshal(spec)

queryResult := dbClient.Where("name = ?", name).First(&r)

if queryResult.RowsAffected == 0 {
if err = dbClient.Create(resource).Error; err != nil {
klog.Errorf("saved resource(name=%s) failed, error: %v", resource.Name, err)
newR := &Resource{
Name: name,
TypeMeta: string(typeMetaData),
ObjectMeta: string(objectMetaData),
Spec: string(specData),
}
if err = dbClient.Create(newR).Error; err != nil {
klog.Errorf("failed to save resource(name=%s): %v", name, err)
return err
}
klog.Infof("saved resource(name=%s)", name)
} else {
r.APIVersion = resource.APIVersion
r.Kind = resource.Kind
r.MetaData = resource.MetaData
r.Spec = resource.Spec
r.TypeMeta = string(typeMetaData)
r.ObjectMeta = string(objectMetaData)
r.Spec = string(specData)
if err := dbClient.Save(&r).Error; err != nil {
klog.Errorf("Update resource(name=%s) failed, error: %v", resource.Name, err)
klog.Errorf("failed to update resource(name=%s): %v", name, err)
return err
}
klog.V(2).Infof("updated resource(name=%s)", name)
}

return nil
Expand All @@ -52,7 +64,6 @@ func SaveResource(resource *Resource) error {
// DeleteResource deletes resource info in db
func DeleteResource(name string) error {
var err error
dbClient := getClient()

r := Resource{}

Expand All @@ -63,13 +74,18 @@ func DeleteResource(name string) error {
}

if err = dbClient.Unscoped().Delete(&r).Error; err != nil {
klog.Errorf("delete resource(name=%s) to db failed, error: %v", name, err)
klog.Errorf("failed to delete resource(name=%s): %v", name, err)
return err
}
klog.Infof("deleted resource(name=%s)", name)

return nil
}

func init() {
dbClient = getClient()
}

// getClient gets db client
func getClient() *gorm.DB {
dbURL := constants.DataBaseURL
Expand Down
36 changes: 36 additions & 0 deletions pkg/localcontroller/gmclient/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package gmclient

const (
// InsertOperation is the insert value
InsertOperation = "insert"
// DeleteOperation is the delete value
DeleteOperation = "delete"
// StatusOperation is the status value
StatusOperation = "status"
)

// Message defines message
type Message struct {
Header MessageHeader `json:"header"`
Content []byte `json:"content"`
}

// MessageHeader define header of message
type MessageHeader struct {
Namespace string `json:"namespace"`
ResourceKind string `json:"resourceKind"`
ResourceName string `json:"resourceName"`
Operation string `json:"operation"`
}

type MessageResourceHandler interface {
GetName() string
Insert(*Message) error
Delete(*Message) error
}

type ClientI interface {
Start() error
WriteMessage(messageBody interface{}, messageHeader MessageHeader) error
Subscribe(m MessageResourceHandler) error
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package wsclient
package gmclient

import (
"encoding/json"
Expand All @@ -14,32 +14,15 @@ import (
"github.com/edgeai-neptune/neptune/pkg/localcontroller/common/constants"
)

// MessageHandler defines message handler function
type MessageHandler func(*Message)

// Client defines a client
type Client struct {
// wsClient defines a websocket client
type wsClient struct {
Options *options.LocalControllerOptions
WSConnection *WSConnection
SubscribeMessageMap map[string]MessageHandler
SubscribeMessageMap map[string]MessageResourceHandler
SendMessageChannel chan Message
ReconnectChannel chan struct{}
}

// Message defines message
type Message struct {
Header MessageHeader `json:"header"`
Content []byte `json:"content"`
}

// MessageHeader define header of message
type MessageHeader struct {
Namespace string `json:"namespace"`
ResourceKind string `json:"resourceKind"`
ResourceName string `json:"resourceName"`
Operation string `json:"operation"`
}

// WSConnection defines conn
type WSConnection struct {
WSConn *websocket.Conn
Expand All @@ -54,30 +37,31 @@ const (
MessageChannelCacheSize = 100
)

// NewClient creates client
func NewClient(options *options.LocalControllerOptions) *Client {
c := Client{
// NewWebSocketClient creates client
func NewWebSocketClient(options *options.LocalControllerOptions) ClientI {
c := wsClient{
Options: options,
SubscribeMessageMap: make(map[string]MessageHandler),
SubscribeMessageMap: make(map[string]MessageResourceHandler),
SendMessageChannel: make(chan Message, MessageChannelCacheSize),
}

return &c
}

// Subscribe registers in client
func (c *Client) Subscribe(resource string, handler MessageHandler) error {
if c.SubscribeMessageMap[resource] == nil {
c.SubscribeMessageMap[resource] = handler
func (c *wsClient) Subscribe(m MessageResourceHandler) error {
name := m.GetName()
if c.SubscribeMessageMap[name] == nil {
c.SubscribeMessageMap[name] = m
} else {
klog.Warningf("%s had been registered in websocket client", resource)
klog.Warningf("%s had been registered in websocket client", name)
}

return nil
}

// handleReceivedMessage handles received message
func (c *Client) handleReceivedMessage(stop chan struct{}) {
func (c *wsClient) handleReceivedMessage(stop chan struct{}) {
defer func() {
stop <- struct{}{}
}()
Expand All @@ -97,17 +81,31 @@ func (c *Client) handleReceivedMessage(stop chan struct{}) {
klog.V(4).Infof("client received message content: %s from global manager(address: %s)",
message.Content, c.Options.GMAddr)

handler := c.SubscribeMessageMap[message.Header.ResourceKind]
if handler != nil {
go handler(&message)
m := c.SubscribeMessageMap[message.Header.ResourceKind]
if m != nil {
go func() {
var err error
switch message.Header.Operation {
case InsertOperation:
err = m.Insert(&message)

case DeleteOperation:
err = m.Delete(&message)
default:
err = fmt.Errorf("unknown operation: %s", message.Header.Operation)
}
if err != nil {
klog.Errorf("failed to handle message(%+v): %v", message.Header, err)
}
}()
} else {
klog.Errorf("%s hadn't registered in websocket client", message.Header.ResourceKind)
}
}
}

// WriteMessage saves message in a queue
func (c *Client) WriteMessage(messageBody interface{}, messageHeader MessageHeader) error {
func (c *wsClient) WriteMessage(messageBody interface{}, messageHeader MessageHeader) error {
content, err := json.Marshal(&messageBody)
if err != nil {
return err
Expand All @@ -124,7 +122,7 @@ func (c *Client) WriteMessage(messageBody interface{}, messageHeader MessageHead
}

// sendMessage sends the message through the connection
func (c *Client) sendMessage(stop chan struct{}) {
func (c *wsClient) sendMessage(stop chan struct{}) {
defer func() {
stop <- struct{}{}
}()
Expand Down Expand Up @@ -155,7 +153,7 @@ func (c *Client) sendMessage(stop chan struct{}) {
}

// connect tries to connect remote server
func (c *Client) connect() error {
func (c *wsClient) connect() error {
header := http.Header{}
header.Add(constants.WSHeaderNodeName, c.Options.NodeName)
u := url.URL{Scheme: constants.WSScheme, Host: c.Options.GMAddr, Path: "/"}
Expand Down Expand Up @@ -190,14 +188,14 @@ func (c *Client) connect() error {
}

// Start starts websocket client
func (c *Client) Start() error {
func (c *wsClient) Start() error {
go c.reconnect()

return nil
}

// reconnect reconnects global manager
func (c *Client) reconnect() {
func (c *wsClient) reconnect() {
for {
if err := c.connect(); err != nil {
continue
Expand Down
Loading

0 comments on commit c4cfc1b

Please sign in to comment.