Skip to content

Commit

Permalink
Merge pull request #318 from SkycoinProject/feature/proxy-discovery
Browse files Browse the repository at this point in the history
Proxy discovery client.
  • Loading branch information
志宇 authored Apr 14, 2020
2 parents 2f93bb2 + ae23e84 commit a071c45
Show file tree
Hide file tree
Showing 5 changed files with 468 additions and 0 deletions.
197 changes: 197 additions & 0 deletions pkg/proxydisc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package proxydisc

import (
"bytes"
"context"
"encoding/json"
"net/http"
"time"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/sirupsen/logrus"

"github.com/SkycoinProject/skywire-mainnet/internal/httpauth"
)

// Config configures the HTTPClient.
type Config struct {
PK cipher.PubKey
SK cipher.SecKey
Port uint16
DiscAddr string
}

// HTTPClient is responsible for interacting with the proxy-discovery
type HTTPClient struct {
log logrus.FieldLogger
conf Config
entry Proxy
auth *httpauth.Client
client http.Client
}

// NewClient creates a new HTTPClient.
func NewClient(log logrus.FieldLogger, conf Config) *HTTPClient {
return &HTTPClient{
log: log,
conf: conf,
entry: Proxy{
Addr: NewSWAddr(conf.PK, conf.Port),
},
client: http.Client{},
}
}

func (c *HTTPClient) addr(path string) string {
return c.conf.DiscAddr + path
}

// Auth returns the internal httpauth.Client
func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) {
if c.auth != nil {
return c.auth, nil
}
auth, err := httpauth.NewClient(ctx, c.conf.DiscAddr, c.conf.PK, c.conf.SK)
if err != nil {
return nil, err
}
c.auth = auth
return auth, nil
}

// Proxies calls 'GET /api/proxies'.
func (c *HTTPClient) Proxies(ctx context.Context) (out []Proxy, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.addr("/api/proxies"), nil)
if err != nil {
return nil, err
}

resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return nil, err
}
return nil, &hErr
}
err = json.NewDecoder(resp.Body).Decode(&out)
return
}

// UpdateEntry calls 'POST /api/proxies'.
func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Proxy, error) {
auth, err := c.Auth(ctx)
if err != nil {
return nil, err
}

c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case.

raw, err := json.Marshal(&c.entry)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr("/api/proxies"), bytes.NewReader(raw))
if err != nil {
return nil, err
}

resp, err := auth.Do(req)
if err != nil {
return nil, err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return nil, err
}
return nil, &hErr
}
err = json.NewDecoder(resp.Body).Decode(&c.entry)
return &c.entry, err
}

// DeleteEntry calls 'DELETE /api/proxies/{entry_addr}'.
func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) {
auth, err := c.Auth(ctx)
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.addr("/api/proxies/"+c.entry.Addr.String()), nil)
if err != nil {
return err
}

resp, err := auth.Do(req)
if err != nil {
return err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return err
}
return &hErr
}
return nil
}

// UpdateLoop repetitively calls 'POST /api/proxies' to update entry.
func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duration) {
defer func() { _ = c.DeleteEntry(context.Background()) }() //nolint:errcheck

update := func() {
for {
entry, err := c.UpdateEntry(ctx)
if err != nil {
c.log.WithError(err).Warn("Failed to update proxy entry in discovery. Retrying...")
time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff.
continue
}
c.log.WithField("entry", entry).Debug("Entry updated.")
break
}
}

// Run initial update.
update()

ticker := time.NewTicker(updateInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
update()
}
}
}
30 changes: 30 additions & 0 deletions pkg/proxydisc/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package proxydisc

import (
"fmt"
"net/http"

"github.com/sirupsen/logrus"
)

// HTTPError represents an HTTP error.
type HTTPError struct {
HTTPStatus int `json:"http_status"` // HTTP Status.
Msg string `json:"error"` // Message describing error intended for client.

// Actual error. This is hidden as it may be purposely obscured by the server.
Err error `json:"-"`
}

// Error implements error.
func (err *HTTPError) Error() string {
return fmt.Sprintf("%s: %s", http.StatusText(err.HTTPStatus), err.Msg)
}

// Log prints a log message for the HTTP error.
func (err *HTTPError) Log(log logrus.FieldLogger) {
log.WithError(err.Err).
WithField("msg", err.Msg).
WithField("http_status", http.StatusText(err.HTTPStatus)).
Warn()
}
100 changes: 100 additions & 0 deletions pkg/proxydisc/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package proxydisc

import (
"fmt"
"net/url"
"strconv"
)

// GeoQuery represents query values for a proxies by geo call.
type GeoQuery struct {
Lat float64
Lon float64
Radius float64 // Format: <value><unit>
RadiusUnit string
Count int64
}

// DefaultGeoQuery returns GeoQuery with default values.
func DefaultGeoQuery() GeoQuery {
return GeoQuery{
Lat: 0,
Lon: 0,
Radius: 2000,
RadiusUnit: "km",
Count: 1000,
}
}

// Fill fills GeoQuery with query values.
func (q *GeoQuery) Fill(v url.Values) error {
if latS := v.Get("lat"); latS != "" {
lat, err := strconv.ParseFloat(latS, 64)
if err != nil {
return fmt.Errorf("invalid 'lat' query: %w", err)
}
q.Lat = lat
}
if lonS := v.Get("lon"); lonS != "" {
lon, err := strconv.ParseFloat(lonS, 64)
if err != nil {
return fmt.Errorf("invalid 'lon' query: %w", err)
}
q.Lon = lon
}
if radS := v.Get("rad"); radS != "" {
rad, err := strconv.ParseFloat(radS, 64)
if err != nil {
return fmt.Errorf("invalid 'radius' query: %w", err)
}
q.Radius = rad
}
if unit := v.Get("radUnit"); unit != "" {
switch unit {
case "m", "km", "mi", "ft":
q.RadiusUnit = unit
default:
return fmt.Errorf("invalid 'radUnit' query: valid values include [%v]",
[]string{"m", "km", "mi", "ft"})
}
}
if countS := v.Get("count"); countS != "" {
count, err := strconv.ParseInt(countS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'count' query: %w", err)
}
if count < 0 {
count = 0
}
q.Count = count
}
return nil
}

// ProxiesQuery represents query values for a proxies call.
type ProxiesQuery struct {
Count int64 // <=0 : no limit
Cursor uint64 // <=0 : 0 offset
}

// Fill fills ProxiesQuery with query values.
func (q *ProxiesQuery) Fill(v url.Values) error {
if countS := v.Get("count"); countS != "" {
count, err := strconv.ParseInt(countS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'count' query: %w", err)
}
if count < 0 {
count = 0
}
q.Count = count
}
if cursorS := v.Get("cursor"); cursorS != "" {
cursor, err := strconv.ParseUint(cursorS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'cursor' query: %w", err)
}
q.Cursor = cursor
}
return nil
}
Loading

0 comments on commit a071c45

Please sign in to comment.