Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine: Dispatch service #346

Merged
merged 39 commits into from Oct 2, 2019

Conversation

@shazbert
Copy link
Collaborator

commented Aug 30, 2019

Description

POC - DISPATCHER

Dispatch service allows for push updating via communication channels.

image

Fixes # (issue)

Type of change

Please delete options that are not relevant and add an x in [] as item is complete.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Good question...

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation and regenerated documentation via the documentation tool
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally and on Travis with my changes
  • Any dependent changes have been merged and published in downstream modules
engine/rpcserver.go Outdated Show resolved Hide resolved
Copy link
Collaborator

left a comment

Seeing this in action is awesome. I know that this is a WIP and have a few comments but am looking forward to seeing it more fleshed out (plus used for tickers too). Great stuff 👍

cmd/gctcli/commands.go Outdated Show resolved Hide resolved
engine/rpcserver.go Outdated Show resolved Hide resolved
exchanges/dispatch/dispatch.go Outdated Show resolved Hide resolved
exchanges/dispatch/dispatch_test.go Outdated Show resolved Hide resolved
exchanges/dispatch/dispatch_test.go Outdated Show resolved Hide resolved
exchanges/dispatch/dispatch.go Outdated Show resolved Hide resolved
@thrasher- thrasher- changed the title Dispatch service Engine: Dispatch service Aug 30, 2019
Copy link
Collaborator

left a comment

This is super cool! Since this is a proof of concept I'll hold off from doing all the niterinos, but I like its setup and you've made sure there aren't any big failure points 🦀

exchanges/dispatch/dispatch.go Outdated Show resolved Hide resolved
exchanges/orderbook/orderbook.go Outdated Show resolved Hide resolved
exchanges/orderbook/orderbook.go Outdated Show resolved Hide resolved
@shazbert shazbert force-pushed the shazbert:dispatch_service branch 2 times, most recently from 80d31e5 to b0412d1 Sep 6, 2019
Copy link
Member

left a comment

No major holds ups from me

Just a question is it worth moving the dispatch package out of exchange as it is not really coupled to the exchange package at all and could cause confusion at a late stage?

var getOrderbookStreamCommand = cli.Command{
Name: "getorderbookstream",
Usage: "gets the orderbook stream for a specific currency pair and exchange",
ArgsUsage: "<exchange> <base> <quote> <asset>",

This comment has been minimized.

Copy link
@xtda

xtda Sep 12, 2019

Member

Minor issue but the arguments for this command are

<exchange> <currencyPair> <asset>

But help displays

<exchange> <base> <quote> <asset>
var getTickerStreamCommand = cli.Command{
Name: "gettickerstream",
Usage: "gets the ticker stream for a specific currency pair and exchange",
ArgsUsage: "<exchange> <base> <quote> <asset>",

This comment has been minimized.

Copy link
@xtda

xtda Sep 12, 2019

Member

same as above

arguments for this command are

<exchange> <currencyPair> <asset>

But help displays

<exchange> <base> <quote> <asset>
return err
}

clear := exec.Command("clear")

This comment has been minimized.

Copy link
@xtda

xtda Sep 12, 2019

Member

This command wont execute on windows (at least in cmd)/some non-linux based operating systems

I do like the screen clean before the output it helps keep it read able might be worth adding support for other operating systems instead of dropping completely

return err
}

clear := exec.Command("clear")

This comment has been minimized.

Copy link
@xtda

xtda Sep 12, 2019

Member

same as above

@shazbert

This comment has been minimized.

Copy link
Collaborator Author

commented Sep 12, 2019

@xtda Yeah I will pull it out.

@shazbert shazbert force-pushed the shazbert:dispatch_service branch from 2636425 to 08188df Sep 12, 2019
@codecov

This comment has been minimized.

Copy link

commented Sep 12, 2019

Codecov Report

Merging #346 into engine will decrease coverage by 0.56%.
The diff coverage is 41.08%.

Impacted file tree graph

@@            Coverage Diff             @@
##           engine     #346      +/-   ##
==========================================
- Coverage   40.25%   39.69%   -0.57%     
==========================================
  Files         151      152       +1     
  Lines       34638    34918     +280     
==========================================
- Hits        13944    13859      -85     
- Misses      19706    20005     +299     
- Partials      988     1054      +66
Impacted Files Coverage Δ
engine/routines.go 0% <ø> (ø) ⬆️
exchanges/poloniex/poloniex_websocket.go 15.14% <0%> (-0.06%) ⬇️
exchanges/gateio/gateio_websocket.go 11.44% <0%> (-5.79%) ⬇️
exchanges/bitstamp/bitstamp_websocket.go 0% <0%> (ø) ⬆️
engine/rpcserver.go 0% <0%> (ø) ⬆️
exchanges/kraken/kraken_websocket.go 0% <0%> (ø) ⬆️
exchanges/huobihadax/huobihadax_websocket.go 0% <0%> (ø) ⬆️
exchanges/binance/binance_websocket.go 0% <0%> (ø) ⬆️
exchanges/bitmex/bitmex_websocket.go 0% <0%> (ø) ⬆️
exchanges/coinbasepro/coinbasepro_websocket.go 0% <0%> (ø) ⬆️
... and 22 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ba7e387...08188df. Read the comment docs.

@codecov

This comment has been minimized.

Copy link

commented Sep 12, 2019

Codecov Report

Merging #346 into engine will increase coverage by 0.31%.
The diff coverage is 70.84%.

Impacted file tree graph

@@            Coverage Diff             @@
##           engine     #346      +/-   ##
==========================================
+ Coverage   42.24%   42.55%   +0.31%     
==========================================
  Files         149      152       +3     
  Lines       33596    34011     +415     
==========================================
+ Hits        14193    14475     +282     
- Misses      18452    18575     +123     
- Partials      951      961      +10
Impacted Files Coverage Δ
engine/routines.go 0% <ø> (ø) ⬆️
exchanges/poloniex/poloniex_websocket.go 15.14% <0%> (-0.06%) ⬇️
exchanges/gateio/gateio_websocket.go 17.22% <0%> (-0.06%) ⬇️
exchanges/bitstamp/bitstamp_websocket.go 0% <0%> (ø) ⬆️
engine/rpcserver.go 0% <0%> (ø) ⬆️
exchanges/binance/binance_websocket.go 0% <0%> (ø) ⬆️
exchanges/bitmex/bitmex_websocket.go 0% <0%> (ø) ⬆️
exchanges/coinbasepro/coinbasepro_websocket.go 0% <0%> (ø) ⬆️
exchanges/zb/zb_websocket.go 1.86% <0%> (-0.01%) ⬇️
engine/engine.go 1.07% <0%> (-0.04%) ⬇️
... and 16 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4a0fcc7...e84736e. Read the comment docs.

Copy link
Collaborator

left a comment

Really epic stuff and love seeing it in action! Some basic nits:

return nil
}

conn, err := setupClient()

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

Please move this to after the pair validation block, so the client doesn't connect until it ensure basic validation of user supplied args

cmd/gctcli/commands.go Show resolved Hide resolved
cmd/gctcli/commands.go Show resolved Hide resolved
cmd/gctcli/commands.go Show resolved Hide resolved
defer conn.Close()

var exchangeName string

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

Please adjust as per the above comments

defer service.RUnlock()
id, ok := service.Exchange[exchange]
if !ok {
return dispatch.Pipe{}, errors.New("exchange orderbooks not found")

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

exchange tickers not found

}

// ProcessTicker processes incoming tickers, creating or updating the Tickers
// list
func ProcessTicker(exchangeName string, tickerNew *Price, assetType asset.Item) error {
if exchangeName == "" {
return fmt.Errorf("%s %s", exchangeName, "name not set")

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

exchangeName would be empty, so you'd get " name not set"

@@ -239,7 +233,7 @@ func TestDeleteWithIDs(t *testing.T) {
Action: "delete",
})
if err != nil {
t.Fatal(err)
fmt.Println(err)

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

t.Error(err)

main.go Outdated
@@ -81,6 +82,8 @@ func main() {
flag.StringVar(&settings.GlobalHTTPUserAgent, "globalhttpuseragent", "", "sets the common HTTP client's user agent")
flag.StringVar(&settings.GlobalHTTPProxy, "globalhttpproxy", "", "sets the common HTTP client's proxy server")

flag.Int64Var(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit")

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

Can move this to core settings as this be used a lot in the future

// required pipes
func (m *Mux) Publish(ids []uuid.UUID, data interface{}) error {
if data == nil {
return errors.New("data payload is nil, that's a little bit of an issue")

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

"data payload is nil"

// GetTickerStream streams the requested updated ticker
func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gctrpc.GoCryptoTrader_GetTickerStreamServer) error {
if r.Exchange == "" {
return errors.New("exchange name unset")

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 12, 2019

Collaborator

It would be nice to have these values as a const as I see them used quite a bit

@shazbert shazbert self-assigned this Sep 16, 2019
}

if service.Tickers[exchange][p.Base.Item][p.Quote.Item][tickerType] == nil {
return Price{}, fmt.Errorf("no orderbooks associated with asset type %s",

This comment has been minimized.

Copy link
@thrasher-

thrasher- Sep 16, 2019

Collaborator

orderbooks -> tickers

Copy link
Collaborator

left a comment

All the things are working! Very nice. I do have some nits though

cmd/gctcli/commands.go Show resolved Hide resolved
dispatch/dispatch.go Outdated Show resolved Hide resolved
dispatch/dispatch.go Outdated Show resolved Hide resolved
dispatch/dispatch.go Outdated Show resolved Hide resolved
dispatch/dispatch.go Outdated Show resolved Hide resolved
exchanges/orderbook/orderbook.go Outdated Show resolved Hide resolved
exchanges/orderbook/orderbook.go Outdated Show resolved Hide resolved
exchanges/websocket/wsorderbook/wsorderbook_test.go Outdated Show resolved Hide resolved
@shazbert shazbert requested review from gloriousCode, xtda and thrasher- Sep 17, 2019
Copy link
Collaborator

left a comment

Only a few small nits this time around. Getting closer 🦀

dispatch/dispatch.go Outdated Show resolved Hide resolved
// GetTicker checks and returns a requested ticker if it exists
func GetTicker(exchange string, p currency.Pair, tickerType asset.Item) (Price, error) {
ticker, err := GetTickerByExchange(exchange)
// SetNewData sets new data

This comment has been minimized.

Copy link
@gloriousCode

gloriousCode Sep 17, 2019

Collaborator

Could this be changed to something like SetNewData updates the ticker data for the associated exchange, currency, asset combination?

exchanges/websocket/wsorderbook/wsorderbook.go Outdated Show resolved Hide resolved
cmd/gctcli/commands.go Outdated Show resolved Hide resolved
@shazbert shazbert added the Review Me label Sep 19, 2019
@shazbert shazbert requested a review from gloriousCode Sep 19, 2019
Copy link
Collaborator

left a comment

I'm running out of nits!... But here are some more small nits 😄 Thanks for making all the other changes 👍

dispatch/dispatch.go Outdated Show resolved Hide resolved
dispatch/mux.go Outdated Show resolved Hide resolved
exchanges/ticker/ticker.go Outdated Show resolved Hide resolved
exchanges/ticker/ticker.go Show resolved Hide resolved
exchanges/websocket/wsorderbook/wsorderbook_test.go Outdated Show resolved Hide resolved
dispatch/dispatch.go Show resolved Hide resolved
@shazbert shazbert force-pushed the shazbert:dispatch_service branch 3 times, most recently from 9cd26ca to 3265877 Sep 27, 2019
Copy link
Collaborator

left a comment

Nits addressed; last things I've noticed is that the ticker and orderbook package test coverage have both decreased (both are 100% on latest commit in engine)

Copy link
Member

left a comment

Everything else has pretty much been addressed only one minor change from me

Although there is some feedback regarding the dispatchworkers command line flag

Is it worth adding a runtime check to make sure you can't do something like -dispatchworkers 6000000000 and have a happy little accident on their machine ?

newC := atomic.LoadInt64(&d.count)

if oldC == newC {
return errors.New("dispatcher worker counts are off")

This comment has been minimized.

Copy link
@xtda

xtda Oct 1, 2019

Member

Can this error message be extended?

What are the issues with worker counts being off after a dropWorker() is called should the user be concerned at all if a worker has not been deallocated/shutdown

This comment has been minimized.

Copy link
@shazbert

shazbert Oct 1, 2019

Author Collaborator

Not really it was more for me when I was writing it. I can actually remove that.

shazbert added 22 commits Sep 16, 2019
…reset per instance and removed returning unused channel on error
Reset timer and bleed buffered timer chan if needed in dispatch.go
Added in ticker.Stop() and timer.Stop() functions for worker routine return in dispatch.go
Index aggregated bid and ask functions for orderbook.go
Added in dummy slice for wsorderbook_test.go
…tch.go

Fix various linter issues dispatch.go
@shazbert shazbert force-pushed the shazbert:dispatch_service branch from 369207b to 1d2e40c Oct 1, 2019
Copy link
Collaborator

left a comment

Copy link
Collaborator

left a comment

tACK and thanks for fixing the nitterinos!

Copy link
Collaborator

left a comment

tACK! Not a single recieveieive typo! 🎉

t.Error("error cannot be nil")
}

id, errrrrrrrr := dispatcher.getNewID()

This comment has been minimized.

Copy link
@gloriousCode

gloriousCode Oct 2, 2019

Collaborator

The extra r's are for rad 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙 🤙

@xtda
xtda approved these changes Oct 2, 2019
Copy link
Member

left a comment

Tested on Linux works great
Approved

@thrasher- thrasher- merged commit db317a2 into thrasher-corp:engine Oct 2, 2019
4 of 5 checks passed
4 of 5 checks passed
Review Action 0 issues found before hitting an error.
Details
Travis CI - Pull Request Build Passed
Details
codecov/patch 70.84% of diff hit (target 42.24%)
Details
codecov/project 42.55% (+0.31%) compared to 4a0fcc7
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.