Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are received by "batch" in client #70

Open
kabaluyot opened this issue Dec 16, 2019 · 6 comments
Open

Messages are received by "batch" in client #70

kabaluyot opened this issue Dec 16, 2019 · 6 comments

Comments

@kabaluyot
Copy link

I notice that the messages are received by batches from my client (javascript).
Is this message buffersize for?

But I adjusted it and cant get "real-time". Please help :/

@purehyperbole
Copy link
Member

Hi,

Any event should be sent immediately providing there is Data associated with it.

Buffering should only come into play if you are generating events faster than the underlying TCP connection can send them.

It's tricky to say what the issue is without more information.

Could you please provide some more information around what and how you are sending events please?

@kabaluyot
Copy link
Author

Im using this package as a SSE Server with Javascript (NuxtJS) client (consumer).
My initial plan was to subscribe by stream,

Example:
/sse/market-data?stream=PSE:2GO -> market data for PSE:2GO
/sse/market-data?stream=PSE:TEL -> market data ffor PSE:TEL

However I am receiving by "batch" of messages instead of flushing them directly thus making the app not real-time anymore.

Also, another solution I tried was to consume by event, say I have:
/sse?stream=market-data

and during server.Publish, I specify the Event = PSE:2GO

and on othe client side I just,
eventSource.AddEventListener("PSE:2GO", function(event){})

I can get "real-time" flushes with the latter than the former. What method is the best?

@purehyperbole
Copy link
Member

Given what you've described, I've attempted to replicate your issue with the following test code and can't get similar results.

srv := sse.New()

mux := http.NewServeMux()
mux.HandleFunc("/events", srv.HTTPHandler)
server := httptest.NewServer(mux)
url := server.URL + "/events"

srv.CreateStream("PSE:TEL")
srv.CreateStream("PSE:2GO")

go func() {
	c := sse.NewClient(url)

	err := c.Subscribe("PSE:TEL", func(e *Event) {
		fmt.Println("got PSE:TEL event")
	})

	if err != nil {
		panic(err)
	}
}()

go func() {
	c := sse.NewClient(url)

	err := c.Subscribe("PSE:2GO", func(e *Event) {
		fmt.Println("got PSE:2GO event")
	})

	if err != nil {
		panic(err)
	}
}()

go func() {
	for {
		srv.Publish("PSE:TEL", &Event{Data: []byte("tel")})
		time.Sleep(time.Millisecond)
	}
}()

for {
	srv.Publish("PSE:2GO", &Event{Data: []byte("2go")})
	time.Sleep(time.Millisecond)
}

I'm not sure if you're able to test something similar with the javascript client you have?

@kabaluyot
Copy link
Author

kabaluyot commented Dec 16, 2019

Thank you for your response. Yes for the client, I just use the HTML5 EventSource to consume.

In my case, I also need to create stream inside a for loop (its a Kafka listener by the way) if the stream does not exist.

Example:

     func KafkaMarketDataListener() {
log.Println("[SERVER] KAFKA CLIENT0 CONNECTED")

for {
	now := time.Now()

	m, err := kafkaConsumerHandler0.ReadConsumer()
	if err != nil {
		log.Println("[SERVER] ReadConsumer0 Error", err.Error())
		panic(err)
	}

	// region key verificaton
	arr := arrays.Explode(".", string(m.Key))

	if len(arr) > 1 && arr[0] == "M-D" && arrays.IsStringInArray(arr[1], marketDataServerKeyList) {
		// region sse publish
		if arr[1] == "TR" { //allow publish for trades
			sseMarketDataServerHandler.Publish(marketDataAllTrades, "", string(m.Value))
		}

		// check if stream exist. If not, create one
		if !sseMarketDataServerHandler.IsStreamExists(string(m.Key)) {
			sseMarketDataServerHandler.CreateStream(string(m.Key))
		}

		// send it now
		sseMarketDataServerHandler.Publish(string(m.Key), "", string(m.Value))
		// endregion sse publish
	}
	// endregion key verification

	log.Println("[SERVER] KAFKA0 TIME:", string(m.Key), time.Since(now))
}

In my case, the browser client are receivinig by batch messages that's why I am trying to find whether I need to set certain "batch queue length" or whatever.

@kabaluyot
Copy link
Author

And I also observe this sir,

What I did in my javascript client and golang sse client is that, I print the messages with counters.
And for every, say, 150 messages I receive, the client will wait for another 150 (=300) and start printing it (its like in batches). I suspect its the size of the messages. Is there anything I need to adjust?

@purehyperbole
Copy link
Member

@kabaluyot Sorry for the delay in getting back to you. I've been on annual holiday.

You may want to decouple the kafa stream logic from the sse handler to rule that out as being an issue.

Also, you may benefit from setting server.AutoStream = true before starting the server. This will create the stream with you if it doesn't exist already.

This should allow you to remove the block:

if !sseMarketDataServerHandler.IsStreamExists(string(m.Key)) {
	sseMarketDataServerHandler.CreateStream(string(m.Key))
}

Size of the messages as you mention might also cause an issue. Large messages will block other pending messages until they are received. There isn't any buffering mechanism that would create this issue inside of the sse library. There is a buffer queue, however these will be sent as soon as the connection is ready to send.

If you are able to, creating a reproducible test case would really help.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants