Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Lightnodes to use Phi, support caching requests and more #9

Merged
merged 114 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
ab4881b
initial server impl
ross-pure Jul 19, 2019
b4014a6
basic server workflow
ross-pure Jul 19, 2019
78ce65d
server: minimal server impl
ross-pure Jul 22, 2019
b841b03
server: export wrapped request
ross-pure Jul 22, 2019
9b4b6a5
validator: minimal impl
ross-pure Jul 22, 2019
3440bae
validator: fix field name
ross-pure Jul 22, 2019
dd7681b
cacher: minimal impl
ross-pure Jul 22, 2019
6d9363a
dispatcher: minimal impl
ross-pure Jul 23, 2019
ecaf4c2
abstract client out of dispatcher
ross-pure Jul 23, 2019
482a729
updater: minimal impl
ross-pure Jul 23, 2019
7486600
fix deps and dispatcher address construction
ross-pure Jul 23, 2019
98a6639
cacher: template caching logic
ross-pure Jul 23, 2019
b3bd351
initial lightnode impl
ross-pure Jul 23, 2019
969908c
server: remove unused wrapper types
ross-pure Jul 23, 2019
824c08d
server: enforce rate limits per method
ross-pure Jul 23, 2019
9711015
server: start returning proper error responses
ross-pure Jul 23, 2019
6bc6b36
server: add custom error code
ross-pure Jul 23, 2019
013d88e
server: implement all error responses
ross-pure Jul 23, 2019
c492796
cacher: basic cacher in memory implementation
divyakoshy Jul 23, 2019
e13dae0
cacher: fixed request ids
divyakoshy Jul 23, 2019
52c6f36
return meaningful error responses
ross-pure Jul 23, 2019
a014213
Merge branch 'imp/arch' of github.com:renproject/lightnode into imp/arch
ross-pure Jul 23, 2019
a1fb241
separate cache and task capacities
ross-pure Jul 23, 2019
50e6b05
server: remove unused interface
ross-pure Jul 23, 2019
e237ec6
server_test: basic pass-through test
ross-pure Jul 24, 2019
1ff8a12
server: remove debugging message
ross-pure Jul 24, 2019
18c15a2
server_test: helper function to wait for server init
ross-pure Jul 24, 2019
fd79f6d
testutils: valid request constructor
ross-pure Jul 24, 2019
bf28238
validator: mostly complete impl
ross-pure Jul 24, 2019
5cc60b4
dispatcher: add more complete impls
ross-pure Jul 24, 2019
b052415
update cacher to use lru cache
divyakoshy Jul 24, 2019
3eeb9e9
Merge branch 'imp/arch' of github.com:renproject/lightnode into imp/arch
divyakoshy Jul 24, 2019
8e0b5ee
validator_test: basic test implemented and passing
ross-pure Jul 24, 2019
ff869fe
Merge branch 'imp/arch' of github.com:renproject/lightnode into imp/arch
ross-pure Jul 24, 2019
c60d4be
validator_test: add basic negative tests
ross-pure Jul 24, 2019
36eabe6
testutils: fix field type
ross-pure Jul 24, 2019
7da83f4
dispatcher: change node selection policy
ross-pure Jul 24, 2019
ed33a55
cacher_test: basic tests written and passing
ross-pure Jul 25, 2019
e14662b
abstract multistore and document exported items
ross-pure Jul 25, 2019
f3690b3
add linting to ci/cd
ross-pure Jul 25, 2019
d41d5ca
circlci: fix whitespace
ross-pure Jul 25, 2019
78bdfb6
circlci: install golint
ross-pure Jul 25, 2019
4605b54
testutils: initial impl of a mock darknode for testing
ross-pure Jul 26, 2019
e745e09
updater_test: basic test written and passing
ross-pure Jul 26, 2019
6910749
dispatcher_test: basic test written and passing
ross-pure Jul 26, 2019
0fca5e8
ignore coverage files
ross-pure Jul 26, 2019
30e1b39
remove html cover file
ross-pure Jul 26, 2019
6c4bf7a
cmd: add basic cmd file
ross-pure Jul 29, 2019
e855d52
cmd/lightnode: create subdirectory for cmd
jazg Jul 30, 2019
945c093
git: add .env to gitignore
jazg Jul 30, 2019
434ecb9
heroku: add procfile
jazg Jul 30, 2019
c8bc804
validator: remove parameter check for QueryPeers and QueryNumPeers
jazg Jul 31, 2019
40a57f6
server: fix writing duplicate responses
jazg Jul 31, 2019
b11c01b
lightnode: add bootstrap addresses to store upon initialisation
jazg Jul 31, 2019
cd49a63
ignore coverage outputs
ross-pure Jul 31, 2019
7d0fb20
client_test: basic client test
ross-pure Jul 31, 2019
81e7a0a
Merge branch 'imp/arch' of github.com:renproject/lightnode into imp/arch
ross-pure Jul 31, 2019
d83c46f
validator: fix marshalling invalid json
jazg Jul 31, 2019
bc224f1
server: return relevant status codes based on json rpc error code
jazg Jul 31, 2019
8512179
Merge branch 'imp/arch' of https://github.com/renproject/lightnode in…
jazg Jul 31, 2019
15951d1
validator: add check to ensure no params are provided
jazg Jul 31, 2019
c291ba3
validator_test: fix expected json data
jazg Jul 31, 2019
e5b8fff
validator: minor formatting fix
divyakoshy Jul 31, 2019
64e4338
store: add tests, minor fixes
divyakoshy Jul 31, 2019
fd33bb7
server: fix error response json data
jazg Jul 31, 2019
7726134
README: use shield style circleci badge
vinceau Aug 1, 2019
b14c39b
circleci: update outdated orb version
vinceau Aug 1, 2019
7183766
Merge branch 'imp/arch' of https://github.com/renproject/lightnode in…
jazg Aug 1, 2019
73dca7e
store: fix potential panic in test
vinceau Aug 1, 2019
a0b6099
Merge branch 'imp/arch' of ssh://github.com/renproject/lightnode into…
vinceau Aug 1, 2019
96586c6
README: add coveralls badge
vinceau Aug 1, 2019
d3ded08
circleci: add coverage
vinceau Aug 1, 2019
3b6753a
dispatcher: add majority response iterator
jazg Aug 1, 2019
f673356
server: add request id to error response
jazg Aug 1, 2019
3742d83
Merge branch 'imp/arch' of https://github.com/renproject/lightnode in…
jazg Aug 1, 2019
be4aeeb
server: fix writing duplicate responses after exceeding rate limit
jazg Aug 1, 2019
29deec6
server, ratelimiter: check method exists prior to checking rate limit
jazg Aug 1, 2019
d638fce
validator: return errors instead of strings
jazg Aug 1, 2019
27e0b10
validator: remove unnecessary parameter checks
jazg Aug 1, 2019
8ae23be
validator_test: remove unnecessary parameter checks
jazg Aug 1, 2019
287705e
cacher: disable caching for SubmitTx and QueryTx methods
jazg Aug 1, 2019
cd166a6
cmd/lightnode: decrease default ttl duration
jazg Aug 1, 2019
73e58f9
cacher_test: skip over methods that have caching disabled
jazg Aug 1, 2019
fdc3216
server: add recovery handler
jazg Aug 1, 2019
7a6a902
dispatcher, store: send SubmitTx requests to first darknode only
jazg Aug 1, 2019
079689b
dispatcher: add todo
jazg Aug 1, 2019
07f42d8
Merge branch 'imp/arch' of ssh://github.com/renproject/lightnode into…
vinceau Aug 1, 2019
2a6d163
server: fix returning incorrect error codes
jazg Aug 1, 2019
7241236
Merge branch 'imp/arch' of https://github.com/renproject/lightnode in…
jazg Aug 1, 2019
daa8821
client: remove assumption about validity of darknode responses
jazg Aug 2, 2019
4d08132
server: add health check endpoint
vinceau Aug 2, 2019
c6ad536
client: add retry logic
jazg Aug 2, 2019
3cd96b9
dispatcher_test: increase "eventually" timeout
jazg Aug 2, 2019
a55f54f
dispatcher_test: remove unused parameter
jazg Aug 2, 2019
51fc75b
Merge branch 'imp/arch' of https://github.com/renproject/lightnode in…
jazg Aug 2, 2019
fa43e4d
go: update darknode dependency
jazg Aug 12, 2019
b6c9a7a
go: tidy dependencies
jazg Aug 12, 2019
eadd4ea
cacher, dispathcer, server, store: add support for specifying darknod…
jazg Aug 13, 2019
fcbb70a
cacher_test, dispatcher_test, validator_test: fix compilation
jazg Aug 13, 2019
c4a3f01
testutils: randomise ren address
jazg Aug 13, 2019
806ff25
updater, lightnode: add check for bootstrap address prior to removing…
jazg Aug 13, 2019
482bcef
validator: validate darknode id in request
jazg Aug 13, 2019
241ba6a
validator_test, updater_test: fix compilation
jazg Aug 13, 2019
3327585
server, validator: remove unnecessary custom error code
jazg Aug 13, 2019
c1b99cd
go: update darknode dependency
jazg Aug 19, 2019
5e28112
lightnode, dispatcher, store: fix returning leader address
jazg Aug 19, 2019
03dbf6e
cmd/lightnode: reduce default cache ttl
jazg Aug 20, 2019
2e00371
cacher: remove unused lru cache
jazg Aug 20, 2019
74995e3
dispatcher_test, store_test, updater_test, validator_test: fix compil…
jazg Aug 21, 2019
699d9a0
dispatcher: use majority response iterator for block/tx queries
jazg Aug 21, 2019
c32e2cf
updater: fix multi-address comparison
jazg Sep 4, 2019
915e603
dispatcher: fix majority response iterator not returning if there is …
jazg Sep 13, 2019
0c0de9c
cmd/lightnode: reduce default ttl
jazg Sep 24, 2019
2e5c528
Merge branch 'master' into imp/arch
jazg Oct 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: 2.1
orbs:
core: ren/circleci-orbs@dev:first
core: ren/core@0.0.1
executors:
go_exec:
docker:
Expand All @@ -12,7 +12,34 @@ jobs:
- checkout
- restore_cache: # Restore saved cache if no changes are detected since last run
key: go-mod-v1-{{ checksum "go.sum" }}
- run: go test -v ./...
- run:
name: Install tools
command: |
go get -u golang.org/x/lint/golint
go get -u github.com/onsi/ginkgo/ginkgo
go get -u github.com/loongy/covermerge
go get -u github.com/mattn/goveralls
- run:
name: Run tests
command: |
# make sure all the tests pass
go test -v ./...
# all the tests passed so update coverage
CI=true /go/bin/ginkgo -v --race --cover --coverprofile coverprofile.out ./...
/go/bin/covermerge \
validator/coverprofile.out \
coverprofile.out \
dispatcher/coverprofile.out \
server/ratelimiter/coverprofile.out \
server/coverprofile.out \
updater/coverprofile.out \
client/coverprofile.out \
cacher/coverprofile.out \
store/coverprofile.out > covermerge.out
goveralls -coverprofile=covermerge.out -service=circleci
- run:
name: Run linter
command: golint ./...
- save_cache:
key: go-mod-v1-{{ checksum "go.sum" }}
paths:
Expand Down Expand Up @@ -42,4 +69,4 @@ workflows:
- master
jobs:
- core/merge_beta:
executor: go_exec
executor: go_exec
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Test coverage
*.coverprofile
*.out
*.html

# Environment variables
.env
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# `🚀 lightnode`

[![CircleCI](https://circleci.com/gh/renproject/lightnode/tree/master.svg?style=svg)](https://circleci.com/gh/renproject/lightnode/tree/master)
[![CircleCI](https://circleci.com/gh/renproject/lightnode/tree/master.svg?style=shield)](https://circleci.com/gh/renproject/lightnode/tree/master)
[![Coverage Status](https://coveralls.io/repos/github/renproject/lightnode/badge.svg?branch=master)](https://coveralls.io/github/renproject/lightnode?branch=master)

A node used for querying Darknodes using JSON-RPC 2.0 interfaces. Featuring query caching (for performance) as well as retrying for failed requests (for reliability).

Expand Down
125 changes: 125 additions & 0 deletions cacher/cacher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package cacher

import (
"fmt"
"time"

"github.com/renproject/darknode/jsonrpc"
"github.com/renproject/kv"
"github.com/renproject/lightnode/server"
"github.com/renproject/phi"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/sha3"
)

// ID is a key for a cached response.
type ID [32]byte

func (id ID) String() string {
return string(id[:32])
}

// Cacher is a task responsible for caching responses for corresponding
// requests. Upon receiving a request (in the current architecture this request
// comes from the `Validator`) it will check its cache to see if it has a
// cached response. If it does, it will write this immediately as a repsonse,
// otherwise it will forward the request on to the `Dispatcher`. Once the
// `Dispatcher` has a response ready, the `Cacher` will store this response in
// its cache with a key derived from the request, and then pass the repsonse
// along to be given to the client. Currently, idempotent requests are stored
// in a LRU cache, and non-idempotent requests are stored in a TTL cache.
type Cacher struct {
logger logrus.FieldLogger
dispatcher phi.Sender

ttlCache kv.Iterable
}

// New constructs a new `Cacher` as a `phi.Task` which can be `Run()`.
func New(dispatcher phi.Sender, logger logrus.FieldLogger, cap int, ttl time.Duration, opts phi.Options) phi.Task {
ttlCache, err := kv.NewTTLCache(kv.NewJSON(kv.NewMemDB()), ttl)
if err != nil {
logger.Panicf("[cacher] cannot create TTL cache: %v", err)
}
return phi.New(&Cacher{logger, dispatcher, ttlCache}, opts)
}

// Handle implements the `phi.Handler` interface.
func (cacher *Cacher) Handle(_ phi.Task, message phi.Message) {
msg, ok := message.(server.RequestWithResponder)
if !ok {
cacher.logger.Panicf("[cacher] unexpected message type %T", message)
}

params, err := msg.Request.Params.MarshalJSON()
if err != nil {
cacher.logger.Errorf("[cacher] cannot marshal request to json: %v", err)
}

data := append(params, []byte(msg.Request.Method)...)
reqID := hash(data)

cachable := isCachable(msg.Request.Method)
response, cached := cacher.get(reqID, msg.DarknodeID)
if cachable && cached {
msg.Responder <- response
} else {
responder := make(chan jsonrpc.Response, 1)
cacher.dispatcher.Send(server.RequestWithResponder{
Request: msg.Request,
Responder: responder,
DarknodeID: msg.DarknodeID,
})

// TODO: What do we do when a second request comes in that is already
// being fetched at the moment? Currently it will also send it to the
// dispatcher, which is probably not ideal.
go func() {
response := <-responder
// TODO: Consider thread safety of insertion.
cacher.insert(reqID, msg.DarknodeID, response, msg.Request.Method)
msg.Responder <- response
}()
}
}

func (cacher *Cacher) insert(reqID ID, darknodeID string, response jsonrpc.Response, method string) {
id := reqID.String() + darknodeID
if err := cacher.ttlCache.Insert(id, response); err != nil {
cacher.logger.Panicf("[cacher] could not insert response into TTL cache: %v", err)
}
}

func (cacher *Cacher) get(reqID ID, darknodeID string) (jsonrpc.Response, bool) {
id := reqID.String() + darknodeID

var response jsonrpc.Response
if err := cacher.ttlCache.Get(id, &response); err == nil {
return response, true
}

return jsonrpc.Response{}, false
}

func isCachable(method string) bool {
switch method {
case jsonrpc.MethodQueryBlock,
jsonrpc.MethodQueryBlocks,
jsonrpc.MethodQueryNumPeers,
jsonrpc.MethodQueryPeers,
jsonrpc.MethodQueryEpoch,
jsonrpc.MethodQueryStat:
return true
case jsonrpc.MethodSubmitTx,
jsonrpc.MethodQueryTx:
// TODO: We need to make sure these are the only methods that we want to
// avoid caching.
return false
default:
panic(fmt.Sprintf("[cacher] unsupported method %s encountered which should have been rejected by the previous checks", method))
}
}

func hash(data []byte) ID {
return sha3.Sum256(data)
}
13 changes: 13 additions & 0 deletions cacher/cacher_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cacher_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestCacher(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Cacher Suite")
}
108 changes: 108 additions & 0 deletions cacher/cacher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cacher_test

import (
"context"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/renproject/darknode/jsonrpc"
"github.com/renproject/lightnode/cacher"
"github.com/renproject/lightnode/server"
"github.com/renproject/lightnode/testutils"
"github.com/renproject/phi"
"github.com/sirupsen/logrus"
)

func initCacher(ctx context.Context, cacheCap int, ttl time.Duration) (phi.Sender, <-chan phi.Message) {
opts := phi.Options{Cap: 10}
logger := logrus.New()
inspector, messages := testutils.NewInspector(10)
cacher := cacher.New(inspector, logger, cacheCap, ttl, opts)

go cacher.Run(ctx)
go inspector.Run(ctx)

return cacher, messages
}

var _ = Describe("Cacher", func() {
Context("When receving a request that does not have a response in the cache", func() {
It("Should pass the request through", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cacher, messages := initCacher(ctx, 10, time.Second)

for method, _ := range jsonrpc.RPCs {
// TODO: This method is not supported right now, but when it is
// this case should be tested too.
if method == jsonrpc.MethodQueryEpoch {
continue
}

request := testutils.ValidRequest(method)
cacher.Send(server.NewRequestWithResponder(request, ""))

select {
case <-time.After(time.Second):
Fail("timeout")
case message := <-messages:
req, ok := message.(server.RequestWithResponder)
Expect(ok).To(BeTrue())
Expect(req.Request).To(Equal(request))
Expect(req.Responder).To(Not(BeNil()))
Eventually(req.Responder).ShouldNot(Receive())
}
}
})
})

Context("when receiving a request that has a response in the cache", func() {
It("Should return the cached response", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cacher, messages := initCacher(ctx, 10, 1*time.Second)

for method, _ := range jsonrpc.RPCs {
// TODO: This method is not supported right now, but when it is
// this case should be tested too.
if method == jsonrpc.MethodQueryEpoch {
continue
}

// We have disabled caching for these methods.
if method == jsonrpc.MethodSubmitTx || method == jsonrpc.MethodQueryTx {
continue
}

// Send the first request
request := testutils.ValidRequest(method)
reqWithRes := server.NewRequestWithResponder(request, "")
cacher.Send(reqWithRes)
forwardedReq := <-messages
res := testutils.ErrorResponse(request.ID)
forwardedReq.(server.RequestWithResponder).Responder <- res

select {
case <-time.After(time.Second):
Fail("timeout")
case response := <-reqWithRes.Responder:
Expect(response).To(Equal(res))
}

// Send the second request and expect a cached response
request = testutils.ValidRequest(method)
reqWithRes = server.NewRequestWithResponder(request, "")
cacher.Send(reqWithRes)

select {
case <-time.After(time.Second):
Fail("timeout")
case response := <-reqWithRes.Responder:
Expect(response).To(Equal(res))
Eventually(messages).ShouldNot(Receive())
}
}
})
})
})
67 changes: 67 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/renproject/darknode/addr"
"github.com/renproject/darknode/jsonrpc"
)

// SendToDarknode sends a given request to the darknode at the given url. The
// timeout is the timeout for the request.
//
// NOTE: If err is not nil, it is expected that the caller will construct an
// appropriate error response message.
func SendToDarknode(url string, req jsonrpc.Request, timeout time.Duration) (jsonrpc.Response, error) {
httpClient := new(http.Client)
httpClient.Timeout = timeout

// Construct HTTP request.
body, err := json.Marshal(req)
if err != nil {
return jsonrpc.Response{}, fmt.Errorf("[client] could not marshal request: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), httpClient.Timeout)
defer cancel()
r, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body))
if err != nil {
return jsonrpc.Response{}, fmt.Errorf("[client] could not create http request: %v", err)
}
r = r.WithContext(ctx)
r.Header.Set("Content-Type", "application/json")

endTime := time.Now().Add(timeout)
for {
// Retry until timeout.
if time.Now().After(endTime) {
return jsonrpc.Response{}, fmt.Errorf("timeout: %v", err)
}

// Send request.
response, err := httpClient.Do(r)
if err != nil {
continue
}

// Read response.
var resp jsonrpc.Response
if err := json.NewDecoder(response.Body).Decode(&resp); err != nil {
continue
}

return resp, nil
}
}

// URLFromMulti converts a `addr.MultiAddress` to a url string appropriate for
// sending a JSON-RPC request to a darknode. The port is defined in the multi
// address is incremented by one because of darknode specific logic about what
// the JSON-RPC port is.
func URLFromMulti(addr addr.MultiAddress) string {
return fmt.Sprintf("http://%s:%v", addr.IP4(), addr.Port()+1)
}
13 changes: 13 additions & 0 deletions client/client_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package client_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestClient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Client Suite")
}
Loading