Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feat(notify): websocket for notify #17972

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ require (
github.com/golang/mock v1.4.4
github.com/google/gopacket v1.1.17
github.com/google/uuid v1.3.0
github.com/googollee/go-socket.io v0.0.0-20181214084611-0ad7206c347a
github.com/googollee/go-socket.io v1.7.0
github.com/gorilla/mux v1.7.0
github.com/gorilla/websocket v1.4.1
github.com/gorilla/websocket v1.4.2
github.com/gosuri/uitable v0.0.0-20160404203958-36ee7e946282
github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead
github.com/hugozhu/godingtalk v1.0.6
Expand Down Expand Up @@ -165,12 +165,12 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/gomodule/redigo v1.8.4 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/huandu/xstrings v1.2.0 // indirect
Expand Down
13 changes: 7 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.1.0+incompatible h1:sIa2eCvUTwgjbqXrPLfNwUf9S3i3mpH1O1atV+iL/Wk=
github.com/gofrs/uuid v4.1.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -337,6 +338,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg=
github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down Expand Up @@ -384,17 +387,15 @@ github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb h1:n22Aukg/TjoypWc37dbKIpCsz0VMFPD36HQk1WKvg3A=
github.com/googollee/go-engine.io v0.0.0-20180829091931-e2f255711dcb/go.mod h1:MBpz1MS3P4HtRcBpQU4HcjvWXZ9q+JWacMEh2/BFYbg=
github.com/googollee/go-socket.io v0.0.0-20181214084611-0ad7206c347a h1:NMY2a78Z98wdhMHJTJxTc9BFfnGGc5ZWfgY9a23zeck=
github.com/googollee/go-socket.io v0.0.0-20181214084611-0ad7206c347a/go.mod h1:ftBGBMhSYToR5oV4ImIPKvAIsNaTkLC+tTvoNafqxlQ=
github.com/googollee/go-socket.io v1.7.0 h1:ODcQSAvVIPvKozXtUGuJDV3pLwdpBLDs1Uoq/QHIlY8=
github.com/googollee/go-socket.io v1.7.0/go.mod h1:0vGP8/dXR9SZUMMD4+xxaGo/lohOw3YWMh2WRiWeKxg=
github.com/gopherjs/gopherjs v0.0.0-20180628210949-0892b62f0d9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U=
github.com/gorilla/mux v1.7.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosuri/uilive v0.0.0-20170323041506-ac356e6e42cd/go.mod h1:qkLSc0A5EXSP6B04TrN4oQoxqFI7A8XvoXSlJi8cwk8=
github.com/gosuri/uiprogress v0.0.0-20170224063937-d0567a9d84a1/go.mod h1:C1RTYn4Sc7iEyf6j8ft5dyoZ4212h8G1ol9QQluh5+0=
github.com/gosuri/uitable v0.0.0-20160404203958-36ee7e946282 h1:KFqmdzEPbU7Uck2tn50t+HQXZNVkxe8M9qRb/ZoSHaE=
Expand Down
2 changes: 2 additions & 0 deletions pkg/notify/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/notify/models"
_ "yunion.io/x/onecloud/pkg/notify/sender"
"yunion.io/x/onecloud/pkg/notify/socket"
)

const (
Expand All @@ -30,6 +31,7 @@ const (
func InitHandlers(app *appsrv.Application) {
db.InitAllManagers()

socket.AddSocketHandlers("", app)
models.InitEventLog()
models.InitEmailQueue()

Expand Down
111 changes: 111 additions & 0 deletions pkg/notify/socket/clientmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package socket

import (
"context"
"fmt"

socketio "github.com/googollee/go-socket.io"

"yunion.io/x/log"
"yunion.io/x/pkg/errors"
)

type Client struct {
Session string
Name string
UserID string
Conn socketio.Conn
}

type ClientManager struct {
UserID2ClientMap map[string][]Client
SID2ClientMap map[string]Client
Server *socketio.Server
ForceSingleSessionLogin bool
}

const (
// BroadcastRoom 所有的在线 client 自动加入聊天室
BroadcastRoom = "YunionBcast"
)

// IsEmpty 当前是否无人在线(浏览器)
func (manager *ClientManager) IsEmpty() bool {
log.Debugf("[Clients] there are %d clients belonging to %d tenants.", len(manager.SID2ClientMap), len(manager.UserID2ClientMap))
return len(manager.SID2ClientMap)+len(manager.UserID2ClientMap) == 0
}

// Register 浏览器user 登录时,注册 socketio 链接
func (manager *ClientManager) Register(ctx context.Context, s socketio.Conn) error {
session, cred, err := getCred(ctx, s)
if err != nil {
log.Errorf("login FAILED: with id %s error: %v", s.ID(), err)
return errors.Wrapf(err, "getCred")
}
log.Debugf("Login PASS ! ID: %s , user: %s(%s)", s.ID(), cred.GetUserName(), cred.GetUserId())
UserID := cred.GetUserId()
client := Client{
Session: session,
Name: cred.GetUserName(),
UserID: UserID,
Conn: s,
}

manager.UserID2ClientMap[UserID] = append(manager.UserID2ClientMap[UserID], client)
manager.SID2ClientMap[s.ID()] = client
log.Debugf("registered successful for user %s(%s) with id %s", cred.GetUserName(), cred.GetUserId(), s.ID())
s.Join(BroadcastRoom)
s.SetContext("")
return nil
}

// Gretting 组合一个显示用户 socketio 链接信息的子串
func (manager *ClientManager) Gretting(s socketio.Conn) string {
client := manager.SID2ClientMap[s.ID()]
return fmt.Sprintf("hello %s(%s), your socket io id: %s, your session: %s.", client.Name, client.UserID, s.ID(), client.Session)
}

// Unregister 浏览器 user 刷新或关闭页面,断开链接(自动重连)
func (manager *ClientManager) Unregister(s socketio.Conn, reason string) {
delete(manager.SID2ClientMap, s.ID())
log.Debugf("[ Unregister ] ID: %s; reason: %s", s.ID(), reason)
s.Emit("bye", "")
s.Leave(BroadcastRoom)
s.Close()
}

// NotifyByUserID 按照用户 id,通知到用户的所有在线浏览器页面。支持用户多 session 登录,例如 sysadmin
func (manager *ClientManager) NotifyByUserID(message string, UserID string) error {
count := 0
name := ""
msg := ""

for _, c := range manager.UserID2ClientMap[UserID] {
if c.UserID == UserID {
c.Conn.Emit("message", string(message))
name = manager.SID2ClientMap[c.Conn.ID()].Name
count++
}
}

if count == 0 {
log.Warningf("UserID %s is not online!", UserID)
return errors.Errorf(msg)
}
log.Debugf("[%d clients] NOTIFY OK to %s(@%s) ", count, UserID, name)
return nil
}

//Broadcasts 对所有在线用户发广播
func (manager *ClientManager) Broadcasts(message string) {
// BroadcastToRoom(namespace string, room, event string, args ...interface{}) bool {
if manager.IsEmpty() {
log.Debugf("Ignore Broadcasting for empty room\n")
return
}
if !manager.Server.BroadcastToRoom("", BroadcastRoom, "message", message) {
log.Errorf("broadcasting error with msg %s\n", message)
return
}
log.Infof("broadcasting OK with msg %s\n", message)
}
15 changes: 15 additions & 0 deletions pkg/notify/socket/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package socket // import "yunion.io/x/onecloud/pkg/notify/tasks"
39 changes: 39 additions & 0 deletions pkg/notify/socket/login.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package socket

import (
"context"

socketio "github.com/googollee/go-socket.io"

"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"

"yunion.io/x/onecloud/pkg/apigateway/clientman"
"yunion.io/x/onecloud/pkg/mcclient"
)

// getCred 与前端约定,在 socketio 里,使用 ?session=XXX 的方式获取用户的 session
func getCred(ctx context.Context, s socketio.Conn) (string, mcclient.TokenCredential, error) {
query, err := jsonutils.ParseQueryString(s.URL().RawQuery)
if err != nil {
return "", nil, errors.Wrapf(err, "ParseQueryString")
}
session, err := query.GetString("session")
if err != nil {
return "", nil, errors.Wrapf(err, "get session")
}
if len(session) == 0 {
return "", nil, errors.Errorf("empty session")
}
// tm := clientman.NewMapTokenManagerV2() 这行注释临时保留 -rex.
// cred := clientman.TokenMan.Get(session)
authToken, err := clientman.Decode(session)
if err != nil {
return "", nil, errors.Wrap(err, "Decode")
}
cred, err := authToken.GetToken(ctx)
if err != nil {
return "", nil, errors.Wrap(err, "authToken.GetToken")
}
return session, cred, nil
}
21 changes: 21 additions & 0 deletions pkg/notify/socket/msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package socket

type SMsgEntry struct {
ObjType string `json:"obj_type"`
ObjId string `json:"obj_id"`
ObjName string `json:"obj_name"`
Success bool `json:"success"`
Action string `json:"action"`
Notes string `json:"notes"`
UserId string `json:"user_id"`
User string `json:"user"`
TenantId string `json:"tenant_id"`
Tenant string `json:"tenant"`
Broadcast bool `json:"broadcast"`
//控制前端是否进行弹窗信息提示
IgnoreAlert bool `json:"ignore_alert"`
}

type SMsg struct {
WebSocket SMsgEntry `json:"notification"`
}
Loading