Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memphis nats integration tcp #253

Merged
merged 27 commits into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0ab8228
tcp connect nats
shohamroditimemphis Aug 7, 2022
2fff337
Merge branch 'memphis-nats-integration' of https://github.com/memphis…
shohamroditimemphis Aug 7, 2022
f8064f4
Merge remote-tracking branch 'origin/memphis-nats-integration' into m…
shohamroditimemphis Aug 9, 2022
b3f148e
fix import cycle
shohamroditimemphis Aug 9, 2022
1c57caf
add connectionId to pong message
shohamroditimemphis Aug 10, 2022
c85e266
Fix connectionId auth flow
OrMemphis Aug 11, 2022
cac448a
Fix access token issue
OrMemphis Aug 11, 2022
292d200
remove access token, handle factory creation requests via subject
OrMemphis Aug 13, 2022
6be4af9
clean code
shohamroditimemphis Aug 14, 2022
078bc0a
station creation requests via subject
shohamroditimemphis Aug 14, 2022
5e2f59b
fix create station via subject
shohamroditimemphis Aug 15, 2022
eb3eadf
handle producer creation via subject
shohamroditimemphis Aug 15, 2022
c7ecd74
handle consumer via subject
shohamroditimemphis Aug 15, 2022
dca10b4
Respond to memphis SDK requests via reply subject
OrMemphis Aug 15, 2022
bd6e60e
Support factory destruction
OrMemphis Aug 16, 2022
33b6812
changing userType
shohamroditimemphis Aug 16, 2022
80fea08
merge conflicts
shohamroditimemphis Aug 16, 2022
254b709
Merge branch 'memphis-nats-integration-tcp-no_access_token' of https:…
shohamroditimemphis Aug 16, 2022
7bff251
Support new SDK destruction of stations, Producers and Consumers
OrMemphis Aug 16, 2022
6ecfc13
Rename main_listener to sdk_handlers
OrMemphis Aug 16, 2022
6d1d167
refactor sdk handlers as part of the server package
OrMemphis Aug 16, 2022
b0be302
Fix returning an empty error in few places
OrMemphis Aug 16, 2022
c0609c7
Merge remote-tracking branch 'origin/memphis-nats-integration-tcp-no_…
OrMemphis Aug 16, 2022
d514c6a
convert internal requests to be cluster compatible using
OrMemphis Aug 19, 2022
1e32f99
Implement jsApi based requests for most memphis requests
OrMemphis Aug 19, 2022
f0d69e6
Implement GetMessages
OrMemphis Aug 19, 2022
f517cd7
Replace warnings with error messages + cleanup
OrMemphis Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ var disableAnalyticsCounter metric.Int64Counter
var deploymentId string
var analyticsFlag string

func InitializeAnalytics() error {
systemKeysCollection = db.GetCollection("system_keys")
func InitializeAnalytics(c *mongo.Client) error {
systemKeysCollection = db.GetCollection("system_keys", c)
deployment, err := getSystemKey("deployment_id")
if err == mongo.ErrNoDocuments {
deploymentId := primitive.NewObjectID().Hex()
Expand Down Expand Up @@ -90,6 +90,7 @@ func InitializeAnalytics() error {
return err
} else {
analyticsFlag = analytics.Value

}

ls = launcher.ConfigureOpentelemetry(
Expand Down
3 changes: 1 addition & 2 deletions background_tasks/poison_messages_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
package background_tasks

import (
"memphis-broker/handlers"
"memphis-broker/server"
)

var poisonMessagesHandler handlers.PoisonMessagesHandler
var poisonMessagesHandler server.PoisonMessagesHandler

func ListenForPoisonMessages(s *server.Server) {
s.QueueSubscribe("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>",
Expand Down
8 changes: 4 additions & 4 deletions background_tasks/zombie_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ var poisonMessagesCollection *mongo.Collection
var serv *server.Server

func InitializeZombieResources(s *server.Server) {
connectionsCollection = db.GetCollection("connections")
producersCollection = db.GetCollection("producers")
consumersCollection = db.GetCollection("consumers")
poisonMessagesCollection = db.GetCollection("poison_messages")
connectionsCollection = db.GetCollection("connections", s.DbClient)
producersCollection = db.GetCollection("producers", s.DbClient)
consumersCollection = db.GetCollection("consumers", s.DbClient)
poisonMessagesCollection = db.GetCollection("poison_messages", s.DbClient)
serv = s
}

Expand Down
46 changes: 29 additions & 17 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package db

import (
"memphis-broker/conf"
"memphis-broker/server"

"context"
"time"
Expand All @@ -25,13 +24,24 @@ import (
)

var configuration = conf.GetConfig()
var serv *server.Server
// var serv *server.Server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to leave these comments ?


const (
dbOperationTimeout = 20
)

func InitializeDbConnection(s *server.Server) error {
type logger interface{
Noticef(string, ...interface{})
Errorf(string, ...interface{})
}

type DbInstance struct {
Client *mongo.Client
Ctx context.Context
Cancel context.CancelFunc
}

func InitializeDbConnection(l logger) (DbInstance, error) {
ctx, cancel := context.WithTimeout(context.TODO(), dbOperationTimeout*time.Second)

var clientOptions *options.ClientOptions
Expand All @@ -48,32 +58,34 @@ func InitializeDbConnection(s *server.Server) error {

client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return err
cancel()
return DbInstance{}, err
}

err = client.Ping(ctx, nil)
if err != nil {
return err
cancel()
return DbInstance{}, err
}

s.DbClient = client
s.DbCtx = ctx
s.DbCancel = cancel
serv = s
s.Noticef("Established connection with the DB")
return nil
// s.DbClient = client
// s.DbCtx = ctx
// s.DbCancel = cancel
// serv = s
l.Noticef("Established connection with the DB")
return DbInstance{Client:client, Ctx: ctx, Cancel: cancel}, nil
}

func GetCollection(collectionName string) *mongo.Collection {
var collection *mongo.Collection = serv.DbClient.Database(configuration.DB_NAME).Collection(collectionName)
func GetCollection(collectionName string, dbClient *mongo.Client) *mongo.Collection {
var collection *mongo.Collection = dbClient.Database(configuration.DB_NAME).Collection(collectionName)
return collection
}

func Close() {
defer serv.DbCancel()
func Close(dbi DbInstance, l logger) {
defer dbi.Cancel()
defer func() {
if err := serv.DbClient.Disconnect(serv.DbCtx); err != nil {
serv.Errorf("Failed to close Mongodb client: " + err.Error())
if err := dbi.Client.Disconnect(dbi.Ctx); err != nil {
l.Errorf("Failed to close Mongodb client: " + err.Error())
}
}()
}
17 changes: 8 additions & 9 deletions http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package http_server

import (
"memphis-broker/conf"
"memphis-broker/handlers"
"memphis-broker/http_server/routes"
"memphis-broker/server"
"memphis-broker/socketio"
Expand All @@ -25,14 +24,14 @@ import (
func InitializeHttpServer(s *server.Server, wg *sync.WaitGroup) {
configuration := conf.GetConfig()

handlers := handlers.Handlers{
Producers: handlers.ProducersHandler{S: s},
Consumers: handlers.ConsumersHandler{S: s},
AuditLogs: handlers.AuditLogsHandler{},
Stations: handlers.StationsHandler{S: s},
Factories: handlers.FactoriesHandler{S: s},
Monitoring: handlers.MonitoringHandler{S: s},
PoisonMsgs: handlers.PoisonMessagesHandler{S: s},
handlers := server.Handlers{
Producers: server.ProducersHandler{S: s},
Consumers: server.ConsumersHandler{S: s},
AuditLogs: server.AuditLogsHandler{},
Stations: server.StationsHandler{S: s},
Factories: server.FactoriesHandler{S: s},
Monitoring: server.MonitoringHandler{S: s},
PoisonMsgs: server.PoisonMessagesHandler{S: s},
}

httpServer := routes.InitializeHttpRoutes(&handlers)
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeConsumersRoutes(router *gin.RouterGroup, h *handlers.Handlers) {
func InitializeConsumersRoutes(router *gin.RouterGroup, h *server.Handlers) {
consumersHandler := h.Consumers
consumersRoutes := router.Group("/consumers")
consumersRoutes.GET("/getAllConsumers", consumersHandler.GetAllConsumers)
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeFactoriesRoutes(router *gin.RouterGroup, h *handlers.Handlers) {
func InitializeFactoriesRoutes(router *gin.RouterGroup, h *server.Handlers) {
factoriesHandler := h.Factories
factoriesRoutes := router.Group("/factories")
factoriesRoutes.POST("/createFactory", factoriesHandler.CreateFactory)
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeMonitoringRoutes(router *gin.RouterGroup, h *handlers.Handlers) {
func InitializeMonitoringRoutes(router *gin.RouterGroup, h *server.Handlers) {
monitoringHandler := h.Monitoring
monitoringRoutes := router.Group("/monitoring")
monitoringRoutes.GET("/getClusterInfo", monitoringHandler.GetClusterInfo)
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeProducersRoutes(router *gin.RouterGroup, h *handlers.Handlers) {
func InitializeProducersRoutes(router *gin.RouterGroup, h *server.Handlers) {
producersHandler := h.Producers
producersRoutes := router.Group("/producers")
producersRoutes.GET("/getAllProducers", producersHandler.GetAllProducers)
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/middlewares"
"memphis-broker/server"
"memphis-broker/utils"

"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
)

func InitializeHttpRoutes(handlers *handlers.Handlers) *gin.Engine {
func InitializeHttpRoutes(handlers *server.Handlers) *gin.Engine {
router := gin.Default()

router.Use(cors.New(cors.Config{
Expand Down
4 changes: 2 additions & 2 deletions http_server/routes/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeSandboxRoutes(router *gin.RouterGroup) {
sandboxHandler := handlers.SandboxHandler{}
sandboxHandler := server.SandboxHandler{}
sandboxRoutes := router.Group("/sandbox")
sandboxRoutes.POST("/login", sandboxHandler.Login)
}
4 changes: 2 additions & 2 deletions http_server/routes/stations.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package routes

import (
"memphis-broker/handlers"
"memphis-broker/server"

"github.com/gin-gonic/gin"
)

func InitializeStationsRoutes(router *gin.RouterGroup, h *handlers.Handlers) {
func InitializeStationsRoutes(router *gin.RouterGroup, h *server.Handlers) {
stationsHandler := h.Stations
stationsRoutes := router.Group("/stations")
stationsRoutes.GET("/getStation", stationsHandler.GetStation)
Expand Down
5 changes: 2 additions & 3 deletions http_server/routes/user_mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
package routes

import (
"memphis-broker/handlers"

"memphis-broker/server"
"github.com/gin-gonic/gin"
)

func InitializeUserMgmtRoutes(router *gin.RouterGroup) {
userMgmtHandler := handlers.UserMgmtHandler{}
userMgmtHandler := server.UserMgmtHandler{}
userMgmtRoutes := router.Group("/usermgmt")
userMgmtRoutes.GET("/nats/authenticate", userMgmtHandler.AuthenticateNatsUser)
userMgmtRoutes.GET("/nats/authenticate/:publicKey", userMgmtHandler.AuthenticateNatsUser)
Expand Down
38 changes: 20 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"memphis-broker/analytics"
"memphis-broker/background_tasks"
"memphis-broker/db"
"memphis-broker/handlers"
"memphis-broker/http_server"
"memphis-broker/server"
"os"
Expand Down Expand Up @@ -93,48 +92,51 @@ func usage() {
os.Exit(0)
}

func handleError(s *server.Server, message string, err error) {
if err != nil {
s.Errorf(message + " " + err.Error())
}
}

func runMemphis(s *server.Server) {

if !s.MemphisInitialized() {
s.Fatalf("Jetstream not enabled on global account")
}

err := db.InitializeDbConnection(s)
handleError(s, "Failed initializing db connection: ", err)

err = analytics.InitializeAnalytics()
handleError(s, "Failed initializing analytics: ", err)
dbInstance, err := db.InitializeDbConnection(s)
if err != nil {
s.Errorf("Failed initializing db connection:" + " " + err.Error())
os.Exit(1)
}

handlers.InitializeHandlers(s)
err = analytics.InitializeAnalytics(dbInstance.Client)
if err != nil {
s.Errorf("Failed initializing analytics: " + " " + err.Error())
db.Close(dbInstance, s)
os.Exit(1)
}

err = handlers.CreateRootUserOnFirstSystemLoad()
handleError(s, "Failed to create root user: ", err)
s.InitializeMemphisHandlers(dbInstance)

err = server.CreateRootUserOnFirstSystemLoad()
if err != nil {
s.Errorf("Failed to create root user: " + " " + err.Error())
db.Close(dbInstance, s)
os.Exit(1)
}
background_tasks.InitializeZombieResources(s)

defer db.Close()
defer db.Close(dbInstance, s)

// defer broker.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this one

defer analytics.Close()

wg := new(sync.WaitGroup)
wg.Add(4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be 3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed


// go tcp_server.InitializeTcpServer(wg)
go http_server.InitializeHttpServer(s, wg)
go background_tasks.KillZombieResources(wg)
go background_tasks.ListenForPoisonMessages(s)

var env string
if os.Getenv("DOCKER_ENV") != "" {
env = "Docker"
s.Noticef("\n**********\n\nDashboard: http://localhost:9000\nMemphis broker: localhost:5555 (Management Port) / 7766 (Data Port) / 6666 (TCP Port)\nUI/CLI root username - root\nUI/CLI root password - memphis\nSDK root connection token - memphis \n\n**********")
s.Noticef("\n**********\n\nDashboard: http://localhost:9000\nMemphis broker: localhost:5555 (Management Port) / 6666 (TCP Port)\nUI/CLI root username - root\nUI/CLI root password - memphis\nSDK root connection token - memphis \n\n**********")
} else {
env = "K8S"
}
Expand Down
Binary file removed memphis-broker
Binary file not shown.