New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memphis nats integration tcp #253
Memphis nats integration tcp #253
Conversation
…dev/memphis-broker into memphis-nats-integration-tcp
…emphis-nats-integration-tcp
9435e6d
to
b3f148e
Compare
…//github.com/memphisdev/memphis-broker into memphis-nats-integration-tcp-no_access_token
…access_token' into memphis-nats-integration-tcp
db/db.go
Outdated
@@ -25,13 +24,24 @@ import ( | |||
) | |||
|
|||
var configuration = conf.GetConfig() | |||
var serv *server.Server | |||
// var serv *server.Server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to leave these comments ?
main.go
Outdated
background_tasks.InitializeZombieResources(s) | ||
|
||
defer db.Close() | ||
defer db.Close(dbInstance, s) | ||
|
||
// defer broker.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this one
main.go
Outdated
background_tasks.InitializeZombieResources(s) | ||
|
||
defer db.Close() | ||
defer db.Close(dbInstance, s) | ||
|
||
// defer broker.Close() | ||
defer analytics.Close() | ||
|
||
wg := new(sync.WaitGroup) | ||
wg.Add(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed
server/memphis_handlers_consumers.go
Outdated
if consumerGroup != "" { | ||
err = validateName(consumerGroup) | ||
if err != nil { | ||
serv.Warnf(err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it warning?
@@ -164,6 +163,46 @@ func (fh FactoriesHandler) CreateFactory(c *gin.Context) { | |||
}) | |||
} | |||
|
|||
var ErrFactoryAlreadyExists = errors.New("memphis: factory already exists") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what soed it means direct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unlike HTTP requests, that go to the HTTP handler function and have the middleware's context (getUserDetailsFromMiddleware), SDK requests are processed directly. we had to differentiate between them somehow and this naming was chosen... we're open for other suggestions :)
server/memphis_handlers_producers.go
Outdated
producerType := strings.ToLower(cpr.ProducerType) | ||
err = validateProducerType(producerType) | ||
if err != nil { | ||
serv.Warnf(err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason we log error as warning in some places in your code
server/memphis_helper.go
Outdated
@@ -382,6 +377,23 @@ func (s *Server) QueueSubscribe(subj, queueGroupName string, cb func(string, []b | |||
return err | |||
} | |||
|
|||
func (s *Server) subscribeOnGlobalAcc(subj, sid string, cb func(string, string, []byte)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sid == queue group ? if it does let's change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscription id, following NATS' naming
func (s *Server) initialiseSDKHandlers() { | ||
// factories | ||
s.subscribeOnGlobalAcc("$memphis_factory_creations", | ||
"memphis_factory_creations_subscription", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case the second parameter is queue group all fine, otherwise we need to ensure this subscription is working as a group
jetstreamAPI (WIP) and move background tasks into the server package
No description provided.