Skip to content

Commit

Permalink
Merge branch 'imp/arch' of ssh://github.com/renproject/lightnode into…
Browse files Browse the repository at this point in the history
… imp/arch
  • Loading branch information
vinceau committed Aug 1, 2019
2 parents d3ded08 + 079689b commit 07f42d8
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 97 deletions.
23 changes: 22 additions & 1 deletion cacher/cacher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cacher

import (
"fmt"
"time"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -65,8 +66,9 @@ func (cacher *Cacher) Handle(_ phi.Task, message phi.Message) {
data := append(params, []byte(msg.Request.Method)...)
reqID := hash(data)

cachable := isCachable(msg.Request.Method)
response, cached := cacher.get(reqID)
if cached {
if cachable && cached {
msg.Responder <- response
} else {
responder := make(chan jsonrpc.Response, 1)
Expand Down Expand Up @@ -112,6 +114,25 @@ func (cacher *Cacher) get(id ID) (jsonrpc.Response, bool) {
return jsonrpc.Response{}, false
}

func isCachable(method string) bool {
switch method {
case jsonrpc.MethodQueryBlock,
jsonrpc.MethodQueryBlocks,
jsonrpc.MethodQueryNumPeers,
jsonrpc.MethodQueryPeers,
jsonrpc.MethodQueryEpoch,
jsonrpc.MethodQueryStat:
return true
case jsonrpc.MethodSubmitTx,
jsonrpc.MethodQueryTx:
// TODO: We need to make sure these are the only methods that we want to
// avoid caching.
return false
default:
panic(fmt.Sprintf("[cacher] unsupported method %s encountered which should have been rejected by the previous checks", method))
}
}

func hash(data []byte) ID {
return sha3.Sum256(data)
}
5 changes: 5 additions & 0 deletions cacher/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ var _ = Describe("Cacher", func() {
continue
}

// We have disabled caching for these methods.
if method == jsonrpc.MethodSubmitTx || method == jsonrpc.MethodQueryTx {
continue
}

// Send the first request
request := testutils.ValidRequest(method)
reqWithRes := server.NewRequestWithResponder(request)
Expand Down
2 changes: 1 addition & 1 deletion cmd/lightnode/lightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
var ttl time.Duration
ttlInt, err := strconv.Atoi(os.Getenv("TTL"))
if err != nil {
ttl = 5 * time.Minute
ttl = 1 * time.Minute
} else {
ttl = time.Duration(ttlInt) * time.Minute
}
Expand Down
82 changes: 61 additions & 21 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package dispatcher

import (
"encoding/hex"
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/renproject/darknode/addr"
"github.com/renproject/darknode/jsonrpc"
"github.com/renproject/lightnode/client"
Expand Down Expand Up @@ -56,27 +58,17 @@ func (dispatcher *Dispatcher) Handle(_ phi.Task, message phi.Message) {
phi.ParForAll(addrs, func(i int) {
response, err := client.SendToDarknode(client.URLFromMulti(addrs[i]), msg.Request, dispatcher.timeout)
if err != nil {
errMsg := fmt.Sprintf("lightnode could not forward response to darknode: %v", err)
err := jsonrpc.NewError(server.ErrorCodeForwardingError, errMsg, json.RawMessage{})
response := jsonrpc.NewResponse(0, nil, &err)
responses <- response
} else {
responses <- response
errMsg := fmt.Errorf("lightnode could not forward request to darknode: %v", err)
jsonErr := jsonrpc.NewError(server.ErrorCodeForwardingError, errMsg.Error(), nil)
response = jsonrpc.NewResponse(msg.Request.ID, nil, &jsonErr)
}
responses <- response
})
close(responses)
}()

go func() {
i := 1
for res := range responses {
done, response := resIter.update(res, i == len(addrs))
if done {
msg.Responder <- response
return
}
i++
}
msg.Responder <- resIter.get(responses)
}()
}

Expand All @@ -96,7 +88,9 @@ func (dispatcher *Dispatcher) multiAddrs(method string) (addr.MultiAddresses, er
case jsonrpc.MethodQueryBlocks:
return dispatcher.multiStore.AddrsRandom(3)
case jsonrpc.MethodSubmitTx:
return dispatcher.multiStore.AddrsAll()
// TODO: Eventually, we would want a more sophisticated way of sending
// these messages.
return dispatcher.multiStore.AddrsFirst()
case jsonrpc.MethodQueryTx:
return dispatcher.multiStore.AddrsRandom(3)
case jsonrpc.MethodQueryNumPeers:
Expand All @@ -123,8 +117,6 @@ func newResponseIter(method string) responseIterator {
case jsonrpc.MethodQueryBlocks:
return newFirstResponseIterator()
case jsonrpc.MethodSubmitTx:
// TODO: This should instead return an iterator that will check for a
// threshold of consistent responses.
return newFirstResponseIterator()
case jsonrpc.MethodQueryTx:
return newFirstResponseIterator()
Expand All @@ -142,7 +134,7 @@ func newResponseIter(method string) responseIterator {
}

type responseIterator interface {
update(jsonrpc.Response, bool) (bool, jsonrpc.Response)
get(<-chan jsonrpc.Response) jsonrpc.Response
}

type firstResponseIterator struct{}
Expand All @@ -151,6 +143,54 @@ func newFirstResponseIterator() responseIterator {
return firstResponseIterator{}
}

func (firstResponseIterator) update(res jsonrpc.Response, final bool) (bool, jsonrpc.Response) {
return true, res
func (firstResponseIterator) get(responses <-chan jsonrpc.Response) jsonrpc.Response {
// Return the first response from the channel.
select {
case response := <-responses:
return response
}
}

type majorityResponseIterator struct{}

func newMajorityResponseIterator() responseIterator {
return majorityResponseIterator{}
}

func (iter majorityResponseIterator) get(responses <-chan jsonrpc.Response) jsonrpc.Response {
// The key in these maps is the hash of the result or error (depending on
// whether or not the error is nil).
responseCount := map[string]int{}
responseMap := map[string]jsonrpc.Response{}
for response := range responses {
// Hash the response/error.
var bytes []byte
var err error
if response.Error == nil {
bytes, err = json.Marshal(response.Result)
} else {
bytes, err = json.Marshal(response.Error)
}
if err != nil {
return response
}
hash := hex.EncodeToString(crypto.Keccak256(bytes))

// Increment the count of the hash and store the response in a map for
// easy access.
responseCount[hash]++
responseMap[hash] = response
}

var response jsonrpc.Response
for hash, count := range responseCount {
if count >= (len(responses)+1)*2/3 {
return responseMap[hash]
}
response = responseMap[hash]
}

// TODO: If the response is not consistent across the majority, we just
// return an arbitrary one. We may want to return an error instead?
return response
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

require (
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8
github.com/ethereum/go-ethereum v1.9.0
github.com/evalphobia/logrus_sentry v0.8.2
github.com/go-kit/kit v0.9.0 // indirect
github.com/go-logfmt/logfmt v0.4.0 // indirect
Expand All @@ -20,11 +21,9 @@ require (
github.com/renproject/kv v0.0.0-20190710041758-5f4eb2a837a8
github.com/renproject/mercury v0.0.0-20190723012846-50264c6eb7d6 // indirect
github.com/renproject/phi v0.0.0-20190713013721-51f586bc4816
github.com/republicprotocol/co-go v0.0.0-20180723052914-4e299fdb0e80
github.com/rs/cors v1.6.0
github.com/sirupsen/logrus v1.4.2
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)
11 changes: 3 additions & 8 deletions server/ratelimiter/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@ func New() RateLimiter {
// been exceeded. It will also return false if the method is not supported
// (i.e. unsupported methods have rate limits of 0/s).
func (rl *RateLimiter) Allow(method, addr string) bool {
limiter, ok := rl.limiters[method]
if !ok {
// NOTE: This return value hides the fact that the method is not
// supported. The fact that the method is not supported should be
// checked elsewhere and suitable indication that this is the case
// should be provided.
return false
}
// NOTE: We assume it has been made sure that the method exists prior to
// this stage.
limiter := rl.limiters[method]
return limiter.IPAddressLimiter(addr).Allow()
}
39 changes: 31 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func New(logger logrus.FieldLogger, port string, options Options, validator phi.
// Run starts the `Server` listening on its port. This function is blocking.
func (server *Server) Run() {
r := mux.NewRouter()
r.HandleFunc("/", server.handleFunc)
r.HandleFunc("/", server.handleFunc).Methods("POST")
r.Use(recoveryHandler)

httpHandler := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
Expand All @@ -80,7 +81,7 @@ func (server *Server) handleFunc(w http.ResponseWriter, r *http.Request) {
if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
err := jsonrpc.NewError(jsonrpc.ErrorCodeInvalidJSON, "lightnode could not decode JSON request", nil)
response := jsonrpc.NewResponse(0, nil, &err)
server.writeResponses(w, []jsonrpc.Response{response})
writeResponses(w, []jsonrpc.Response{response})
return
}
// Unmarshal requests with support for batching
Expand All @@ -93,7 +94,7 @@ func (server *Server) handleFunc(w http.ResponseWriter, r *http.Request) {
if err := json.Unmarshal(rawMessage, &req); err != nil {
err := jsonrpc.NewError(jsonrpc.ErrorCodeInvalidJSON, "lightnode could not parse JSON request", nil)
response := jsonrpc.NewResponse(0, nil, &err)
server.writeResponses(w, []jsonrpc.Response{response})
writeResponses(w, []jsonrpc.Response{response})
return
}
reqs = []jsonrpc.Request{req}
Expand All @@ -105,7 +106,7 @@ func (server *Server) handleFunc(w http.ResponseWriter, r *http.Request) {
errMsg := fmt.Sprintf("maximum batch size exceeded: maximum is %v but got %v", server.options.MaxBatchSize, batchSize)
err := jsonrpc.NewError(ErrorCodeMaxBatchSizeExceeded, errMsg, nil)
response := jsonrpc.NewResponse(0, nil, &err)
server.writeResponses(w, []jsonrpc.Response{response})
writeResponses(w, []jsonrpc.Response{response})
return
}

Expand All @@ -114,10 +115,20 @@ func (server *Server) handleFunc(w http.ResponseWriter, r *http.Request) {
responses := make([]jsonrpc.Response, len(reqs))
phi.ParForAll(reqs, func(i int) {
method := reqs[i].Method

// Ensure method exists prior to checking rate limit.
_, ok := jsonrpc.RPCs[method]
if !ok {
err := jsonrpc.NewError(ErrorCodeRateLimitExceeded, "unsupported method", nil)
response := jsonrpc.NewResponse(reqs[i].ID, nil, &err)
responses[i] = response
return
}

if !server.rateLimiter.Allow(method, r.RemoteAddr) {
err := jsonrpc.NewError(ErrorCodeRateLimitExceeded, "rate limit exceeded", nil)
response := jsonrpc.NewResponse(0, nil, &err)
server.writeResponses(w, []jsonrpc.Response{response})
response := jsonrpc.NewResponse(reqs[i].ID, nil, &err)
responses[i] = response
return
}

Expand All @@ -126,12 +137,12 @@ func (server *Server) handleFunc(w http.ResponseWriter, r *http.Request) {
responses[i] = <-reqWithResponder.Responder
})

if err := server.writeResponses(w, responses); err != nil {
if err := writeResponses(w, responses); err != nil {
server.logger.Errorf("error writing http response: %v", err)
}
}

func (server *Server) writeResponses(w http.ResponseWriter, responses []jsonrpc.Response) error {
func writeResponses(w http.ResponseWriter, responses []jsonrpc.Response) error {
w.Header().Set("Content-Type", "application/json")
if len(responses) == 1 {
// We use the `writeError` function to determine the relevant status code
Expand Down Expand Up @@ -159,6 +170,18 @@ func writeError(w http.ResponseWriter, id interface{}, err jsonrpc.Error) error
return json.NewEncoder(w).Encode(jsonrpc.NewResponse(id, nil, &err))
}

func recoveryHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
jsonErr := jsonrpc.NewError(ErrorCodeRateLimitExceeded, fmt.Sprintf("recovered from a panic in the lightnode: %v", err), nil)
writeError(w, 0, jsonErr)
}
}()
h.ServeHTTP(w, r)
})
}

// RequestWithResponder wraps a `jsonrpc.Request` with a responder channel that
// the response will be written to.
type RequestWithResponder struct {
Expand Down
17 changes: 17 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"errors"
"math/rand"

"github.com/renproject/darknode/addr"
Expand Down Expand Up @@ -51,6 +52,22 @@ func (multiStore *MultiAddrStore) AddrsAll() (addr.MultiAddresses, error) {
return addrs, nil
}

// AddrsFirst returns the first multi addressses in the store.
func (multiStore *MultiAddrStore) AddrsFirst() (addr.MultiAddresses, error) {
for iter := multiStore.store.Iterator(); iter.Next(); {
str, err := iter.Key()
if err != nil {
return nil, err
}
address, err := addr.NewMultiAddressFromString(str)
if err != nil {
return nil, err
}
return addr.MultiAddresses{address}, nil
}
return nil, errors.New("no multi address in store")
}

// AddrsRandom returns a random number of addresses from the store.
func (multiStore *MultiAddrStore) AddrsRandom(n int) (addr.MultiAddresses, error) {
addrs, err := multiStore.AddrsAll()
Expand Down
Loading

0 comments on commit 07f42d8

Please sign in to comment.