Skip to content

Commit

Permalink
Merge pull request #649 from livepeer/yf/pmrecipient
Browse files Browse the repository at this point in the history
PM recipient
  • Loading branch information
yondonfu committed Dec 26, 2018
2 parents dd06592 + a72a3af commit 2afa993
Show file tree
Hide file tree
Showing 8 changed files with 1,089 additions and 64 deletions.
48 changes: 48 additions & 0 deletions pm/helpers.go
@@ -0,0 +1,48 @@
package pm

import (
"crypto/rand"
"testing"

ethcommon "github.com/ethereum/go-ethereum/common"
)

func randHashOrFatal(t *testing.T) ethcommon.Hash {
key, err := randBytes(32)

if err != nil {
t.Fatalf("failed generating random hash: %v", err)
return ethcommon.Hash{}
}

return ethcommon.BytesToHash(key[:])
}

func randAddressOrFatal(t *testing.T) ethcommon.Address {
key, err := randBytes(addressSize)

if err != nil {
t.Fatalf("failed generating random address: %v", err)
return ethcommon.Address{}
}

return ethcommon.BytesToAddress(key[:])
}

func randBytesOrFatal(size int, t *testing.T) []byte {
res, err := randBytes(size)

if err != nil {
t.Fatalf("failed generating random bytes: %v", err)
return nil
}

return res
}

func randBytes(size int) ([]byte, error) {
key := make([]byte, size)
_, err := rand.Read(key)

return key, err
}
225 changes: 225 additions & 0 deletions pm/recipient.go
@@ -0,0 +1,225 @@
package pm

import (
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"math/big"
"sync"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
)

// Recipient is an interface which describes an object capable
// of receiving tickets
type Recipient interface {
// ReceiveTicket validates and processes a received ticket
ReceiveTicket(ticket *Ticket, sig []byte, seed *big.Int) (sessionID string, won bool, err error)

// RedeemWinningTicket redeems all winning tickets with the broker
// for a session ID
RedeemWinningTickets(sessionID string) error

// TicketParams returns the recipient's currently accepted ticket parameters
// for a provided sender ETH adddress
TicketParams(sender ethcommon.Address) (*TicketParams, error)
}

// recipient is an implementation of the Recipient interface that
// receives tickets and redeems winning tickets
type recipient struct {
val Validator
broker Broker
store TicketStore

secret [32]byte

invalidRands sync.Map

senderNonces map[string]uint64
senderNoncesLock sync.Mutex

faceValue *big.Int
winProb *big.Int
}

// NewRecipient creates an instance of a recipient with an
// automatically generated random secret
func NewRecipient(broker Broker, val Validator, store TicketStore, faceValue *big.Int, winProb *big.Int) (Recipient, error) {
randBytes := make([]byte, 32)
if _, err := rand.Read(randBytes); err != nil {
return nil, err
}

var secret [32]byte
copy(secret[:], randBytes[:32])

return NewRecipientWithSecret(broker, val, store, secret, faceValue, winProb), nil
}

// NewRecipientWithSecret creates an instance of a recipient with a user provided
// secret. In most cases, NewRecipient should be used instead which will
// automatically generate a random secret
func NewRecipientWithSecret(broker Broker, val Validator, store TicketStore, secret [32]byte, faceValue *big.Int, winProb *big.Int) Recipient {
return &recipient{
broker: broker,
val: val,
store: store,
secret: secret,
faceValue: faceValue,
senderNonces: make(map[string]uint64),
winProb: winProb,
}
}

// ReceiveTicket validates and processes a received ticket
func (r *recipient) ReceiveTicket(ticket *Ticket, sig []byte, seed *big.Int) (string, bool, error) {
recipientRand := r.rand(seed, ticket.Sender)

if crypto.Keccak256Hash(ethcommon.LeftPadBytes(recipientRand.Bytes(), uint256Size)) != ticket.RecipientRandHash {
return "", false, errors.Errorf("invalid recipientRand generated from seed %v", seed)
}

if ticket.FaceValue.Cmp(r.faceValue) != 0 {
return "", false, errors.Errorf("invalid ticket faceValue %v", ticket.FaceValue)
}

if ticket.WinProb.Cmp(r.winProb) != 0 {
return "", false, errors.Errorf("invalid ticket winProb %v", ticket.WinProb)
}

if err := r.val.ValidateTicket(ticket, sig, recipientRand); err != nil {
return "", false, err
}

if !r.validRand(recipientRand) {
return "", false, errors.Errorf("invalid already revealed recipientRand %v", recipientRand)
}

if err := r.updateSenderNonce(recipientRand, ticket.SenderNonce); err != nil {
return "", false, err
}

if r.val.IsWinningTicket(ticket, sig, recipientRand) {
sessionID := ticket.RecipientRandHash.Hex()

if err := r.store.Store(sessionID, ticket, sig, recipientRand); err != nil {
return "", true, err
}

return sessionID, true, nil
}

return "", false, nil
}

// RedeemWinningTicket redeems all winning tickets with the broker
// for a session ID
func (r *recipient) RedeemWinningTickets(sessionID string) error {
tickets, sigs, recipientRands, err := r.store.Load(sessionID)
if err != nil {
return err
}

for i := 0; i < len(tickets); i++ {
if err := r.redeemWinningTicket(tickets[i], sigs[i], recipientRands[i]); err != nil {
return err
}
}

return nil
}

// TicketParams returns the recipient's currently accepted ticket parameters
func (r *recipient) TicketParams(sender ethcommon.Address) (*TicketParams, error) {
randBytes := make([]byte, 32)
if _, err := rand.Read(randBytes); err != nil {
return nil, err
}

seed := new(big.Int).SetBytes(randBytes)
recipientRand := r.rand(seed, sender)
recipientRandHash := crypto.Keccak256Hash(ethcommon.LeftPadBytes(recipientRand.Bytes(), uint256Size))

return &TicketParams{
FaceValue: r.faceValue,
WinProb: r.winProb,
RecipientRandHash: recipientRandHash,
Seed: seed,
}, nil
}

func (r *recipient) redeemWinningTicket(ticket *Ticket, sig []byte, recipientRand *big.Int) error {
deposit, err := r.broker.GetDeposit(ticket.Sender)
if err != nil {
return err
}

penaltyEscrow, err := r.broker.GetPenaltyEscrow(ticket.Sender)
if err != nil {
return err
}

// TODO: Consider a smarter strategy here in the future
// Ex. If deposit < transaction cost, do not try to redeem
if deposit.Cmp(big.NewInt(0)) == 0 && penaltyEscrow.Cmp(big.NewInt(0)) == 0 {
return errors.Errorf("sender %v has zero deposit and penalty escrow", ticket.Sender)
}

// Assume that that this call will return immediately if there
// is an error in transaction submission. Else, the function will kick off
// a goroutine and then return to the caller
if err := r.broker.RedeemWinningTicket(ticket, sig, recipientRand); err != nil {
return err
}

// If there is no error, the transaction has been submitted. As a result,
// we assume that recipientRand has been revealed so we should invalidate it locally
r.updateInvalidRands(recipientRand)

// After we invalidate recipientRand we can clear the memory used to track
// its latest senderNonce
r.clearSenderNonce(recipientRand)

return nil
}

func (r *recipient) rand(seed *big.Int, sender ethcommon.Address) *big.Int {
h := hmac.New(sha256.New, r.secret[:])
h.Write(append(seed.Bytes(), sender.Bytes()...))

return new(big.Int).SetBytes(h.Sum(nil))
}

func (r *recipient) validRand(rand *big.Int) bool {
_, ok := r.invalidRands.Load(rand.String())
return !ok
}

func (r *recipient) updateInvalidRands(rand *big.Int) {
r.invalidRands.Store(rand.String(), true)
}

func (r *recipient) updateSenderNonce(rand *big.Int, senderNonce uint64) error {
r.senderNoncesLock.Lock()
defer r.senderNoncesLock.Unlock()

randStr := rand.String()
nonce, ok := r.senderNonces[randStr]
if ok && senderNonce <= nonce {
return errors.Errorf("invalid ticket senderNonce %v - highest seen is %v", senderNonce, nonce)
}

r.senderNonces[randStr] = senderNonce

return nil
}

func (r *recipient) clearSenderNonce(rand *big.Int) {
r.senderNoncesLock.Lock()
defer r.senderNoncesLock.Unlock()

delete(r.senderNonces, rand.String())
}

0 comments on commit 2afa993

Please sign in to comment.