Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy discovery client. #318

Merged
merged 4 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions pkg/proxydisc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package proxydisc

import (
"bytes"
"context"
"encoding/json"
"net/http"
"time"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/sirupsen/logrus"

"github.com/SkycoinProject/skywire-mainnet/internal/httpauth"
)

// Config configures the HTTPClient.
type Config struct {
PK cipher.PubKey
SK cipher.SecKey
Port uint16
DiscAddr string
}

// HTTPClient is responsible for interacting with the proxy-discovery
type HTTPClient struct {
log logrus.FieldLogger
conf Config
entry Proxy
auth *httpauth.Client
client http.Client
}

// NewClient creates a new HTTPClient.
func NewClient(log logrus.FieldLogger, conf Config) *HTTPClient {
return &HTTPClient{
log: log,
conf: conf,
entry: Proxy{
Addr: NewSWAddr(conf.PK, conf.Port),
},
client: http.Client{},
}
}

func (c *HTTPClient) addr(path string) string {
return c.conf.DiscAddr + path
}

// Auth returns the internal httpauth.Client
func (c *HTTPClient) Auth(ctx context.Context) (*httpauth.Client, error) {
if c.auth != nil {
return c.auth, nil
}
auth, err := httpauth.NewClient(ctx, c.conf.DiscAddr, c.conf.PK, c.conf.SK)
if err != nil {
return nil, err
}
c.auth = auth
return auth, nil
}

// Proxies calls 'GET /api/proxies'.
func (c *HTTPClient) Proxies(ctx context.Context) (out []Proxy, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.addr("/api/proxies"), nil)
if err != nil {
return nil, err
}

resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return nil, err
}
return nil, &hErr
}
err = json.NewDecoder(resp.Body).Decode(&out)
return
}

// UpdateEntry calls 'POST /api/proxies'.
func (c *HTTPClient) UpdateEntry(ctx context.Context) (*Proxy, error) {
auth, err := c.Auth(ctx)
if err != nil {
return nil, err
}

c.entry.Addr = NewSWAddr(c.conf.PK, c.conf.Port) // Just in case.

raw, err := json.Marshal(&c.entry)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr("/api/proxies"), bytes.NewReader(raw))
if err != nil {
return nil, err
}

resp, err := auth.Do(req)
if err != nil {
return nil, err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return nil, err
}
return nil, &hErr
}
err = json.NewDecoder(resp.Body).Decode(&c.entry)
return &c.entry, err
}

// DeleteEntry calls 'DELETE /api/proxies/{entry_addr}'.
func (c *HTTPClient) DeleteEntry(ctx context.Context) (err error) {
auth, err := c.Auth(ctx)
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.addr("/api/proxies/"+c.entry.Addr.String()), nil)
if err != nil {
return err
}

resp, err := auth.Do(req)
if err != nil {
return err
}
if resp != nil {
defer func() {
if cErr := resp.Body.Close(); cErr != nil && err == nil {
err = cErr
}
}()
}

if resp.StatusCode != http.StatusOK {
var hErr HTTPError
if err = json.NewDecoder(resp.Body).Decode(&hErr); err != nil {
return err
}
return &hErr
}
return nil
}

// UpdateLoop repetitively calls 'POST /api/proxies' to update entry.
func (c *HTTPClient) UpdateLoop(ctx context.Context, updateInterval time.Duration) {
defer func() { _ = c.DeleteEntry(context.Background()) }() //nolint:errcheck

update := func() {
for {
entry, err := c.UpdateEntry(ctx)
if err != nil {
c.log.WithError(err).Warn("Failed to update proxy entry in discovery. Retrying...")
time.Sleep(time.Second * 10) // TODO(evanlinjin): Exponential backoff.
continue
}
c.log.WithField("entry", entry).Debug("Entry updated.")
break
}
}

// Run initial update.
update()

ticker := time.NewTicker(updateInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
update()
}
}
}
30 changes: 30 additions & 0 deletions pkg/proxydisc/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package proxydisc

import (
"fmt"
"net/http"

"github.com/sirupsen/logrus"
)

// HTTPError represents an HTTP error.
type HTTPError struct {
HTTPStatus int `json:"http_status"` // HTTP Status.
Msg string `json:"error"` // Message describing error intended for client.

// Actual error. This is hidden as it may be purposely obscured by the server.
Err error `json:"-"`
}

// Error implements error.
func (err *HTTPError) Error() string {
return fmt.Sprintf("%s: %s", http.StatusText(err.HTTPStatus), err.Msg)
}

// Log prints a log message for the HTTP error.
func (err *HTTPError) Log(log logrus.FieldLogger) {
log.WithError(err.Err).
WithField("msg", err.Msg).
WithField("http_status", http.StatusText(err.HTTPStatus)).
Warn()
}
100 changes: 100 additions & 0 deletions pkg/proxydisc/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package proxydisc

import (
"fmt"
"net/url"
"strconv"
)

// GeoQuery represents query values for a proxies by geo call.
type GeoQuery struct {
Lat float64
Lon float64
Radius float64 // Format: <value><unit>
RadiusUnit string
Count int64
}

// DefaultGeoQuery returns GeoQuery with default values.
func DefaultGeoQuery() GeoQuery {
return GeoQuery{
Lat: 0,
Lon: 0,
Radius: 2000,
RadiusUnit: "km",
Count: 1000,
}
}

// Fill fills GeoQuery with query values.
func (q *GeoQuery) Fill(v url.Values) error {
if latS := v.Get("lat"); latS != "" {
lat, err := strconv.ParseFloat(latS, 64)
if err != nil {
return fmt.Errorf("invalid 'lat' query: %w", err)
}
q.Lat = lat
}
if lonS := v.Get("lon"); lonS != "" {
lon, err := strconv.ParseFloat(lonS, 64)
if err != nil {
return fmt.Errorf("invalid 'lon' query: %w", err)
}
q.Lon = lon
}
if radS := v.Get("rad"); radS != "" {
rad, err := strconv.ParseFloat(radS, 64)
if err != nil {
return fmt.Errorf("invalid 'radius' query: %w", err)
}
q.Radius = rad
}
if unit := v.Get("radUnit"); unit != "" {
switch unit {
case "m", "km", "mi", "ft":
q.RadiusUnit = unit
default:
return fmt.Errorf("invalid 'radUnit' query: valid values include [%v]",
[]string{"m", "km", "mi", "ft"})
}
}
if countS := v.Get("count"); countS != "" {
count, err := strconv.ParseInt(countS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'count' query: %w", err)
}
if count < 0 {
count = 0
}
q.Count = count
}
return nil
}

// ProxiesQuery represents query values for a proxies call.
type ProxiesQuery struct {
Count int64 // <=0 : no limit
Cursor uint64 // <=0 : 0 offset
}

// Fill fills ProxiesQuery with query values.
func (q *ProxiesQuery) Fill(v url.Values) error {
if countS := v.Get("count"); countS != "" {
count, err := strconv.ParseInt(countS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'count' query: %w", err)
}
if count < 0 {
count = 0
}
q.Count = count
}
if cursorS := v.Get("cursor"); cursorS != "" {
cursor, err := strconv.ParseUint(cursorS, 10, 64)
if err != nil {
return fmt.Errorf("invalid 'cursor' query: %w", err)
}
q.Cursor = cursor
}
return nil
}
Loading