Skip to content

Commit

Permalink
Improve request package and adjust bitfinex rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
thrasher- committed Jul 17, 2018
1 parent 950d66e commit a5f5132
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 30 deletions.
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func TestSetup(t *testing.T) {
}
b.AuthenticatedAPISupport = true
// custom rate limit for testing
b.Requester.SetRateLimit(true, time.Second*20, 1)
b.Requester.SetRateLimit(false, time.Second*20, 1)
b.Requester.SetRateLimit(true, time.Millisecond*300, 1)
b.Requester.SetRateLimit(false, time.Millisecond*300, 1)
}

func TestGetPlatformStatus(t *testing.T) {
Expand Down
135 changes: 107 additions & 28 deletions exchanges/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ import (

var supportedMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "OPTIONS", "CONNECT"}

const (
maxRequestJobs = 50
)

// Requester struct for the request client
type Requester struct {
HTTPClient *http.Client
UnauthLimit *RateLimit
AuthLimit *RateLimit
Name string
Cycle time.Time
m sync.Mutex
HTTPClient *http.Client
UnauthLimit *RateLimit
AuthLimit *RateLimit
Name string
Cycle time.Time
m sync.Mutex
Jobs chan Job
WorkerStarted bool
}

// RateLimit struct
Expand All @@ -33,6 +39,25 @@ type RateLimit struct {
Mutex sync.Mutex
}

// JobResult holds a request job result
type JobResult struct {
Error error
Result interface{}
}

// Job holds a request job
type Job struct {
Request *http.Request
Method string
Path string
Headers map[string]string
Body io.Reader
Result interface{}
JobResult chan *JobResult
AuthRequest bool
Verbose bool
}

// NewRateLimit creates a new RateLimit
func NewRateLimit(d time.Duration, rate int) *RateLimit {
return &RateLimit{Duration: d, Rate: rate}
Expand Down Expand Up @@ -124,7 +149,7 @@ func (r *Requester) IncrementRequests(auth bool) {
return
}

reqs := r.AuthLimit.GetRequests()
reqs := r.UnauthLimit.GetRequests()
reqs++
r.UnauthLimit.SetRequests(reqs)
}
Expand Down Expand Up @@ -170,6 +195,7 @@ func New(name string, authLimit, unauthLimit *RateLimit, httpRequester *http.Cli
UnauthLimit: unauthLimit,
AuthLimit: authLimit,
Name: name,
Jobs: make(chan Job, maxRequestJobs),
}
}

Expand All @@ -189,6 +215,8 @@ func (r *Requester) IsValidCycle(auth bool) bool {
return true
}
}

r.StartCycle()
return false
}

Expand All @@ -208,7 +236,7 @@ func (r *Requester) checkRequest(method, path string, body io.Reader, headers ma
// DoRequest performs a HTTP/HTTPS request with the supplied params
func (r *Requester) DoRequest(req *http.Request, method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error {
if verbose {
log.Printf("%s exchange request path: %s", r.Name, path)
log.Printf("%s exchange request path: %s requires rate limiter: %v", r.Name, path, r.RequiresRateLimiter())
}

resp, err := r.HTTPClient.Do(req)
Expand Down Expand Up @@ -244,6 +272,46 @@ func (r *Requester) DoRequest(req *http.Request, method, path string, headers ma
return nil
}

func (r *Requester) worker() {
for {
for x := range r.Jobs {
if !r.IsRateLimited(x.AuthRequest) {
r.IncrementRequests(x.AuthRequest)

err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose)
x.JobResult <- &JobResult{
Error: err,
Result: x.Result,
}
} else {
limit := r.GetRateLimit(x.AuthRequest)
diff := limit.GetDuration() - time.Since(r.Cycle)
if x.Verbose {
log.Printf("%s request. Rate limited! Sleeping for %v", r.Name, diff)
}
time.Sleep(diff)

for {
if !r.IsRateLimited(x.AuthRequest) {
r.IncrementRequests(x.AuthRequest)

if x.Verbose {
log.Printf("%s request. No longer rate limited! Doing request", r.Name)
}

err := r.DoRequest(x.Request, x.Method, x.Path, x.Headers, x.Body, x.Result, x.AuthRequest, x.Verbose)
x.JobResult <- &JobResult{
Error: err,
Result: x.Result,
}
break
}
}
}
}
}
}

// SendPayload handles sending HTTP/HTTPS requests
func (r *Requester) SendPayload(method, path string, headers map[string]string, body io.Reader, result interface{}, authRequest, verbose bool) error {
if r == nil || r.Name == "" {
Expand All @@ -267,33 +335,44 @@ func (r *Requester) SendPayload(method, path string, headers map[string]string,
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
}

if len(r.Jobs) == maxRequestJobs {
return errors.New("max request jobs reached")
}

r.m.Lock()
if r.Cycle.IsZero() || !r.IsValidCycle(authRequest) {
if !r.WorkerStarted {
r.StartCycle()
r.WorkerStarted = true
go r.worker()
}
r.m.Unlock()

if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) {
r.IncrementRequests(authRequest)
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
jobResult := make(chan *JobResult)

newJob := Job{
Request: req,
Method: method,
Path: path,
Headers: headers,
Body: body,
Result: result,
JobResult: jobResult,
AuthRequest: authRequest,
Verbose: verbose,
}

r.m.Lock()
for r.IsRateLimited(authRequest) {
limit := r.GetRateLimit(authRequest)
diff := limit.GetDuration() - time.Since(r.Cycle)
log.Printf("%s IS RATE LIMITED. SLEEPING FOR %v", r.Name, diff)
time.Sleep(diff)

if !r.IsValidCycle(authRequest) {
r.StartCycle()
}
if verbose {
log.Printf("%s request. Attaching new job.", r.Name)
}
r.Jobs <- newJob

if !r.IsRateLimited(authRequest) && r.IsValidCycle(authRequest) {
r.IncrementRequests(authRequest)
r.m.Unlock()
return r.DoRequest(req, method, path, headers, body, result, authRequest, verbose)
}
if verbose {
log.Printf("%s request. Waiting for job to complete.", r.Name)
}
return nil
resp := <-newJob.JobResult

if verbose {
log.Printf("%s request. Job complete.", r.Name)
}
return resp.Error
}

0 comments on commit a5f5132

Please sign in to comment.