Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improving e2e testing latency #1760

Merged
merged 12 commits into from
Jun 24, 2024
Merged

Conversation

samhith-kakarla
Copy link
Contributor

creating the pr for testing of ci do not merge right now

Copy link

codecov bot commented Jun 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 57.23%. Comparing base (c70540b) to head (d60988c).

Current head d60988c differs from pull request most recent head a8588a8

Please upload reports for the commit a8588a8 to get more accurate results.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1760      +/-   ##
==========================================
- Coverage   60.94%   57.23%   -3.71%     
==========================================
  Files         232      218      -14     
  Lines       20014    17581    -2433     
==========================================
- Hits        12197    10063    -2134     
+ Misses       6959     6664     -295     
+ Partials      858      854       -4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@whynowy
Copy link
Member

whynowy commented Jun 12, 2024

@samhith-kakarla - you might need to look into the original PR, we want to revert some changes over there:

  1. We do not want to start kafka, nats every time when we run an E2E test, we only want to do that when a Kafka or nats e2e test is running. This means, we still want to use go:generate to do the installation.
  2. With this, it comes a problem for the test api, that it initializes the connection (e.g. Kafka client) when the test api service starts, which requires the kafka/nats services running. To solve the problem, move the client initialization to lazy load mode with a lock.

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
@@ -171,9 +149,6 @@ cleanup-e2e:
# To run just one of the e2e tests by name (i.e. 'make TestCreateSimplePipeline'):
Test%:
$(MAKE) cleanup-e2e
$(MAKE) deploy-nats
$(MAKE) deploy-kafka
$(MAKE) deploy-redis
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing go generate?

@@ -25,6 +25,10 @@ import (
"github.com/stretchr/testify/suite"
)

//go:generate kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/nats -n numaflow-system
//go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is how the resources are getting deployed when go generate is called on the file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't this needed before?

Copy link
Contributor Author

@samhith-kakarla samhith-kakarla Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i think the commit where this jetstream test was added(#1723) came after the e2e re-factor pr (#1725) so at this point all the resources were getting deployed in the make file whenever any test was run. But now since we are no longer deploying all the resources in the make file, need this line here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, shouldn't this line be covered in kubectl apply -k ../../config/apps/nats -n numaflow-system?

Copy link
Contributor Author

@samhith-kakarla samhith-kakarla Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you talking about this line //go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system if so i have removed it: i have only kept : //go:generatekubectl -n numaflow-system delete statefulset nats --ignore-not-found=true //go:generate kubectl apply -k ../../config/apps/nats -n numaflow-system

@@ -0,0 +1,9 @@

apiVersion: v1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this file used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is getting used in the following line: /go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system

@@ -27,6 +27,10 @@ import (
. "github.com/numaproj/numaflow/test/fixtures"
)

//go:generate kubectl -n numaflow-system delete statefulset nats --ignore-not-found=true
//go:generate kubectl apply -k ../../config/apps/nats -n numaflow-system
//go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is how the resources are getting deployed when go generate is called on the file

@@ -0,0 +1,9 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file in-use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is used here : //go:generate kubectl apply -f testdata/nats-auth-fake-token.yaml -n numaflow-system

func (kh *KafkaController) CreateTopicHandler(w http.ResponseWriter, r *http.Request) {

m.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abstract this as a function with lock, returning a client.

@@ -42,7 +42,20 @@ func NewHttpController() *HttpController {
}
}

func NewHttpController() *HttpController {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make change to this file? Initialization with an HTTP client should be okay, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, we do not need to change this file. i will reverse the changes

}
}

func (n *NatsController) PumpSubject(w http.ResponseWriter, r *http.Request) {
m.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, abstract this as a function with lock, returning a client.

func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Request) {
m.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Makefile Outdated
$(MAKE) image e2eapi-image
kubectl -n numaflow-system delete po -lapp.kubernetes.io/component=controller-manager,app.kubernetes.io/part-of=numaflow
kubectl -n numaflow-system delete po e2e-api-pod --ignore-not-found=true
go generate $(shell find ./test/kafka-e2e -name '*.go')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't use go generate $(shell find ./test/$* -name '*.go') as above?

Copy link
Contributor Author

@samhith-kakarla samhith-kakarla Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think because that portion of the makefile is To run just one of the e2e tests by name (i.e. 'make TestCreateSimplePipeline'): so i dont think go generate $(shell find ./test/$* -name '*.go') works as this requires test file name as input not a test name.

Copy link
Contributor

@kohlisid kohlisid Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to do a selective go generate here as well by trying to extract the file name, maybe this can help
shell grep $(*) -R ./test | head -1 | awk -F\/ '{print $$3}'

func NewNatsController(url string, token string) *NatsController {
opts := []natslib.Option{natslib.Token(token)}
nc, err := natslib.Connect(url, opts...)
func initNewNats(n *NatsController) *NatsController {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it returns a * NatsController?

func NewKafkaController() *KafkaController {
// initialize Kafka handlers
var brokers = []string{bootstrapServers}
func initNewKafka(n *KafkaController) *KafkaController {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it returns *KafkaController?

Copy link
Contributor Author

@samhith-kakarla samhith-kakarla Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i felt it was just cleaner to do that. that way when i call the function i can just re-assign the controller like this kh = initNewKafka(kh). otherwise i would need to return producer, consumer and adminClient individually and re-assign those fields when the init function is called right ?

func NewRedisController() *RedisController {
// When we use this API to validate e2e test result, we always assume a redis UDSink is used
// to persist data to a redis instance listening on port 6379.
func initNewReddis(n *RedisController) *RedisController {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above.

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
config.Producer.Partitioner = sarama.NewManualPartitioner

producer, err := sarama.NewSyncProducer(brokers, config)
adminClient, err := sarama.NewClusterAdmin(n.brokers, config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can directly assign to n.adminClient and return the same

config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewManualPartitioner

consumer, err := sarama.NewConsumer(n.brokers, config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can directly assign, same as above.

config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewManualPartitioner

producer, err := sarama.NewSyncProducer(n.brokers, config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can directly assign, same as above.


}

// func initNewKafka(n *KafkaController) *KafkaController {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this

// }

func NewKafkaController() *KafkaController {
// initialize Kafka handlers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clean up any extra commented lines

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
adminClient: adminClient,
producer: producer,
consumer: consumer,
adminClient: nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here indicating why these are set to nil initially.

func NewKafkaController() *KafkaController {
// initialize Kafka handlers
var brokers = []string{bootstrapServers}
// getter method for lazy loading. create and return the admin client only when required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getKafkaClient is used for getting a Kafka admin client for the given 
controller config. It is implemented with a lazy loading mechanism 
1) A new client created only for the first request
2) Returning the current client if it exists

@kohlisid
Copy link
Contributor

@samhith-kakarla Please add the latency improvement stats that you collected here

n.mLock.Lock()
defer n.mLock.Unlock()
if n.consumer != nil {
log.Println("consumer already existed")
Copy link
Contributor

@kohlisid kohlisid Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this log

n.mLock.Lock()
defer n.mLock.Unlock()
if n.producer != nil {
log.Println("producer already existed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this log

@@ -149,5 +172,6 @@ func (n *NatsController) PumpJetstream(w http.ResponseWriter, r *http.Request) {
}

func (n *NatsController) Close() {
// no lazy clozing required becuase they check if nc is not null before they close
Copy link
Contributor

@kohlisid kohlisid Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos
// Nil check is not required here because the client.Close() function does a nil check internally.

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
@kohlisid
Copy link
Contributor

@samhith-kakarla There are permission issues with the doc you added, can you directly add the raw data here. Might be easier

@samhith-kakarla
Copy link
Contributor Author

ci runtimes before and after improvemnts:

Screenshot 2024-06-21 at 11 54 44 AM

@kohlisid
Copy link
Contributor

@samhith-kakarla As discussed, Could you add the verification result for the local testing for single test function with the updated go generate to block extra resources for those cases as well.

@samhith-kakarla
Copy link
Contributor Author

Local Testing results for single test function.

here is the result of the grep command run on the test name:

Screenshot 2024-06-21 at 2 09 57 PM

Here is the results of running make TestKafkaSourceSink:

Screenshot 2024-06-21 at 2 11 44 PM Screenshot 2024-06-21 at 2 11 54 PM

it finds the right test file, deploys the correct resources and passes.

@samhith-kakarla samhith-kakarla marked this pull request as ready for review June 21, 2024 22:18
*/

func (n *KafkaController) getKafkaClient() sarama.ClusterAdmin {
n.mLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines should be moved to line 54.

2) Returning the current client if it exists
*/

func (n *KafkaController) getKafkaClient() sarama.ClusterAdmin {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is just for testing, we don't need to separate them into multiple different get functions, instead, do something like func (n *KafkaController) getProducerAndConsumer() (sarama.ClusterAdmin, sarama.SyncProducer, sarama.Consumer).

This is to avoid too much redundant code.

func (n *KafkaController) getKafkaClient() sarama.ClusterAdmin {
n.mLock.Lock()
defer n.mLock.Unlock()
if n.adminClient != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check needs to happen again after the lock is acquired.

nc, err := natslib.Connect(url, opts...)
// getter method for lazy loading. creates and returns nats client only when required
func (n *NatsController) getNatsClient() *natslib.Conn {
n.mLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock should happen after client nil check.

func (n *NatsController) getNatsClient() *natslib.Conn {
n.mLock.Lock()
defer n.mLock.Unlock()
if n.client != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, nil check should happen twice, before and after acquiring the lock.

Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
@whynowy whynowy merged commit efcd722 into numaproj:main Jun 24, 2024
25 checks passed
samhith-kakarla added a commit to samhith-kakarla/numaflow that referenced this pull request Jul 5, 2024
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Signed-off-by: Samhith Kakarla <samhith_kakarla@intuit.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants