Skip to content

Commit

Permalink
pm: updates to Broker and SenderManager interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Vergauwen authored and Nico Vergauwen committed Sep 11, 2019
1 parent 57e2887 commit 5b74602
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 334 deletions.
1 change: 0 additions & 1 deletion cmd/livepeer/livepeer.go
Expand Up @@ -478,7 +478,6 @@ func main() {
n.Database,
gpm,
sm,
senderWatcher,
n.ErrorMonitor,
cfg,
)
Expand Down
1 change: 0 additions & 1 deletion eth/client.go
Expand Up @@ -94,7 +94,6 @@ type LivepeerEthClient interface {
RedeemWinningTicket(ticket *pm.Ticket, sig []byte, recipientRand *big.Int) (*types.Transaction, error)
IsUsedTicket(ticket *pm.Ticket) (bool, error)
GetSenderInfo(addr ethcommon.Address) (*pm.SenderInfo, error)
ClaimableReserve(reserveHolder, claimant ethcommon.Address) (*big.Int, error)
UnlockPeriod() (*big.Int, error)
ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error)

Expand Down
2 changes: 1 addition & 1 deletion eth/watchers/roundswatcher.go
Expand Up @@ -89,7 +89,7 @@ func (rw *RoundsWatcher) Watch() error {
rw.setLastInitializedRound(lr, bh)

if err := rw.fetchAndSetTranscoderPoolSize(); err != nil {
glog.Errorf("error fetching initial transcoderPoolSize: %v", err)
return fmt.Errorf("error fetching initial transcoderPoolSize: %v", err)
}

events := make(chan []*blockwatch.Event, 10)
Expand Down
11 changes: 5 additions & 6 deletions eth/watchers/senderwatcher.go
Expand Up @@ -65,19 +65,19 @@ func (sw *SenderWatcher) setSenderInfo(addr ethcommon.Address, info *pm.SenderIn
}

// ClaimedReserve returns the amount claimed from a sender's reserve
func (sw *SenderWatcher) ClaimedReserve(sender ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) {
func (sw *SenderWatcher) ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) {
sw.mu.RLock()
claimed := sw.claimedReserve[sender]
claimed := sw.claimedReserve[reserveHolder]
sw.mu.RUnlock()
if claimed != nil {
return claimed, nil
}
claimed, err := sw.lpEth.ClaimedReserve(claimant, sender)
claimed, err := sw.lpEth.ClaimedReserve(reserveHolder, claimant)
if err != nil {
return nil, fmt.Errorf("ClaimedReserve RPC call to remote node failed: %v", err)
}
sw.mu.Lock()
sw.claimedReserve[sender] = claimed
sw.claimedReserve[reserveHolder] = claimed
sw.mu.Unlock()
return claimed, nil
}
Expand Down Expand Up @@ -198,11 +198,10 @@ func (sw *SenderWatcher) handleLog(log types.Log) error {
// ReserveFrozen will be handled in it's own event log handler
diff := new(big.Int).Sub(amount, info.Deposit)
info.Deposit = big.NewInt(0)
// sw.senders[sender].Reserve.Sub(sw.senders[sender].Reserve, amount)
if claimed, ok := sw.claimedReserve[sender]; ok {
claimed.Add(claimed, diff)
} else {
claimed = diff
sw.claimedReserve[sender] = diff
}
} else {
// Draw from deposit
Expand Down
12 changes: 4 additions & 8 deletions pm/broker.go
Expand Up @@ -45,8 +45,6 @@ type SenderInfo struct {
// smart contract that handles the administrative tasks in a probabilistic micropayment protocol
// including processing deposits and pay outs
type Broker interface {
SenderManager

// FundDepositAndReserve funds a sender's deposit and reserve
FundDepositAndReserve(depositAmount, reserveAmount *big.Int) (*types.Transaction, error)

Expand Down Expand Up @@ -74,10 +72,6 @@ type Broker interface {
// IsUsedTicket checks if a ticket has been used
IsUsedTicket(ticket *Ticket) (bool, error)

// ClaimableReserve returns the amount from the reserveHolder's reserve that the claimant
// can claim
ClaimableReserve(reserveHolder, claimant ethcommon.Address) (*big.Int, error)

// CheckTx waits for a transaction to confirm on-chain and returns an error
// if the transaction failed
CheckTx(tx *types.Transaction) error
Expand All @@ -90,7 +84,7 @@ type RoundsManager interface {
LastInitializedRound() *big.Int
// LastInitializedBlockHash returns the blockhash of the block the last round was initiated in
LastInitializedBlockHash() [32]byte
// GetTranscoderPoolSize returns the size active transcoder set for a round
// GetTranscoderPoolSize returns the size of the active transcoder set for a round
GetTranscoderPoolSize() *big.Int
}

Expand All @@ -99,5 +93,7 @@ type SenderManager interface {
// GetSenderInfo returns a sender's information
GetSenderInfo(addr ethcommon.Address) (*SenderInfo, error)
// ClaimedReserve returns the amount claimed from a sender's reserve
ClaimedReserve(sender ethcommon.Address) *big.Int
ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error)
// Clear clears the cached values for a sender
Clear(addr ethcommon.Address)
}
26 changes: 7 additions & 19 deletions pm/recipient.go
Expand Up @@ -329,35 +329,23 @@ func (r *recipient) redeemWinningTicket(ticket *Ticket, sig []byte, recipientRan
return err
}

// if max float is zero, there is no claimable reserve left or reserve is 0
if maxFloat.Cmp(big.NewInt(0)) == 0 {
return errors.Errorf("max float is zero")
}

// If max float is insufficient to cover the ticket face value, queue
// the ticket to be retried later
if maxFloat.Cmp(ticket.FaceValue) < 0 {
if err := r.sm.QueueTicket(ticket.Sender, &SignedTicket{ticket, sig, recipientRand}); err != nil {
return err
}

r.sm.QueueTicket(ticket.Sender, &SignedTicket{ticket, sig, recipientRand})
glog.Infof("Queued ticket sender=%x recipientRandHash=%x senderNonce=%v", ticket.Sender, ticket.RecipientRandHash, ticket.SenderNonce)

return nil
}

info, err := r.broker.GetSenderInfo(ticket.Sender)
if err != nil {
return err
}

// TODO: Consider a smarter strategy here in the future
// Ex. If deposit < transaction cost, do not try to redeem
if info.Deposit.Cmp(big.NewInt(0)) == 0 && info.Reserve.Cmp(big.NewInt(0)) == 0 {
return errors.Errorf("sender %v has zero deposit and reserve", ticket.Sender)
}

// Subtract the ticket face value from the sender's current max float
// This amount will be considered pending until the ticket redemption
// transaction confirms on-chain
if err := r.sm.SubFloat(ticket.Sender, ticket.FaceValue); err != nil {
return err
}
r.sm.SubFloat(ticket.Sender, ticket.FaceValue)

defer func() {
// Add the ticket face value back to the sender's current max float
Expand Down
80 changes: 6 additions & 74 deletions pm/recipient_test.go
Expand Up @@ -23,8 +23,6 @@ func newRecipientFixtureOrFatal(t *testing.T) (ethcommon.Address, *stubBroker, *
sender := RandAddress()

b := newStubBroker()
b.SetDeposit(sender, big.NewInt(500))
b.SetReserve(sender, big.NewInt(500))

v := &stubValidator{}
v.SetIsValidTicket(true)
Expand Down Expand Up @@ -596,7 +594,7 @@ func TestRedeemWinningTickets_InvalidSessionID(t *testing.T) {
}
}

func TestRedeemWinningTickets_SingleTicket_GetSenderInfoError(t *testing.T) {
func TestRedeemWinningTickets_SingleTicket_ZeroMaxFloat(t *testing.T) {
sender, b, v, ts, gm, sm, em, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, ts, gm, sm, em, cfg)
params, err := r.TicketParams(sender)
Expand All @@ -605,7 +603,7 @@ func TestRedeemWinningTickets_SingleTicket_GetSenderInfoError(t *testing.T) {
// Config stub validator with valid winning tickets
v.SetIsWinningTicket(true)

// Test get deposit error
// Test zero maxfloat error
ticket := newTicket(sender, params, 0)

sessionID, won, err := r.ReceiveTicket(ticket, sig, params.Seed)
Expand All @@ -616,48 +614,13 @@ func TestRedeemWinningTickets_SingleTicket_GetSenderInfoError(t *testing.T) {
t.Fatal("expected valid winning ticket")
}

// Config stub broker to fail getting deposit
b.getSenderInfoShouldFail = true

err = r.RedeemWinningTickets([]string{sessionID})
if err == nil {
t.Error("expected broker GetSenderInfo error")
}
if err != nil && !strings.Contains(err.Error(), "broker GetSenderInfo error") {
t.Errorf("execpted broker GetSenderInfo error, got %v", err)
}
}

func TestRedeemWinningTickets_SingleTicket_ZeroDepositAndReserve(t *testing.T) {
sender, b, v, ts, gm, sm, em, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, ts, gm, sm, em, cfg)
params, err := r.TicketParams(sender)
require.Nil(t, err)

// Config stub validator with valid winning tickets
v.SetIsWinningTicket(true)

// Test zero deposit and reserve error
ticket := newTicket(sender, params, 0)

sessionID, won, err := r.ReceiveTicket(ticket, sig, params.Seed)
if err != nil {
t.Fatal(err)
}
if !won {
t.Fatal("expected valid winning ticket")
}

// Config stub broker with zero deposit and reserve
b.SetDeposit(sender, big.NewInt(0))
b.SetReserve(sender, big.NewInt(0))

sm.maxFloat = big.NewInt(0)
err = r.RedeemWinningTickets([]string{sessionID})
if err == nil {
t.Error("expected zero deposit and reserve error")
t.Error("expected zero max float error")
}
if err != nil && !strings.Contains(err.Error(), "zero deposit and reserve") {
t.Errorf("expected zero deposit and reserve error, got %v", err)
if err != nil && !strings.Contains(err.Error(), "max float is zero") {
t.Errorf("expected zero max float error, got %v", err)
}
}

Expand Down Expand Up @@ -910,22 +873,6 @@ func TestRedeemWinningTicket_MaxFloatError(t *testing.T) {
assert.EqualError(err, sm.maxFloatErr.Error())
}

func TestRedeemWinningTicket_InsufficientMaxFloat_QueueTicketError(t *testing.T) {
assert := assert.New(t)

sender, b, v, ts, gm, sm, em, cfg, sig := newRecipientFixtureOrFatal(t)
secret := [32]byte{3}
r := NewRecipientWithSecret(RandAddress(), b, v, ts, gm, sm, em, secret, cfg)

params := ticketParamsOrFatal(t, r, sender)
ticket := newTicket(sender, params, 1)
ticket.FaceValue = big.NewInt(99999999999999)

sm.queueTicketErr = errors.New("QueueTicket error")
err := r.RedeemWinningTicket(ticket, sig, params.Seed)
assert.EqualError(err, sm.queueTicketErr.Error())
}

func TestRedeemWinningTicket_InsufficientMaxFloat_QueueTicket(t *testing.T) {
assert := assert.New(t)

Expand All @@ -945,21 +892,6 @@ func TestRedeemWinningTicket_InsufficientMaxFloat_QueueTicket(t *testing.T) {
assert.Equal(&SignedTicket{ticket, sig, recipientRand}, sm.queued[0])
}

func TestRedeemWinningTicket_SubFloatError(t *testing.T) {
assert := assert.New(t)

sender, b, v, ts, gm, sm, em, cfg, sig := newRecipientFixtureOrFatal(t)
secret := [32]byte{3}
r := NewRecipientWithSecret(RandAddress(), b, v, ts, gm, sm, em, secret, cfg)

params := ticketParamsOrFatal(t, r, sender)
ticket := newTicket(sender, params, 1)

sm.subFloatErr = errors.New("SubFloat error")
err := r.RedeemWinningTicket(ticket, sig, params.Seed)
assert.EqualError(err, sm.subFloatErr.Error())
}

func TestRedeemWinningTicket_AddFloatError(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down
33 changes: 18 additions & 15 deletions pm/sender_test.go
Expand Up @@ -137,8 +137,9 @@ func TestCreateTicketBatch_EVTooHigh_ReturnsError(t *testing.T) {
func TestCreateTicketBatch_FaceValueTooHigh_ReturnsError(t *testing.T) {
// Test single ticket faceValue too high
sender := defaultSender(t)
senderAddr := sender.signer.Account().Address
sm := sender.senderManager.(*stubSenderManager)
sm.info = &SenderInfo{
sm.info[senderAddr] = &SenderInfo{
Deposit: big.NewInt(0),
}

Expand All @@ -155,7 +156,7 @@ func TestCreateTicketBatch_FaceValueTooHigh_ReturnsError(t *testing.T) {

// Test multiple tickets faceValue too high
sender.depositMultiplier = 2
sm.info.Deposit = big.NewInt(2224)
sm.info[senderAddr].Deposit = big.NewInt(2224)

_, err = sender.CreateTicketBatch(sessionID, 2)
assert.EqualError(t, err, "ticket faceValue higher than max faceValue")
Expand Down Expand Up @@ -324,8 +325,9 @@ func TestValidateTicketParams_FaceValueTooHigh_ReturnsError(t *testing.T) {

// Test when deposit = 0 and faceValue != 0
sender := defaultSender(t)
senderAddr := sender.signer.Account().Address
sm := sender.senderManager.(*stubSenderManager)
sm.info = &SenderInfo{
sm.info[senderAddr] = &SenderInfo{
Deposit: big.NewInt(0),
}

Expand All @@ -337,10 +339,10 @@ func TestValidateTicketParams_FaceValueTooHigh_ReturnsError(t *testing.T) {
assert.Contains("ticket faceValue higher than max faceValue", err.Error())

// Test when deposit / depositMultiplier < faceValue
sm.info.Deposit = big.NewInt(300)
sm.info[senderAddr].Deposit = big.NewInt(300)
sender.maxEV = big.NewRat(100, 1)
sender.depositMultiplier = 5
maxFaceValue := new(big.Int).Div(sm.info.Deposit, big.NewInt(int64(sender.depositMultiplier)))
maxFaceValue := new(big.Int).Div(sm.info[senderAddr].Deposit, big.NewInt(int64(sender.depositMultiplier)))

ticketParams.FaceValue = new(big.Int).Add(maxFaceValue, big.NewInt(1))
err = sender.ValidateTicketParams(ticketParams)
Expand All @@ -354,13 +356,14 @@ func TestValidateTicketParams_AcceptableParams_NoError(t *testing.T) {
// faceValue = 150 - 1 = 149
// ev = 149 * .5 = 74.5
sender := defaultSender(t)
senderAddr := sender.signer.Account().Address
sm := sender.senderManager.(*stubSenderManager)
sm.info = &SenderInfo{
sm.info[senderAddr] = &SenderInfo{
Deposit: big.NewInt(300),
}
sender.maxEV = big.NewRat(100, 1)
sender.depositMultiplier = 2
maxFaceValue := new(big.Int).Div(sm.info.Deposit, big.NewInt(int64(sender.depositMultiplier)))
maxFaceValue := new(big.Int).Div(sm.info[senderAddr].Deposit, big.NewInt(int64(sender.depositMultiplier)))

ticketParams := &TicketParams{
FaceValue: new(big.Int).Sub(maxFaceValue, big.NewInt(1)),
Expand All @@ -374,8 +377,8 @@ func TestValidateTicketParams_AcceptableParams_NoError(t *testing.T) {
// maxFaceValue = 402 / 2 = 201
// faceValue = 201 - 1 = 200
// ev = 200 * .5 = 100
sm.info.Deposit = big.NewInt(402)
maxFaceValue = new(big.Int).Div(sm.info.Deposit, big.NewInt(int64(sender.depositMultiplier)))
sm.info[senderAddr].Deposit = big.NewInt(402)
maxFaceValue = new(big.Int).Div(sm.info[senderAddr].Deposit, big.NewInt(int64(sender.depositMultiplier)))

ticketParams.FaceValue = new(big.Int).Sub(maxFaceValue, big.NewInt(1))
err = sender.ValidateTicketParams(ticketParams)
Expand All @@ -386,8 +389,8 @@ func TestValidateTicketParams_AcceptableParams_NoError(t *testing.T) {
// maxFaceValue = 399 / 2 = 199
// faceValue = 199
// ev = 199 * .5 = 99.5
sm.info.Deposit = big.NewInt(399)
maxFaceValue = new(big.Int).Div(sm.info.Deposit, big.NewInt(int64(sender.depositMultiplier)))
sm.info[senderAddr].Deposit = big.NewInt(399)
maxFaceValue = new(big.Int).Div(sm.info[senderAddr].Deposit, big.NewInt(int64(sender.depositMultiplier)))

ticketParams.FaceValue = maxFaceValue
err = sender.ValidateTicketParams(ticketParams)
Expand All @@ -398,8 +401,8 @@ func TestValidateTicketParams_AcceptableParams_NoError(t *testing.T) {
// maxFaceValue = 400 / 2 = 200
// faceValue = 200
// ev = 200 * .5 = 100
sm.info.Deposit = big.NewInt(400)
maxFaceValue = new(big.Int).Div(sm.info.Deposit, big.NewInt(int64(sender.depositMultiplier)))
sm.info[senderAddr].Deposit = big.NewInt(400)
maxFaceValue = new(big.Int).Div(sm.info[senderAddr].Deposit, big.NewInt(int64(sender.depositMultiplier)))

ticketParams.FaceValue = maxFaceValue
err = sender.ValidateTicketParams(ticketParams)
Expand All @@ -414,8 +417,8 @@ func defaultSender(t *testing.T) *sender {
account: account,
}
rm := &stubRoundsManager{round: big.NewInt(5), blkHash: [32]byte{5}}
sm := &stubSenderManager{}
sm.info = &SenderInfo{
sm := newStubSenderManager()
sm.info[account.Address] = &SenderInfo{
Deposit: big.NewInt(100000),
}
s := NewSender(am, rm, sm, big.NewRat(100, 1), 2)
Expand Down

0 comments on commit 5b74602

Please sign in to comment.