Skip to content

Commit

Permalink
Server list optimization (#1720)
Browse files Browse the repository at this point in the history
* re-implement proxy list ; cache ut and sd files

* update go.mod go.sum and vendor deps

* fix ci errors

* fix ci errors

* remove unused variable

* match changes to proxy list with vpn list

* match changes to proxy list with vpn list

* add missing variables

* fix ci errors

* fix ci errors

* use os.TmpDir() to get temporary files dir
  • Loading branch information
0pcom committed Jan 31, 2024
1 parent c2d1025 commit 1e7789c
Show file tree
Hide file tree
Showing 10 changed files with 1,053 additions and 247 deletions.
218 changes: 104 additions & 114 deletions cmd/skywire-cli/commands/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,24 @@ package skysocksc
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"os"
"strings"
"text/tabwriter"
"time"

"github.com/bitfield/script"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tidwall/pretty"

"github.com/skycoin/skywire-utilities/pkg/buildinfo"
"github.com/skycoin/skywire-utilities/pkg/cmdutil"
"github.com/skycoin/skywire-utilities/pkg/skyenv"
clirpc "github.com/skycoin/skywire/cmd/skywire-cli/commands/rpc"
"github.com/skycoin/skywire/cmd/skywire-cli/internal"
"github.com/skycoin/skywire/pkg/app/appserver"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/servicedisc"
)

func init() {
Expand All @@ -35,24 +32,10 @@ func init() {
statusCmd,
listCmd,
)
version := buildinfo.Version()
if version == "unknown" {
version = ""
}
startCmd.Flags().StringVarP(&pk, "pk", "k", "", "server public key")
startCmd.Flags().StringVarP(&addr, "addr", "a", "", "address of proxy for use")
startCmd.Flags().StringVarP(&clientName, "name", "n", "", "name of skysocks client")
startCmd.Flags().IntVarP(&startingTimeout, "timeout", "t", 20, "starting timeout value in second")
stopCmd.Flags().BoolVar(&allClients, "all", false, "stop all skysocks client")
stopCmd.Flags().StringVar(&clientName, "name", "", "specific skysocks client that want stop")
listCmd.Flags().StringVarP(&sdURL, "url", "a", "", "service discovery url default:\n"+skyenv.ServiceDiscAddr)
listCmd.Flags().BoolVarP(&directQuery, "direct", "b", false, "query service discovery directly")
listCmd.Flags().StringVarP(&pk, "pk", "k", "", "check "+serviceType+" service discovery for public key")
listCmd.Flags().IntVarP(&count, "num", "n", 0, "number of results to return (0 = all)")
listCmd.Flags().BoolVarP(&isUnFiltered, "unfilter", "u", false, "provide unfiltered results")
listCmd.Flags().StringVarP(&ver, "ver", "v", version, "filter results by version")
listCmd.Flags().StringVarP(&country, "country", "c", "", "filter results by country")
listCmd.Flags().BoolVarP(&isStats, "stats", "s", false, "return only a count of the results")
}

var startCmd = &cobra.Command{
Expand Down Expand Up @@ -167,9 +150,14 @@ var startCmd = &cobra.Command{
},
}

func init() {
stopCmd.Flags().BoolVar(&allClients, "all", false, "stop all skysocks client")
stopCmd.Flags().StringVar(&clientName, "name", "", "specific skysocks client that want stop")
}

var stopCmd = &cobra.Command{
Use: "stop",
Short: "stop the " + serviceType + " client",
Short: "stop the " + serviceType + " client" + "\nstop the default instance with:\n stop --name skysocks-client",
Run: func(cmd *cobra.Command, args []string) {
rpcClient, err := clirpc.Client(cmd.Flags())
if err != nil {
Expand Down Expand Up @@ -251,121 +239,123 @@ var statusCmd = &cobra.Command{
},
}

var isLabel bool

func init() {
if version == "unknown" {
version = ""
}
version = strings.Split(version, "-")[0]
listCmd.Flags().StringVarP(&utURL, "uturl", "w", skyenv.UptimeTrackerAddr, "uptime tracker url")
listCmd.Flags().StringVarP(&sdURL, "sdurl", "a", skyenv.ServiceDiscAddr, "service discovery url")
listCmd.Flags().BoolVarP(&rawData, "raw", "r", false, "print raw data")
listCmd.Flags().BoolVarP(&noFilterOnline, "noton", "o", false, "do not filter by online status in UT")
listCmd.Flags().StringVar(&cacheFileSD, "cfs", os.TempDir()+"/proxysd.json", "SD cache file location")
listCmd.Flags().StringVar(&cacheFileUT, "cfu", os.TempDir()+"/ut.json", "UT cache file location.")
listCmd.Flags().IntVarP(&cacheFilesAge, "cfa", "m", 5, "update cache files if older than n minutes")
listCmd.Flags().StringVarP(&pk, "pk", "k", "", "check "+serviceType+" service discovery for public key")
listCmd.Flags().BoolVarP(&isUnFiltered, "unfilter", "u", false, "provide unfiltered results")
listCmd.Flags().StringVarP(&ver, "ver", "v", version, "filter results by version")
listCmd.Flags().StringVarP(&country, "country", "c", "", "filter results by country")
listCmd.Flags().BoolVarP(&isStats, "stats", "s", false, "return only a count of the results")
listCmd.Flags().BoolVarP(&isLabel, "label", "l", false, "label keys by country \033[91m(SLOW)\033[0m")
}

var listCmd = &cobra.Command{
Use: "list",
Short: "List servers",
Long: "List " + serviceType + " servers from service discovery\n " + skyenv.ServiceDiscAddr + "/api/services?type=" + serviceType + "\n " + skyenv.ServiceDiscAddr + "/api/services?type=" + serviceType + "&country=US",
Long: fmt.Sprintf("List %v servers from service discovery\n%v/api/services?type=%v\n%v/api/services?type=%v&country=US\n\nSet cache file location to \"\" to avoid using cache files", serviceType, skyenv.ServiceDiscAddr, serviceType, skyenv.ServiceDiscAddr, serviceType),
Run: func(cmd *cobra.Command, args []string) {
//validate any specified public key
sds := getData(cacheFileSD, sdURL+"/api/services?type="+serviceType)
if rawData {
script.Echo(string(pretty.Color(pretty.Pretty([]byte(sds)), nil))).Stdout() //nolint
return
}
if pk != "" {
err := pubkey.Set(pk)
if err != nil {
internal.PrintFatalError(cmd.Flags(), fmt.Errorf("Invalid or missing public key"))
if isStats {
count, _ := script.Echo(sds).JQ(`map(select(.address == "`+pk+`:3"))`).Replace("\"", "").Replace(":", " ").Column(1).CountLines() //nolint
script.Echo(fmt.Sprintf("%v\n", count)).Stdout() //nolint
return
}
jsonOut, _ := script.Echo(sds).JQ(`map(select(.address == "` + pk + `:3"))`).Bytes() //nolint
script.Echo(string(pretty.Color(pretty.Pretty(jsonOut), nil))).Stdout() //nolint
return
}
if sdURL == "" {
sdURL = skyenv.ServiceDiscAddr
}
if isUnFiltered {
ver = ""
country = ""
var sdJQ string
if !isUnFiltered {
if ver != "" && country == "" {
sdJQ = `select(.version == "` + ver + `")`
}
if country != "" && ver == "" {
sdJQ = `select(.geo.country == "` + country + `")`
}
if country != "" && ver != "" {
sdJQ = `select(.geo.country == "` + country + `" and .version == "` + ver + `")`
}
}
if directQuery {
servers = directQuerySD(cmd.Flags())
if sdJQ != "" {
sdJQ = `.[] | ` + sdJQ + ` | .address`
} else {
rpcClient, err := clirpc.Client(cmd.Flags())
if err != nil {
internal.PrintError(cmd.Flags(), fmt.Errorf("unable to create RPC client: %w", err))
internal.PrintOutput(cmd.Flags(), fmt.Sprintf("directly querying service discovery\n%s/api/services?type=%s\n", sdURL, serviceType), fmt.Sprintf("directly querying service discovery\n%s/api/services?type=%s\n", sdURL, serviceType))
servers = directQuerySD(cmd.Flags())
} else {
servers, err = rpcClient.ProxyServers(ver, country)
if err != nil {
internal.PrintError(cmd.Flags(), err)
internal.PrintOutput(cmd.Flags(), fmt.Sprintf("directly querying service discovery\n%s/api/services?type=%s\n", sdURL, serviceType), fmt.Sprintf("directly querying service discovery\n%s/api/services?type=%s\n", sdURL, serviceType))
servers = directQuerySD(cmd.Flags())
}
}
sdJQ = `.[] .address`
}
if len(servers) == 0 {
internal.PrintOutput(cmd.Flags(), "No Servers found", "No Servers found")
os.Exit(0)
var sdkeys string
sdkeys, _ = script.Echo(sds).JQ(sdJQ).Replace("\"", "").Replace(":", " ").Column(1).String() //nolint
if noFilterOnline {
if isStats {
count, _ := script.Echo(sdkeys).CountLines() //nolint
script.Echo(fmt.Sprintf("%v\n", count)).Stdout() //nolint
return
}
script.Echo(sdkeys).Stdout() //nolint
return
}
uts := getData(cacheFileUT, utURL+"/uptimes?v=v2")
utkeys, _ := script.Echo(uts).JQ(".[] | select(.on) | .pk").Replace("\"", "").String() //nolint
if isStats {
internal.PrintOutput(cmd.Flags(), fmt.Sprintf("%d Servers\n", len(servers)), fmt.Sprintf("%d Servers\n", len(servers)))
count, _ := script.Echo(sdkeys + utkeys).Freq().Match("2 ").Column(2).CountLines() //nolint
script.Echo(fmt.Sprintf("%v\n", count)).Stdout() //nolint
return
}
if !isLabel {
script.Echo(sdkeys + utkeys).Freq().Match("2 ").Column(2).Stdout() //nolint
} else {
var msg string
var results []string
limit := len(servers)
if count > 0 && count < limit {
limit = count
}
if pk != "" {
for _, server := range servers {
if strings.Replace(server.Addr.String(), servicePort, "", 1) == pk {
results = append(results, server.Addr.String())
}
}
} else {
for _, server := range servers {
results = append(results, server.Addr.String())
}
}

//randomize the order of the displayed results
rand.Shuffle(len(results), func(i, j int) {
results[i], results[j] = results[j], results[i]
})
for i := 0; i < limit && i < len(results); i++ {
msg += strings.Replace(results[i], servicePort, "", 1)
if server := findServerByPK(servers, results[i]); server != nil && server.Geo != nil {
if server.Geo.Country != "" {
msg += fmt.Sprintf(" | %s\n", server.Geo.Country)
} else {
msg += "\n"
}
} else {
msg += "\n"
filteredKeys, _ := script.Echo(sdkeys + utkeys).Freq().Match("2 ").Column(2).Slice() //nolint
formattedoutput, _ := script.Echo(sds).JQ(".[] | \"\\(.address) \\(.geo.country)\"").Replace("\"", "").Slice() //nolint
// Very slow!
for _, fo := range formattedoutput {
for _, fk := range filteredKeys {
script.Echo(fo).Match(fk).Stdout() //nolint
}
}
internal.PrintOutput(cmd.Flags(), servers, msg)
}

},
}

func directQuerySD(cmdFlags *pflag.FlagSet) (s []servicedisc.Service) {
//url/uri format
//https://sd.skycoin.com/api/services?type=proxy&country=US&version=v1.3.7
sdURL += "/api/services?type=" + serviceType
if country != "" {
sdURL += "&country=" + country
}
if ver != "" {
sdURL += "&version=" + ver
func getData(cachefile, thisurl string) (thisdata string) {
var shouldfetch bool
buf1 := new(bytes.Buffer)
cTime := time.Now()
if cachefile == "" {
thisdata, _ = script.NewPipe().WithHTTPClient(&http.Client{Timeout: 30 * time.Second}).Get(thisurl).String() //nolint
return thisdata
}
//preform http get request for the service discovery URL
resp, err := (&http.Client{Timeout: time.Duration(30 * time.Second)}).Get(sdURL)
if err != nil {
internal.PrintFatalError(cmdFlags, fmt.Errorf("error fetching servers from service discovery: %w", err))
}
defer func() {
if err := resp.Body.Close(); err != nil {
internal.PrintError(cmdFlags, fmt.Errorf("error closing http response body: %w", err))
if cachefile != "" {
if u, err := os.Stat(cachefile); err != nil {
shouldfetch = true
} else {
if cTime.Sub(u.ModTime()).Minutes() > float64(cacheFilesAge) {
shouldfetch = true
}
}
}()
// Decode JSON response into struct
err = json.NewDecoder(resp.Body).Decode(&s)
if err != nil {
internal.PrintFatalError(cmdFlags, fmt.Errorf("error decoding json to struct: %w", err))
}
return s
}

func findServerByPK(servers []servicedisc.Service, addr string) *servicedisc.Service {
for _, server := range servers {
if server.Addr.String() == addr {
return &server
if shouldfetch {
_, _ = script.NewPipe().WithHTTPClient(&http.Client{Timeout: 30 * time.Second}).Get(thisurl).Tee(buf1).WriteFile(cachefile) //nolint
thisdata = buf1.String()
} else {
thisdata, _ = script.File(cachefile).String() //nolint
}
} else {
thisdata, _ = script.NewPipe().WithHTTPClient(&http.Client{Timeout: 30 * time.Second}).Get(thisurl).String() //nolint
}
return nil
return thisdata
}
14 changes: 9 additions & 5 deletions cmd/skywire-cli/commands/proxy/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@ package skysocksc
import (
"github.com/spf13/cobra"

"github.com/skycoin/skywire-utilities/pkg/buildinfo"
"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/servicedisc"
)

var (
version = buildinfo.Version()
binaryName = "skysocks-client"
stateName = "skysocks-client"
serviceType = servicedisc.ServiceTypeProxy
servicePort = ":44"
isUnFiltered bool
rawData bool
utURL string
sdURL string
cacheFileSD string
cacheFileUT string
cacheFilesAge int
ver string
country string
isStats bool
pubkey cipher.PubKey
pk string
count int
sdURL string
directQuery bool
servers []servicedisc.Service
allClients bool
noFilterOnline bool
clientName string
addr string
startingTimeout int
Expand Down
16 changes: 10 additions & 6 deletions cmd/skywire-cli/commands/vpn/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@ package clivpn
import (
"github.com/spf13/cobra"

"github.com/skycoin/skywire-utilities/pkg/buildinfo"
"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire/pkg/servicedisc"
)

var (
version = buildinfo.Version()
stateName = "vpn-client"
serviceType = servicedisc.ServiceTypeVPN
servicePort = ":3"
isUnFiltered bool
rawData bool
utURL string
sdURL string
cacheFileSD string
cacheFileUT string
cacheFilesAge int
noFilterOnline bool
path string
isPkg bool
isUnFiltered bool
ver string
country string
isStats bool
pubkey cipher.PubKey
pk string
count int
sdURL string
directQuery bool
servers []servicedisc.Service
startingTimeout int
)

Expand Down
Loading

0 comments on commit 1e7789c

Please sign in to comment.