Skip to content
This repository was archived by the owner on May 14, 2021. It is now read-only.

Support pubsub methodology in SSE requests#5

Closed
gadisr wants to merge 22 commits intokinecosystem/masterfrom
gadi/add_pubsub_to_sse
Closed

Support pubsub methodology in SSE requests#5
gadisr wants to merge 22 commits intokinecosystem/masterfrom
gadi/add_pubsub_to_sse

Conversation

@gadisr
Copy link
Copy Markdown

@gadisr gadisr commented Aug 28, 2018

Every connection subscribes only to related data and get notified on changes accordingly.
This optimizes Horizon<->History DB throughput and make flow more effiecient.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use log fields for the "topic" field like in logrus. check usage throughout the codebase

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant line is redundant. use git add -p

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel is a generic name. call this ssePumped or something more verbose

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is confusing:

  • what do you mean by "get an if from requests"?
  • what is "q topic"?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to use a switch {} block instead? or do you need to evaluate all conditions all every on call to this function?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you have to create a new goroutine here? doesn't it just overcomplicate things?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed implementation

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Horizon" database instead of "History"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will using this function work properly if there are multiple subscribers to the same topic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed to use different pubsub implementation

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make two select blocks instead of an infinitie for loop. the first blook will continue successfully if channel_a returns, and then just run the select again and succeed if 2 seconds pass.

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you going to test specific functions along with testing the entire flow?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@gadisr gadisr force-pushed the gadi/add_pubsub_to_sse branch from 6e9c29b to 613a098 Compare August 30, 2018 13:41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok for the switch to not catch any catch any condition?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel name doesn't convey any information beside its type, and makes it hard to follow your flow down the road. better give it a meaningful name like topicMessage or something

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • rephrasing and small typo fixes: putting result before the condition for better readability:
Subscribe this handler to the topic if the SSE request is related to a specific topic (tx_id, account_id, etc.).
This causes the request to only be triggered by this topic. Unsubscribe when done.
  • Also, to which request are you referring on "request will be triggered" - HTTP or database query?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why int32 --> int --> string instead of just straight to string?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package name is already pubsub, no need to rename to pubsub again

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did this magic value come from?

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as previous test

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand this func doc. if it's from another file why not just import it?

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the New function coming from? from the . import? if so, can you not rely on the . import? confusing

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to comment in the test above, duplicate this test to another one that publishes more than one value

@gadisr gadisr force-pushed the gadi/add_pubsub_to_sse branch from a4a9da5 to 0b51059 Compare September 9, 2018 07:11
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check for if !ok first, otherwise this would crash the program. if !ok then action == null. Replace this condition with the one coming just after it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you doing this stahp this unnecessary diff noise

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also another one like this in the import section at the top of this file

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not action.GetString(..) like all other implementations? there are other functions like this in this pr

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no value for ledger there, just "ledger" hardcoded itself

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no value in this action for specific ledger_id, so registration topic is a general change in the ledger.

Copy link
Copy Markdown

@oryband oryband Sep 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure the arg here should be account_id? it's already used in func (action *DataShowAction) GetTopic()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid calling GetString() twice:

func (action *EffectIndexAction) GetTopic() (res string) {
    if res := action.GetString(...); res != "" {
        return
    }
    if res := ...; res != "" { return }  // repeat for other fields

    return  // res == ""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply this to other functions in this pr

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaghetti code: instead of doing a select inside a select, just have two select blocks one after the other. i already wrote this comment here somewhere look it up

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename channels to topics

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many subscribers

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these magic address and kahuna?

Comment thread services/horizon/internal/sse_test.go Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: if the time.After returns before subscription the wait at the bottom will hang indefinitely

@gadisr gadisr force-pushed the gadi/add_pubsub_to_sse branch from 968878d to 51dafea Compare September 9, 2018 11:33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, magic value "commit"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic number

Gadi Srebnik and others added 9 commits September 17, 2018 17:47
Every connection subscribes only to related data and get notified on changes accordingly.
This optimizes Horizon<->History DB throughput and make flow more effiecient.
… after data is written on DB.

Also changed sse channel from struct{} to interface{} to be easily used with pubsub solution.
Changed tests by pr and added undubscribe when each test is done
@oryband oryband force-pushed the gadi/add_pubsub_to_sse branch from c8c631c to 5280130 Compare September 17, 2018 14:47
NOTE This only happens only healthy HTTP 2xx responses.

This is a hacky solution that tries to solve the case where clients have an unwanted behavior:
They already have an account open and just want to poll their account balance but retry too quickly,
according to default retry value (a few seconds usually). This causes them to spam the server right
after disconnecting the SSE stream.

This change will cause them to receive the preamble over and over again on every SSE tick,
practically turning it into a keepalive message. This would lower the reconnection amount
and in turn (most importantly) the amount of queries sent to the Horizon database.

This has a cost though - it keeps SSE connections alive and also causes "junk" keep alive traffic
to be sent over and over again.
@oryband
Copy link
Copy Markdown

oryband commented Oct 16, 2018

cleaning this up and closing in favor of #14

@oryband oryband closed this Oct 16, 2018
@oryband oryband deleted the gadi/add_pubsub_to_sse branch April 4, 2019 12:44
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants