Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client code for ServiceRegistry with ability to set service URI in CLI #456

Merged
merged 5 commits into from Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 35 additions & 12 deletions cmd/livepeer/livepeer.go
Expand Up @@ -11,6 +11,8 @@ import (
"fmt"
"io/ioutil"
"math/big"
"net/http"
"net/url"
"os"
"os/signal"
"os/user"
Expand Down Expand Up @@ -91,7 +93,7 @@ func main() {
ipfsPath := flag.String("ipfsPath", fmt.Sprintf("%v/.ipfs", usr.HomeDir), "IPFS path")
noIPFSLogFiles := flag.Bool("noIPFSLogFiles", false, "Set to true if log files should not be generated")
offchain := flag.Bool("offchain", false, "Set to true to start the node in offchain mode")
publicIP := flag.String("publicIP", "", "Explicit set node IP address so nodes that need a well-known address can advertise it to the network")
publicAddr := flag.String("publicAddr", "", "Public address that broadcasters can use to contact this node; may be an IP or hostname. If used, should match the on-chain ServiceURI set via livepeer_cli")
initializeRound := flag.Bool("initializeRound", false, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
version := flag.Bool("version", false, "Print out the version")

Expand Down Expand Up @@ -191,15 +193,7 @@ func main() {
glog.Errorf("Error creating a new node: %v", err)
return
}
if *transcoder && *publicIP == "" {
glog.Errorf("Error - transcoder needs to specify publicIP")
return
}
if *transcoder && *publicIP == "" {
glog.Errorf("Error - transcoder needs to specify publicIP")
return
}
nw, err := bnet.NewBasicVideoNetwork(node, *publicIP, *port)
nw, err := bnet.NewBasicVideoNetwork(node, "127.0.0.1", *port)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we default to passing in 127.0.0.1 with the expectation that with the V2 networking changes, this basicnet construct won't be used anyway so it doesn't matter that a public IP isn't set for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the basicnet publicIP won't be used anymore with the v2 network. Technically this API could be cleaned up too, but I'm not sure how much further work the p2p layer will see; it depends what we do with the relay network (if anything).

if err != nil {
glog.Errorf("Cannot create network node: %v", err)
return
Expand Down Expand Up @@ -333,7 +327,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := setupTranscoder(ctx, n, em, *ipfsPath, *initializeRound); err != nil {
if err := setupTranscoder(ctx, n, em, *ipfsPath, *initializeRound, *publicAddr); err != nil {
glog.Errorf("Error setting up transcoder: %v", err)
return
}
Expand Down Expand Up @@ -493,7 +487,7 @@ func getLPKeys(datadir string) (crypto.PrivKey, crypto.PubKey, error) {
return priv, pub, nil
}

func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMonitor, ipfsPath string, initializeRound bool) error {
func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMonitor, ipfsPath string, initializeRound bool, publicAddr string) error {
//Check if transcoder is active
active, err := n.Eth.IsActiveTranscoder()
if err != nil {
Expand All @@ -506,6 +500,35 @@ func setupTranscoder(ctx context.Context, n *core.LivepeerNode, em eth.EventMoni
glog.Infof("Transcoder %v is active", n.Eth.Account().Address.Hex())
}

if publicAddr == "" {
// TODO probably should put this (along w wizard GETs) into common code
resp, err := http.Get("https://api.ipify.org?format=text")
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
glog.Error("Could not look up public IP address")
return err
}
publicAddr = strings.TrimSpace(string(body))
}
uriStr, err := n.Eth.GetServiceURI(n.Eth.Account().Address)
if err != nil {
glog.Error("Could not get service URI")
return err
}
uri, err := url.ParseRequestURI(uriStr)
if err != nil {
glog.Error("Could not parse service URI")
uri, _ = url.ParseRequestURI("http://127.0.0.1")
}
if uri.Hostname() != publicAddr {
glog.Errorf("Service address %v did not match discovered address %v; set the correct address in livepeer_cli or use -publicAddr", uri.Hostname(), publicAddr)
// TODO remove '&& false' after all transcoders have set a service URI
if active && false {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm so after we remove the && false to kill the node if an active transcoder has mismatched service URI + publicAddr if it is still the case that an active transcoder did not properly set a serviceURI to match their publicAddr they won't be able to use the CLI to set a service URI anymore since the transcoder configuration options would only be available when the node is running with the -transcoder flag, but if the account for the node is already an active transcoder without a service URI then every time the node boots up with the -transcoder flag the node will be killed. I guess the workaround here is that the user could pass in -publicAddr 127.0.0.1 so that it matches the default service URI, successfully boot up the node, set an actual service URI and the reboot the node without the -publicAddr flag.

Not really a big concern IMO since the CLI makes you pick a service URI upon transcoder registration (the scenario would really only happen if someone registered as a transcoder outside of the CLI and chose to leave out the service URI), but just thought I would note the scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the &&false is to give currently active transcoders an opportunity to set up their service URIs; new transcoders registering after the network switchover wouldn't have this issue.

Also correct that -publicAddr 127.0.0.1 would be the fallback if current transcoders still don't set a service URI. Note that transcoders will still start up by default on port 4433 if there is no service URI set; see https://github.com/livepeer/go-livepeer/pull/447/files#diff-236217e7f473dfbde9134c6a391f3408R126

Since we have the fallback, we could still kill the node by default to force transcoders to make the adjustment, but I figured it's nicer to be a little less abrupt during the transition. We probably should monitor this though. The current transcoder pool is small enough that we could reach out to individual transcoders after a few weeks, if needed.

return fmt.Errorf("Mismatched service address")
}
}

// Set up IPFS
ipfsApi, err := ipfs.StartIpfs(ctx, ipfsPath)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions cmd/livepeer_cli/wizard.go
Expand Up @@ -39,6 +39,25 @@ func (w *wizard) readString() string {
}
}

// readStringAndValidate reads a single line from stdin, trims spaces and
// checks that the string passes a condition defined by the provided validation function
func (w *wizard) readStringAndValidate(validate func(in string) (string, error)) string {
for {
fmt.Printf("> ")
text, err := w.in.ReadString('\n')
if err != nil {
log.Crit("Failed to read user input", "err", err)
}
text = strings.TrimSpace(text)
validText, err := validate(text)
if err != nil {
log.Error("Failed to validate input", "err", err)
continue
}
return validText
}
}

// readDefaultString reads a single line from stdin, trimming if from spaces. If
// an empty line is entered, the default value is returned.
func (w *wizard) readDefaultString(def string) string {
Expand Down
1 change: 1 addition & 0 deletions cmd/livepeer_cli/wizard_stats.go
Expand Up @@ -162,6 +162,7 @@ func (w *wizard) transcoderStats() {
data := [][]string{
[]string{"Status", t.Status},
[]string{"Active", strconv.FormatBool(t.Active)},
[]string{"Service URI", t.ServiceURI},
[]string{"Delegated Stake", eth.FormatUnits(t.DelegatedStake, "LPT")},
[]string{"Reward Cut (%)", eth.FormatPerc(t.RewardCut)},
[]string{"Fee Share (%)", eth.FormatPerc(t.FeeShare)},
Expand Down
44 changes: 40 additions & 4 deletions cmd/livepeer_cli/wizard_transcoder.go
Expand Up @@ -5,16 +5,29 @@ import (
"math/big"
"net/url"
"strconv"
"strings"

lpcommon "github.com/livepeer/go-livepeer/common"
)

const defaultRPCPort = "4433"

func (w *wizard) isTranscoder() bool {
isT := httpGet(fmt.Sprintf("http://%v:%v/IsTranscoder", w.host, w.httpPort))
return isT == "true"
}

func (w *wizard) promptTranscoderConfig() (float64, float64, *big.Int) {
func myHostPort() string {
// TODO Fall back to try other services if this one fails. Ask a peer?
// http://myexternalip.com
// http://api.ident.me
// http://whatismyipaddress.com/api
// http://ipinfo.io/ip
ip := strings.TrimSpace(httpGet("https://api.ipify.org/?format=text"))
return ip + ":" + defaultRPCPort
}

func (w *wizard) promptTranscoderConfig() (float64, float64, *big.Int, string) {
var (
blockRewardCut float64
feeShare float64
Expand All @@ -30,15 +43,32 @@ func (w *wizard) promptTranscoderConfig() (float64, float64, *big.Int) {
fmt.Printf("Enter price per segment in Wei (default: 1) - ")
pricePerSegment = w.readDefaultBigInt(big.NewInt(1))

return blockRewardCut, feeShare, pricePerSegment
addr := myHostPort()
fmt.Printf("Enter the public host:port of node (default: %v)", addr)
serviceURI := w.readStringAndValidate(func(in string) (string, error) {
if "" == in {
in = addr
}
in = "https://" + in
uri, err := url.ParseRequestURI(in)
if err != nil {
return "", err
}
if uri.Port() == "" {
return "", fmt.Errorf("Missing Port")
}
return in, nil
})

return blockRewardCut, feeShare, pricePerSegment, serviceURI
}

func (w *wizard) activateTranscoder() {
d, err := w.getDelegatorInfo()
fmt.Printf("Current token balance: %v\n", w.getTokenBalance())
fmt.Printf("Current bonded amount: %v\n", d.BondedAmount.String())

blockRewardCut, feeShare, pricePerSegment := w.promptTranscoderConfig()
blockRewardCut, feeShare, pricePerSegment, serviceURI := w.promptTranscoderConfig()

fmt.Printf("You must bond to yourself in order to become a transcoder\n")

Expand All @@ -64,24 +94,30 @@ func (w *wizard) activateTranscoder() {
"blockRewardCut": {fmt.Sprintf("%v", blockRewardCut)},
"feeShare": {fmt.Sprintf("%v", feeShare)},
"pricePerSegment": {fmt.Sprintf("%v", pricePerSegment.String())},
"serviceURI": {fmt.Sprintf("%v", serviceURI)},
"amount": {fmt.Sprintf("%v", amount.String())},
}

httpPostWithParams(fmt.Sprintf("http://%v:%v/activateTranscoder", w.host, w.httpPort), val)
// TODO we should confirm if the transaction was actually sent
fmt.Println("\nTransaction sent. Once confirmed, please restart your node.")
}

func (w *wizard) setTranscoderConfig() {
fmt.Printf("Current token balance: %v\n", w.getTokenBalance())

blockRewardCut, feeShare, pricePerSegment := w.promptTranscoderConfig()
blockRewardCut, feeShare, pricePerSegment, serviceURI := w.promptTranscoderConfig()

val := url.Values{
"blockRewardCut": {fmt.Sprintf("%v", blockRewardCut)},
"feeShare": {fmt.Sprintf("%v", feeShare)},
"pricePerSegment": {fmt.Sprintf("%v", pricePerSegment.String())},
"serviceURI": {fmt.Sprintf("%v", serviceURI)},
}

httpPostWithParams(fmt.Sprintf("http://%v:%v/setTranscoderConfig", w.host, w.httpPort), val)
// TODO we should confirm if the transaction was actually sent
fmt.Println("\nTransaction sent. Once confirmed, please restart your node if the ServiceURI has been reset")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reset is just to allow the node to check for you if your service URI matches your publicAddr before moving forward with any other operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restart also allows the node to bind to the new port, if that has changed.

}

func (w *wizard) callReward() {
Expand Down
50 changes: 42 additions & 8 deletions eth/client.go
Expand Up @@ -5,6 +5,7 @@ package eth

//go:generate abigen --abi protocol/abi/Controller.abi --pkg contracts --type Controller --out contracts/controller.go
//go:generate abigen --abi protocol/abi/LivepeerToken.abi --pkg contracts --type LivepeerToken --out contracts/livepeerToken.go
//go:generate abigen --abi protocol/abi/ServiceRegistry.abi --pkg contracts --type ServiceRegistry --out contracts/serviceRegistry.go
//go:generate abigen --abi protocol/abi/BondingManager.abi --pkg contracts --type BondingManager --out contracts/bondingManager.go
//go:generate abigen --abi protocol/abi/JobsManager.abi --pkg contracts --type JobsManager --out contracts/jobsManager.go
//go:generate abigen --abi protocol/abi/RoundsManager.abi --pkg contracts --type RoundsManager --out contracts/roundsManager.go
Expand Down Expand Up @@ -59,6 +60,10 @@ type LivepeerEthClient interface {
BalanceOf(ethcommon.Address) (*big.Int, error)
TotalSupply() (*big.Int, error)

// Service Registry
SetServiceURI(serviceURI string) (*types.Transaction, error)
GetServiceURI(addr ethcommon.Address) (string, error)

// Staking
Transcoder(blockRewardCut *big.Int, feeShare *big.Int, pricePerSegment *big.Int) (*types.Transaction, error)
Reward() (*types.Transaction, error)
Expand Down Expand Up @@ -119,18 +124,20 @@ type client struct {
accountManager *AccountManager
backend *ethclient.Client

controllerAddr ethcommon.Address
tokenAddr ethcommon.Address
bondingManagerAddr ethcommon.Address
jobsManagerAddr ethcommon.Address
roundsManagerAddr ethcommon.Address
minterAddr ethcommon.Address
verifierAddr ethcommon.Address
faucetAddr ethcommon.Address
controllerAddr ethcommon.Address
tokenAddr ethcommon.Address
serviceRegistryAddr ethcommon.Address
bondingManagerAddr ethcommon.Address
jobsManagerAddr ethcommon.Address
roundsManagerAddr ethcommon.Address
minterAddr ethcommon.Address
verifierAddr ethcommon.Address
faucetAddr ethcommon.Address

// Embedded contract sessions
*contracts.ControllerSession
*contracts.LivepeerTokenSession
*contracts.ServiceRegistrySession
*contracts.BondingManagerSession
*contracts.JobsManagerSession
*contracts.RoundsManagerSession
Expand Down Expand Up @@ -228,6 +235,27 @@ func (c *client) setContracts(opts *bind.TransactOpts) error {

glog.V(common.SHORT).Infof("LivepeerToken: %v", c.tokenAddr.Hex())

serviceRegistryAddr, err := c.GetContract(crypto.Keccak256Hash([]byte("ServiceRegistry")))
if err != nil {
glog.Errorf("Error getting ServiceRegistry address: %v", err)
return err
}

c.serviceRegistryAddr = serviceRegistryAddr

serviceRegistry, err := contracts.NewServiceRegistry(serviceRegistryAddr, c.backend)
if err != nil {
glog.Errorf("Error creating ServiceRegistry binding: %v", err)
return err
}

c.ServiceRegistrySession = &contracts.ServiceRegistrySession{
Contract: serviceRegistry,
TransactOpts: *opts,
}

glog.V(common.SHORT).Infof("ServiceRegistry: %v", c.serviceRegistryAddr.Hex())

bondingManagerAddr, err := c.GetContract(crypto.Keccak256Hash([]byte("BondingManager")))
if err != nil {
glog.Errorf("Error getting BondingManager address: %v", err)
Expand Down Expand Up @@ -598,8 +626,14 @@ func (c *client) GetTranscoder(addr ethcommon.Address) (*lpTypes.Transcoder, err
return nil, err
}

serviceURI, err := c.GetServiceURI(addr)
if err != nil {
return nil, err
}

return &lpTypes.Transcoder{
Address: addr,
ServiceURI: serviceURI,
LastRewardRound: tInfo.LastRewardRound,
RewardCut: tInfo.RewardCut,
FeeShare: tInfo.FeeShare,
Expand Down